### ETL Project (Spar Nord Bank ATM Data Mart) by Simran Singh

#### Importing necessary libraries and setting up SparkSession

In [1]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre"
os.environ["SPARK_HOME"]="/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('jupyter_Spark').master("local").getOrCreate()
spark

### Reading the data to Spark

#### Reading the data from HDFS, using default schema at first, just to make sure data is getting loaded

In [3]:
df = spark.read.csv("/user/root/SRC_ATM_TRANS/part-m-00000", header = False, inferSchema = True)

In [4]:
df.show(1)

+----+-------+---+------+---+------+---+---+----------+-----------+----+----+------+------+----+----------+----+----------+----+----+-----+------+-------+--------+------+----+----+----+----+-----+----+----+----+----------+
| _c0|    _c1|_c2|   _c3|_c4|   _c5|_c6|_c7|       _c8|        _c9|_c10|_c11|  _c12|  _c13|_c14|      _c15|_c16|      _c17|_c18|_c19| _c20|  _c21|   _c22|    _c23|  _c24|_c25|_c26|_c27|_c28| _c29|_c30|_c31|_c32|      _c33|
+----+-------+---+------+---+------+---+---+----------+-----------+----+----+------+------+----+----------+----+----------+----+----+-----+------+-------+--------+------+----+----+----+----+-----+----+----+----+----------+
|2017|January|  1|Sunday|  0|Active|  1|NCR|NÃƒÂ¦stved|Farimagsvej|   8|4700|55.233|11.763| DKK|MasterCard|5643|Withdrawal|null|null|55.23|11.761|2616038|Naestved|281.15|1014|  87|   7| 260|0.215|  92| 500|Rain|light rain|
+----+-------+---+------+---+------+---+---+----------+-----------+----+----+------+------+----+----------+-

####  Creating custom input schema using StrucType and reading the data 

In [5]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

In [6]:
Schema = StructType([StructField('year', IntegerType(), nullable = True),
                        StructField('month', StringType(), True),
                        StructField('day', IntegerType(), True),
                        StructField('weekday', StringType(), True),
                        StructField('hour', IntegerType(), True),
                        StructField('atm_status', StringType(), True),
                        StructField('atm_id', StringType(), True),
                        StructField('atm_manufacturer', StringType(), True),
                        StructField('atm_location', StringType(), True),
                        StructField('atm_streetname', StringType(), True),
                        StructField('atm_street_number', IntegerType(), True),
                        StructField('atm_zipcode', IntegerType(), True),
                        StructField('atm_lat', DoubleType(), True),
                        StructField('atm_lon', DoubleType(), True),
                        StructField('currency', StringType(), True),
                        StructField('card_type', StringType(), True),
                        StructField('transaction_amount', IntegerType(), True),
                        StructField('service', StringType(), True),
                        StructField('message_code', StringType(), True),
                        StructField('message_text', StringType(), True),
                        StructField('weather_lat', DoubleType(), True),
                        StructField('weather_lon', DoubleType(), True),
                        StructField('weather_city_id', IntegerType(), True),
                        StructField('weather_city_name', StringType(), True),
                        StructField('temp', DoubleType(), True),
                        StructField('pressure', IntegerType(), True),
                        StructField('humidity', IntegerType(), True),
                        StructField('wind_speed', IntegerType(), True),
                        StructField('wind_deg', IntegerType(), True),
                        StructField('rain_3h', DoubleType(), True),
                        StructField('clouds_all', IntegerType(), True),
                        StructField('weather_id', IntegerType(), True),
                        StructField('weather_main', StringType(), True),
                        StructField('weather_description', StringType(), True)])

In [7]:
df = spark.read.csv("/user/root/SRC_ATM_TRANS/part-m-00000", header = False, schema = Schema)

#### Verifying the count of the records loaded into the Dataframe

In [8]:
df.select('*').count()

2468572

#### Taking a glance at the Schema, Dataframe and the Columns

In [9]:
df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [10]:
df.show(1)

