# 1. PySpark Setup

In [1]:
# Setting the environment for the PySpark
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre"
os.environ["SPARK_HOME"]="/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
# Creating a Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkETLCode').master("local").getOrCreate()
spark

In [3]:
# Accessing SparkContext from SparkSession
sc = spark.sparkContext
sc

# 2. Importing the Bank Data from HDFS

Data is loaded using the files in the provided location of HDFS and with a well-defined schema.
The total count, schema and data is verified.

In [4]:
# Creating a schema that matches with the data present in the HDFS.

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, BinaryType, FloatType
fileSchema = StructType([StructField('year', IntegerType(),True),
                        StructField('month', StringType(),True),
                        StructField('day', IntegerType(),True),
                        StructField('weekday', StringType(),True),
                        StructField('hour', IntegerType(),True),
                        StructField('atm_status', StringType(),True),
                        StructField('atm_id', StringType(),True),
                        StructField('atm_manufacturer', StringType(),True),
                        StructField('atm_location', StringType(),True),
                        StructField('atm_streetname', StringType(),True),
                        StructField('atm_street_number', IntegerType(),True),
                        StructField('atm_zipcode', IntegerType(),True),
                        StructField('atm_lat', DoubleType(),True),
                        StructField('atm_lon', DoubleType(),True),
                        StructField('currency', StringType(),True),
                        StructField('card_type', StringType(),True),
                        StructField('transaction_amount', IntegerType(),True),
                        StructField('service', StringType(),True), 
                        StructField('message_code', StringType(),True),
                        StructField('message_text', StringType(),True),
                        StructField('weather_lat', DoubleType(),True),
                        StructField('weather_lon', DoubleType(),True),
                        StructField('weather_city_id', IntegerType(),True),
                        StructField('weather_city_name', StringType(),True),
                        StructField('temp', DoubleType(),True), 
                        StructField('pressure', IntegerType(),True),
                        StructField('humidity', IntegerType(),True),
                        StructField('wind_speed', IntegerType(),True),
                        StructField('wind_deg', IntegerType(),True),
                        StructField('rain_3h', DoubleType(),True),
                        StructField('clouds_all', IntegerType(),True),
                        StructField('weather_id', IntegerType(),True), 
                        StructField('weather_main', StringType(),True),
                        StructField('weather_description', StringType(),True),
                        ])

In [5]:
# Loading the data in the dataframe using schema as defined above.
bank_data = spark.read.load("/user/root/ETL_Project/bank_data_import/", format="csv", schema = fileSchema)

In [6]:
# Printing the schema for the imported data
bank_data.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [7]:
# Verifying the total number of rows loaded in spark
bank_data.count()

2468572

In [8]:
# Displaying the first row to verify if there are any errors in the schema or the parameters passed while importing.
bank_data.show(5, truncate=False)

+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-----------------------+
|year|month  |day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|atm_streetname     |atm_street_number|atm_zipcode|atm_lat|atm_lon|currency|card_type |transaction_amount|service   |message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|temp  |pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description    |
+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+--------

# 3. Creation of Dimension Tables using Loaded Data

#### 1. Creating DIM_LOCATION table using the bank_data dataframe

- The required columns are selected from the loaded data into a dataframe. Alias are provided wherever required.
- Duplicates are removed from the dataframe.
- A unique row id is assigned to every row in the dataframe.
- Columns are rearranged and count is verified after operations.

In [9]:
# Importing pyspark functions and windows
from pyspark.sql.functions import *
from pyspark.sql.window import *

In [10]:
# Selecting required columns from the dataframe to create the required table.
dim_location = bank_data.select(col("atm_location").alias("location"),col("atm_streetname").alias("streetname"),
                                col("atm_street_number").alias("street_number"), col("atm_zipcode").alias("zipcode"),
                                col("atm_lat").alias("lat"), col("atm_lon").alias("lon"))

