# ETL Spark Project - Spar Nord Bank

## Setup

Create a new Spark session

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1651580483378_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- Identify the hdfs path to where data has been extracted using Sqoop
- Define the schema for transactional data
- Load the data and verify schema

In [2]:
from pyspark.sql.types import *

hdfsPath = "hdfs:/user/root/atm_trans"
dataSchema = StructType([
    StructField('year', IntegerType(), False),
    StructField('month', StringType(), False),
    StructField('day', IntegerType(), False),
    StructField('weekday', StringType(), False),
    StructField('hour', IntegerType(), False),
    StructField('atm_status', StringType(), False),
    StructField('atm_id', StringType(), False),
    StructField('atm_manufacturer', StringType(), False),
    StructField('atm_location', StringType(), False),
    StructField('atm_streetname', StringType(), False),
    StructField('atm_street_number', IntegerType(), False),
    StructField('atm_zipcode', IntegerType(), False),
    StructField('atm_lat', DoubleType(), False),
    StructField('atm_lon', DoubleType(), False),
    StructField('currency', StringType(), False),
    StructField('card_type', StringType(), False),
    StructField('transaction_amount', IntegerType(), False),
    StructField('service', StringType(), False),
    StructField('message_code', StringType(), True),
    StructField('message_text', StringType(), True),
    StructField('weather_lat', DoubleType(), False),
    StructField('weather_lon', DoubleType(), False),
    StructField('weather_city_id', IntegerType(), False),
    StructField('weather_city_name', StringType(), False),
    StructField('temp', DoubleType(), False),
    StructField('pressure', IntegerType(), False),
    StructField('humidity', IntegerType(), False),
    StructField('wind_speed', IntegerType(), False),
    StructField('wind_deg', IntegerType(), False),
    StructField('rain_3h', DoubleType(), True),
    StructField('clouds_all', IntegerType(), False),
    StructField('weather_id', IntegerType(), False),
    StructField('weather_main', StringType(), False),
    StructField('weather_description', StringType(), False)
])
transactions = spark.read.format("csv").option(
    "header", "false").schema(dataSchema).load(hdfsPath)
transactions.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

- Check the number of records imported for later reference

In [3]:
transactions.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

## Create Dimensions 

### 1. Create Location Dimension

In [4]:
locationDim = transactions.select('atm_location', 'atm_streetname',
                                  'atm_street_number', 'atm_zipcode', 'atm_lat',
                                  'atm_lon').distinct()

#number of unique records
locationDim.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

In [5]:
from pyspark.sql.functions import *
from pyspark.sql import Window

#create a primary key for the location dimension
locationDim = locationDim.withColumn(
    'location_id',
    row_number().over(Window.orderBy(monotonically_increasing_id())))
locationDim.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+-------------------+-----------------+-----------+-------+-------+-----------+
|    atm_location|     atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|location_id|
+----------------+-------------------+-----------------+-----------+-------+-------+-----------+
|         Kolding|           Vejlevej|              135|       6000| 55.505|  9.457|          1|
|  Skelagervej 15|        Skelagervej|               15|       9000| 57.023|  9.891|          2|
|Intern HolbÃƒÂ¦k|        Slotsvolden|                7|       4300| 55.718| 11.704|          3|
|          Odense|       FÃƒÂ¦lledvej|                3|       5000| 55.394|  10.37|          4|
|           Ikast|RÃƒÂ¥dhusstrÃƒÂ¦det|               12|       7430| 56.139|  9.154|          5|
+----------------+-------------------+-----------------+-----------+-------+-------+-----------+
only showing top 5 rows

### 2. Create ATM Dimension 

In [6]:
atmDimTemp = transactions.select('atm_id', 'atm_manufacturer', 'atm_lat',
                                 'atm_lon', 'atm_location', 'atm_streetname',
                                 'atm_street_number', 'atm_zipcode').distinct()

#number of unique records
atmDimTemp.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

- Join ATM dimension with location on common columns
- Set location dimension's reference key in ATM dimension
- Clean ATM dimension by dropping all location columns from the join

In [7]:
atmDimTemp = atmDimTemp.join(locationDim, 
                ['atm_location', 'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon'], 
                'left').withColumn('location_ref_key', 
                                   col('location_id')).drop(*locationDim.columns)

#verify the number of unique records is still same
atmDimTemp.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

In [8]:
#create a primary key for ATM dimension
atmDim = atmDimTemp.withColumn(
    'atm_prim_key',
    row_number().over(Window.orderBy(monotonically_increasing_id())))
atmDim.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

### 3. Create Date Dimension 

In [9]:
dateDimTemp = transactions.select('year', 'month', 'day', 'weekday',
                                  'hour').distinct()

#number of unique records
dateDimTemp.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8685

- Create timestamp using other date/time columns
- Verify the newly created timestamp data

