In [1]:
spark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
5,application_1653022088945_0007,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f5204e6cc50>

# Define schema

In [2]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
schema = StructType([
                        StructField(name='year', dataType=IntegerType(), nullable=True),
                        StructField(name='month', dataType=StringType(), nullable=True),
                        StructField(name='day', dataType=IntegerType(), nullable=True),
                        StructField(name='weekday', dataType=StringType(), nullable=True),
                        StructField(name='hour', dataType=IntegerType(), nullable=True),
                        StructField(name='atm_status', dataType=StringType(), nullable=True),
                        StructField(name='atm_id', dataType=StringType(), nullable=True),
                        StructField(name='atm_manufacturer', dataType=StringType(), nullable=True),
                        StructField(name='atm_location', dataType=StringType(), nullable=True),
                        StructField(name='atm_streetname', dataType=StringType(), nullable=True),
                        StructField(name='atm_street_number', dataType=IntegerType(), nullable=True),
                        StructField(name='atm_zipcode', dataType=IntegerType(), nullable=True),
                        StructField(name='atm_lat', dataType=DoubleType(), nullable=True),
                        StructField(name='atm_lon', dataType=DoubleType(), nullable=True),
                        StructField(name='currency', dataType=StringType(), nullable=True),
                        StructField(name='card_type', dataType=StringType(), nullable=True),
                        StructField(name='transaction_amount', dataType=IntegerType(), nullable=True),
                        StructField(name='service', dataType=StringType(), nullable=True),
                        StructField(name='message_code', dataType=StringType(), nullable=True),
                        StructField(name='message_text', dataType=StringType(), nullable=True),
                        StructField(name='weather_lat', dataType=DoubleType(), nullable=True),
                        StructField(name='weather_lon', dataType=DoubleType(), nullable=True),
                        StructField(name='weather_city_id', dataType=IntegerType(), nullable=True),
                        StructField(name='weather_city_name', dataType=StringType(), nullable=True),
                        StructField(name='temp', dataType=DoubleType(), nullable=True),
                        StructField(name='pressure', dataType=IntegerType(), nullable=True),
                        StructField(name='humidity', dataType=IntegerType(), nullable=True),
                        StructField(name='wind_speed', dataType=IntegerType(), nullable=True),
                        StructField(name='wind_deg', dataType=IntegerType(), nullable=True),
                        StructField(name='rain_3h', dataType=DoubleType(), nullable=True),
                        StructField(name='clouds_all', dataType=IntegerType(), nullable=True),
                        StructField(name='weather_id', dataType=IntegerType(), nullable=True),
                        StructField(name='weather_main', dataType=StringType(), nullable=True),
                        StructField(name='weather_description', dataType=StringType(), nullable=True)
                ])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Read the data into a dataframe and verify the records loaded

In [3]:
transactions = spark.read.csv("SRC_ATM_TRANS/part-m-00000", header=False, schema=schema)
transactions.printSchema()
transactions.select("*").count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

# Create the dimension and fact tables

### Dimension for card types

In [4]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

dim_card_type = transactions.select("card_type").distinct()\
                            .withColumn("card_type_id", 
                                        row_number().over(Window.partitionBy().orderBy(col("card_type"))))\
                            .select("card_type_id", "card_type")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Validate dim_card_type

In [5]:
dim_card_type.printSchema()
dim_card_type.show()
dim_card_type.select("*").count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- card_type_id: integer (nullable = true)
 |-- card_type: string (nullable = true)

+------------+--------------------+
|card_type_id|           card_type|
+------------+--------------------+
|           1|              CIRRUS|
|           2|             Dankort|
|           3|     Dankort - on-us|
|           4|         HÃƒÂ¦vekort|
|           5| HÃƒÂ¦vekort - on-us|
|           6|             Maestro|
|           7|          MasterCard|
|           8|  Mastercard - on-us|
|           9|                VISA|
|          10|        Visa Dankort|
|          11|Visa Dankort - on-us|
|          12|            VisaPlus|
+------------+--------------------+

12

### Dimension for dates

In [6]:
from pyspark.sql.functions import concat_ws, lit, unix_timestamp

timestamp_format = "yyyy-MMM-dd HH:mm:ss"

dim_date = transactions.select("year", "month", "day", "hour", "weekday")\
                       .withColumn("full_date_time", 
                                   unix_timestamp(
                                       concat_ws(" ", 
                                                 concat_ws('-', col("year"), col("month"), col("day")), 
                                                 concat_ws(':', col("hour"), lit("00:00"))
                                                ), 
                                       timestamp_format
                                   ).cast("timestamp")
                                  )\
                        .distinct()\
                        .withColumn("date_id", row_number().over(Window.partitionBy().orderBy(col("full_date_time"))))\
                        .select("date_id", "full_date_time", "year", "month", "day", "hour", "weekday")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Validate dim_date

