# SPAR NORD BANK ETL PROCESS AND DATA ANALYSIS

PROBLEM STATEMENT
Spar Nord Bank is trying to observe the withdrawal behavior and the corresponding dependent factors to optimally manage the refill frequency. Apart from this, other insights also have to be drawn from the data.

The overall task in this project will be to build a batch ETL pipeline to read transactional data from RDS, transform and load it into target dimensions and facts on Redshift Data Mart(Schema).

Please note that the source data and target schema details are provided to better understand the source and targets, which would help design the ETL pipeline. Once the data is loaded into Redshift, you would have to write the analytical queries discussed above.

We have data from more than 100 ATMs across Denmark. Data is captured for every transaction including, card type, location, date, time, ATM type, etc.

Also, the transaction amount field in the data set was added separately using a random function for the analysis purpose.

Coming to the analysis part, you will be tasked to carry out the calculations to perform the following analytical queries:

Top 10 ATMs where most transactions are in the ’inactive’ state
Number of ATM failures corresponding to the different weather conditions recorded at the time of the transactions
Top 10 ATMs with the most number of transactions throughout the year
Number of overall ATM transactions going inactive per month for each month
Top 10 ATMs with the highest total amount withdrawn throughout the year
Number of failed ATM transactions across various card types
Top 10 records with the number of transactions ordered by the ATM_number, ATM_manufacturer, location, weekend_flag and then total_transaction_count, on weekdays and on weekends throughout the year
Most active day in each ATMs from location "Vejgaard"

CURRENT OBJECTIVE
So far we have extracted the data from MySQL database using Apache Sqoop and placed it in HDFS under the path: /user/root/SRC_ATM_TRANS (Amazon EMR cluster).

Now we have to perform the following:

Read the ATM transactions data from HDFS and store it in a Spark dataframe (PySpark).
Split the dataframe into dimensions and fact dataframes with proper keys generation for each dataframe and establish relationship between them.
Store all the dimensions and fact data in the S3 bucket so that it can be loaded in Amazon Redshift and used for data analysis.

## 1. Import PySpark Library And Establish Spark Session

In [1]:
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, SQLContext
conf = SparkConf()
conf.set('spark.sql.catalogImplementation', 'in-memory')
spark = SparkSession.builder.appName('ETLSparkCode').config(conf=conf).getOrCreate()
sc = spark.sparkContext

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2,application_1690105962890_0003,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
sc

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<SparkContext master=yarn appName=livy-session-2>

## 2. Read the ATM Transactions data from HDFS

In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType
spar_nord_bank_schema = StructType([StructField('year', IntegerType(), nullable = True),
                        StructField('month', StringType(), nullable = True),
                        StructField('day', IntegerType(), nullable = True),
                        StructField('weekday', StringType(), nullable = True),
                        StructField('hour', IntegerType(), nullable = True),
                        StructField('atm_status', StringType(), nullable = True),
                        StructField('atm_id', StringType(), nullable = True),
                        StructField('atm_manufacturer', StringType(), nullable = True),
                        StructField('atm_location', StringType(), nullable = True),
                        StructField('atm_streetname', StringType(), nullable = True),
                        StructField('atm_street_number', IntegerType(), nullable = True),
                        StructField('atm_zipcode', IntegerType(), nullable = True),
                        StructField('atm_lat', DoubleType(), nullable = True),
                        StructField('atm_lon', DoubleType(), nullable = True),
                        StructField('currency', StringType(), nullable = True),
                        StructField('card_type', StringType(), nullable = True),
                        StructField('transaction_amount', IntegerType(), nullable = True),
                        StructField('service', StringType(), nullable = True),
                        StructField('message_code', StringType(), nullable = True),
                        StructField('message_text', StringType(), nullable = True),
                        StructField('weather_lat', DoubleType(), nullable = True),
                        StructField('weather_lon', DoubleType(), nullable = True),
                        StructField('weather_city_id', IntegerType(), nullable = True),
                        StructField('weather_city_name', StringType(), nullable = True),
                        StructField('temp', DoubleType(), nullable = True),
                        StructField('pressure', IntegerType(), nullable = True),
                        StructField('humidity', IntegerType(), nullable = True),
                        StructField('wind_speed', IntegerType(), nullable = True),
                        StructField('wind_deg', IntegerType(), nullable = True),
                        StructField('rain_3h', DoubleType(), nullable = True),
                        StructField('clouds_all', IntegerType(), nullable = True),
                        StructField('weather_id', IntegerType(), nullable = True),
                        StructField('weather_main', StringType(), nullable = True),
                        StructField('weather_description', StringType(), nullable = True)])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