# Dropping duplicates from the dim_location dataframe
dim_location_distincts = dim_location.dropDuplicates()
dim_location_distincts.show(5, truncate=False)

+----------------------+--------------+-------------+-------+------+-----+
|location              |streetname    |street_number|zipcode|lat   |lon  |
+----------------------+--------------+-------------+-------+------+-----+
|NykÃƒÂ¸bing Mors Lobby|Kirketorvet   |1            |7900   |56.795|8.86 |
|Nibe                  |Torvet        |1            |9240   |56.983|9.639|
|Skipperen             |Vestre Alle   |2            |9000   |57.034|9.908|
|Viborg                |Toldboden     |3            |8800   |56.448|9.401|
|Vadum                 |Ellehammersvej|43           |9430   |57.118|9.861|
+----------------------+--------------+-------------+-------+------+-----+
only showing top 5 rows



In [11]:
# Assigning a unique value (primary key) to the location_id column 
window = Window.orderBy(monotonically_increasing_id())
dim_location = dim_location_distincts.withColumn("location_id", row_number().over(window))
def location_Id(value):
    return "location_" + str(value)
location_id_udf = udf(location_Id, StringType())
dim_location = dim_location.withColumn("location_id", location_id_udf("location_id"))

# Rearranging the columns
dim_location = dim_location.select('location_id','location','streetname','street_number','zipcode','lat','lon')
dim_location.show(5, truncate=False)

+-----------+----------------------+--------------+-------------+-------+------+-----+
|location_id|location              |streetname    |street_number|zipcode|lat   |lon  |
+-----------+----------------------+--------------+-------------+-------+------+-----+
|location_1 |NykÃƒÂ¸bing Mors Lobby|Kirketorvet   |1            |7900   |56.795|8.86 |
|location_2 |Nibe                  |Torvet        |1            |9240   |56.983|9.639|
|location_3 |Skipperen             |Vestre Alle   |2            |9000   |57.034|9.908|
|location_4 |Viborg                |Toldboden     |3            |8800   |56.448|9.401|
|location_5 |Vadum                 |Ellehammersvej|43           |9430   |57.118|9.861|
+-----------+----------------------+--------------+-------------+-------+------+-----+
only showing top 5 rows



In [12]:
# Checking the count of the rows in the dim_location table
dim_location.count()

109

#### 2. Creating DIM_ATM table using the bank_data dataframe

- The required columns are selected from the loaded data into a dataframe. Alias are provided wherever required.
- Duplicates are removed from the dataframe.
- The dataframe is joined with the location dataframe using lat and lon columns.
- A unique row id is assigned to every row in the dataframe.
- Columns are rearranged and count is verified after operations.

In [13]:
# Selecting required columns from the dataframe to create the required table.
dim_atm = bank_data.select(col("atm_id").alias("atm_number"), col("atm_manufacturer"),
                           col('atm_lat').alias('lat'),col('atm_lon').alias('lon'))

# Dropping duplicates from the dim_atm dataframe
dim_atm_distincts = dim_atm.dropDuplicates()
dim_atm_distincts.show(5, truncate=False)

+----------+----------------+------+------+
|atm_number|atm_manufacturer|lat   |lon   |
+----------+----------------+------+------+
|113       |Diebold Nixdorf |55.398|11.342|
|54        |NCR             |56.745|8.949 |
|104       |NCR             |57.049|9.922 |
|18        |Diebold Nixdorf |56.448|9.401 |
|8         |NCR             |56.762|8.867 |
+----------+----------------+------+------+
only showing top 5 rows



In [14]:
# Joining the table with Location Table to get the required location_id.
condition = [dim_atm_distincts.lat == dim_location.lat, dim_atm_distincts.lon == dim_location.lon]
dim_atm_joined = dim_atm_distincts.join(dim_location, condition, 'left_outer')

