# ETL ASSIGNMENT: Danish ATM Transactions Data Set

In [1]:
#setting up environment variable

import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre"
os.environ["SPARK_HOME"]="/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
#creating a spark session

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('ATM_TRANS').master('local').getOrCreate()

spark

### READING THE DATA

##### Command to create an input schema using StructType(We recommend you to create a custom schema using the StructType class of PySpark, to avoid any data type mismatch.)

##### Commands to read the data using the input schema created and verifying the data using the count function

In [3]:
#Reading the data from the files in HDFS by a specific schema
#creating an input schema using StructType

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, LongType

fileschema= StructType([ \
                        StructField("year",IntegerType(),True), \
                        StructField("month",StringType(),True), \
                        StructField("day",IntegerType(),True), \
                        StructField("weekday",StringType(),True), \
                        StructField("hour",IntegerType(),True), \
                        StructField("atm_status",StringType(),True), \
                        StructField("atm_id",IntegerType(),True), \
                        StructField("atm_manufacturer",StringType(),True), \
                        StructField("atm_location",StringType(),True), \
                        StructField("atm_streetname",StringType(),True), \
                        StructField("atm_street_number",IntegerType(),True), \
                        StructField("atm_zipcode",IntegerType(),True), \
                        StructField("atm_lat",DoubleType(),True), \
                        StructField("atm_lon",DoubleType(),True), \
                        StructField("currency",StringType(),True), \
                        StructField("card_type",StringType(),True), \
                        StructField("transaction_amount",IntegerType(),True), \
                        StructField("service",StringType(),True), \
                        StructField("message_code",StringType(),True), \
                        StructField("message_text",StringType(),True), \
                        StructField("weather_lat",DoubleType(),True), \
                        StructField("weather_lon",DoubleType(),True), \
                        StructField("weather_city_id",IntegerType(),True), \
                        StructField("weather_city_name",StringType(),True), \
                        StructField("temp",DoubleType(),True), \
                        StructField("pressure",IntegerType(),True), \
                        StructField("humidity",IntegerType(),True), \
                        StructField("wind_speed",IntegerType(),True), \
                        StructField("wind_deg",IntegerType(),True), \
                        StructField("rain_3h",DoubleType(),True), \
                        StructField("clouds_all",IntegerType(),True), \
                        StructField("weather_id",IntegerType(),True), \
                        StructField("weather_main",StringType(),True), \
                        StructField("weather_description",StringType(),True), \
])

In [4]:
# reading the data using the input schema created

data= spark.read.csv("atm_trans",schema=fileschema)

# verifying the data using the count function

data.count()

2468572

### CREATING DIMENSION TABLES

##### Command to create a data frame for the dimension according to the target schema(dimension model) provided

##### Commands to clean and transform the data: 
###### -Making sure that duplicate records are cleaned(Avoid duplicate info especially in the dimension tables.
###### -Making sure that appropriate primary keys are present for the dimensions( You need to generate a primary key for each dimension table. For example for the 'Date' dimension one way to generate the primary key can be by adding "date" as the prefix  to the row number i.e. 'date1', 'date2' and so on.) 
###### -Rearranging the fields if necessary(According to the target schema)

In [5]:
#Importing all the Spark Sql functions

from pyspark.sql.functions import *

#### LOCATION DIMENSION

In [6]:
#creating the location dimension keeping only the necessary fields.
#dropping duplicate values

dim_location=data.select('atm_location','atm_streetname','atm_street_number','atm_zipcode','atm_lat','atm_lon').distinct()

dim_location.show(20)

+--------------------+----------------+-----------------+-----------+-------+-------+
|        atm_location|  atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|
+--------------------+----------------+-----------------+-----------+-------+-------+
|               Vadum|  Ellehammersvej|               43|       9430| 57.118|  9.861|
|            Slagelse| Mariendals Alle|               29|       4200| 55.398| 11.342|
|          Fredericia|SjÃƒÂ¦llandsgade|               33|       7000| 55.564|  9.757|
|             Kolding|        Vejlevej|              135|       6000| 55.505|  9.457|
|   HÃƒÂ¸rning Hallen|        Toftevej|               53|       8362| 56.091| 10.033|
|                Aars| Himmerlandsgade|               70|       9600| 56.803|  9.518|
|     Aarhus Lufthavn| Ny Lufthavnsvej|               24|       8560| 56.308| 10.627|
|                 Fur|      StenÃƒÂ¸re|               19|       7884| 56.805|   9.02|
|            Hasseris|     Hasserisvej|              1

