# ETL Project - Bank ATM Case study

### Initializing Spark Session

In [1]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2,application_1699251209521_0004,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7fb359cd0dd0>

### Reading the data to Spark from HDFS location

In [2]:
hdfsPath = '/user/livy/part-m-00000'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Creating input file schema (as per given format) using StructType

In [4]:
fileSchema = StructType([StructField('Year', IntegerType(),True),
                        StructField('Month', StringType(),True),
                        StructField('Day', IntegerType(),True),
                        StructField('Weekday', StringType(),True),
                        StructField('Hour', IntegerType(),True),
                        StructField('atm_status', StringType(),True),
                        StructField('atm_id', StringType(),True),
                        StructField('atm_manufacturer', StringType(),True),
                        StructField('atm_location', StringType(),True),
                        StructField('atm_street_name', StringType(),True),
                        StructField('atm_street_number', IntegerType(),True),
                        StructField('atm_zipcode', IntegerType(),True),
                        StructField('atm_lat', DoubleType(),True),
                        StructField('atm_lon', DoubleType(),True),
                        StructField('currency', StringType(),True),
                        StructField('card_type', StringType(),True),
                        StructField('transaction_amount', IntegerType(), True),
                        StructField('service', StringType(),True),
                        StructField('message_code', StringType(),True),
                        StructField('message_text', StringType(),True),
                        StructField('weather_lat', DoubleType(),True),
                        StructField('weather_long', DoubleType(),True),
                        StructField('weather_city_id', IntegerType(),True), 
                        StructField('weather_city_name', StringType(),True),
                        StructField('temp', DoubleType(),True),
                        StructField('pressure', IntegerType(),True),
                        StructField('humidity', IntegerType(),True),
                        StructField('wind_speed', IntegerType(),True),
                        StructField('wind_deg', IntegerType(),True),
                        StructField('rain_3h', DoubleType(),True), 
                        StructField('clouds_all', IntegerType(),True),
                        StructField('weather_id', IntegerType(),True),
                        StructField('weather_main', StringType(),True),
                        StructField('weather_description', StringType(),True)
                        ])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