+----+-------+---+-------+----+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+------

In [11]:
df.columns

['year',
 'month',
 'day',
 'weekday',
 'hour',
 'atm_status',
 'atm_id',
 'atm_manufacturer',
 'atm_location',
 'atm_streetname',
 'atm_street_number',
 'atm_zipcode',
 'atm_lat',
 'atm_lon',
 'currency',
 'card_type',
 'transaction_amount',
 'service',
 'message_code',
 'message_text',
 'weather_lat',
 'weather_lon',
 'weather_city_id',
 'weather_city_name',
 'temp',
 'pressure',
 'humidity',
 'wind_speed',
 'wind_deg',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description']

---------

### Creating the Dimension and Fact tables

#### Creating a dataframe for Location Dimension according to Target Dimension Model

In [12]:
# creating a temporary df and selecting required columns & making sure records are distinct
location = df.select('atm_location', 'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon').distinct()

In [13]:
from pyspark.sql.functions import *

In [14]:
# creating the primary key column
df_temp = location.rdd.zipWithIndex().toDF()
dim_location = df_temp.select(col("_1.*"),col("_2").alias('location_id'))
dim_location.show(5)

+-----------------+----------------+-----------------+-----------+-------+-------+-----------+
|     atm_location|  atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|location_id|
+-----------------+----------------+-----------------+-----------+-------+-------+-----------+
|            Vadum|  Ellehammersvej|               43|       9430| 57.118|  9.861|          0|
|         Slagelse| Mariendals Alle|               29|       4200| 55.398| 11.342|          1|
|       Fredericia|SjÃƒÂ¦llandsgade|               33|       7000| 55.564|  9.757|          2|
|          Kolding|        Vejlevej|              135|       6000| 55.505|  9.457|          3|
|HÃƒÂ¸rning Hallen|        Toftevej|               53|       8362| 56.091| 10.033|          4|
+-----------------+----------------+-----------------+-----------+-------+-------+-----------+
only showing top 5 rows



In [15]:
# renaming the colums as per requirement
DIM_LOCATION = dim_location.withColumnRenamed('atm_location','location').withColumnRenamed('atm_streetname','streetname').withColumnRenamed('atm_street_number','street_number').withColumnRenamed('atm_zipcode','zipcode').withColumnRenamed('atm_lat','lat').withColumnRenamed('atm_lon','lon')

In [16]:
# rearranging the columns according to the target model
DIM_LOCATION = DIM_LOCATION.select('location_id', 'location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon')

In [17]:
# checking that all required columns are present and named correctly
DIM_LOCATION.columns

['location_id',
 'location',
 'streetname',
 'street_number',
 'zipcode',
 'lat',
 'lon']

In [18]:
# validating the count of the dataframe
DIM_LOCATION.select('*').count()

109

--------------

#### Creating a dataframe for ATM Dimension according to Target Dimension Model

In [19]:
# creating a temporary df and selecting required columns
atm = df.select('atm_id', 'atm_manufacturer', 'atm_lat', 'atm_lon')

In [20]:
# renaming the column atm_id to atm_number as per requirement
atm = atm.withColumnRenamed('atm_id', 'atm_number')

In [21]:
# joining the dim_location and atm dataframes
atm = atm.join(dim_location, on = ['atm_lat', 'atm_lon'], how = "left")

In [22]:
# checking to columns in the joined df
atm.columns

['atm_lat',
 'atm_lon',
 'atm_number',
 'atm_manufacturer',
 'atm_location',
 'atm_streetname',
 'atm_street_number',
 'atm_zipcode',
 'location_id']

In [23]:
# selecting the required columns and making sure records are distinct
atm = atm.select('atm_number', 'atm_manufacturer', 'location_id').distinct()

In [24]:
# renaming the colums as per requirement
atm = atm.withColumnRenamed('location_id', 'atm_location_id')

In [25]:
# viewing changes in columns
atm.columns

['atm_number', 'atm_manufacturer', 'atm_location_id']

In [26]:
# creating the primary key column
df_temp = atm.rdd.zipWithIndex().toDF()
dim_atm = df_temp.select(col("_1.*"),col("_2").alias('atm_id'))
dim_atm.show(5)

+----------+----------------+---------------+------+
|atm_number|atm_manufacturer|atm_location_id|atm_id|
+----------+----------------+---------------+------+
|        93|             NCR|            100|     0|
|        68|             NCR|             80|     1|
|        99|             NCR|             80|     2|
|       106|             NCR|             53|     3|
|        59| Diebold Nixdorf|             89|     4|
+----------+----------------+---------------+------+
only showing top 5 rows



In [27]:
# rearranging the columns according to the target model
DIM_ATM = dim_atm.select('atm_id', 'atm_number', 'atm_manufacturer', 'atm_location_id')

In [28]:
# checking that all required columns are present and named correctly
DIM_ATM.columns

['atm_id', 'atm_number', 'atm_manufacturer', 'atm_location_id']

In [29]:
# validating the count of the dataframe
DIM_ATM.select('*').count()

156

------------

#### Creating a dataframe for Date Dimension according to Target Dimension Model

In [30]:
# creating a temporary df and selecting required columns
date = df.select('year', 'month', 'day', 'hour', 'weekday')

In [31]:
date = date.withColumn('full_date', concat_ws('-', date.year, date.month, date.day))

In [32]:
date = date.withColumn('full_time', concat_ws(':', date.hour, lit('00'), lit('00')))

In [33]:
date = date.withColumn('full_date_time', concat_ws(' ', date.full_date, date.full_time))

In [34]:
pattern = 'yyyy-MMM-dd HH:mm:ss'
date = date.withColumn('full_date_time', unix_timestamp(date.full_date_time, pattern).cast('timestamp'))

In [35]:
date.show(5, truncate = False)

+----+-------+---+----+-------+--------------+---------+-------------------+
|year|month  |day|hour|weekday|full_date     |full_time|full_date_time     |
+----+-------+---+----+-------+--------------+---------+-------------------+
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|
|2017|January|1  |0   |Sunday |2017-January-1|0:00:00  |2017-01-01 00:00:00|
+----+-------+---+----+-------+--------------+---------+-------------------+
only showing top 5 rows



In [36]:
# selecting the required columns and making sure records are distinct
date = date.select('year', 'month', 'day', 'hour', 'weekday', 'full_date_time').distinct()

In [37]:
# creating the primary key column
df_temp = date.rdd.zipWithIndex().toDF()
DIM_DATE = df_temp.select(col("_1.*"),col("_2").alias('date_id'))
DIM_DATE.show(5)

+----+-------+---+----+---------+-------------------+-------+
|year|  month|day|hour|  weekday|     full_date_time|date_id|
+----+-------+---+----+---------+-------------------+-------+
|2017|January|  1|  12|   Sunday|2017-01-01 12:00:00|      0|
|2017|January|  6|   8|   Friday|2017-01-06 08:00:00|      1|
|2017|January|  8|  16|   Sunday|2017-01-08 16:00:00|      2|
|2017|January| 16|  18|   Monday|2017-01-16 18:00:00|      3|
|2017|January| 18|   7|Wednesday|2017-01-18 07:00:00|      4|
+----+-------+---+----+---------+-------------------+-------+
only showing top 5 rows



In [38]:
# rearranging the columns according to the target model
DIM_DATE = DIM_DATE.select('date_id', 'full_date_time', 'year', 'month', 'day', 'hour', 'weekday')

In [39]:
# checking that all required columns are present and named correctly
DIM_DATE.columns

['date_id', 'full_date_time', 'year', 'month', 'day', 'hour', 'weekday']

In [40]:
# validating the count of the dataframe
DIM_DATE.select('*').count()

8685

--------------

#### Creating a dataframe for Card Type Dimension according to Target Dimension Model

In [41]:
# creating a temporary df and selecting required columns and making sure records are distinct
card_type = df.select('card_type').distinct()

In [42]:
# creating the primary key column
df_temp = card_type.rdd.zipWithIndex().toDF()
DIM_CARD_TYPE = df_temp.select(col("_1.*"),col("_2").alias('card_type_id'))
DIM_CARD_TYPE.show(5)

+------------------+------------+
|         card_type|card_type_id|
+------------------+------------+
|   Dankort - on-us|           0|
|            CIRRUS|           1|
|       HÃƒÂ¦vekort|           2|
|              VISA|           3|
|Mastercard - on-us|           4|
+------------------+------------+
only showing top 5 rows



In [43]:
# rearranging the columns according to the target model
DIM_CARD_TYPE = DIM_CARD_TYPE.select('card_type_id', 'card_type')

In [44]:
# checking that all required columns are present and named correctly
DIM_CARD_TYPE.columns

['card_type_id', 'card_type']

In [45]:
# validating the count of the dataframe
DIM_CARD_TYPE.select('*').count()

12

-------

#### Creating the Transaction Fact Table according to Target Model

In [46]:
# Stage 1 of FACT_ATM_TRANS Table -> joining original dataframe with DIM_LOC

In [47]:
# renaming the colums as per requirement
fact_loc = df.withColumnRenamed('atm_location','location').withColumnRenamed('atm_streetname','streetname').withColumnRenamed('atm_street_number','street_number').withColumnRenamed('atm_zipcode','zipcode').withColumnRenamed('atm_lat','lat').withColumnRenamed('atm_lon','lon')

In [48]:
# joining the dfs
fact_loc = fact_loc.join(DIM_LOCATION, on = ['location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon'], how = "left")

In [49]:
# viewing the columns
fact_loc.columns

['location',
 'streetname',
 'street_number',
 'zipcode',
 'lat',
 'lon',
 'year',
 'month',
 'day',
 'weekday',
 'hour',
 'atm_status',
 'atm_id',
 'atm_manufacturer',
 'currency',
 'card_type',
 'transaction_amount',
 'service',
 'message_code',
 'message_text',
 'weather_lat',
 'weather_lon',
 'weather_city_id',
 'weather_city_name',
 'temp',
 'pressure',
 'humidity',
 'wind_speed',
 'wind_deg',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description',
 'location_id']

In [50]:
# Validating the count of the df at the end of Stage 1
fact_loc.select('*').count()

2468572

In [51]:
# Stage 2 of FACT_ATM_TRANS Table -> joining the dataframe with DIM_ATM

In [52]:
# renaming the colums as per requirement
fact_loc = fact_loc.withColumnRenamed('atm_id', 'atm_number').withColumnRenamed('location_id', 'atm_location_id')

In [53]:
# joining the dfs
fact_atm = fact_loc.join(DIM_ATM, on = ['atm_number', 'atm_manufacturer', 'atm_location_id'], how = "left")

In [54]:
# performing necessary transformations, same as done to atm table
fact_atm = fact_atm.withColumnRenamed('atm_location_id', 'weather_loc_id')

In [55]:
# Validating the count of the df at the end of Stage 2
fact_atm.select('*').count()

2468572

In [56]:
# Stage 3 of FACT_ATM_TRANS Table -> joining the dataframe with DIM_DATE

In [57]:
# joining the dfs
fact_date = fact_atm.join(DIM_DATE, on = ['year', 'month', 'day', 'hour', 'weekday'], how = "left")

In [58]:
# Validating the count of the df at the end of Stage 3
fact_date.select('*').count()

2468572

In [59]:
# Stage 4 of FACT_ATM_TRANS Table -> joining the dataframe with DIM_CARD_TYPE

In [60]:
# joining the dfs
fact_atm_trans = fact_date.join(DIM_CARD_TYPE, on = ['card_type'], how = "left")

In [61]:
# Validating the count of the df at the end of Stage 4
fact_atm_trans.select('*').count()

2468572

In [62]:
# creating primary key of fact table and viewing 1st record of the table
from pyspark.sql.window import Window

w = Window().orderBy('date_id')
FACT_ATM_TRANS = fact_atm_trans.withColumn("trans_id", row_number().over(w))
FACT_ATM_TRANS.show(1, True)

+---------------+----+-------+---+----+-------+----------+----------------+--------------+---------+--------------------+-------------+-------+------+----+----------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+------+-------+-------------------+------------+--------+
|      card_type|year|  month|day|hour|weekday|atm_number|atm_manufacturer|weather_loc_id| location|          streetname|street_number|zipcode|   lat| lon|atm_status|currency|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|   temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|atm_id|date_id|     full_date_time|card_type_id|trans_id|
+---------------+----+-------+---+----+-------+----------+----------------+-------------

In [63]:
# viewing the list of columns
FACT_ATM_TRANS.columns

['card_type',
 'year',
 'month',
 'day',
 'hour',
 'weekday',
 'atm_number',
 'atm_manufacturer',
 'weather_loc_id',
 'location',
 'streetname',
 'street_number',
 'zipcode',
 'lat',
 'lon',
 'atm_status',
 'currency',
 'transaction_amount',
 'service',
 'message_code',
 'message_text',
 'weather_lat',
 'weather_lon',
 'weather_city_id',
 'weather_city_name',
 'temp',
 'pressure',
 'humidity',
 'wind_speed',
 'wind_deg',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description',
 'atm_id',
 'date_id',
 'full_date_time',
 'card_type_id',
 'trans_id']

In [64]:
# selecting and arranging only the required columns according to the target model
FACT_ATM_TRANS = FACT_ATM_TRANS.select('trans_id', 'atm_id', 'weather_loc_id', 'date_id', 'card_type_id', 
'atm_status', 'currency', 'service', 'transaction_amount', 'message_code', 'message_text', 'rain_3h', 
'clouds_all', 'weather_id', 'weather_main', 'weather_description')

In [65]:
# checking that all required columns are present and named correctly
FACT_ATM_TRANS.columns

['trans_id',
 'atm_id',
 'weather_loc_id',
 'date_id',
 'card_type_id',
 'atm_status',
 'currency',
 'service',
 'transaction_amount',
 'message_code',
 'message_text',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description']

In [66]:
# validating the count of the dataframe
FACT_ATM_TRANS.select('*').count()

2468572

--------

### Writing the PySpark Dataframes to AWS S3 Storage in csv format

In [67]:
# writing data from pyspark df 'dim_location' in csv format to dim_location folder in S3 bucket 'etlprojectbysimran'
DIM_LOCATION.coalesce(1).write.format('csv').option('header','false').save('s3a://etlprojectbysimran/dim_location', mode='overwrite')

In [68]:
# writing data from pyspark df 'dim_atm' in csv format to dim_atm folder in S3 bucket 'etlprojectbysimran'
DIM_ATM.coalesce(1).write.format('csv').option('header','false').save('s3a://etlprojectbysimran/dim_atm', mode='overwrite')

In [69]:
# writing data from pyspark df 'dim_data' in csv format to dim_data folder in S3 bucket 'etlprojectbysimran'
DIM_DATE.coalesce(1).write.format('csv').option('header','false').save('s3a://etlprojectbysimran/dim_date', mode='overwrite')

In [70]:
# writing data from pyspark df 'dim_card_type' in csv format to dim_card_type folder in S3 bucket 'etlprojectbysimran'
DIM_CARD_TYPE.coalesce(1).write.format('csv').option('header','false').save('s3a://etlprojectbysimran/dim_card_type', mode='overwrite')

In [71]:
# writing data from pyspark df 'fact_atm_trans' in csv format to fact_atm_trans folder in S3 bucket 'etlprojectbysimran'
FACT_ATM_TRANS.coalesce(1).write.format('csv').option('header','false').save('s3a://etlprojectbysimran/fact_atm_trans', mode='overwrite')