In [7]:
# creating location_id column as primary key
#renaming the columns as per the target schema

from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id,row_number

dim_location=dim_location.withColumn("location_id",row_number().over(Window.orderBy(monotonically_increasing_id())))
dim_location=dim_location.withColumnRenamed('atm_location','location') \
.withColumnRenamed('atm_streetname','streetname') \
.withColumnRenamed('atm_street_number','street_number') \
.withColumnRenamed('atm_zipcode','zipcode') \
.withColumnRenamed('atm_lat','lat') \
.withColumnRenamed('atm_lon','lon')

dim_location=dim_location.select('location_id','location','streetname','street_number','zipcode','lat','lon')

dim_location.show()

+-----------+--------------------+----------------+-------------+-------+------+------+
|location_id|            location|      streetname|street_number|zipcode|   lat|   lon|
+-----------+--------------------+----------------+-------------+-------+------+------+
|          1|               Vadum|  Ellehammersvej|           43|   9430|57.118| 9.861|
|          2|            Slagelse| Mariendals Alle|           29|   4200|55.398|11.342|
|          3|          Fredericia|SjÃƒÂ¦llandsgade|           33|   7000|55.564| 9.757|
|          4|             Kolding|        Vejlevej|          135|   6000|55.505| 9.457|
|          5|   HÃƒÂ¸rning Hallen|        Toftevej|           53|   8362|56.091|10.033|
|          6|                Aars| Himmerlandsgade|           70|   9600|56.803| 9.518|
|          7|     Aarhus Lufthavn| Ny Lufthavnsvej|           24|   8560|56.308|10.627|
|          8|                 Fur|      StenÃƒÂ¸re|           19|   7884|56.805|  9.02|
|          9|            Hasseri

In [8]:
# verifying location dimension using count function

dim_location.count()

109

In [9]:
# verifying schema using printSchema

dim_location.printSchema()

root
 |-- location_id: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)



#### ATM DIMENSION

In [10]:
#using dict to generate foreign key atm_location_id referencing location dimension.
#creating the ATM dimension keeping only the necessary fields
#dropping duplicate values


id_location_dict = dict()

for location_id,location in dim_location.select('location_id','location').collect():
  id_location_dict[location]=location_id

add_location_id=udf(lambda location: id_location_dict[location])

dim_atm=data.select('atm_id','atm_manufacturer','atm_location').withColumn('atm_location', add_location_id('atm_location'))\
.withColumnRenamed('atm_location','atm_location_id').distinct().orderBy('atm_id')

#creating atm_number field

dim_atm=dim_atm.withColumn('atm_number',row_number().over(Window.orderBy(monotonically_increasing_id())))\
                    .select('atm_id','atm_number','atm_manufacturer','atm_location_id')

#casting the datatype of the columns as per the target schema

dim_atm=dim_atm.withColumn('atm_location_id',col('atm_location_id').cast(IntegerType()))\
.withColumn('atm_number',col('atm_number').cast(StringType()))


dim_atm.show()

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|     1|         1|             NCR|             98|
|     2|         2|             NCR|            103|
|     3|         3|             NCR|             47|
|     4|         4|             NCR|             75|
|     5|         5|             NCR|             50|
|     6|         6|             NCR|              3|
|     7|         7| Diebold Nixdorf|             87|
|     8|         8|             NCR|             21|
|     9|         9| Diebold Nixdorf|             76|
|    10|        10|             NCR|             26|
|    11|        11|             NCR|             77|
|    12|        12|             NCR|             25|
|    13|        13|             NCR|             14|
|    14|        14|             NCR|             95|
|    15|        15|             NCR|             36|
|    16|        16|             NCR|          

In [11]:
# verifying ATM dimension using count function

dim_atm.count()

113

In [12]:
# verifying schema using printSchema

dim_atm.printSchema()

root
 |-- atm_id: integer (nullable = true)
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location_id: integer (nullable = true)



#### DATE DIMENSION