# Assigning a unique value (primary key) to the atm_id column 
window = Window.orderBy(monotonically_increasing_id())
dim_atm_joined = dim_atm_joined.withColumn("atm_id", row_number().over(window))
def atm_Id(value):
    return "atm_" + str(value)
atm_id_udf = udf(atm_Id, StringType())
dim_atm_joined = dim_atm_joined.withColumn("atm_id", atm_id_udf("atm_id"))

# Rearranging the columns
dim_atm_joined = dim_atm_joined.select('atm_id','atm_number','atm_manufacturer',col('location_id').alias('atm_location_id'))
dim_atm_joined.show(5, truncate=False)

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|atm_1 |113       |Diebold Nixdorf |location_104   |
|atm_2 |113       |Diebold Nixdorf |location_90    |
|atm_3 |54        |NCR             |location_93    |
|atm_4 |104       |NCR             |location_63    |
|atm_5 |104       |NCR             |location_25    |
+------+----------+----------------+---------------+
only showing top 5 rows



In [15]:
# Verifing the total count for the dim_atm dimension.
dim_atm_joined.count()

156

#### 3. Creating DIM_CARD_TYPE table using the bank_data dataframe

- The required columns are selected from the loaded data into a dataframe. Alias are provided wherever required.
- Duplicates are removed from the dataframe.
- A unique row id is assigned to every row in the dataframe.
- Columns are rearranged and count is verified after operations.

In [16]:
# Selecting required columns from the dataframe to create the required table.
dim_card_type = bank_data.select(col("card_type"))

# Dropping duplicates from the dim_card_type dataframe
dim_card_type = dim_card_type.dropDuplicates()
dim_card_type.show(5, truncate=False)

+------------------+
|card_type         |
+------------------+
|Dankort - on-us   |
|CIRRUS            |
|HÃƒÂ¦vekort       |
|VISA              |
|Mastercard - on-us|
+------------------+
only showing top 5 rows



In [17]:
# Assigning a unique value (primary key) to the atm_id column 

window = Window.orderBy(monotonically_increasing_id())
dim_card_type = dim_card_type.withColumn("card_type_id", row_number().over(window))
def card_type_Id(value):
    return "card_type_" + str(value)
card_type_udf = udf(card_type_Id, StringType())
dim_card_type = dim_card_type.withColumn("card_type_id", card_type_udf("card_type_id"))

#Rearranging the columns
dim_card_type_old = dim_card_type.select('card_type_id','card_type')
dim_card_type.show(5, truncate=False)

+------------------+------------+
|card_type         |card_type_id|
+------------------+------------+
|Dankort - on-us   |card_type_1 |
|CIRRUS            |card_type_2 |
|HÃƒÂ¦vekort       |card_type_3 |
|VISA              |card_type_4 |
|Mastercard - on-us|card_type_5 |
+------------------+------------+
only showing top 5 rows



In [18]:
# Verifing the total count for the dim_atm dimension.
dim_card_type.count()

12

#### 4. Creating DIM_DATE table using the bank_data dataframe

- The required columns are selected from the loaded data into a dataframe. Alias are provided wherever required.
- Duplicates are removed from the dataframe.
- Date is created with year, month and day first. Then the date is converted in timestamp using expr and to_timestamp function.
- A unique row id is assigned to every row in the dataframe.
- Columns are rearranged and count is verified after operations.

In [19]:
# Selecting required columns from the dataframe to create the required table.
dim_date = bank_data.select(col("year"), "month", "day", "hour", "weekday")

# Dropping duplicates from the dim_date dataframe
dim_date = dim_date.dropDuplicates()
dim_date.show(5, truncate=False)

+----+-------+---+----+--------+
|year|month  |day|hour|weekday |
+----+-------+---+----+--------+
|2017|January|1  |9   |Sunday  |
|2017|January|3  |5   |Tuesday |
|2017|January|8  |19  |Sunday  |
|2017|January|21 |3   |Saturday|
|2017|January|23 |21  |Monday  |
+----+-------+---+----+--------+
only showing top 5 rows