In [7]:
dim_date.printSchema()
dim_date.show()
dim_date.select("*").count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- date_id: integer (nullable = true)
 |-- full_date_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)

+-------+-------------------+----+-------+---+----+-------+
|date_id|     full_date_time|year|  month|day|hour|weekday|
+-------+-------------------+----+-------+---+----+-------+
|      1|2017-01-01 00:00:00|2017|January|  1|   0| Sunday|
|      2|2017-01-01 01:00:00|2017|January|  1|   1| Sunday|
|      3|2017-01-01 02:00:00|2017|January|  1|   2| Sunday|
|      4|2017-01-01 03:00:00|2017|January|  1|   3| Sunday|
|      5|2017-01-01 04:00:00|2017|January|  1|   4| Sunday|
|      6|2017-01-01 05:00:00|2017|January|  1|   5| Sunday|
|      7|2017-01-01 06:00:00|2017|January|  1|   6| Sunday|
|      8|2017-01-01 07:00:00|2017|January|  1|   7| Sunday|
|      9|2017-01-01 08:00:00|2017|January|  1|   8| Sunday|

### Dimension for locations

In [8]:
dim_location = transactions.select("atm_location", "atm_streetname", "atm_street_number", 
                                   "atm_zipcode", "atm_lat", "atm_lon")\
                           .withColumnRenamed("atm_location", "location")\
                           .withColumnRenamed("atm_streetname", "streetname")\
                           .withColumnRenamed("atm_street_number", "street_number")\
                           .withColumnRenamed("atm_zipcode", "zipcode")\
                           .withColumnRenamed("atm_lat", "lat")\
                           .withColumnRenamed("atm_lon", "lon")\
                           .distinct()\
                           .withColumn("location_id", row_number().over(Window.partitionBy().orderBy(col("zipcode"))))\
                           .select("location_id", "location", "streetname",
                                  "street_number", "zipcode", "lat", "lon")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Validate dim_location

In [9]:
dim_location.printSchema()
dim_location.show()
dim_location.select("*").count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- location_id: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)

+-----------+--------------------+------------------+-------------+-------+------+------+
|location_id|            location|        streetname|street_number|zipcode|   lat|   lon|
+-----------+--------------------+------------------+-------------+-------+------+------+
|          1|        KÃƒÂ¸benhavn|    Regnbuepladsen|            5|   1550|55.676|12.571|
|          2|Intern  KÃƒÂ¸benhavn|  RÃƒÂ¥dhuspladsen|           75|   1550|55.676|12.571|
|          3|       Frederiksberg|   Gammel Kongevej|          157|   1850|55.677|12.537|
|          4|              Lyngby|       Jernbanevej|            6|   2800|55.772|  12.5|
|          5|        HelsingÃƒÂ¸r|    Sct. Olai Gade|           39|   3000|56.036|

### Dimension for ATMs

In [10]:
from pyspark.sql.functions import concat

dim_atm = transactions.select("atm_id", "atm_manufacturer", "atm_lat", "atm_lon")\
                      .withColumnRenamed("atm_id", "atm_number")\
                      .join(other=dim_location, 
                            on=(concat(col("atm_lat"), col("atm_lon")) == concat(col("lat"), col("lon"))), 
                            how="left")\
                      .select("location_id", "atm_number", "atm_manufacturer")\
                      .withColumnRenamed("location_id", "atm_location_id")\
                      .distinct()\
                      .withColumn("atm_id", row_number().over(Window.partitionBy().orderBy(col("atm_location_id"))))\
                      .select("atm_id", "atm_location_id", "atm_number", "atm_manufacturer")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Validate dim_atm

In [11]:
dim_atm.printSchema()
dim_atm.show()
dim_atm.select("*").count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- atm_id: integer (nullable = true)
 |-- atm_location_id: integer (nullable = true)
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)

+------+---------------+----------+----------------+
|atm_id|atm_location_id|atm_number|atm_manufacturer|
+------+---------------+----------+----------------+
|     1|              1|        95|             NCR|
|     2|              1|        85| Diebold Nixdorf|
|     3|              2|        85| Diebold Nixdorf|
|     4|              2|        95|             NCR|
|     5|              3|        47|             NCR|
|     6|              4|        44|             NCR|
|     7|              5|        46| Diebold Nixdorf|
|     8|              6|        86|             NCR|
|     9|              7|        57|             NCR|
|    10|              8|        22|             NCR|
|    11|              8|       106|             NCR|
|    12|              9|         4|             NCR|
|    13|             1