In [13]:
#creating the date dimension keeping only the necessary fields
#dropping duplicate values

date=data.select('year','month','day','hour','weekday').distinct()

date.show()

+----+--------+---+----+--------+
|year|   month|day|hour| weekday|
+----+--------+---+----+--------+
|2017| January|  1|   9|  Sunday|
|2017| January|  3|   5| Tuesday|
|2017| January|  8|  19|  Sunday|
|2017| January| 21|   3|Saturday|
|2017| January| 23|  21|  Monday|
|2017|February|  2|  19|Thursday|
|2017|February|  5|  16|  Sunday|
|2017|February| 21|  15| Tuesday|
|2017|   March|  2|   8|Thursday|
|2017|   April|  2|   2|  Sunday|
|2017|   April|  6|   8|Thursday|
|2017|   April| 30|  10|  Sunday|
|2017|     May|  2|   2| Tuesday|
|2017|     May| 20|  16|Saturday|
|2017|     May| 21|  19|  Sunday|
|2017|    June| 27|   0| Tuesday|
|2017|    July| 18|   9| Tuesday|
|2017|    July| 18|  22| Tuesday|
|2017|    July| 20|   0|Thursday|
|2017|    July| 21|  19|  Friday|
+----+--------+---+----+--------+
only showing top 20 rows



In [14]:
from datetime import datetime

# merging date and casting to timestamp.

merge_date= udf(lambda day, month, year, hour:"{}/{}/{} {}".format(day, datetime.strptime(month, '%B').month, year, hour))

dim_date=date.withColumn('full_date_time', merge_date('day','month','year','hour'))
dim_date=dim_date.withColumn('full_date_time', to_timestamp("full_date_time", format='d/M/yyyy H'))

#creating date_id as primary key

dim_date=dim_date.withColumn('date_id',row_number().over(Window.orderBy(monotonically_increasing_id())))\
.select('date_id','full_date_time','year','month','day','hour','weekday')

dim_date.show()

+-------+-------------------+----+--------+---+----+--------+
|date_id|     full_date_time|year|   month|day|hour| weekday|
+-------+-------------------+----+--------+---+----+--------+
|      1|2017-01-01 09:00:00|2017| January|  1|   9|  Sunday|
|      2|2017-01-03 05:00:00|2017| January|  3|   5| Tuesday|
|      3|2017-01-08 19:00:00|2017| January|  8|  19|  Sunday|
|      4|2017-01-21 03:00:00|2017| January| 21|   3|Saturday|
|      5|2017-01-23 21:00:00|2017| January| 23|  21|  Monday|
|      6|2017-02-02 19:00:00|2017|February|  2|  19|Thursday|
|      7|2017-02-05 16:00:00|2017|February|  5|  16|  Sunday|
|      8|2017-02-21 15:00:00|2017|February| 21|  15| Tuesday|
|      9|2017-03-02 08:00:00|2017|   March|  2|   8|Thursday|
|     10|2017-04-02 02:00:00|2017|   April|  2|   2|  Sunday|
|     11|2017-04-06 08:00:00|2017|   April|  6|   8|Thursday|
|     12|2017-04-30 10:00:00|2017|   April| 30|  10|  Sunday|
|     13|2017-05-02 02:00:00|2017|     May|  2|   2| Tuesday|
|     14

In [15]:
# verifying date dimension using count function

dim_date.count()

8685

In [16]:
# verifying schema using printSchema

dim_date.printSchema()

root
 |-- date_id: integer (nullable = true)
 |-- full_date_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)



#### CARD TYPE DIMENSION

In [17]:
# creating the card type dimension keeping only the necessary fields
# dropping duplicate values

card=data.select('card_type').distinct()

# creating card_type_id as primary key

dim_card=card.withColumn("card_type_id",row_number().over(Window.orderBy(monotonically_increasing_id())))\
.select('card_type_id','card_type')

dim_card.show()

+------------+--------------------+
|card_type_id|           card_type|
+------------+--------------------+
|           1|     Dankort - on-us|
|           2|              CIRRUS|
|           3|         HÃƒÂ¦vekort|
|           4|                VISA|
|           5|  Mastercard - on-us|
|           6|             Maestro|
|           7|Visa Dankort - on-us|
|           8|        Visa Dankort|
|           9|            VisaPlus|
|          10|          MasterCard|
|          11|             Dankort|
|          12| HÃƒÂ¦vekort - on-us|
+------------+--------------------+



