 # <div align="center"> <span style="font-size:larger; font-weight:bold;" >ETL CASE STUDY USING PYSPARK</span></div>  
 
## PROBLEM STATEMENT : 

In our project, Spar Nord Bank is trying to observe the withdrawal behaviour and the corresponding dependent factors to optimally manage the refill frequency. Apart from this, other insights also have to be drawn from the data.

We have data from more than 100 ATMs across Denmark. Data is captured for every transaction including, card type, location, date, time, ATM type, etc.

Also, the transaction amount field in the data set was added separately using a random function for the analysis purpose. 

Spar Nord Bank has also built a dimensional model datastore (ATM Data Mart) on this ATM transaction data to understand the ATM usage pattern.

## Steps followed:

1. Extracting the transactional data from a given MySQL RDS server to HDFS(EC2) instance using Sqoop.

1. Transforming the transactional data according to the given target schema using PySpark. 

1. This transformed data is to be loaded to an S3 bucket.

1. Creating the Redshift tables according to the given schema.

1. Loading the data from Amazon S3 to Redshift tables.

1. Performing the analysis queries on RedShift Query Editor.

As a process, we have to make sure we do have all the necessary variables are in place and assigned correctly. Hence we do the following pre-liminary step

In [1]:
# Importing packages and setting up the environment variables in order to have the jupyter notebook

import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

Once we are done with the environment variables set, we can go ahead and create SparkSession and sparkContext in order to work with spark data-processing engine. Also setting the partitions of about 11 to make sure smooth operation.

In [2]:
# Importing SparkSession and creating the app and starting sparkContext

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Etl_case_study').getOrCreate()
sc = spark.sparkContext

# Setting up the Partitions of 11,in order not to get the Garbage Collector stucked

spark.conf.set("spark.sql.shuffle.partitions", 11)
sc

Then, we are importing certain types in order to make a new custom schema for our sqoop imported file available in HDFS

In [3]:
# Importing Struct and some of the data types in order to have a custom schema for the DF

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType,FloatType

# Importing Pandas in order to show the DF neat and clear

import pandas as pd
pd.set_option('max_columns', None)

# Importing all the window and functions modules package

from pyspark.sql.window import *
from pyspark.sql.functions import *

# Assinging all the fields with the required data types and having all the values with allowed nulls

fileSchema = StructType([StructField('year',IntegerType(),True),
                    StructField('month',StringType(),True),
                    StructField('day',IntegerType(),True),
                    StructField('weekday',StringType(),True),
                    StructField('hour',IntegerType(),True),
                    StructField('atm_status',StringType(),True),
                    StructField('atm_id',StringType(),True),
                    StructField('atm_manufacturer',StringType(),True),
                    StructField('atm_location',StringType(),True),
                    StructField('atm_streetname',StringType(),True),
                    StructField('atm_street_number',IntegerType(),True),
                    StructField('atm_zipcode',IntegerType(),True),
                    StructField('atm_lat',DecimalType(10,3),True),
                    StructField('atm_lon',DecimalType(10,3),True),
                    StructField('currency',StringType(),True),
                    StructField('card_type',StringType(),True),
                    StructField('transaction_amount',IntegerType(),True),
                    StructField('service',StringType(),True),
                    StructField('message_code',StringType(),True),
                    StructField('message_text',StringType(),True),
                    StructField('weather_lat',DecimalType(10,3),True),
                    StructField('weather_lon',DecimalType(10,3),True),
                    StructField('weather_city_id',IntegerType(),True),
                    StructField('weather_city_name',StringType(),True),
                    StructField('temp',DecimalType(10,3),True),
                    StructField('pressure',IntegerType(),True),
                    StructField('humidity',IntegerType(),True),
                    StructField('wind_speed',IntegerType(),True),
                    StructField('wind_deg',IntegerType(),True),
                    StructField('rain_3h',DecimalType(10,3),True),
                    StructField('clouds_all',IntegerType(),True),
                    StructField('weather_id',IntegerType(),True),
                    StructField('weather_main',StringType(),True),
                    StructField('weather_description',StringType(),True)
                    ])

We are going to load the file using csv with our custom created schema as a schema for the DF

In [4]:
# Loading Data in the form of txt file with the custom Schema created

table = spark.read.csv("ETL_Case_Study/part-m-00000",schema=fileSchema)

Once done with the laoding, we will do some general analysis of DF in order to have better understanding

In [5]:
# Printing the Schema created