In [20]:
# Create the date from year, month, and day column
dim_date = dim_date.withColumn('month_int', from_unixtime(
    unix_timestamp(col("month"),'MMM'),'MM')).withColumn(
    'date',date_format(concat_ws("-",col('year'),col('month_int'),col('day')),"yyyy-MM-dd").cast('date'))

# Creating the required timestamp column after merging the date
dim_date = dim_date.withColumn('date',expr("date || '-' || hour || ':00:00'")
                   ).withColumn('date',to_timestamp('date','yyyy-MM-dd-HH:mm:ss'))

# Assigning a unique value (primary key) to the atm_id column 
window = Window.orderBy(monotonically_increasing_id())
dim_date = dim_date.withColumn("date_id", row_number().over(window))
def date_Id(value):
    return "date_" + str(value)
date_udf = udf(date_Id, StringType())
dim_date = dim_date.withColumn("date_id", date_udf("date_id"))

# Rearranging the columns
dim_date = dim_date.select(col('date_id'),col('date').alias('full_date_time'),col("year"), "month", "day", "hour", "weekday")
dim_date.show(5,truncate=False)

+-------+-------------------+----+-------+---+----+--------+
|date_id|full_date_time     |year|month  |day|hour|weekday |
+-------+-------------------+----+-------+---+----+--------+
|date_1 |2017-01-01 09:00:00|2017|January|1  |9   |Sunday  |
|date_2 |2017-01-03 05:00:00|2017|January|3  |5   |Tuesday |
|date_3 |2017-01-08 19:00:00|2017|January|8  |19  |Sunday  |
|date_4 |2017-01-21 03:00:00|2017|January|21 |3   |Saturday|
|date_5 |2017-01-23 21:00:00|2017|January|23 |21  |Monday  |
+-------+-------------------+----+-------+---+----+--------+
only showing top 5 rows



In [21]:
# Verifying the schema
dim_date.printSchema()

root
 |-- date_id: string (nullable = true)
 |-- full_date_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)



In [22]:
# Verifing the total count for the dim_atm dimension.
dim_date.count()

8685

# 4. Creation of Fact Table using Loaded Data and Dimension Tables

- A new dataframe is created for the fact table using the previous loaded data.
- The dataframe is joined with DIM_LOCATION table using all the columns in the dataframe.
- The resultant is joined with DIM_CARD_TYPE and DIM_DATE and DIM_ATM table respectively using the candidate keys for the table.
- The extra columns that might get created after join are dropped off.
- Unique ID is assigned to the fact table and the columns are rearranged according to the target schema.
- The result is verified printing the data and matching the count.

In [23]:
# Copying the loaded into another dataframe to create the fact table
fact_atm_trans = bank_data.select('*')

In [24]:
# Joining the table with DIM_LOCATION table to get the required location_id.
condition = [fact_atm_trans.atm_location == dim_location.location, fact_atm_trans.atm_lat == dim_location.lat,
             fact_atm_trans.atm_lon == dim_location.lon, fact_atm_trans.atm_streetname == dim_location.streetname, 
            fact_atm_trans.atm_street_number == dim_location.street_number, fact_atm_trans.atm_zipcode == dim_location.zipcode]
fact_atm_trans = fact_atm_trans.join(dim_location, condition, 'left_outer').select('*')

# Joining the table with DIM_CARD_TYPE table to get the required card_type_id.
condition = [fact_atm_trans.card_type == dim_card_type.card_type]
fact_atm_trans = fact_atm_trans.join(dim_card_type, condition, 'left_outer').select('*')

# Joining the table with DIM_DATE table to get the required date_id.
condition = [fact_atm_trans.year == dim_date.year, fact_atm_trans.month == dim_date.month,
             fact_atm_trans.day == dim_date.day, fact_atm_trans.hour == dim_date.hour,
            fact_atm_trans.weekday == dim_date.weekday]
