## ATM Transactions Assignment

## Setting up environment variables and parameters

In [1]:
# Below parameters needs to be updated with respective environments values

import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre" 
os.environ["SPARK_HOME"]="/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/" 
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

## Setting up Spark Session and Context

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ATM_Transactions_Project').master("local").getOrCreate()
spark

## Pulling the data from HDFS , setting up the schema as per the required output

In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

atmSchema = StructType([StructField('year', IntegerType(),False),
                        StructField('month', StringType(),False),
                        StructField('day', IntegerType(),False),
                        StructField('weekday', StringType(),False),
                        StructField('hour', IntegerType(),False),
                        StructField('atm_status', StringType(),False),
                        StructField('atm_id', IntegerType(),False),
                        StructField('atm_manufacturer', StringType(),False),
                        StructField('atm_location', StringType(),False),
                        StructField('atm_streetname', StringType(),False),
                        StructField('atm_street_number', IntegerType(),False),
                        StructField('atm_zipcode', IntegerType(),False),
                        StructField('atm_lat', DoubleType(),False),
                        StructField('atm_lon', DoubleType(),False),
                        StructField('currency', StringType(),False),
                        StructField('card_type', StringType(),False),
                        StructField('transaction_amount', IntegerType(),False), 
                        StructField('service', StringType(),False),
                        StructField('message_code', StringType(),True),
                        StructField('message_text', StringType(),True),
                        StructField('weather_lat', DoubleType(),False),
                        StructField('weather_lon', DoubleType(),False),
                        StructField('weather_city_id', IntegerType(),False),
                        StructField('weather_city_name', StringType(),False), 
                        StructField('temp', DoubleType(),False),
                        StructField('pressure', IntegerType(),False), 
                        StructField('humidity', IntegerType(),False), 
                        StructField('wind_speed', IntegerType(),False), 
                        StructField('wind_deg', IntegerType(),False), 
                        StructField('rain_3h', DoubleType(),True), 
                        StructField('clouds_all', IntegerType(),False), 
                        StructField('weather_id', IntegerType(),False), 
                        StructField('weather_main', StringType(),False), 
                        StructField('weather_description', StringType(),False), 
                        ])

In [4]:

atm_staging = spark.read.csv("/user/root/etl_assignment",header=False,schema=atmSchema)


In [5]:
atm_staging.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: s

## Validate the count of the entries pulled from RDS to Spark(HDFS) 

In [6]:
atm_staging.count()

2468572

## Review the data

In [7]:
atm_staging.select("year","atm_id","atm_manufacturer","atm_lat","message_code","clouds_all").show(3)

+----+------+----------------+-------+------------+----------+
|year|atm_id|atm_manufacturer|atm_lat|message_code|clouds_all|
+----+------+----------------+-------+------------+----------+
|2017|     1|             NCR| 55.233|        null|        92|
|2017|     2|             NCR| 57.043|        null|        92|
|2017|     2|             NCR| 57.043|        null|        92|
+----+------+----------------+-------+------------+----------+
only showing top 3 rows



## Creating staging table

In [8]:
atm_staging.registerTempTable("atm_staging");
atm_staging.cache()

DataFrame[year: int, month: string, day: int, weekday: string, hour: int, atm_status: string, atm_id: int, atm_manufacturer: string, atm_location: string, atm_streetname: string, atm_street_number: int, atm_zipcode: int, atm_lat: double, atm_lon: double, currency: string, card_type: string, transaction_amount: int, service: string, message_code: string, message_text: string, weather_lat: double, weather_lon: double, weather_city_id: int, weather_city_name: string, temp: double, pressure: int, humidity: int, wind_speed: int, wind_deg: int, rain_3h: double, clouds_all: int, weather_id: int, weather_main: string, weather_description: string]

In [9]:
spark.sql("select count(1) from atm_staging").show();

+--------+
|count(1)|
+--------+
| 2468572|
+--------+



## Dimension Table Creation

### Location Dimension Table Creation - DIM_LOCATION

In [10]:
# Importing pacakges to create unique id 
from pyspark.sql.functions import *
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window

# Creating staging table for Location dimension table and removing the duplicates
atm_location_staging=spark.sql("select distinct atm_location location, \
          atm_streetname streetname, \
          atm_street_number street_number,\
          atm_zipcode zipcode, \
          atm_lat lat, \
          atm_lon lon \
          from atm_staging \
          order by location");