In [10]:
#dateDimTemp = dateDimTemp.withColumn('month', substring(col('month'), 1, 3))
#dateDimTemp = dateDimTemp.withColumn('day', when(col('day') < 10, concat(lit('0'), col('day'))).otherwise(col('day')))
#dateDimTemp = dateDimTemp.withColumn('weekday', substring(col('weekday'), 1, 3))
#dateDimTemp = dateDimTemp.withColumn('hour', when(col('hour') < 10, concat(lit('0'), col('hour'))).otherwise(col('hour')))

dateDimTemp = dateDimTemp.withColumn(
    'full_date_time',
    to_timestamp(
        concat(dateDimTemp.year, lit('-'), dateDimTemp.month, lit('-'),
               dateDimTemp.day, lit(' '), dateDimTemp.weekday, lit(' '),
               dateDimTemp.hour), 'yyyy-MMMM-d EEEE HH'))
dateDimTemp.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+------+---+---------+----+-------------------+
|year| month|day|  weekday|hour|     full_date_time|
+----+------+---+---------+----+-------------------+
|2017|August|  1|  Tuesday|  11|2017-08-01 11:00:00|
|2017|August|  3| Thursday|  21|2017-08-03 21:00:00|
|2017|  July| 29| Saturday|  17|2017-07-29 17:00:00|
|2017|August|  3| Thursday|  15|2017-08-03 15:00:00|
|2017|August| 13|   Sunday|   2|2017-08-13 02:00:00|
|2017|  July|  6| Thursday|  14|2017-07-06 14:00:00|
|2017|  July| 13| Thursday|  16|2017-07-13 16:00:00|
|2017|  July| 30|   Sunday|  13|2017-07-30 13:00:00|
|2017|August|  3| Thursday|   5|2017-08-03 05:00:00|
|2017|August| 12| Saturday|  22|2017-08-12 22:00:00|
|2017|  July| 14|   Friday|   2|2017-07-14 02:00:00|
|2017|August|  2|Wednesday|  22|2017-08-02 22:00:00|
|2017|August| 10| Thursday|  18|2017-08-10 18:00:00|
|2017|  July|  3|   Monday|  10|2017-07-03 10:00:00|
|2017|  July| 27| Thursday|  18|2017-07-27 18:00:00|
|2017|  July| 11|  Tuesday|  18|2017-07-11 18:

In [11]:
#create primary key for Date Dimension
dateDim = dateDimTemp.withColumn(
    'date_id',
    row_number().over(Window.orderBy(monotonically_increasing_id())))

#verify no change in unique count
dateDim.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8685

### 4. Create Card Type Dimension 

In [12]:
#create primary key for unique card types
cardDim = transactions.select('card_type').distinct().withColumn(
    'card_type_id',
    row_number().over(Window.orderBy(monotonically_increasing_id())))

#verify the results
cardDim.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------+
|           card_type|card_type_id|
+--------------------+------------+
|Visa Dankort - on-us|           1|
|  Mastercard - on-us|           2|
|         HÃƒÂ¦vekort|           3|
|            VisaPlus|           4|
|     Dankort - on-us|           5|
|        Visa Dankort|           6|
| HÃƒÂ¦vekort - on-us|           7|
|              CIRRUS|           8|
|                VISA|           9|
|             Maestro|          10|
|          MasterCard|          11|
|             Dankort|          12|
+--------------------+------------+

In [13]:
#unique number of records
cardDim.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12

## Create Fact Table 

- Join transaction table on each dimesnion on common columns
- Set the reference key for each dimension in fact table
- Drop dimension table columns from join
- Verify the number of records in fact table is same as the original transactions table

### 1. Link with Location Dimension 

In [14]:
transFactTemp = transactions.join(
    locationDim, ['atm_location', 'atm_streetname', 'atm_street_number', 
                  'atm_zipcode', 'atm_lat', 'atm_lon'],
                  'left').withColumn('weather_loc_id', 
                                     col('location_id')).drop(*locationDim.columns)

#number of records same as transactions table
transFactTemp.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

### 2. Link with ATM Dimension 

In [15]:
transFactTemp = transFactTemp.join(
    atmDim, ['atm_id', 'atm_manufacturer'], 
    'left').withColumn('atm_ref_key', 
                       col('atm_prim_key')).drop(*atmDim.columns)

#number of records same as transactions table
transFactTemp.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

### 3. Link with Date Dimension 

In [16]:
transFactTemp = transFactTemp.join(
    dateDim, ['year', 'month', 'day', 'hour', 'weekday'],
    'left').withColumn('date_ref_key', 
                       col('date_id')).drop(*dateDim.columns)

#number of records same as transactions table
transFactTemp.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

### 4. Link with Card Type Dimension 

In [17]:
transFactTemp = transFactTemp.join(cardDim, ['card_type'], 'left').withColumn(
    'card_type_ref_key',
    col('card_type_id')).drop(*cardDim.columns)

#number of records same as transactions table
transFactTemp.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

### 5. Create Primary Key for the fact table

In [18]:
transFactTemp = transFactTemp.withColumn(
    'trans_id',
    row_number().over(Window.orderBy(monotonically_increasing_id())))