In [18]:
# verifying card type dimension using count function

dim_card.count()

12

In [19]:
# verifying schema using printSchema

dim_card.printSchema()

root
 |-- card_type_id: integer (nullable = true)
 |-- card_type: string (nullable = true)



### CREATING FACT TABLE
#### ATM TRANSACTION FACT TABLE

In [24]:
# using dict to generate foreign keys in the fact table.

weather_loc_dict=dict()
date_id_dict=dict()
cardType_id_dict=dict()


for location_id,location in dim_location.select('location_id','location').collect():
  weather_loc_dict[location]=location_id

for card_type_id, card_type in dim_card.select('card_type_id','card_type').collect():
  cardType_id_dict[card_type]=card_type_id

for date_id, date in dim_date.select('date_id','full_date_time').collect():
  date_id_dict[date]=date_id


add_weather_loc_id=udf(lambda location: weather_loc_dict[location])
add_date_id=udf(lambda date:date_id_dict[date])
add_cardType_id=udf(lambda card_type: cardType_id_dict[card_type])

In [25]:
# creating fact table using udf to generate foreign keys referencing to dimension tables.

fact=data.select('atm_id','atm_location','year','month','day','hour','card_type','atm_status','currency','service',\
                 'transaction_amount','message_code','message_text','rain_3h','clouds_all','weather_id','weather_main'\
                 ,'weather_description')\
.withColumn('atm_location',add_weather_loc_id('atm_location'))\
.withColumn('card_type',add_cardType_id('card_type'))\
.withColumn('date', merge_date('day','month','year','hour')).withColumn('date', to_timestamp("date", format='d/M/yyyy H'))\
.withColumn('date',add_date_id('date'))\
.withColumnRenamed('date','date_id')\
.withColumnRenamed('atm_location','weather_loc_id')\
.withColumnRenamed('card_type','card_type_id')

# creating primary key trans_id

fact_atm_trans=fact.withColumn('trans_id',row_number().over(Window.orderBy(monotonically_increasing_id())))\
.select('trans_id','atm_id','weather_loc_id','date_id','card_type_id','atm_status','currency','service','transaction_amount',\
        'message_code','message_text','rain_3h','clouds_all','weather_id','weather_main','weather_description')

#casting the datatype of the columns as per the target schema

fact_atm_trans=fact_atm_trans.withColumn('trans_id',col('trans_id').cast(LongType()))\
.withColumn('weather_loc_id',col('weather_loc_id').cast(IntegerType()))\
.withColumn('date_id',col('date_id').cast(IntegerType()))\
.withColumn('card_type_id',col('card_type_id').cast(IntegerType()))

fact_atm_trans.show()

+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|trans_id|atm_id|weather_loc_id|date_id|card_type_id|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|       1|     1|            98|   8339|          10|    Active|     DKK|Withdrawal|              5643|        null|        null|  0.215|        92|       500|        Rain|          light rain|
|       2|     2|           103|   8339|          10|  Inactive|     DKK|Withdrawal|              1764|        null|        null|   0.59|        92|       500|        Rain|          light rain|
|       3|     2|           10

In [26]:
# verifying atm fact table using count function

fact_atm_trans.count()

2468572

In [27]:
# verifying schema using printSchema

fact_atm_trans.printSchema()

root
 |-- trans_id: long (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- weather_loc_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)



### LOADING THE DIMENSION AND FACT TABLE INTO AMAZON S3 BUCKET

##### Write the DataFrames containing the dimensions and fact table directly to an S3 bucket folder. [You should create different folders on your S3 bucket for different dimensions and fact table.]



In [28]:
## loading all the tables to S3

dim_location.write.csv("s3a://atm-trans-tables/DIM_LOCATION")
dim_atm.write.csv("s3a://atm-trans-tables/DIM_ATM")
dim_date.write.csv("s3a://atm-trans-tables/DIM_DATE")
dim_card.write.csv("s3a://atm-trans-tables/DIM_CARD_TYPE")
fact_atm_trans.write.csv("s3a://atm-trans-tables/FACT_ATM_TRANS")