# Adding the primary key column by generating unique id 
atm_location_staging = atm_location_staging.withColumn(
    "location_id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)

# Creating the dimension table by rearragning the columns as per expectation
DIM_LOCATION=atm_location_staging.select("location_id","location","streetname","street_number","zipcode","lat","lon")

#Registering the df to use in join with sparlsql
DIM_LOCATION.registerTempTable("DIM_LOCATION");


### DIM_LOCATION Validate the count -109

In [11]:
spark.sql("select count(*) from DIM_LOCATION").show()

+--------+
|count(1)|
+--------+
|     109|
+--------+



In [12]:
DIM_LOCATION.count()

109

### Validate the DIM_LOCATION Data

In [13]:
spark.sql("select * from DIM_LOCATION").show(109,truncate=False)

+-----------+---------------------------+------------------------+-------------+-------+------+------+
|location_id|location                   |streetname              |street_number|zipcode|lat   |lon   |
+-----------+---------------------------+------------------------+-------------+-------+------+------+
|1          |Aabybro                    |ÃƒËœstergade            |6            |9440   |57.162|9.73  |
|2          |Aalborg Hallen             |Europa Plads            |4            |9000   |57.044|9.913 |
|3          |Aalborg Storcenter  Afd    |Hobrovej                |452          |9200   |57.005|9.876 |
|4          |Aalborg Storcenter indg. D |Hobrovej                |452          |9200   |57.005|9.876 |
|5          |Aalborg Syd                |Hobrovej                |440          |9200   |57.005|9.881 |
|6          |AalbÃƒÂ¦k                  |Centralvej              |5            |9982   |57.593|10.412|
|7          |Aarhus                     |SÃƒÂ¸nder Alle          |11     

### ATM Dimension Table Creation - DIM_ATM

In [14]:
# Importing pacakges to create unique id 
from pyspark.sql.functions import *
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window

# Creating staging table for ATM dimension table and removing the duplicates
atm_data_staging=spark.sql("select distinct atm_id atm_number, \
          atm_manufacturer, \
          atm_lat, \
          atm_lon \
          from atm_staging");

#Generating the temp table to perfor join later with the DIM_LOCATION table
atm_data_staging.registerTempTable("atm_data_staging");

#Joining the tables to get normalized data based on location of atm
atm_data_staging=spark.sql("select distinct atm_number , \
          atm_manufacturer, \
          location_id atm_location_id\
          from atm_data_staging \
          left outer join DIM_LOCATION \
          on atm_data_staging.atm_lat = lat \
          and atm_data_staging.atm_lon = lon")

# Adding the primary key column by generating unique id 
atm_data_staging = atm_data_staging.withColumn(
    "atm_id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)

# Creating the dimension table by rearragning the columns as per expectation
DIM_ATM=atm_data_staging.select("atm_id","atm_number","atm_manufacturer","atm_location_id");

#Registering the df to use in join with sparlsql
DIM_ATM.registerTempTable("DIM_ATM");



### DIM_ATM Validate the count -156

In [15]:
spark.sql("select count(1) from DIM_ATM").show();

+--------+
|count(1)|
+--------+
|     156|
+--------+



### Validate the DIM_ATM Data

In [16]:
spark.sql("select * from DIM_ATM").show(156,truncate=False);

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|1     |29        |NCR             |84             |
|2     |110       |Diebold Nixdorf |59             |
|3     |105       |Diebold Nixdorf |76             |
|4     |111       |Diebold Nixdorf |8              |
|5     |87        |NCR             |3              |
|6     |100       |NCR             |87             |
|7     |2         |NCR             |103            |
|8     |21        |NCR             |108            |
|9     |91        |NCR             |86             |
|10    |18        |Diebold Nixdorf |105            |
|11    |6         |NCR             |25             |
|12    |36        |NCR             |52             |
|13    |82        |NCR             |83             |
|14    |41        |Diebold Nixdorf |82             |
|15    |59        |Diebold Nixdorf |72             |
|16    |88        |NCR             |4         

### ATM Transaction date Dimension Table Creation - DIM_DATE

In [17]:
# Importing pacakges to create unique id 
from pyspark.sql import functions as sf
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window

#Extrating the date attribtues to create the date staging dimension table
atm_date_staging = spark.sql("select distinct year,month,day,hour,weekday from atm_staging")

#Converting the Month values to euivalent numbered months values to ease addition of unix timestamp column full_date_time
atm_date_staging= atm_date_staging.withColumn("tmonth",from_unixtime(unix_timestamp(col("Month"),'MMM'),'MM'))
atm_date_staging = atm_date_staging.withColumn('full_date_time',sf.concat(sf.col('year'),sf.lit('/'),sf.col('tmonth'),sf.lit('/'),sf.col('day'),sf.lit(' '),sf.col('hour'),sf.lit(':'),sf.lit('00:00')))
atm_date_staging = atm_date_staging.withColumn('full_date_time', unix_timestamp(atm_date_staging['full_date_time'], 'yyyy/MM/dd HH:mm:ss').cast('timestamp'))

# Adding the primary key column by generating unique id 
atm_date_staging = atm_date_staging.withColumn(
    "date_id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)

# Creating the dimension table by rearragning the columns as per expectation
DIM_DATE=atm_date_staging.select("date_id","full_date_time","year","month","day","hour","weekday");

#Registering the df to use in join with sparlsql
DIM_DATE.registerTempTable("DIM_DATE");


### DIM_DATE Validate the count -8685

In [18]:
spark.sql("select count(1) from DIM_DATE").show();

+--------+
|count(1)|
+--------+
|    8685|
+--------+



### Validate the DIM_DATE Data

In [19]:
spark.sql("select * from DIM_DATE").show(5)

+-------+-------------------+----+--------+---+----+---------+
|date_id|     full_date_time|year|   month|day|hour|  weekday|
+-------+-------------------+----+--------+---+----+---------+
|      1|2017-01-02 16:00:00|2017| January|  2|  16|   Monday|
|      2|2017-01-15 09:00:00|2017| January| 15|   9|   Sunday|
|      3|2017-01-24 17:00:00|2017| January| 24|  17|  Tuesday|
|      4|2017-02-07 11:00:00|2017|February|  7|  11|  Tuesday|
|      5|2017-02-08 20:00:00|2017|February|  8|  20|Wednesday|
+-------+-------------------+----+--------+---+----+---------+
only showing top 5 rows



### ATM Card Type Dimension Table Creation - DIM_CARD_TYPE

In [20]:
#Extrating the card type attribtues to create the card type staging dimension table
atm_card_type = spark.sql("select distinct card_Type from atm_staging ");

# Adding the primary key column by generating unique id 
atm_card_type = atm_card_type.withColumn(
    "card_type_id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)

# Creating the dimension table by rearragning the columns as per expectation
DIM_CARD_TYPE=atm_card_type.select("card_type_id","card_type");

#Registering the df to use in join with sparlsql
DIM_CARD_TYPE.registerTempTable("DIM_CARD_TYPE");

### DIM_CARD_TYPE Validate the count -12

In [21]:
spark.sql("select count(*) from DIM_CARD_TYPE").show()

+--------+
|count(1)|
+--------+
|      12|
+--------+



### Validate the DIM_CARD_TYPE Data

In [22]:
spark.sql("select * from DIM_CARD_TYPE").show()

+------------+--------------------+
|card_type_id|           card_type|
+------------+--------------------+
|           1|     Dankort - on-us|
|           2|              CIRRUS|
|           3|         HÃƒÂ¦vekort|
|           4|                VISA|
|           5|  Mastercard - on-us|
|           6|             Maestro|
|           7|Visa Dankort - on-us|
|           8|        Visa Dankort|
|           9|            VisaPlus|
|          10|          MasterCard|
|          11|             Dankort|
|          12| HÃƒÂ¦vekort - on-us|
+------------+--------------------+



## Fact Table Creation

### Stage 1 : Left Outer Join of staging table and the location dimension table 

In [23]:
atm_ljoin_location = spark.sql("select dim_location.location_id,atm_staging.* \
                        from atm_staging \
                        left join dim_location\
                        on dim_location.location = atm_staging.atm_location \
                        and dim_location.streetname = atm_staging.atm_streetname \
                        and dim_location.street_number = atm_staging.atm_street_number \
                        and dim_location.lat = atm_staging.atm_lat \
                        and dim_location.lon = atm_staging.atm_lon");

### Renaming the atm_id column to avoid conflict while joining with the dimension table DIM_ATM

In [24]:
atm_ljoin_location=atm_ljoin_location.withColumnRenamed('atm_id','atm_id_temp')

atm_ljoin_location.registerTempTable("atm_ljoin_location");

### Stage 2: Left Joining the above resultset with the DIM_ATM table

In [25]:
atm_ljoin_atm = spark.sql("select dim_atm.atm_id,atm_ljoin_location.* \
                            from atm_ljoin_location \
                            left join dim_atm \
                            on dim_atm.atm_number = atm_ljoin_location.atm_id_temp \
                            and dim_atm.atm_manufacturer = atm_ljoin_location.atm_manufacturer \
                            and dim_atm.atm_location_id = atm_ljoin_location.location_id")

In [26]:
atm_ljoin_atm.registerTempTable("atm_ljoin_atm")

### Stage 3 : Left Join above resultset with the Date Dimensional Table DIM_DATE

In [27]:
atm_ljoin_date = spark.sql("select dim_date.date_id,dim_date.full_date_time,atm_ljoin_atm.* \
                            from atm_ljoin_atm left join dim_date \
                            on dim_date.year = atm_ljoin_atm.year \
                            and dim_date.month = atm_ljoin_atm.month \
                            and dim_date.day = atm_ljoin_atm.day \
                            and dim_date.hour = atm_ljoin_atm.hour \
                            and dim_date.weekday = atm_ljoin_atm.weekday")

### Renaming the card_type column to avoid conflict

In [28]:
atm_ljoin_date=atm_ljoin_date.withColumnRenamed('card_type','card_type_temp')
atm_ljoin_date.registerTempTable("atm_ljoin_date")

### Stage 4: Left Join the above resultset with the card type dimensional table DIM_CARD_TYPE

In [29]:
atm_ljoin_card = spark.sql("select dim_card_type.card_type_id,atm_ljoin_date.* \
                            from atm_ljoin_date \
                            left join dim_card_type \
                            on dim_card_type.card_type = atm_ljoin_date.card_type_temp")

### Cleaning up the staging fact table to hold only relevant columns

In [30]:

atm_ljoin_card=atm_ljoin_card.drop("year","month","day","weekday","hour","atm_id_temp","atm_manufacturer","atm_location",
                    "atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon","card_type_temp",
                   "weather_lat","weather_lon","weather_city_id","weather_city_name","temp","pressure",
                   "humidity","wind_speed","wind_deg")

In [31]:
atm_ljoin_card.printSchema();

root
 |-- card_type_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- full_date_time: timestamp (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- location_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)



### Validating the record in the staging fact table - 2468572

In [32]:
atm_ljoin_card.count()

2468572

### Setting up the primary key for fact table 

In [33]:
atm_ljoin_card=atm_ljoin_card.withColumnRenamed('location_id','weather_loc_id')

atm_ljoin_card = atm_ljoin_card.withColumn(
    "trans_id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)

FACT_ATM_TRANS = atm_ljoin_card.select("trans_id","atm_id","weather_loc_id","date_id","card_type_id","atm_status",
                                      "currency","service","transaction_amount","message_code","message_text",
                                      "rain_3h","clouds_all","weather_id","weather_main","weather_description")

In [34]:
FACT_ATM_TRANS.cache()

DataFrame[trans_id: int, atm_id: int, weather_loc_id: int, date_id: int, card_type_id: int, atm_status: string, currency: string, service: string, transaction_amount: int, message_code: string, message_text: string, rain_3h: double, clouds_all: int, weather_id: int, weather_main: string, weather_description: string]

### Fact Table Validation wrt to count and unique values for the respective foriegn keys

In [35]:
FACT_ATM_TRANS.count()

2468572

In [36]:
FACT_ATM_TRANS.filter(FACT_ATM_TRANS.trans_id.isNotNull()).count()

2468572

In [37]:
FACT_ATM_TRANS.filter(FACT_ATM_TRANS.atm_id.isNotNull()).count()

2468572

In [38]:
FACT_ATM_TRANS.filter(FACT_ATM_TRANS.weather_loc_id.isNotNull()).count()

2468572

In [39]:
FACT_ATM_TRANS.filter(FACT_ATM_TRANS.date_id.isNotNull()).count()

2468572

In [40]:
FACT_ATM_TRANS.filter(FACT_ATM_TRANS.card_type_id.isNotNull()).count()

2468572

### Validate the data for the FACT_ATM_TRANS DF

In [41]:
FACT_ATM_TRANS.select("trans_id","atm_id","weather_loc_id","date_id","card_type_id","atm_status").show(5)

+--------+------+--------------+-------+------------+----------+
|trans_id|atm_id|weather_loc_id|date_id|card_type_id|atm_status|
+--------+------+--------------+-------+------------+----------+
|       1|    14|            82|     10|           1|    Active|
|       2|    21|            85|     10|           1|    Active|
|       3|    26|             4|     10|           1|    Active|
|       4|    30|            16|     10|           1|    Active|
|       5|    30|            16|     10|           1|    Active|
+--------+------+--------------+-------+------------+----------+
only showing top 5 rows



## Transfer the dimension and fact table to S3

### Assumption : Necessay configuration are done to integrate S3 and cdh

In [62]:
DIM_LOCATION.write.save("s3a://upgradjjassignment/ETL/DIM_LOCATION",format='csv',header='true')

In [55]:
DIM_ATM.write.save("s3a://upgradjjassignment/ETL/DIM_ATM",format='csv',header='true')

In [42]:
DIM_DATE.write.save("s3a://upgradjjassignment/ETL/DIM_DATE",format='csv',header='true')

In [57]:
DIM_CARD_TYPE.write.save("s3a://upgradjjassignment/ETL/DIM_CARD_TYPE",format='csv',header='true')

In [43]:
FACT_ATM_TRANS.write.save("s3a://upgradjjassignment/ETL/FACT_ATM_TRANS",format='csv',header='true')