#number of records same as transactions table
transFactTemp.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

## Restructuring and Renaming columns

- Rename columns (if required)
- verify new schema
- verify the count matches with the initial count

In [19]:
locationDim = locationDim.withColumnRenamed('atm_location', 'location')\
                         .withColumnRenamed('atm_streetname', 'streetname')\
                         .withColumnRenamed('atm_street_number', 'street_number')\
                         .withColumnRenamed('atm_zipcode', 'zipcode')\
                         .withColumnRenamed('atm_lat', 'lat')\
                         .withColumnRenamed('atm_lon', 'lon').select('location_id', 'location', 'streetname', 
                                                                     'street_number', 'zipcode', 'lat', 'lon')

locationDim.printSchema()
locationDim.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- location_id: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)

109

In [20]:
atmDim = atmDim.withColumnRenamed('atm_id', 'atm_number')\
               .withColumnRenamed('location_ref_key', 'atm_location_id')\
               .withColumnRenamed('atm_prim_key', 'atm_id').select('atm_id', 'atm_number', 'atm_manufacturer', 'atm_location_id')

atmDim.printSchema()
atmDim.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- atm_id: integer (nullable = true)
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location_id: integer (nullable = true)

113

In [21]:
dateDim = dateDim.select('date_id', 'full_date_time', 'year', 'month', 'day', 'hour', 'weekday')

dateDim.printSchema()
dateDim.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- date_id: integer (nullable = true)
 |-- full_date_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)

8685

In [22]:
cardDim = cardDim.select('card_type_id', 'card_type')

cardDim.printSchema()
cardDim.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- card_type_id: integer (nullable = true)
 |-- card_type: string (nullable = true)

12

In [23]:
transFact = transFactTemp.withColumnRenamed('atm_ref_key', 'atm_id')\
                         .withColumnRenamed('date_ref_key', 'date_id')\
                         .withColumnRenamed('card_type_ref_key', 'card_type_id')\
            .select('trans_id', 'atm_id', 'weather_loc_id', 'date_id', 'card_type_id', 
                    'atm_status', 'currency', 'service', 'transaction_amount', 'message_code',
                    'message_text', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description')


transFact.printSchema()
transFact.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- trans_id: integer (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- weather_loc_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)

2468572

## Sanity Checks on all the dimension and fact table transformations

In [24]:
#Maximum value of 'trans_id' should be same as total number of records in fact table
transFact.select(max('trans_id')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+
|max(trans_id)|
+-------------+
|      2468572|
+-------------+

In [25]:
#Maximum value of 'atm_id' should be same as total number of records in ATM dimension
transFact.select(max('atm_id')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+
|max(atm_id)|
+-----------+
|        113|
+-----------+

In [26]:
#Maximum value of 'weather_loc_id' should be same as total number of records in Location dimension
transFact.select(max('weather_loc_id')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+
|max(weather_loc_id)|
+-------------------+
|                109|
+-------------------+

In [27]:
#Maximum value of 'date_id' should be same as total number of records in Date dimension
transFact.select(max('date_id')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+
|max(date_id)|
+------------+
|        8685|
+------------+

In [28]:
#Maximum value of 'card_type_id' should be same as total number of records in Card Type dimension
transFact.select(max('card_type_id')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+
|max(card_type_id)|
+-----------------+
|               12|
+-----------------+

In [29]:
#All references to Location dimension must be not null
transFact.select(col('weather_loc_id').isNotNull()).count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [30]:
#All references to ATM dimension must be not null
transFact.select(col('atm_id').isNotNull()).count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [31]:
#All references to Date dimension must be not null
transFact.select(col('date_id').isNotNull()).count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [32]:
#All references to Card Type dimension must be not null
transFact.select(col('card_type_id').isNotNull()).count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

## Save the dimensions and facts in S3 bucket

### 1. Authorization 

In [33]:
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key",
                                                  "AKIA26QYRCTCE5NBJ4HE")
spark.sparkContext._jsc.hadoopConfiguration().set(
    "fs.s3a.secret.key", "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")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint",
                                                  "s3.amazonaws.com")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 2. Write Dimension tables to S3

In [34]:
locationDim.write.mode("overwrite").save('s3a://etlanalytics/dimensions/DIM_LOCATION',
                       format='csv',
                       header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
atmDim.write.mode("overwrite").save('s3a://etlanalytics/dimensions/DIM_ATM',
                  format='csv',
                  header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
dateDim.write.mode("overwrite").save('s3a://etlanalytics/dimensions/DIM_DATE',
                   format='csv',
                   header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
cardDim.write.mode("overwrite").save('s3a://etlanalytics/dimensions/DIM_CARD_TYPE',
                   format='csv',
                   header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Write Fact table to S3

In [42]:
transFact.write.mode("overwrite").save('s3a://etlanalytics/facts/FACT_ATM_TRANS',
                   format='csv',
                   header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…