fact_atm_trans = fact_atm_trans.join(dim_date, condition, 'left_outer').select('*')

# Joining the table with DIM_ATM table to get the required atm_id.
fact_atm_trans = fact_atm_trans.withColumnRenamed("atm_id","atm_id_fact")
condition = [fact_atm_trans.atm_id_fact == dim_atm_joined.atm_number, fact_atm_trans.atm_manufacturer == dim_atm_joined.atm_manufacturer,
             fact_atm_trans.location_id == dim_atm_joined.atm_location_id]
fact_atm_trans = fact_atm_trans.join(dim_atm_joined, condition, 'left_outer').select('*')


columns_to_drop = ['atm_location','atm_streetname','atm_street_number','atm_zipcode',
                   'atm_lat','atm_lon','location','streetname','street_number','zipcode','lat','lon','card_type',
                  'year','month','day','hour','weekday','full_date_time','atm_manufacturer','atm_number','atm_id_fact','atm_location_id']
fact_atm_trans = fact_atm_trans.drop(*columns_to_drop)
fact_atm_trans.show(2, truncate=False)

+----------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+------------+------------+-------+------+
|atm_status|currency|transaction_amount|service   |message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|temp   |pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|location_id |card_type_id|date_id|atm_id|
+----------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+------------+------------+-------+------+
|Inactive  |DKK     |8933              |Withdrawal|null        |null        |57.464     |9.982      |2620214        |Hjorring         |283.921

In [25]:
# Assigning a unique value (primary key) to the atm_id column 
window = Window.orderBy(monotonically_increasing_id())
fact_atm_trans = fact_atm_trans.withColumn('trans_id', row_number().over(window))
def fact_atm_trans_Id(value):
    return "trans_" + str(value)
fact_atm_trans_udf = udf(fact_atm_trans_Id, StringType())
fact_atm_trans = fact_atm_trans.withColumn('trans_id', fact_atm_trans_udf('trans_id'))

#Rearranging the columns
fact_atm_trans = fact_atm_trans.select('trans_id','atm_id',col('location_id').alias('weather_location_id'),'date_id',
                                       'card_type_id','atm_status','currency','service','transaction_amount','message_code',
                                      'message_text','rain_3h','clouds_all','weather_id','weather_main','weather_description')
fact_atm_trans.show(2, truncate=False)

+--------+------+-------------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|trans_id|atm_id|weather_location_id|date_id|card_type_id|atm_status|currency|service   |transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main|weather_description|
+--------+------+-------------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|trans_1 |atm_40|location_101       |date_61|card_type_8 |Inactive  |DKK     |Withdrawal|8933              |null        |null        |0.0    |0         |800       |Clear       |Sky is Clear       |
|trans_2 |atm_40|location_101       |date_61|card_type_8 |Inactive  |DKK     |Withdrawal|274               |null        |null        |0.0    |0         |800       |Clear       |Sky is Clear       |
+--------+

In [26]:
# Verifying the count of the fact table:
fact_atm_trans.count()

2468572

# 5. Exporting the required dataframes into Amazon S3 buckets.

In [27]:
# Writing data to the s3 bucket using write method.
dim_location.write.csv("s3a://shardul-etl-bank-data/Dim_Location/Dim_Location.csv", mode="overwrite")
dim_atm_joined.write.csv("s3a://shardul-etl-bank-data/Dim_ATM/Dim_ATM.csv", mode="overwrite")
dim_card_type.write.csv("s3a://shardul-etl-bank-data/Dim_Card_Type/Dim_Card_Type.csv", mode="overwrite")
dim_date.write.parquet("s3a://shardul-etl-bank-data/Dim_Date/Dim_Date.parquet", mode="overwrite")
fact_atm_trans.write.csv("s3a://shardul-etl-bank-data/Fact_Transaction/Fact_Transaction.csv", mode="overwrite")