spar_nord_bank_df = spark.read.csv('hdfs:///user/root/SRC_ATM_TRANS/', header=False, schema=spar_nord_bank_schema)
spar_nord_bank_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|     atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-

In [5]:
spar_nord_bank_df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [6]:
spar_nord_bank_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [7]:
spar_nord_bank_df.createOrReplaceTempView('spar_nord_bank_trans')
spark.sql('SELECT * FROM spar_nord_bank_trans LIMIT 5').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|     atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-

Observation:
From the above results we can see that:

Records count from the spark dataframe matches with MySQL table source data (2468572).
All the attributes contains required datatypes as per the data dictionary provided in the business requirement. Now let's break this single dataframe into dimension and fact tables/files and build a proper relationship between dimensions and fact data.

## 3. Create Dimensions and Fact tables data from the Original Dataframe

Create a temporary view for the original dataframe in order to process further using SparkSQL.

### 3.1 Create Location Dimension Table

In [8]:
dim_location = spark.sql('''
                        WITH location_details AS
                        (
                            SELECT 
                                atm_location as location,
                                atm_streetname as streetname,
                                atm_street_number as street_number,
                                atm_zipcode as zipcode,
                                atm_lat as lat,
                                atm_lon as lon
                            FROM
                                spar_nord_bank_trans
                            GROUP BY
                                atm_location,
                                atm_streetname,
                                atm_street_number,
                                atm_zipcode,
                                atm_lat,
                                atm_lon
                        )
                        SELECT 
                            ROW_NUMBER() OVER (ORDER BY location) AS location_id,
                            location,
                            streetname,
                            street_number,
                            zipcode,
                            lat,
                            lon
                        FROM location_details
                        ''')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
dim_location.show(5, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------------+------------+-------------+-------+------+-----+
|location_id|location                  |streetname  |street_number|zipcode|lat   |lon  |
+-----------+--------------------------+------------+-------------+-------+------+-----+
|1          |Aabybro                   |ÃƒËœstergade|6            |9440   |57.162|9.73 |
|2          |Aalborg Hallen            |Europa Plads|4            |9000   |57.044|9.913|
|3          |Aalborg Storcenter  Afd   |Hobrovej    |452          |9200   |57.005|9.876|
|4          |Aalborg Storcenter indg. D|Hobrovej    |452          |9200   |57.005|9.876|
|5          |Aalborg Syd               |Hobrovej    |440          |9200   |57.005|9.881|
+-----------+--------------------------+------------+-------------+-------+------+-----+
only showing top 5 rows

In [10]:
dim_location.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- location_id: integer (nullable = false)
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)

In [11]:
dim_location.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

Observation:
We have successfully created location dimension data with unique records and assigned unique keys for each location.
We have total of 109 locations available in the dataset.

### 3.2 Create Card Type Dimension Table

In [12]:
dim_card_type = spark.sql('''
                        WITH card_type_details AS
                        (
                            SELECT 
                                card_type
                            FROM
                                spar_nord_bank_trans
                            GROUP BY
                                card_type
                        )
                        SELECT 
                            ROW_NUMBER() OVER (ORDER BY card_type) AS card_type_id,
                            card_type
                        FROM card_type_details
                        ''')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