table.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: decimal(10,3) (nullable = true)
 |-- atm_lon: decimal(10,3) (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: decimal(10,3) (nullable = true)
 |-- weather_lon: decimal(10,3) (nullable = true)
 |-- weather_city_id: integer (nullable = true

In [6]:
# A Glimpse of Data Frame loaded

table.limit(5).toPandas().head(5)

Unnamed: 0,year,month,day,weekday,hour,atm_status,atm_id,atm_manufacturer,atm_location,atm_streetname,atm_street_number,atm_zipcode,atm_lat,atm_lon,currency,card_type,transaction_amount,service,message_code,message_text,weather_lat,weather_lon,weather_city_id,weather_city_name,temp,pressure,humidity,wind_speed,wind_deg,rain_3h,clouds_all,weather_id,weather_main,weather_description
0,2017,January,1,Sunday,0,Active,1,NCR,NÃƒÂ¦stved,Farimagsvej,8,4700,55.233,11.763,DKK,MasterCard,5643,Withdrawal,,,55.23,11.761,2616038,Naestved,281.15,1014,87,7,260,0.215,92,500,Rain,light rain
1,2017,January,1,Sunday,0,Inactive,2,NCR,Vejgaard,Hadsundvej,20,9000,57.043,9.95,DKK,MasterCard,1764,Withdrawal,,,57.048,9.935,2616235,NÃƒÂ¸rresundby,280.64,1020,93,9,250,0.59,92,500,Rain,light rain
2,2017,January,1,Sunday,0,Inactive,2,NCR,Vejgaard,Hadsundvej,20,9000,57.043,9.95,DKK,VISA,1891,Withdrawal,,,57.048,9.935,2616235,NÃƒÂ¸rresundby,280.64,1020,93,9,250,0.59,92,500,Rain,light rain
3,2017,January,1,Sunday,0,Inactive,3,NCR,Ikast,RÃƒÂ¥dhusstrÃƒÂ¦det,12,7430,56.139,9.154,DKK,VISA,4166,Withdrawal,,,56.139,9.158,2619426,Ikast,281.15,1011,100,6,240,0.0,75,300,Drizzle,light intensity drizzle
4,2017,January,1,Sunday,0,Active,4,NCR,Svogerslev,BrÃƒÂ¸nsager,1,4000,55.634,12.018,DKK,MasterCard,5153,Withdrawal,,,55.642,12.08,2614481,Roskilde,280.61,1014,87,7,260,0.0,88,701,Mist,mist


In [7]:
# Total Count of the DF 

print("There are totally {0} records in the DF created".format(table.count()))

There are totally 2468572 records in the DF created


In [8]:
# Inspecting the length of DF

print("There are totally {0} columns in the DF".format(len(table.columns)))

There are totally 34 columns in the DF


In [9]:
# Inspecting the column name created

print("The Columns created are  as follows")
print("=="*30)

for i in table.columns:
    print(i)

The Columns created are  as follows
year
month
day
weekday
hour
atm_status
atm_id
atm_manufacturer
atm_location
atm_streetname
atm_street_number
atm_zipcode
atm_lat
atm_lon
currency
card_type
transaction_amount
service
message_code
message_text
weather_lat
weather_lon
weather_city_id
weather_city_name
temp
pressure
humidity
wind_speed
wind_deg
rain_3h
clouds_all
weather_id
weather_main
weather_description


## Dimensions and Facts

The Fact Table and Dimension Tables, are the essential factors to create a schema.  Basically we are using Star Schema here in order to produce 1 fact and 4 dimensions for our case study.

### Fact :
 A fact table is a table whose records are combinations of attributes from different dimension tables used.  
 Fact table helps the user to analyze from no-where to high level various business dimensions which helps in decision taking.
 
### Dimension
 A dimension tables help fact table to collect dimensions along which the measures has to be taken.
 
 Both of these tables can be collectively used by joining using the foreign key / column column values in the tables.  
 
 
___DIMENSION 1 - LOCATION___

Here, we are taking appropriate columns from main table and creating Id based out of atm location ordered in ascending alphabetical order and also making sure that duplicate records are cleaned.

In [10]:
# Creating a dimension table with Id and necessary columns applied

table1=table.select('atm_location','atm_streetname','atm_street_number','atm_zipcode','atm_lat','atm_lon').distinct()
location =table1.withColumn("location_id",row_number().over(Window.orderBy(col('atm_location'))))
dim_location = location.select(['location_id','atm_location','atm_streetname','atm_street_number','atm_zipcode','atm_lat','atm_lon']).\
            withColumnRenamed('atm_location','location').\
            withColumnRenamed('atm_streetname','streetname').\
            withColumnRenamed('atm_street_number','street_number').\
            withColumnRenamed('atm_zipcode','zipcode').\
            withColumnRenamed('atm_lat','lat').\
            withColumnRenamed('atm_lon','lon')
            

Once created, we are seeing the top and bottom rows in order to see the quality of our DF created

In [11]:
# A Glance of created dimension DF

dim_location.toPandas().head()

Unnamed: 0,location_id,location,streetname,street_number,zipcode,lat,lon
0,1,Aabybro,ÃƒËœstergade,6,9440,57.162,9.73
1,2,Aalborg Hallen,Europa Plads,4,9000,57.044,9.913
2,3,Aalborg Storcenter Afd,Hobrovej,452,9200,57.005,9.876
3,4,Aalborg Storcenter indg. D,Hobrovej,452,9200,57.005,9.876
4,5,Aalborg Syd,Hobrovej,440,9200,57.005,9.881


In [12]:
# A Glance of created dimension DF

dim_location.toPandas().tail()

Unnamed: 0,location_id,location,streetname,street_number,zipcode,lat,lon
104,105,Viborg,Toldboden,3,8800,56.448,9.401
105,106,Vinderup,SÃƒÂ¸ndergade,5,7830,56.481,8.779
106,107,Vodskov,Vodskovvej,27,9310,57.104,10.027
107,108,ÃƒËœsterÃƒÂ¥ Duus,ÃƒËœsterÃƒÂ¥,12,9000,57.049,9.922
108,109,ÃƒËœsterÃƒÂ¥ MÃƒÂ¸ller,ÃƒËœsterÃƒÂ¥,12,9000,57.049,9.922


We will now see the schema structure and then the total count of DF

In [13]:
# Schema of Dimension Created

print("The Schema of the DF DIM_LOCATION is")
print("=="*30)
dim_location.printSchema()

The Schema of the DF DIM_LOCATION is
root
 |-- location_id: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: decimal(10,3) (nullable = true)
 |-- lon: decimal(10,3) (nullable = true)



In [14]:
# Count of the dimension created

print("The count of records in DIM_LOCATION table is {0}".format(dim_location.count()))

The count of records in DIM_LOCATION table is 109


Then we are doing some filter operations in order to see the values accordingly

In [15]:
# A filter operation over DF

dim_location.filter(dim_location['location_id']==106).toPandas().head()

Unnamed: 0,location_id,location,streetname,street_number,zipcode,lat,lon
0,106,Vinderup,SÃƒÂ¸ndergade,5,7830,56.481,8.779


In [16]:
# A filter operation over DF

dim_location.filter((dim_location['zipcode']>6000) & (dim_location['zipcode']<7000)).toPandas().head()

Unnamed: 0,location_id,location,streetname,street_number,zipcode,lat,lon
0,23,Esbjerg,Strandbygade,20,6700,55.468,8.44
1,99,SÃƒÂ¦dding,Tarphagevej,59,6710,55.498,8.408


___DIMENSION 2 - ATM___

Here, we are taking appropriate columns from main table and creating atm number based out of some of the columns in DF and also making sure that duplicate records are cleaned.

In [17]:
# Creating a dimension table with new column and some of necessary columns applied

table2=table.select('atm_id','atm_manufacturer','atm_location','atm_lat','atm_lon','atm_streetname'\
                    ,'atm_street_number','atm_zipcode').distinct()
table2_df=table2.withColumn("atm_number",concat(lit(col("atm_id")),substring('atm_location',1,3),\
                                                substring("atm_manufacturer",1,3)))

# Creating a dimension table with Id and necessary columns applied

dim_atm = table2_df.join(dim_location, ((table2_df['atm_location']==dim_location['location']) & \
            (table2_df['atm_lat']==dim_location['lat']) & (table2_df['atm_lon']==dim_location['lon']) & \
                                        (table2_df['atm_streetname']==dim_location['streetname']) & \
                                        (table2_df['atm_street_number'] == dim_location['street_number'])&\
                                        (table2_df['atm_zipcode']==dim_location['zipcode'])),how='inner') \
        .select(col('atm_id'),col('atm_number'),col('atm_manufacturer'),col("location_id")) \
        .withColumn('atm_id',col('atm_id').cast(IntegerType()))\
        .withColumnRenamed("atm_location_id","location_id").orderBy('atm_id')

Once created, we are seeing the top and bottom rows in order to see the quality of our DF created

In [18]:
# A Glance of created dimension DF

dim_atm.toPandas().head()

Unnamed: 0,atm_id,atm_number,atm_manufacturer,location_id
0,1,1NÃƒNCR,NCR,74
1,2,2VejNCR,NCR,103
2,3,3IkaNCR,NCR,48
3,4,4SvoNCR,NCR,96
4,5,5NibNCR,NCR,69


In [19]:
# A Glance of created dimension DF

dim_atm.toPandas().tail()

Unnamed: 0,atm_id,atm_number,atm_manufacturer,location_id
108,109,109AalDie,Diebold Nixdorf,5
109,110,110HolDie,Diebold Nixdorf,41
110,111,111AarDie,Diebold Nixdorf,8
111,112,112NÃƒDie,Diebold Nixdorf,75
112,113,113SlaDie,Diebold Nixdorf,88


We will now see the schema structure and then the total count of DF

In [20]:
# Schema of Dimension Created

print("The Schema of the DF DIM_ATM is")
print("=="*30)
dim_atm.printSchema()

The Schema of the DF DIM_ATM is
root
 |-- atm_id: integer (nullable = true)
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- location_id: integer (nullable = true)



In [21]:
# Count of the dimension created

print("The count of records in DIM_ATM table is {0}".format(dim_atm.count()))

The count of records in DIM_ATM table is 113


Then we are doing some filter operations in order to see the values accordingly

In [22]:
# A filter operation over DF

dim_atm.filter(dim_atm['atm_id']==111).toPandas().head()

Unnamed: 0,atm_id,atm_number,atm_manufacturer,location_id
0,111,111AarDie,Diebold Nixdorf,8


In [23]:
# A filter operation over DF

dim_atm.filter(dim_atm['atm_manufacturer']=='NCR').toPandas().head()

Unnamed: 0,atm_id,atm_number,atm_manufacturer,location_id
0,1,1NÃƒNCR,NCR,74
1,2,2VejNCR,NCR,103
2,3,3IkaNCR,NCR,48
3,4,4SvoNCR,NCR,96
4,5,5NibNCR,NCR,69


___DIMENSION 3 - DATE___

Here, we are taking appropriate columns from main table and creating full date time and date id based out of some of the columns in DF and also making sure that duplicate records are cleaned.

In [24]:
# Taking only distinct values for creating DF

table3 = table.select('year','month','day','hour','weekday')\
.orderBy("month",col('day'),col('hour')) \
.distinct()

# Creating a dimension table with Id and necessary columns applied

table31=table3.withColumn("merge", concat(concat_ws("-", col("year"), date_format(to_date(col('month'), 'MMMMM'), 'MM').cast('int'), col("day")),lit(" "),concat_ws(":",col("hour"),lit("00"),lit("00")),lit(".000"))) \
.withColumn("full_date_time", to_timestamp("merge",format="yyyy-MM-dd HH:mm:ss"))\
.drop("merge")

date =table31.withColumn("date_id",row_number().over(Window.orderBy(col('full_date_time'))))

dim_date = date.select(['date_id','full_date_time','year','month','day','hour','weekday']) 

Once created, we are seeing the top and bottom rows in order to see the quality of our DF created

In [25]:
# A snap of DF created

dim_date.toPandas().head()

Unnamed: 0,date_id,full_date_time,year,month,day,hour,weekday
0,1,2017-01-01 00:00:00,2017,January,1,0,Sunday
1,2,2017-01-01 01:00:00,2017,January,1,1,Sunday
2,3,2017-01-01 02:00:00,2017,January,1,2,Sunday
3,4,2017-01-01 03:00:00,2017,January,1,3,Sunday
4,5,2017-01-01 04:00:00,2017,January,1,4,Sunday


In [26]:
# A snap of DF created

dim_date.toPandas().tail()

Unnamed: 0,date_id,full_date_time,year,month,day,hour,weekday
8680,8681,2017-12-31 19:00:00,2017,December,31,19,Sunday
8681,8682,2017-12-31 20:00:00,2017,December,31,20,Sunday
8682,8683,2017-12-31 21:00:00,2017,December,31,21,Sunday
8683,8684,2017-12-31 22:00:00,2017,December,31,22,Sunday
8684,8685,2017-12-31 23:00:00,2017,December,31,23,Sunday


We will now see the schema structure and then the total count of DF

In [27]:
# Schema of Dimension Created

print("The Schema of the DF DIM_DATE is")
print("=="*30)
dim_date.printSchema()

The Schema of the DF DIM_DATE is
root
 |-- date_id: integer (nullable = true)
 |-- full_date_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)



In [28]:
# Count of Dimension created

print("The count of records in DIM_DATE table is {0}".format(dim_date.count()))

The count of records in DIM_DATE table is 8685


Then we are doing some filter operations in order to see the values accordingly

In [29]:
# A filter operation over DF

dim_date.filter((dim_date.weekday == 'Saturday')|(dim_date.weekday == 'Sunday')).toPandas().head()

Unnamed: 0,date_id,full_date_time,year,month,day,hour,weekday
0,1,2017-01-01 00:00:00,2017,January,1,0,Sunday
1,2,2017-01-01 01:00:00,2017,January,1,1,Sunday
2,3,2017-01-01 02:00:00,2017,January,1,2,Sunday
3,4,2017-01-01 03:00:00,2017,January,1,3,Sunday
4,5,2017-01-01 04:00:00,2017,January,1,4,Sunday


___DIMENSION 4 - CARD TYPE___

Here, we are taking appropriate columns from main table and creating card type id based out of the column in DF and also making sure that duplicate records are cleaned.

In [30]:
# Creating a dimension table with Id and necessary columns applied

table4 = table.select("card_type").distinct()
card_type =table4.withColumn("card_type_id",row_number().over(Window.orderBy(col("card_type"))))
dim_card_type = card_type.select(["card_type_id","card_type"])

Once created, we are seeing the rows in order to see the quality of our DF created

In [31]:
# A snap of DF created

dim_card_type.toPandas().head(12)

Unnamed: 0,card_type_id,card_type
0,1,CIRRUS
1,2,Dankort
2,3,Dankort - on-us
3,4,HÃƒÂ¦vekort
4,5,HÃƒÂ¦vekort - on-us
5,6,Maestro
6,7,MasterCard
7,8,Mastercard - on-us
8,9,VISA
9,10,Visa Dankort


We will now see the schema structure and then the total count of DF

In [32]:
# Schema of Dimension Created

print("The Schema of the DF DIM_CARD_TYPE is")
print("=="*30)
dim_card_type.printSchema()

The Schema of the DF DIM_CARD_TYPE is
root
 |-- card_type_id: integer (nullable = true)
 |-- card_type: string (nullable = true)



In [33]:
# Count of Dimension Created

print("The total count of records in this table is {0}".format(dim_card_type.count()))

The total count of records in this table is 12


Then we are doing some filter operations in order to see the values accordingly

In [34]:
# A filter operation over DF

dim_card_type.filter(dim_card_type.card_type_id == 12).toPandas().head()

Unnamed: 0,card_type_id,card_type
0,12,VisaPlus


In [35]:
# A filter operation over DF

dim_card_type.filter(dim_card_type.card_type.contains("Visa")).toPandas().head()

Unnamed: 0,card_type_id,card_type
0,10,Visa Dankort
1,11,Visa Dankort - on-us
2,12,VisaPlus


___FACT TABLE___

Here, we are taking appropriate columns from all the dimension tables and creating mappings id based out of the column in DF

* __Step 0 :__ Taking necessary columns from Table  
* __Step 1 :__ Performing Join with Id taken out for Fact table with dim_location dimension  
* __Step 2 :__ Performing Join with Id taken out for Fact table with dim_atm dimension  
* __Step 3 :__ Performing Join with Id taken out for Fact table with dim_date dimension  
* __Step 4 :__ Performing Join with Id taken out for Fact table with dim_card_type dimension
* __Step 5 :__ Creating new column for each record trans_id to filter the record easily

In [36]:
# Taking necessary columns from Table

table_f = table.select('atm_id',"atm_status","currency",'service','transaction_amount','message_code',\
                       'message_text','rain_3h','clouds_all','weather_id','weather_main','weather_description','atm_location','atm_lat','atm_lon','atm_streetname','atm_street_number','atm_zipcode',\
                      "year","month","day","hour","weekday","card_type")

In [37]:
# Stage 1 - Performing Join with Id taken out for Fact table

table_f1 = table_f.join(dim_location, ((table_f['atm_location']==dim_location['location']) & \
            (table_f['atm_lat']==dim_location['lat']) & (table_f['atm_lon']==dim_location['lon']) & \
                                        (table_f['atm_streetname']==dim_location['streetname']) & \
                                        (table_f['atm_street_number'] == dim_location['street_number'])&\
                                        (table_f['atm_zipcode']==dim_location['zipcode'])),how='inner')\
        .select ('location_id','atm_id',"atm_status","currency",'service','transaction_amount','message_code',\
                       'message_text','rain_3h','clouds_all','weather_id','weather_main','weather_description',"year",\
                 "month","day","hour","weekday","card_type")

In [38]:
# Stage 2 - Performing Join with Id taken out for Fact table

table_f2 = table_f1.join(dim_date,((table_f1['year']==dim_date['year'])&(table_f1['month']==dim_date['month'])& \
                                   (table_f1['day']==dim_date['day'])&\
                                  (table_f1['hour']==dim_date['hour']) &(table_f1['weekday']==dim_date['weekday'])))\
    .select('atm_id','location_id','date_id',"atm_status","currency",'service','transaction_amount','message_code',\
                       'message_text','rain_3h','clouds_all','weather_id','weather_main','weather_description','card_type')

In [39]:
# Stage 3 - Performing Join with Id taken out for Fact table

table_f3 = table_f2.join(dim_card_type,table_f2['card_type']==dim_card_type['card_type'],how='inner')\
    .select('atm_id','location_id','date_id','card_type_id',"atm_status","currency",'service','transaction_amount','message_code',\
                       'message_text','rain_3h','clouds_all','weather_id','weather_main','weather_description')

In [40]:
# Stage 4 - Performing Join with Id taken out for Fact table

table_f4 = table_f3.withColumn("trans_id",concat(col("atm_id"),col("location_id"),col("date_id"),col("card_type_id"),col("transaction_amount")))\
        .select('trans_id','atm_id','location_id','date_id','card_type_id',"atm_status","currency",'service','transaction_amount','message_code',\
                       'message_text','rain_3h','clouds_all','weather_id','weather_main','weather_description')\
        .withColumn('atm_id',col('atm_id').cast(IntegerType()))\
        .withColumnRenamed('location_id','weather_loc_id')

Once created, we are seeing the rows in order to see the quality of our DF created

In [41]:
# A glance of Fact table

table_f4.limit(10).toPandas().head()

Unnamed: 0,trans_id,atm_id,weather_loc_id,date_id,card_type_id,atm_status,currency,service,transaction_amount,message_code,message_text,rain_3h,clouds_all,weather_id,weather_main,weather_description
0,2866215593692,28,66,2155,9,Active,DKK,Withdrawal,3692,,,0.0,24,801,Clouds,few clouds
1,5973215592942,59,73,2155,9,Active,DKK,Withdrawal,2942,,,0.0,8,800,Clear,sky is clear
2,4182215593941,41,82,2155,9,Active,DKK,Withdrawal,3941,,,0.0,90,701,Mist,mist
3,4182215598434,41,82,2155,9,Active,DKK,Withdrawal,8434,,,0.0,90,701,Mist,mist
4,863621559561,86,36,2155,9,Inactive,DKK,Withdrawal,561,,,0.0,75,701,Mist,mist


In [42]:
# A glance of Fact table

table_f4.limit(20).toPandas().tail()

Unnamed: 0,trans_id,atm_id,weather_loc_id,date_id,card_type_id,atm_status,currency,service,transaction_amount,message_code,message_text,rain_3h,clouds_all,weather_id,weather_main,weather_description
15,69100218092031,69,100,2180,9,Active,DKK,Withdrawal,2031,,,0.0,20,801,Clouds,few clouds
16,69100218098043,69,100,2180,9,Active,DKK,Withdrawal,8043,,,0.0,20,801,Clouds,few clouds
17,1962218091279,19,62,2180,9,Active,DKK,Withdrawal,1279,,,0.0,40,802,Clouds,scattered clouds
18,1962218095624,19,62,2180,9,Active,DKK,Withdrawal,5624,,,0.0,40,802,Clouds,scattered clouds
19,3832218092435,38,32,2180,9,Active,DKK,Withdrawal,2435,,,0.0,20,801,Clouds,few clouds


We will now see the schema structure and then the total count of DF

In [43]:
# Schema of Facts Created

print("The Schema of the DF FACT_ATM_TRANS is")
print("=="*30)
table_f4.printSchema()

The Schema of the DF FACT_ATM_TRANS is
root
 |-- trans_id: string (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- weather_loc_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: decimal(10,3) (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)



In [44]:
# Count of Fact table

print("The total count of records formed after joined with dimensions are {0}".format(table_f4.count()))

The total count of records formed after joined with dimensions are 2468572


Then we are doing some filter operations in order to see the values accordingly

In [45]:
# A filter operation over DF

table_f4.filter((table_f4.date_id==2250)&(table_f4.atm_id==11)).toPandas().head()

Unnamed: 0,trans_id,atm_id,weather_loc_id,date_id,card_type_id,atm_status,currency,service,transaction_amount,message_code,message_text,rain_3h,clouds_all,weather_id,weather_main,weather_description
0,118022503567,11,80,2250,3,Active,DKK,Withdrawal,567,,,0.0,40,802,Clouds,scattered clouds
1,11802250116243,11,80,2250,11,Active,DKK,Withdrawal,6243,,,0.0,40,802,Clouds,scattered clouds
2,118022502765,11,80,2250,2,Active,DKK,Withdrawal,765,,,0.0,40,802,Clouds,scattered clouds
3,1180225055141,11,80,2250,5,Active,DKK,Withdrawal,5141,,,0.0,40,802,Clouds,scattered clouds
4,1180225055479,11,80,2250,5,Active,DKK,Withdrawal,5479,,,0.0,40,802,Clouds,scattered clouds


In [46]:
# A filter operation over DF

table_f4.filter((table_f4.weather_loc_id==25)&(table_f4.card_type_id==9)).toPandas().head()

Unnamed: 0,trans_id,atm_id,weather_loc_id,date_id,card_type_id,atm_status,currency,service,transaction_amount,message_code,message_text,rain_3h,clouds_all,weather_id,weather_main,weather_description
0,62521609919,6,25,2160,9,Active,DKK,Withdrawal,919,,,0.0,32.0,802.0,Clouds,scattered clouds
1,625220598493,6,25,2205,9,Active,DKK,Withdrawal,8493,,,0.0,0.0,800.0,Clear,Sky is Clear
2,625221394796,6,25,2213,9,Active,DKK,Withdrawal,4796,,,0.0,0.0,701.0,Mist,mist
3,625222992141,6,25,2229,9,Active,DKK,Withdrawal,2141,,,0.0,75.0,803.0,Clouds,broken clouds
4,625223593255,6,25,2235,9,Active,DKK,Withdrawal,3255,,,0.0,0.0,800.0,Clear,Sky is Clear


## Loading Facts and Dimensions in to S3 Objects


### S3 Bucket:
  Simple Storage Service (S3), an object storage offering by AWS in public cloud storage resource .   
  Amazon S3 buckets, which are same as file folders, store objects, which consist of data and its descriptive metadata in ordered form to have the view easily.
  
Here, we are doing repartition of 1 single file in order to store all data in an object

In [48]:
# Writing as Csv in S3 with the new folder created

dim_card_type.repartition(1).write.option("header","true")\
.csv("s3a://upgrad-sathyanarayananrv-pyspark/dim_card_type_2")

Here, we are doing repartition of 5 files in order to store all data in 5 objects, changing the format of timestamp in order to load the data properly in AWS Redshift

In [49]:
# Writing as Csv in S3 with the new folder created

dim_date.repartition(5).write.option("header","true").option("timestampFormat", "yyyy-MM-dd HH:mm:ss").\
 csv("s3a://upgrad-sathyanarayananrv-pyspark/dim_date_2")

Here, we are doing repartition of 1 single file in order to store all data in an object

In [50]:
# Writing as Csv in S3 with the new folder created

dim_atm.repartition(1).write.option("header","true")\
.csv("s3a://upgrad-sathyanarayananrv-pyspark/dim_atm_2")

Here, we are doing repartition of 1 single file in order to store all data in an object

In [51]:
# Writing as Csv in S3 with the new folder created

dim_location.repartition(1).write.option("header","true")\
.csv("s3a://upgrad-sathyanarayananrv-pyspark/dim_location_2")

Here, we are doing repartition of 10 files in order to store all data in 10 objects, so as to reduce the complexity of data loading in order to load the data properly in AWS Redshift

In [52]:
# Writing as Csv in S3 with the new folder created

table_f4.repartition(10).write.option("header","true")\
.csv("s3a://upgrad-sathyanarayananrv-pyspark/fact_2")

## Analytical Queries 

As I have moved the data to S3 also, I am trying to produce same kind of results in the notebook itself

1. Top 10 ATMs where most transactions are in the ’inactive’ state

In [53]:
# Joining and filtering records based on the expected results

aq1 = table_f4.join(dim_atm, table_f4['atm_id']==dim_atm['atm_id']).\
    select(table_f4['atm_id'],'atm_manufacturer','transaction_amount','weather_loc_id','atm_status')

aq12 = aq1.join(dim_location, aq1['weather_loc_id']==dim_location['location_id']).\
    select('atm_id','atm_manufacturer','location','transaction_amount','atm_status').\
withColumn('inactive',when(aq1['atm_status'] =='Inactive',1)).\
groupBy('atm_id','atm_manufacturer','location')

print("The output of the query performed is ")
print("=="*30)

aq12.agg(count('transaction_amount').alias('total_transaction_count'),count('inactive').alias('inactive_count'),\
                              round(count('inactive')/count('transaction_amount')*100.0,4).alias('inactive_count_percent')).\
    orderBy(['inactive_count_percent','inactive_count','total_transaction_count'],ascending=False).toPandas().head(10)

The output of the query performed is 


Unnamed: 0,atm_id,atm_manufacturer,location,total_transaction_count,inactive_count,inactive_count_percent
0,16,NCR,Skive,44043,44043,100.0
1,12,NCR,ÃƒËœsterÃƒÂ¥ Duus,33982,33982,100.0
2,2,NCR,Vejgaard,33725,33725,100.0
3,88,NCR,Storcenter indg. A,32183,32183,100.0
4,30,NCR,NykÃƒÂ¸bing Mors,30883,30883,100.0
5,52,NCR,FarsÃƒÂ¸,27361,27361,100.0
6,50,NCR,Aarhus,23416,23416,100.0
7,29,NCR,Skelagervej 15,20773,20773,100.0
8,81,NCR,Spar KÃƒÂ¸bmand TornhÃƒÂ¸j,20148,20148,100.0
9,102,NCR,Aalborg Storcenter Afd,18297,18297,100.0


2. Number of ATM failures corresponding to the different weather conditions recorded at the time of the transactions

In [54]:
# Joining and filtering records based on the expected results

aq2 =table_f4.select('weather_main','transaction_amount','atm_status').\
    withColumn('inactive',when(table_f4['atm_status']=='Inactive',1)).\
    filter(table_f4['weather_main']!='').\
    groupBy('weather_main')

print("The output of the query performed is ")
print("=="*30)

aq2.agg(count('inactive').alias('inactive_count'),count('transaction_amount').alias('total_transaction_count'),\
             round(count('inactive')/count('transaction_amount')*100.0,4).alias('inactive_count_percent')).\
    orderBy('inactive_count_percent',ascending=False).toPandas().head(10)

The output of the query performed is 


Unnamed: 0,weather_main,inactive_count,total_transaction_count,inactive_count_percent
0,Snow,4813,23405,20.564
1,Fog,3729,18174,20.5183
2,Clouds,194027,1181901,16.4165
3,Rain,86017,545135,15.779
4,Clear,85531,543949,15.7241
5,Mist,12864,82801,15.536
6,Thunderstorm,361,2549,14.1624
7,Drizzle,8670,62530,13.8653
8,TORNADO,1,38,2.6316
9,Haze,0,3,0.0


3. Top 10 ATMs with the most number of transactions throughout the year

In [55]:
# Joining and filtering records based on the expected results

aq3 = table_f4.join(dim_atm,table_f4['atm_id']==dim_atm['atm_id']).\
    select(table_f4['atm_id'],'atm_manufacturer','weather_loc_id','transaction_amount')

aq31=aq3.join(dim_location,aq3['weather_loc_id']==dim_location['location_id']).\
    select('atm_id','atm_manufacturer','location','transaction_amount').\
    groupBy('atm_id','atm_manufacturer','location')

print("The output of the query performed is ")
print("=="*30)

aq31.agg(count('transaction_amount').alias('total_transaction_count')).orderBy('total_transaction_count',ascending=False).toPandas().head(10)

The output of the query performed is 


Unnamed: 0,atm_id,atm_manufacturer,location,total_transaction_count
0,39,NCR,Svenstrup,55380
1,20,NCR,Bispensgade,54211
2,10,NCR,NÃƒÂ¸rresundby,53794
3,24,NCR,Hobro,53378
4,45,NCR,Abildgaard,53198
5,16,NCR,Skive,44043
6,40,Diebold Nixdorf,Frederikshavn,43767
7,1,NCR,NÃƒÂ¦stved,42787
8,41,Diebold Nixdorf,Skagen,42732
9,48,Diebold Nixdorf,BrÃƒÂ¸nderslev,42493


4. Number of overall ATM transactions going inactive per month for each month

In [56]:
# Joining and filtering records based on the expected results

aq4=table_f4.join(dim_date,table_f4['date_id']==dim_date['date_id']).\
    select('year','month','transaction_amount','atm_status').\
withColumn('inactive',when(table_f4['atm_status']=='Inactive',1)).\
    groupBy('year','month')

print("The output of the query performed is ")
print("=="*30)

aq4.agg(count('inactive').alias('inactive_count'),count('transaction_amount').alias('total_transaction_count'),\
             round(count('inactive')/count('transaction_amount')*100.0,4).alias('inactive_count_percent')).\
    orderBy(['month']).toPandas().head(12)

The output of the query performed is 


Unnamed: 0,year,month,inactive_count,total_transaction_count,inactive_count_percent
0,2017,April,41830,218865,19.1122
1,2017,August,36713,217218,16.9015
2,2017,December,20476,197048,10.3914
3,2017,February,36656,182659,20.068
4,2017,January,35953,180195,19.9523
5,2017,July,38139,227682,16.751
6,2017,June,36789,225166,16.3386
7,2017,March,41046,209586,19.5843
8,2017,May,37679,222418,16.9406
9,2017,November,21684,193967,11.1792


5. Top 10 ATMs with the highest total amount withdrawn throughout the year 



In [57]:
# Joining and filtering records based on the expected results


aq5=table_f4.join(dim_atm,table_f4['atm_id']==dim_atm['atm_id']).\
    select(table_f4['atm_id'],'atm_manufacturer','location_id','transaction_amount')

aq51 = aq5.join(dim_location,aq5['location_id']==dim_location['location_id']).\
    select('atm_id','atm_manufacturer','location','transaction_amount').\
    groupBy('atm_id','atm_manufacturer','location')

print("The output of the query performed is ")
print("=="*30)

aq51.agg(sum('transaction_amount').alias('total_transaction_amount')).\
    orderBy('total_transaction_amount',ascending=False).toPandas().head(10)

The output of the query performed is 


Unnamed: 0,atm_id,atm_manufacturer,location,total_transaction_amount
0,39,NCR,Svenstrup,277097637
1,20,NCR,Bispensgade,271008803
2,24,NCR,Hobro,268289882
3,10,NCR,NÃƒÂ¸rresundby,267379103
4,45,NCR,Abildgaard,265639616
5,16,NCR,Skive,220677013
6,40,Diebold Nixdorf,Frederikshavn,219812287
7,41,Diebold Nixdorf,Skagen,214127315
8,1,NCR,NÃƒÂ¦stved,213721117
9,48,Diebold Nixdorf,BrÃƒÂ¸nderslev,212883099


6. Number of failed ATM transactions across various card types

In [58]:
# Joining and filtering records based on the expected results

aq6=table_f4.join(dim_card_type, table_f4['card_type_id']==dim_card_type['card_type_id'],how='inner').\
        select('card_type','transaction_amount','atm_status').\
        withColumn('inactive',when(table_f4['atm_status'] =='Inactive',1))

aq61=aq6.groupBy('card_type').agg(count('transaction_amount').alias('total_transaction_count'),count('inactive').alias('inactive_count'),\
                              round(count('inactive')/count('transaction_amount')*100.0,4).alias('inactive_count_percent'))

print("The output of the query performed is ")
print("=="*30)

aq61.orderBy('inactive_count_percent',ascending=False).toPandas().head(12)

The output of the query performed is 


Unnamed: 0,card_type,total_transaction_count,inactive_count,inactive_count_percent
0,Mastercard - on-us,458226,86000,18.768
1,VISA,170828,30713,17.9789
2,Dankort - on-us,143813,24680,17.1612
3,CIRRUS,17362,2953,17.0084
4,HÃƒÂ¦vekort - on-us,62487,10331,16.533
5,Dankort,28581,4557,15.9442
6,MasterCard,400507,63482,15.8504
7,Visa Dankort - on-us,748805,112972,15.087
8,HÃƒÂ¦vekort,8459,1208,14.2806
9,Visa Dankort,427840,60547,14.1518


7. Top 10 records with the number of transactions ordered by the ATM_number, ATM_manufacturer, location, weekend_flag and then total_transaction_count, on weekdays and on weekends throughout the year

In [59]:
# Joining and filtering records based on the expected results

aq7=table_f4.join(dim_atm,table_f4['atm_id']==dim_atm['atm_id']).\
    select(table_f4['atm_id'],'atm_manufacturer','location_id','transaction_amount','date_id')

aq71=aq7.join(dim_location,aq7['location_id']==dim_location['location_id']).\
    select('atm_id','atm_manufacturer','location','transaction_amount','date_id')

aq72= aq71.join(dim_date,aq71['date_id']==dim_date['date_id']).\
    select('atm_id','atm_manufacturer','location','transaction_amount','weekday')

aq73=aq72.withColumn('weekend_flag',when(((aq72['weekday']=='Saturday')|(aq72['weekday']=='Sunday')),1).otherwise(0)).\
    groupBy('atm_id','atm_manufacturer','location','weekend_flag')

aq73.agg(count('transaction_amount').alias('total_transaction_count')).\
    where((col('atm_id').like('10%'))|(col('atm_id')==1)).\
    orderBy(['atm_id','atm_manufacturer','location',"weekend_flag"]).toPandas().head(10)
    

Unnamed: 0,atm_id,atm_manufacturer,location,weekend_flag,total_transaction_count
0,1,NCR,NÃƒÂ¦stved,0,32711
1,1,NCR,NÃƒÂ¦stved,1,10076
2,10,NCR,NÃƒÂ¸rresundby,0,41667
3,10,NCR,NÃƒÂ¸rresundby,1,12127
4,100,NCR,Intern Skive,0,17812
5,100,NCR,Intern Skive,1,1
6,101,NCR,Bryggen Vejle,0,11693
7,101,NCR,Bryggen Vejle,1,3247
8,102,NCR,Aalborg Storcenter Afd,0,14556
9,102,NCR,Aalborg Storcenter Afd,1,3741


8. Most active day in each ATMs from location "Vejgaard"

In [60]:
# Joining and filtering records based on the expected results

aq8=table_f4.join(dim_atm,table_f4['atm_id']==dim_atm['atm_id']).\
    select(table_f4['atm_id'],'atm_manufacturer','location_id','transaction_amount','date_id')

aq81=aq8.join(dim_location,aq8['location_id']==dim_location['location_id']).\
    select('atm_id','atm_manufacturer','location','transaction_amount','date_id')

aq82= aq81.join(dim_date,aq81['date_id']==dim_date['date_id']).\
    select('atm_id','atm_manufacturer','location','transaction_amount','weekday').\
    filter(aq81.location=='Vejgaard').\
    groupBy('atm_id','atm_manufacturer','location','weekday')

aq82.agg(count('transaction_amount').alias('total_transaction_count')).\
    orderBy(['weekday','total_transaction_count']).distinct().toPandas().head(2)

Unnamed: 0,atm_id,atm_manufacturer,location,weekday,total_transaction_count
0,103,Diebold Nixdorf,Vejgaard,Friday,4757
1,2,NCR,Vejgaard,Friday,6290


## NEXT STEP / CONCLUSION:

We have done the transformation part here using pyspark, now we will do the loading and analyzing part in AWS Redshift Queries Editor UI