file1 = spark.read.csv(hdfsPath, header = False, schema = fileSchema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
file1.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Weekday: string (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_street_name: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_long: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: 

In [7]:
file1.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+-------+----+----------+------+----------------+------------+---------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+------------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+
|Year|  Month|Day|Weekday|Hour|atm_status|atm_id|atm_manufacturer|atm_location|atm_street_name|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_long|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+---------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-

### Total count of imported data

In [8]:
file1.select('*').count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

## Creating the Dimension and Fact tables according to given schema

### 1. Location Dimension - first dimension table

##### - Creating first data dimension for Location dataframe, pertianing to all the details for ATM location

In [9]:
dim_table1 = file1.select("atm_location","atm_street_name","atm_street_number","atm_zipcode","atm_lat","atm_lon").distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
dim_table1.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

In [11]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### - Creating primary key (**location_id**) for Location dimension (**DIM_LOCATION**)

In [12]:
window_spec = Window.orderBy("atm_location")
dim_location = dim_table1.withColumn("location_id", row_number().over(window_spec))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
dim_location.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------------+-----------------+-----------+-------+-------+-----------+
|        atm_location|atm_street_name|atm_street_number|atm_zipcode|atm_lat|atm_lon|location_id|
+--------------------+---------------+-----------------+-----------+-------+-------+-----------+
|             Aabybro|   ÃƒËœstergade|                6|       9440| 57.162|   9.73|          1|
|      Aalborg Hallen|   Europa Plads|                4|       9000| 57.044|  9.913|          2|
|Aalborg Storcente...|       Hobrovej|              452|       9200| 57.005|  9.876|          3|
|Aalborg Storcente...|       Hobrovej|              452|       9200| 57.005|  9.876|          4|
|         Aalborg Syd|       Hobrovej|              440|       9200| 57.005|  9.881|          5|
+--------------------+---------------+-----------------+-----------+-------+-------+-----------+
only showing top 5 rows

##### - Renaming the columns as per requirement

In [14]:
DIM_LOCATION = dim_location.withColumnRenamed('atm_location','location')\
                            .withColumnRenamed('atm_street_name','streetname')\
                            .withColumnRenamed('atm_street_number','street_number')\
                            .withColumnRenamed('atm_zipcode','zipcode')\
                            .withColumnRenamed('atm_lat','lat')\
                            .withColumnRenamed('atm_lon','lon')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### - Rearranging the columns according to the given target model

In [15]:
DIM_LOCATION = DIM_LOCATION.select("location_id",'location','streetname',"street_number","zipcode","lat","lon")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
DIM_LOCATION.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+------------+-------------+-------+------+-----+
|location_id|            location|  streetname|street_number|zipcode|   lat|  lon|
+-----------+--------------------+------------+-------------+-------+------+-----+
|          1|             Aabybro|ÃƒËœstergade|            6|   9440|57.162| 9.73|
|          2|      Aalborg Hallen|Europa Plads|            4|   9000|57.044|9.913|
|          3|Aalborg Storcente...|    Hobrovej|          452|   9200|57.005|9.876|
|          4|Aalborg Storcente...|    Hobrovej|          452|   9200|57.005|9.876|
|          5|         Aalborg Syd|    Hobrovej|          440|   9200|57.005|9.881|
+-----------+--------------------+------------+-------------+-------+------+-----+
only showing top 5 rows

##### - List of columns in the final data set

In [17]:
DIM_LOCATION.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['location_id', 'location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon']

##### - Total count of distinct records in final data set (DIM_LOCATION) meaning there are 109 different ATM locations as per the given data set.

In [18]:
DIM_LOCATION.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

### 2. ATM Dimension - second dimension table

##### -  Creating second data dimension for ATM dataframe, with all details for ATM identifier

In [19]:
dim_table2 = file1.select('atm_id', 'atm_manufacturer', 'atm_lat', 'atm_lon')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### - Renaming the column atm_id to atm_number as per requirement

In [20]:
dim_table2 = dim_table2.withColumnRenamed('atm_id','atm_number')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### - Joining the DIM_LOCATION dataframe and (dim_table2) ATM dataframes on columns atm_lat and atm_lon

In [21]:
dim_table2 = dim_table2.join(dim_location, on = ['atm_lat', 'atm_lon'], how = 'left')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
dim_table2.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['atm_lat', 'atm_lon', 'atm_number', 'atm_manufacturer', 'atm_location', 'atm_street_name', 'atm_street_number', 'atm_zipcode', 'location_id']

##### - Selecting only required columns for final data set ATM Dimension 

In [23]:
DIM_ATM = dim_table2.select("atm_number","atm_manufacturer","location_id").distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
DIM_ATM.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

156

##### - Renaming the location_id to atm_location_id for ATM data set

In [25]:
DIM_ATM = DIM_ATM.withColumnRenamed("location_id","atm_location_id")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
DIM_ATM.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['atm_number', 'atm_manufacturer', 'atm_location_id']

##### - Creating Primary key (**atm_id**) for ATM dimension (**DIM_ATM**)

In [27]:
window_spec = Window.orderBy("atm_number")
DIM_ATM = DIM_ATM.withColumn("atm_id", row_number().over(window_spec))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### - Rearranging the columns according to the given target model

In [28]:
DIM_ATM = DIM_ATM.select("atm_id","atm_number","atm_manufacturer","atm_location_id")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### - List of columns in the final data set

In [29]:
DIM_ATM.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['atm_id', 'atm_number', 'atm_manufacturer', 'atm_location_id']

In [30]:
DIM_ATM.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|     1|         1|             NCR|             74|
|     2|        10|             NCR|             76|
|     3|       100|             NCR|             56|
|     4|       100|             NCR|             86|
|     5|       100|             NCR|             87|
+------+----------+----------------+---------------+
only showing top 5 rows

##### - Total count of distinct records in final data set (DIM_ATM) meaning there are 156 different ATMs.

In [31]:
DIM_ATM.distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

156

### 3. Date Dimension - third dimension table

##### -  Creating third data dimension for Date dataframe, with all details of various dates of transactions

In [32]:
dim_table3 = file1.select("Year","Month","Day","Weekday","Hour")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### - Creating a new column "full_date_time" as per requirement by concatenating year, month, day, hour columns of data set

In [33]:
dim_table3 = dim_table3.withColumn("full_date", concat_ws('-', dim_table3.Year, dim_table3.Month, dim_table3.Day))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
dim_table3 = dim_table3.withColumn("full_time", concat_ws(':', dim_table3.Hour, lit('00'), lit('00')))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
dim_table3.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+-------+----+--------------+---------+
|Year|  Month|Day|Weekday|Hour|     full_date|full_time|
+----+-------+---+-------+----+--------------+---------+
|2017|January|  1| Sunday|   0|2017-January-1|  0:00:00|
|2017|January|  1| Sunday|   0|2017-January-1|  0:00:00|
+----+-------+---+-------+----+--------------+---------+
only showing top 2 rows

In [36]:
dim_table3 = dim_table3.withColumn("full_date_time", concat_ws(' ' , dim_table3.full_date, dim_table3.full_time))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
pattern = 'yyyy-MMM-dd HH:mm:ss'
dim_table3 = dim_table3.withColumn('full_date_time', unix_timestamp(dim_table3['full_date_time'],pattern).cast('timestamp'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
dim_table3.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+-------+----+--------------+---------+-------------------+
|Year|  Month|Day|Weekday|Hour|     full_date|full_time|     full_date_time|
+----+-------+---+-------+----+--------------+---------+-------------------+
|2017|January|  1| Sunday|   0|2017-January-1|  0:00:00|2017-01-01 00:00:00|
|2017|January|  1| Sunday|   0|2017-January-1|  0:00:00|2017-01-01 00:00:00|
+----+-------+---+-------+----+--------------+---------+-------------------+
only showing top 2 rows

In [46]:
dim_date = dim_table3.select("full_date_time","Year","Month","Day","Weekday","Hour").distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### - Creating primary key (date_id) over timestamp

In [47]:
window_spec = Window.orderBy("full_date_time")
DIM_DATE = dim_date.withColumn("date_id", row_number().over(window_spec))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [48]:
DIM_DATE = DIM_DATE.select("date_id","full_date_time","Year","Month","Day","Weekday","Hour")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
DIM_DATE.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+----+-------+---+-------+----+
|date_id|     full_date_time|Year|  Month|Day|Weekday|Hour|
+-------+-------------------+----+-------+---+-------+----+
|      1|2017-01-01 00:00:00|2017|January|  1| Sunday|   0|
|      2|2017-01-01 01:00:00|2017|January|  1| Sunday|   1|
|      3|2017-01-01 02:00:00|2017|January|  1| Sunday|   2|
|      4|2017-01-01 03:00:00|2017|January|  1| Sunday|   3|
|      5|2017-01-01 04:00:00|2017|January|  1| Sunday|   4|
+-------+-------------------+----+-------+---+-------+----+
only showing top 5 rows

##### - Total count of records in DIM_DATE dataframe

In [50]:
DIM_DATE.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8685

### 4. Card Dimension - 4th dimension table

##### -  Creating 4th data dimension for Card Type dataframe, with the details of various types of cards used for atm transactions

In [72]:
dim_table4 = file1.select("card_type").distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### - Creating primary key (card_type_id) to identify unique card type

In [73]:
window_spec = Window.orderBy("card_type")
DIM_CARD_TYPE = dim_table4.withColumn("card_type_id", row_number().over(window_spec))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [74]:
DIM_CARD_TYPE.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+------------+
|          card_type|card_type_id|
+-------------------+------------+
|             CIRRUS|           1|
|            Dankort|           2|
|    Dankort - on-us|           3|
|        HÃƒÂ¦vekort|           4|
|HÃƒÂ¦vekort - on-us|           5|
+-------------------+------------+
only showing top 5 rows

In [75]:
# rearranging the columns according to the given target model
DIM_CARD_TYPE = DIM_CARD_TYPE.select("card_type_id","card_type")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### - Total count of card types available in the data set

In [76]:
DIM_CARD_TYPE.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12

### Creating the Transaction Fact Table as per given target model

##### - Creating fact table for transactions with transaction amount and unique transaction id (trans_id)

In [56]:
file1.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['Year', 'Month', 'Day', 'Weekday', 'Hour', 'atm_status', 'atm_id', 'atm_manufacturer', 'atm_location', 'atm_street_name', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon', 'currency', 'card_type', 'transaction_amount', 'service', 'message_code', 'message_text', 'weather_lat', 'weather_long', 'weather_city_id', 'weather_city_name', 'temp', 'pressure', 'humidity', 'wind_speed', 'wind_deg', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description']

##### - Renaming the columns of original dataframe to match the DIM_LOCATON dataframe to perform the join operation between both data sets

In [61]:
fact_table = file1.withColumnRenamed("atm_location","location").withColumnRenamed("atm_street_name","streetname")\
                  .withColumnRenamed("atm_street_number","street_number").withColumnRenamed("atm_zipcode","zipcode")\
                  .withColumnRenamed("atm_zipcode","zipcode").withColumnRenamed("atm_lat","lat").withColumnRenamed("atm_lon","lon")          

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### - Joining the data frames (fact_table LEFT JOIN DIM_LOCATION)

In [62]:
fact_table = fact_table.join(DIM_LOCATION, on = ['location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon'], how = "left")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [64]:
fact_table.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon', 'Year', 'Month', 'Day', 'Weekday', 'Hour', 'atm_status', 'atm_id', 'atm_manufacturer', 'currency', 'card_type', 'transaction_amount', 'service', 'message_code', 'message_text', 'weather_lat', 'weather_long', 'weather_city_id', 'weather_city_name', 'temp', 'pressure', 'humidity', 'wind_speed', 'wind_deg', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description', 'location_id']

#### - Joining the data frames (fact_table LEFT JOIN DIM_ATM)

In [65]:
fact_table = fact_table.withColumnRenamed("atm_id","atm_number").withColumnRenamed("location_id","atm_location_id")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [66]:
fact_table = fact_table.join(DIM_ATM, on = ['atm_number', 'atm_manufacturer', 'atm_location_id'], how = "left")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [67]:
fact_table.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------------+---------------+----------+-----------+-------------+-------+------+------+----+-------+---+-------+----+----------+--------+----------+------------------+----------+------------+------------+-----------+------------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+------+
|atm_number|atm_manufacturer|atm_location_id|  location| streetname|street_number|zipcode|   lat|   lon|Year|  Month|Day|Weekday|Hour|atm_status|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_long|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|atm_id|
+----------+----------------+---------------+----------+-----------+-------------+-------+------+------+----+-------+---+-------+----+----------+--------+----------+------------------+----------+------------+

In [68]:
fact_table.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['atm_number', 'atm_manufacturer', 'atm_location_id', 'location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon', 'Year', 'Month', 'Day', 'Weekday', 'Hour', 'atm_status', 'currency', 'card_type', 'transaction_amount', 'service', 'message_code', 'message_text', 'weather_lat', 'weather_long', 'weather_city_id', 'weather_city_name', 'temp', 'pressure', 'humidity', 'wind_speed', 'wind_deg', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description', 'atm_id']

#### - Joining the data frames (fact_table LEFT JOIN DIM_DATE)

In [69]:
fact_table = fact_table.join(DIM_DATE, on = ["Year","Month","Day","Weekday","Hour"], how = "left")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [70]:
fact_table.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['Year', 'Month', 'Day', 'Weekday', 'Hour', 'atm_number', 'atm_manufacturer', 'atm_location_id', 'location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon', 'atm_status', 'currency', 'card_type', 'transaction_amount', 'service', 'message_code', 'message_text', 'weather_lat', 'weather_long', 'weather_city_id', 'weather_city_name', 'temp', 'pressure', 'humidity', 'wind_speed', 'wind_deg', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description', 'atm_id', 'date_id', 'full_date_time']

#### - Joining the dataframe with DIM_CARD_TYPE

In [77]:
fact_table = fact_table.join(DIM_CARD_TYPE, on = ['card_type'], how = "left")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [78]:
fact_table.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['card_type', 'Year', 'Month', 'Day', 'Weekday', 'Hour', 'atm_number', 'atm_manufacturer', 'atm_location_id', 'location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon', 'atm_status', 'currency', 'transaction_amount', 'service', 'message_code', 'message_text', 'weather_lat', 'weather_long', 'weather_city_id', 'weather_city_name', 'temp', 'pressure', 'humidity', 'wind_speed', 'wind_deg', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description', 'atm_id', 'date_id', 'full_date_time', 'card_type_id']

##### - Renaming the atm_location_id to (weather_loc_id) as per given fact table schema

In [84]:
fact_table = fact_table.withColumnRenamed("atm_location_id","weather_loc_id")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### - Creating primary key for fact_table (trans_id) to uniquely identify each transaction

In [85]:
 window_spec = Window.orderBy("date_id")
FACT_ATM_TRANS = fact_table.withColumn("trans_id", row_number().over(window_spec))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### - Choosing and re-arranging the columns as per the required given model

In [86]:
FACT_ATM_TRANS = FACT_ATM_TRANS.select('trans_id', 'atm_id', 'weather_loc_id', 'date_id', 'card_type_id', 
                                       'atm_status', 'currency', 'service', 'transaction_amount', 'message_code',
                                       'message_text', 'rain_3h','clouds_all', 'weather_id', 'weather_main',
                                       'weather_description')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [87]:
FACT_ATM_TRANS.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|trans_id|atm_id|weather_loc_id|date_id|card_type_id|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|       1|    37|           105|      1|           9|    Active|     DKK|Withdrawal|              4062|        null|        null|    0.0|        75|       300|     Drizzle|light intensity d...|
|       2|    95|            25|      1|           9|    Active|     DKK|Withdrawal|              7452|        null|        null|   0.29|        92|       500|        Rain|          light rain|
|       3|    95|            2

##### - List of columns in the final fact table (FACT_ATM_TRANS)

In [88]:
FACT_ATM_TRANS.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['trans_id', 'atm_id', 'weather_loc_id', 'date_id', 'card_type_id', 'atm_status', 'currency', 'service', 'transaction_amount', 'message_code', 'message_text', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main', 'weather_description']

##### - Total count of records in fact table

In [89]:
FACT_ATM_TRANS.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

## Loading the PySpark dimension and fact tables into Amazon S3 bucket

##### - Transferring DIM_LOCATION dataframe to AWS S3 table

In [91]:
DIM_LOCATION.write.format('csv').option('header','false').save('s3://atmdatasetforetl/dim_location/',mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### - Transferring DIM_ATM dataframe to AWS S3 table

In [92]:
DIM_ATM.write.format('csv').option('header','false').save('s3://atmdatasetforetl/dim_atm/',mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### - Transferring DIM_DATE dataframe to AWS S3 table

In [93]:
DIM_DATE.write.format('csv').option('header','false').save('s3://atmdatasetforetl/dim_date/',mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### - Transferring DIM_CARD_TYPE dataframe to AWS S3 table

In [94]:
DIM_CARD_TYPE.write.format('csv').option('header','false').save('s3://atmdatasetforetl/dim_card_type/',mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### - Transferring FACT_ATM_TRANS dataframe to AWS S3 table

In [96]:
FACT_ATM_TRANS.write.format('csv').option('header','false').save('s3://atmdatasetforetl/fact_atm_trans/',mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### - All the dataframes created above (4 dimension tables and 1 fact table) are transferred to AWS S3 location for further usage in Redshift queries.