dim_card_type.show(5, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-------------------+
|card_type_id|card_type          |
+------------+-------------------+
|1           |CIRRUS             |
|2           |Dankort            |
|3           |Dankort - on-us    |
|4           |HÃƒÂ¦vekort        |
|5           |HÃƒÂ¦vekort - on-us|
+------------+-------------------+
only showing top 5 rows

In [14]:
dim_card_type.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- card_type_id: integer (nullable = false)
 |-- card_type: string (nullable = true)

In [15]:
dim_card_type.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12

Observation:
We have successfully created card type dimension data with unique records and assigned unique keys for each type.
We have total of 12 card types available in the dataset.

### 3.3 Create Date Dimension Table

In [16]:
date = spar_nord_bank_df.select('year', 'month', 'day', 'hour', 'weekday')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
date.head(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(year=2017, month='January', day=1, hour=0, weekday='Sunday'), Row(year=2017, month='January', day=1, hour=0, weekday='Sunday'), Row(year=2017, month='January', day=1, hour=0, weekday='Sunday'), Row(year=2017, month='January', day=1, hour=0, weekday='Sunday'), Row(year=2017, month='January', day=1, hour=0, weekday='Sunday')]

In [18]:
dim_date = spark.sql('''
                        WITH date_details AS
                        (
                            SELECT 
                                case when length(hour) = 1 then
                                CONCAT(year, '-', month, '-', day, ' ', '0', hour, ':', '00', ':', '00') 
                                else
                                CONCAT(year, '-', month, '-', day, ' ', hour, ':', '00', ':', '00') 
                                end AS full_date_time,
                                year,
                                month,
                                day,
                                hour,
                                weekday
                            FROM
                                spar_nord_bank_trans
                            GROUP BY
                                year,
                                month,
                                day,
                                hour,
                                weekday
                        )
                        SELECT 
                            full_date_time,
                            year,
                            month,
                            day,
                            hour,
                            weekday
                        FROM date_details
                        ''')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
dim_date.show(10, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------+----+--------+---+----+---------+
|full_date_time          |year|month   |day|hour|weekday  |
+------------------------+----+--------+---+----+---------+
|2017-February-7 11:00:00|2017|February|7  |11  |Tuesday  |
|2017-March-4 19:00:00   |2017|March   |4  |19  |Saturday |
|2017-March-1 12:00:00   |2017|March   |1  |12  |Wednesday|
|2017-January-13 19:00:00|2017|January |13 |19  |Friday   |
|2017-March-7 20:00:00   |2017|March   |7  |20  |Tuesday  |
|2017-March-20 19:00:00  |2017|March   |20 |19  |Monday   |
|2017-March-22 22:00:00  |2017|March   |22 |22  |Wednesday|
|2017-January-25 14:00:00|2017|January |25 |14  |Wednesday|
|2017-March-11 08:00:00  |2017|March   |11 |8   |Saturday |
|2017-March-22 13:00:00  |2017|March   |22 |13  |Wednesday|
+------------------------+----+--------+---+----+---------+
only showing top 10 rows

In [20]:
dim_date.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- full_date_time: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)

In [21]:
dim_date.show(10, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------+----+--------+---+----+---------+
|full_date_time          |year|month   |day|hour|weekday  |
+------------------------+----+--------+---+----+---------+
|2017-February-7 11:00:00|2017|February|7  |11  |Tuesday  |
|2017-March-4 19:00:00   |2017|March   |4  |19  |Saturday |
|2017-March-1 12:00:00   |2017|March   |1  |12  |Wednesday|
|2017-January-13 19:00:00|2017|January |13 |19  |Friday   |
|2017-March-7 20:00:00   |2017|March   |7  |20  |Tuesday  |
|2017-March-20 19:00:00  |2017|March   |20 |19  |Monday   |
|2017-March-22 22:00:00  |2017|March   |22 |22  |Wednesday|
|2017-January-25 14:00:00|2017|January |25 |14  |Wednesday|
|2017-March-11 08:00:00  |2017|March   |11 |8   |Saturday |
|2017-March-22 13:00:00  |2017|March   |22 |13  |Wednesday|
+------------------------+----+--------+---+----+---------+
only showing top 10 rows

### Convert full_date_time attribute data type to timestamp and create a primary key for date dim.

In [22]:
# Type conversion of full date time attribute
from pyspark.sql.functions import col, unix_timestamp, row_number
from pyspark.sql.window import Window
dim_date = dim_date.withColumn('full_date_time', unix_timestamp(col('full_date_time'), 'yyyy-MMMM-d HH:mm:ss').cast('timestamp'))
dim_date = dim_date.withColumn('full_date_time', unix_timestamp(col('full_date_time'), 'yyyy-MMMM-dd HH:mm:ss').cast('timestamp'))

# Create primary key for date dim
date_w = Window().orderBy('full_date_time')
dim_date = dim_date.withColumn('date_id', row_number().over(date_w))

dim_date = dim_date.withColumn('full_date_time', unix_timestamp(col('full_date_time'), 'yyyy-MMMM-dd HH:mm:ss').cast('timestamp'))

# Create primary key for date dim

dim_date1 = dim_date.select('date_id', 'full_date_time', 'year', 'month', 'day', 'hour', 'weekday')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
dim_date1.show(5, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+----+-------+---+----+-------+
|date_id|full_date_time     |year|month  |day|hour|weekday|
+-------+-------------------+----+-------+---+----+-------+
|1      |2017-01-01 00:00:00|2017|January|1  |0   |Sunday |
|2      |2017-01-01 01:00:00|2017|January|1  |1   |Sunday |
|3      |2017-01-01 02:00:00|2017|January|1  |2   |Sunday |
|4      |2017-01-01 03:00:00|2017|January|1  |3   |Sunday |
|5      |2017-01-01 04:00:00|2017|January|1  |4   |Sunday |
+-------+-------------------+----+-------+---+----+-------+
only showing top 5 rows

In [24]:
dim_date = dim_date1

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
dim_date.show(5, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+----+-------+---+----+-------+
|date_id|full_date_time     |year|month  |day|hour|weekday|
+-------+-------------------+----+-------+---+----+-------+
|1      |2017-01-01 00:00:00|2017|January|1  |0   |Sunday |
|2      |2017-01-01 01:00:00|2017|January|1  |1   |Sunday |
|3      |2017-01-01 02:00:00|2017|January|1  |2   |Sunday |
|4      |2017-01-01 03:00:00|2017|January|1  |3   |Sunday |
|5      |2017-01-01 04:00:00|2017|January|1  |4   |Sunday |
+-------+-------------------+----+-------+---+----+-------+
only showing top 5 rows

In [26]:
dim_date.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- date_id: integer (nullable = false)
 |-- full_date_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)

In [27]:
dim_date.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8685


Observation:
We have successfully created date dimension data with unique records and assigned unique keys for each type.
We have total of 8685 records available in the date dim dataset

### 3.4 Create ATM Dimension Table

In [28]:
# Create a temp view for location dimension to perform lookup operation on original dataset
dim_location.createOrReplaceTempView('location_dim_vw')
spark.sql('SELECT * FROM location_dim_vw LIMIT 5').show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------------+------------+-------------+-------+------+-----+
|location_id|location                  |streetname  |street_number|zipcode|lat   |lon  |
+-----------+--------------------------+------------+-------------+-------+------+-----+
|1          |Aabybro                   |ÃƒËœstergade|6            |9440   |57.162|9.73 |
|2          |Aalborg Hallen            |Europa Plads|4            |9000   |57.044|9.913|
|3          |Aalborg Storcenter  Afd   |Hobrovej    |452          |9200   |57.005|9.876|
|4          |Aalborg Storcenter indg. D|Hobrovej    |452          |9200   |57.005|9.876|
|5          |Aalborg Syd               |Hobrovej    |440          |9200   |57.005|9.881|
+-----------+--------------------------+------------+-------------+-------+------+-----+

In [29]:
# Create atm dimension table data with primary key generation
dim_atm = spark.sql('''
                        WITH atm_details AS
                        (
                            SELECT 
                                snt.atm_id AS atm_number,
                                snt.atm_manufacturer AS atm_manufacturer,
                                l.location_id AS atm_location_id
                            FROM
                                spar_nord_bank_trans snt
                                LEFT JOIN location_dim_vw l
                                    ON snt.atm_location = l.location
                                        AND snt.atm_streetname = l.streetname
                                        AND snt.atm_street_number = l.street_number
                                        AND snt.atm_lat = l.lat
                                        AND snt.atm_lon = l.lon
                            GROUP BY
                                snt.atm_id,
                                snt.atm_manufacturer,
                                l.location_id
                        )
                        SELECT 
                            ROW_NUMBER() OVER (ORDER BY atm_number, atm_manufacturer, atm_location_id) AS atm_id,
                            atm_number,
                            atm_manufacturer,
                            atm_location_id
                        FROM atm_details
                        ''')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
# Check first 5 records of atm dim
dim_atm.show(5, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|1     |1         |NCR             |74             |
|2     |10        |NCR             |76             |
|3     |100       |NCR             |56             |
|4     |101       |NCR             |17             |
|5     |102       |NCR             |3              |
+------+----------+----------------+---------------+
only showing top 5 rows

In [31]:
# Check atm dim data structure
dim_atm.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- atm_id: integer (nullable = false)
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location_id: integer (nullable = true)

In [32]:
# Check total no. of records in atm dim
dim_atm.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

### 3.5 Create ATM Transactions Fact Table

Fact Stage 1
Create first stage of ATM Trans fact table by performing left join with location dim from original dataset.

In [33]:
fact_atm_trans_stg1 = spar_nord_bank_df.withColumnRenamed('atm_location','location').withColumnRenamed('atm_streetname','streetname')\
                      .withColumnRenamed('atm_street_number', 'street_number').withColumnRenamed('atm_zipcode', 'zipcode')\
                      .withColumnRenamed('atm_lat', 'lat').withColumnRenamed('atm_lon', 'lon')

fact_atm_trans_stg1 = fact_atm_trans_stg1.join(dim_location, on=['location', 'streetname', 'street_number', 'zipcode','lat', 'lon'], how='left')
fact_atm_trans_stg1 = fact_atm_trans_stg1.withColumnRenamed('location_id', 'atm_location_id')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
# Check first stage data structure
fact_atm_trans_stg1.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string (nullable = true)
 

In [35]:
# Check first stage count
fact_atm_trans_stg1.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

### Fact Stage 2

In [36]:
fact_atm_trans_stg2 = fact_atm_trans_stg1.join(dim_card_type, on=['card_type'], how='left')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
# Check second stage data structure
fact_atm_trans_stg2.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- card_type: string (nullable = true)
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string (nullable = true)
 

In [38]:
# Check second stage count
fact_atm_trans_stg2.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

### Fact Stage 3

In [39]:
fact_atm_trans_stg3 = fact_atm_trans_stg2.join(dim_date, on=['year', 'month', 'day', 'weekday', 'hour'], how='left')
fact_atm_trans_stg3 = fact_atm_trans_stg3.withColumnRenamed('atm_id', 'atm_number')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
# Check third stage data structure
fact_atm_trans_stg3.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- card_type: string (nullable = true)
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string (nullable = tru

In [41]:
# Check third stage count
fact_atm_trans_stg3.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

### Fact Stage 4

In [42]:
fact_atm_trans_stg4 = fact_atm_trans_stg3.join(dim_atm, on=['atm_number', 'atm_manufacturer', 'atm_location_id'], how='left')
fact_atm_trans_stg4 = fact_atm_trans_stg4.withColumnRenamed('atm_location_id', 'weather_loc_id')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
# Check fourth stage data structure
fact_atm_trans_stg4.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- weather_loc_id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- card_type: string (nullable = true)
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)

In [44]:
# Check fourth stage count
fact_atm_trans_stg4.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

### Final Fact ATM Transaction table

In [45]:
# Create a primary key for fact table
atm_trans_w = Window().orderBy('date_id')
fact_atm_trans = fact_atm_trans_stg4.withColumn('trans_id', row_number().over(atm_trans_w))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [46]:
# Select the required columns from the final fact table
fact_atm_trans = fact_atm_trans.select('trans_id', 'atm_id', 'weather_loc_id', 'date_id', 'card_type_id', 
                                       'atm_status', 'currency', 'service', 'transaction_amount', 'message_code',
                                       'message_text', 'rain_3h', 'clouds_all', 'weather_id', 'weather_main',
                                       'weather_description')
fact_atm_trans = fact_atm_trans.withColumn('trans_id', col('trans_id').cast(LongType()))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [47]:
# Check first 5 records in fact atm trans
fact_atm_trans.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|trans_id|atm_id|weather_loc_id|date_id|card_type_id|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|       1|    24|           105|      1|           9|    Active|     DKK|Withdrawal|              4062|        null|        null|    0.0|        75|       300|     Drizzle|light intensity d...|
|       2|    19|            97|      1|           8|    Active|     DKK|Withdrawal|              5024|        null|        null|  1.275|        92|       500|        Rain|          light rain|
|       3|    70|            2

In [48]:
# Check fact data structure
fact_atm_trans.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- trans_id: long (nullable = false)
 |-- atm_id: integer (nullable = true)
 |-- weather_loc_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)

In [49]:
# Check fact table count
fact_atm_trans.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572


Observation:
We have successfully created atm transaction fact table data and established primary-foreign keys relationship with dimension tables.
We have total of 2468572 records available in the fact dataset

### 4. Copy all the Dimensions and Fact tables data in S3 bucket

In [50]:
# Copying data of Location dim to S3 bucket in CSV format
dim_location.write.csv('s3://sparketl/dimention/DIM_LOCATION', header=False, mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
# Copying data of Card Type dim to S3 bucket in CSV format
dim_card_type.write.csv('s3://sparketl/dimention/DIM_CARD_TYPE', header=False, mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
# Copying data of Date dim to S3 bucket in CSV format
dim_date.write.csv('s3://sparketl/dimention/DIM_DATE', header=False, mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [53]:
# Copying data of ATM dim to S3 bucket in CSV format
dim_atm.write.csv('s3://sparketl/dimention/DIM_ATM', header=False, mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
# Copying data of ATM Transactions fact to S3 bucket in CSV format
fact_atm_trans.write.csv('s3://sparketl/dimention/FACT_ATM_TRANS', header=False, mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# CONCLUSION

We have successfully read the source data from HDFS, performed transformations using Spark and splitted the data into dimensions and facts tables.
Later we have copied all the dimensions and fact data into S3 bucket (s3://sparketl/dimention/) which can further be used to load it into Amazon Redshift for Data Analysis.
