# ETL Project - <font color='red'> Transformation </font>

In [1]:
# Setting up environment for the pyspark to run properly

import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre"
os.environ["SPARK_HOME"]="/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
# Initializing SparkSession

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('etl_project').master("local").getOrCreate()
spark

In [3]:
# Importing important libraries

from pyspark.sql.types import *
from pyspark.sql.functions import col,lit
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import from_unixtime
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import concat
from pyspark.sql.functions import lpad

In [4]:
# Importing the data from HDFS
df = spark.read.csv("/user/ec2-user/ETL_Project/data", inferSchema= "True")

In [5]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: double (nullable = true)
 |-- _c13: double (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: integer (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: integer (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: double (nullable = true)
 |-- _c21: double (nullable = true)
 |-- _c22: integer (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: double (nullable = true)
 |-- _c25: integer (nullable = true)
 |-- _c26: integer (nullable = true)
 |-- _c27: integer (nu

Inference:
- The column names are not present.
- The dataType for the columns are not specified.

In [6]:
# Setting the input schema for the downloaded dataset

input_schema = StructType([StructField('year', IntegerType(), True),
                           StructField('month', StringType(), True),
                           StructField('day', IntegerType(), True),
                           StructField('weekday', StringType(), True),
                           StructField('hour', IntegerType(), True),
                           StructField('atm_status', StringType(), True),
                           StructField('atm_id', IntegerType(), True),
                           StructField('atm_manufacturer', StringType(), True),
                           StructField('atm_location', StringType(), True),
                           StructField('atm_street', StringType(), True),
                           StructField('atm_street_number', IntegerType(), True),
                           StructField('atm_zipcode', IntegerType(), True),
                           StructField('atm_lat', FloatType(), True),
                           StructField('atm_lon', FloatType(), True),
                           StructField('currency', StringType(), True),
                           StructField('card_type', StringType(), True),
                           StructField('transaction_amount', IntegerType(), True),
                           StructField('service', StringType(), True),
                           StructField('message_code', StringType(), True),
                           StructField('message_text', StringType(), True),
                           StructField('weather_lat', FloatType(), True),
                           StructField('weather_lon', FloatType(), True),
                           StructField('weather_city_id', IntegerType(), True),
                           StructField('weather_city_name', StringType(), True),
                           StructField('temp', FloatType(), True),
                           StructField('pressure', IntegerType(), True),
                           StructField('humidity', IntegerType(), True),
                           StructField('wind_speed', IntegerType(), True),
                           StructField('wind_deg', IntegerType(), True),
                           StructField('rain_3h', FloatType(), True),
                           StructField('clouds_all', IntegerType(), True),
                           StructField('weather_id', IntegerType(), True),
                           StructField('weather_main', StringType(), True),
                           StructField('weather_description', StringType(), True)])

In [7]:
# Importing the data from the HDFS with the correct schema
df = spark.read.csv("/user/ec2-user/ETL_Project/data", schema = input_schema)

In [8]:
df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_street: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: float (nullable = true)
 |-- atm_lon: float (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string (n

In [9]:
df.count()

2468572

#### Validation after importing the data into dataframe
- Count of Records = 2468572

## Location Dimension Table

In [10]:
# Creating a Location Dimension Table from the dataset given.
location_dim = df.select(df.atm_location.alias('location'), df.atm_street.alias('streetname'),
          df.atm_street_number.alias('street_number'), df.atm_zipcode.alias('zipcode'),
          df.atm_lat.alias('lat'), df.atm_lon.alias('lon'))

In [11]:
location_dim.printSchema()

root
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: float (nullable = true)
 |-- lon: float (nullable = true)



In [12]:
location_dim.show(5)

+----------+-------------------+-------------+-------+------+------+
|  location|         streetname|street_number|zipcode|   lat|   lon|
+----------+-------------------+-------------+-------+------+------+
|NÃƒÂ¦stved|        Farimagsvej|            8|   4700|55.233|11.763|
|  Vejgaard|         Hadsundvej|           20|   9000|57.043|  9.95|
|  Vejgaard|         Hadsundvej|           20|   9000|57.043|  9.95|
|     Ikast|RÃƒÂ¥dhusstrÃƒÂ¦det|           12|   7430|56.139| 9.154|
|Svogerslev|       BrÃƒÂ¸nsager|            1|   4000|55.634|12.018|
+----------+-------------------+-------------+-------+------+------+
only showing top 5 rows



In [13]:
location_dim.count()

2468572

In [14]:
location_dim = location_dim.distinct()

In [15]:
location_dim.count()

109

#### Validation for Location Dimension Table
- Count of Records = 109

In [16]:
location_dim= location_dim.withColumn("new_column",lit("ABC"))

In [17]:
# creating primary key 
w = Window().partitionBy('new_column').orderBy(lit('A'))
location_dim = location_dim.withColumn("atm_location_id", row_number().over(w)).drop("new_column")

In [18]:
location_dim.printSchema()

root
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: float (nullable = true)
 |-- lon: float (nullable = true)
 |-- atm_location_id: integer (nullable = true)



In [19]:
location= location_dim.select('atm_location_id', 'location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon')

In [20]:
location.show(5)

+---------------+--------------------+-----------+-------------+-------+------+------+
|atm_location_id|            location| streetname|street_number|zipcode|   lat|   lon|
+---------------+--------------------+-----------+-------------+-------+------+------+
|              1|Aalborg Storcente...|   Hobrovej|          452|   9200|57.005| 9.876|
|              2|            Hasseris|Hasserisvej|          113|   9000|57.044| 9.898|
|              3|         Bispensgade|Bispensgade|           35|   9800|57.453| 9.996|
|              4|           HolbÃƒÂ¦k|Slotsvolden|            7|   4300|55.718|11.704|
|              5|            Bindslev|NÃƒÂ¸rrebro|           18|   9881|57.541|  10.2|
+---------------+--------------------+-----------+-------------+-------+------+------+
only showing top 5 rows



## Card-Type Dimension Table

In [21]:
# Creating a card-type dimension table from card_type col in the dataset.
card_type_dim = df.select(df.card_type)

In [22]:
card_type_dim.count()

2468572

In [23]:
card_type_dim = card_type_dim.distinct()

In [24]:
card_type_dim.count()

12

#### Validation for Card-Type Dimension table
- Count of Records = 12

In [25]:
card_type_dim= card_type_dim.withColumn("new_column",lit("ABC"))

In [26]:
# creating primary key 
w = Window().partitionBy('new_column').orderBy(lit('A'))
card_type_dim= card_type_dim.withColumn("card_type_id", row_number().over(w)).drop("new_column")

In [27]:
card_type_dim.printSchema()

root
 |-- card_type: string (nullable = true)
 |-- card_type_id: integer (nullable = true)



In [28]:
card_type = card_type_dim.select('card_type_id', 'card_type')

In [29]:
card_type.show(5)

+------------+------------------+
|card_type_id|         card_type|
+------------+------------------+
|           1|   Dankort - on-us|
|           2|            CIRRUS|
|           3|       HÃƒÂ¦vekort|
|           4|              VISA|
|           5|Mastercard - on-us|
+------------+------------------+
only showing top 5 rows



## Date Dimension Table

In [30]:
# Creating Date dimension table from the date related columns from the dataset.
date_dim = df.select(df.year.alias('year'),
                     df.month.alias('month'),
                     df.day.alias('day'),
                     df.hour.alias('hour'),
                     df.weekday.alias('weekday')).distinct()

In [31]:
date_dim.show(5)

+----+-------+---+----+--------+
|year|  month|day|hour| weekday|
+----+-------+---+----+--------+
|2017|January|  1|   9|  Sunday|
|2017|January|  3|   5| Tuesday|
|2017|January|  8|  19|  Sunday|
|2017|January| 21|   3|Saturday|
|2017|January| 23|  21|  Monday|
+----+-------+---+----+--------+
only showing top 5 rows



In [32]:
date_dim.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)



In [33]:
date_dim.count()

8685

#### Validation Date Dimension Table
- Count of Records = 8685

In [34]:
from pyspark.sql.functions  import date_format
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import to_date

In [35]:
# createing New month column with Integer values
date_dim = date_dim.withColumn('month_new', date_format(to_date(col('month'), 'MMMMM'), 'MM').cast(IntegerType()))

In [36]:
## adding new Month, day and hours columns with Zeroes 
date_dim = date_dim.withColumn('month_new', lpad(col('month_new'),2,'0')).withColumn('day_new', lpad(col('day'),2,'0')).withColumn('hour_new', lpad(col('hour'),2,'0'))

In [37]:
# Create a new column Full_Date_time by combining Year, new month, day, hour and "00" value to create timestamp in YYYYMMDDMI24HHMI format.
date_dim_final = date_dim.withColumn("full_date_time",concat(col('year'),col('month_new'),col('day_new'),col('hour_new'),lit('00')))

In [38]:
date_dim_final.show(5)

+----+-------+---+----+--------+---------+-------+--------+--------------+
|year|  month|day|hour| weekday|month_new|day_new|hour_new|full_date_time|
+----+-------+---+----+--------+---------+-------+--------+--------------+
|2017|January|  1|   9|  Sunday|       01|     01|      09|  201701010900|
|2017|January|  3|   5| Tuesday|       01|     03|      05|  201701030500|
|2017|January|  8|  19|  Sunday|       01|     08|      19|  201701081900|
|2017|January| 21|   3|Saturday|       01|     21|      03|  201701210300|
|2017|January| 23|  21|  Monday|       01|     23|      21|  201701232100|
+----+-------+---+----+--------+---------+-------+--------+--------------+
only showing top 5 rows



In [39]:
# Creating primary key for the dimension with name date_id
date_dim_final= date_dim_final.withColumn("new_column",lit("ABC"))
w = Window().partitionBy('new_column').orderBy(lit('A'))
date_dim_final= date_dim_final.withColumn("date_id", row_number().over(w)).drop("new_column")

In [40]:
# creating data from dim_date_final
date = date_dim_final.select("date_id","full_date_time","year","month","day","hour","weekday")

In [41]:
date.count()

8685

In [42]:
date.printSchema()

root
 |-- date_id: integer (nullable = true)
 |-- full_date_time: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)



In [43]:
date.show(5)

+-------+--------------+----+-------+---+----+--------+
|date_id|full_date_time|year|  month|day|hour| weekday|
+-------+--------------+----+-------+---+----+--------+
|      1|  201701010900|2017|January|  1|   9|  Sunday|
|      2|  201701030500|2017|January|  3|   5| Tuesday|
|      3|  201701081900|2017|January|  8|  19|  Sunday|
|      4|  201701210300|2017|January| 21|   3|Saturday|
|      5|  201701232100|2017|January| 23|  21|  Monday|
+-------+--------------+----+-------+---+----+--------+
only showing top 5 rows



## ATM Dimension Table

In [49]:
# Creating the dimension table with ATM details
atm_dim = df.select(df.atm_id.alias('atm_number'),
                    df.atm_manufacturer.alias('atm_manufacturer'),
                    df.atm_lat.alias('lat'),
                    df.atm_lon.alias('lon'))

In [50]:
atm_dim.count()

2468572

In [51]:
atm_dim.show(5)

+----------+----------------+------+------+
|atm_number|atm_manufacturer|   lat|   lon|
+----------+----------------+------+------+
|         1|             NCR|55.233|11.763|
|         2|             NCR|57.043|  9.95|
|         2|             NCR|57.043|  9.95|
|         3|             NCR|56.139| 9.154|
|         4|             NCR|55.634|12.018|
+----------+----------------+------+------+
only showing top 5 rows



In [52]:
atm_dim.printSchema()

root
 |-- atm_number: integer (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- lat: float (nullable = true)
 |-- lon: float (nullable = true)



In [53]:
#To add atm_location_id of dim_location df as a foreign key to the atm table, adding left join to the atm table and locatino table.
atm_dim = atm_dim.join(location_dim, aon = ["lat","lon"],how = "leftouter")


In [55]:
# Taking distinct values to avoid repeated values 
atm_distinct =atm_dim.distinct()

In [56]:
# Checking dataframe 
atm_distinct.show(5)

+------+------+----------+----------------+--------------------+----------------+-------------+-------+---------------+
|   lat|   lon|atm_number|atm_manufacturer|            location|      streetname|street_number|zipcode|atm_location_id|
+------+------+----------+----------------+--------------------+----------------+-------------+-------+---------------+
|56.448| 9.401|        18| Diebold Nixdorf|              Viborg|       Toldboden|            3|   8800|             13|
|55.705| 9.532|       101|             NCR|      Bryggen  Vejle|SÃƒÂ¸nderbrogade|            2|   7100|              6|
|56.716|10.114|         9| Diebold Nixdorf|             Hadsund|       Storegade|           12|   9560|             25|
|55.859| 9.854|        64|             NCR|             Horsens| GrÃƒÂ¸nlandsvej|            5|   8700|             88|
|56.795|  8.86|        30|             NCR|NykÃƒÂ¸bing Mors ...|     Kirketorvet|            1|   7900|             10|
+------+------+----------+--------------

In [57]:
#adding primary key
atm_distinct= atm_distinct.withColumn("new_column",lit("ABC"))
w = Window().partitionBy('new_column').orderBy(lit('A'))
atm_distinct= atm_distinct.withColumn("atm_id", row_number().over(w)).drop("new_column")

In [58]:
# Checking newly created primary key 
atm_distinct.show(5)

+------+------+----------+----------------+--------------------+----------------+-------------+-------+---------------+------+
|   lat|   lon|atm_number|atm_manufacturer|            location|      streetname|street_number|zipcode|atm_location_id|atm_id|
+------+------+----------+----------------+--------------------+----------------+-------------+-------+---------------+------+
|56.448| 9.401|        18| Diebold Nixdorf|              Viborg|       Toldboden|            3|   8800|             13|     1|
|55.705| 9.532|       101|             NCR|      Bryggen  Vejle|SÃƒÂ¸nderbrogade|            2|   7100|              6|     2|
|56.716|10.114|         9| Diebold Nixdorf|             Hadsund|       Storegade|           12|   9560|             25|     3|
|55.859| 9.854|        64|             NCR|             Horsens| GrÃƒÂ¸nlandsvej|            5|   8700|             88|     4|
|56.795|  8.86|        30|             NCR|NykÃƒÂ¸bing Mors ...|     Kirketorvet|            1|   7900|        

In [59]:
# creating atm from atm_distinct
atm = atm_distinct.select('atm_id','atm_number','atm_manufacturer','atm_location_id')

In [60]:
# Checking top rows of atm
atm.show(5)

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|     1|        18| Diebold Nixdorf|             13|
|     2|       101|             NCR|              6|
|     3|         9| Diebold Nixdorf|             25|
|     4|        64|             NCR|             88|
|     5|        30|             NCR|             10|
+------+----------+----------------+---------------+
only showing top 5 rows



In [61]:
atm.count()

156

#### Validation for the ATM Dimension Table
- Count of Records = 156

### Fact Table

In [62]:
# Creating alias
df = df.alias('df')
location = location.alias('location')
card_type = card_type.alias('card_type')
date = date.alias('date')
atm = atm.alias('atm')

#### - Creating fact table will take 4 steps by outer left joining the input table with dimension tables
#### - Dropping columns as required except primary keys of dimension table as they will act as foreign key

### Creating first_df

In [63]:
date.printSchema()

root
 |-- date_id: integer (nullable = true)
 |-- full_date_time: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)



In [64]:
first_df = df.join(date, on = ['year', 'month', 'day', 'hour', 'weekday'], how = 'left').select('df.*', 'date.date_id').drop(*['year', 'month', 'day', 'hour', 'weekday'])

In [65]:
first_df = first_df.alias('first_df')

In [66]:
first_df.count()

2468572

### Creating second_df

In [67]:
card_type.printSchema()

root
 |-- card_type_id: integer (nullable = true)
 |-- card_type: string (nullable = true)



In [77]:
second_df = first_df.join(card_type, on = ['card_type'], how = 'left').select('first_df.*', 'card_type.card_type_id').drop(*['card_type'])

In [78]:
second_df.count()

2468572

In [79]:
second_df.printSchema()

root
 |-- atm_status: string (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_street: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: float (nullable = true)
 |-- atm_lon: float (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string (nullable = true)
 |-- temp: float (nullable = true)
 |-- pressure: integer (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- wind_speed: integer (nullable = true)
 |-- wind_deg: integer (nullable = true)
 |-- rain_

In [80]:
second_df = second_df.alias('second_df')

### Creating third_df

In [81]:
location.printSchema()

root
 |-- atm_location_id: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: float (nullable = true)
 |-- lon: float (nullable = true)



In [82]:
second_df.printSchema()

root
 |-- atm_status: string (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_street: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: float (nullable = true)
 |-- atm_lon: float (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string (nullable = true)
 |-- temp: float (nullable = true)
 |-- pressure: integer (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- wind_speed: integer (nullable = true)
 |-- wind_deg: integer (nullable = true)
 |-- rain_

In [83]:
second_df = second_df.withColumnRenamed('atm_location', 'location').withColumnRenamed('atm_lat', 'lat').withColumnRenamed('atm_lon', 'lon').withColumnRenamed('atm_street', 'streetname').withColumnRenamed('atm_street_number', 'street_number').withColumnRenamed('atm_zipcode', 'zipcode')
third_df = second_df.join(location, on = ['location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon'], how = 'left').select('second_df.*', 'location.atm_location_id').drop(*['location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon'])

In [84]:
third_df.count()

2468572

In [85]:
third_df = third_df.alias('third_df')

In [86]:
third_df.printSchema()

root
 |-- atm_status: string (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string (nullable = true)
 |-- temp: float (nullable = true)
 |-- pressure: integer (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- wind_speed: integer (nullable = true)
 |-- wind_deg: integer (nullable = true)
 |-- rain_3h: float (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- ca

- We can see here that there is already a atm_id and the primary key for atm dimenstion table is atm_id too, so we will change this to atm_number

In [87]:
third_df = third_df.withColumnRenamed('atm_id', 'atm_number')

### Creating fourth_df

In [88]:
atm.printSchema()

root
 |-- atm_id: integer (nullable = true)
 |-- atm_number: integer (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location_id: integer (nullable = true)



In [89]:
fourth_df = third_df.join(atm, on = ['atm_number', 'atm_manufacturer', 'atm_location_id'], how = 'left').select('third_df.*', 'atm.atm_id').drop(*['atm_number', 'atm_manufacturer'])

In [90]:
fourth_df.count()

2468572

In [91]:
fourth_df.printSchema()

root
 |-- atm_location_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string (nullable = true)
 |-- temp: float (nullable = true)
 |-- pressure: integer (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- wind_speed: integer (nullable = true)
 |-- wind_deg: integer (nullable = true)
 |-- rain_3h: float (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 

### Creating final fact table <font color='red'> fact_atm_trans </font>

In [92]:
fact_atm_trans = fourth_df.alias('fact_atm_trans')

In [93]:
fact_atm_trans.count()

2468572

In [94]:
#adding our primary key to fact table
fact_atm_trans= fact_atm_trans.withColumn("new_column",lit("ABC"))
w = Window().partitionBy('new_column').orderBy(lit('A'))
fact_atm_trans= fact_atm_trans.withColumn("trans_id", row_number().over(w)).drop("new_column")

In [95]:
# Checking schema of fact table
fact_atm_trans.printSchema()

root
 |-- atm_location_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string (nullable = true)
 |-- temp: float (nullable = true)
 |-- pressure: integer (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- wind_speed: integer (nullable = true)
 |-- wind_deg: integer (nullable = true)
 |-- rain_3h: float (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 

In [96]:
 # dropping irrelevant columns as per schema 
fact_atm_trans = fact_atm_trans.drop('weather_lat','weather_lon','weather_city_id','weather_city_name','temp','pressure','humidity','wind_speed','wind_deg')

In [97]:
# Checking final schema
fact_atm_trans.printSchema()

root
 |-- atm_location_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: float (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- trans_id: integer (nullable = true)



In [98]:
# Renaming atm_location_id as weather_loaction_id as per schema 
fact_atm_trans = fact_atm_trans.withColumnRenamed("atm_location_id","weather_loc_id")

In [99]:
# Checking final schema
fact_atm_trans.printSchema()

root
 |-- weather_loc_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: float (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- trans_id: integer (nullable = true)



In [100]:
fact_atm_trans1 = fact_atm_trans.select('trans_id','atm_id','weather_loc_id','date_id','card_type_id','atm_status','currency','service','transaction_amount','message_code','message_text','rain_3h','clouds_all','weather_id','weather_main','weather_description')

In [101]:
fact_atm_trans1 = fact_atm_trans1.alias('fact_atm_trans1')

In [102]:
fact_atm_trans1.show(2)

+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|trans_id|atm_id|weather_loc_id|date_id|card_type_id|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|       1|    56|            85|   2559|           1|  Inactive|     DKK|Withdrawal|              9131|        null|        null|    0.0|        92|       520|        Rain|light intensity s...|
|       2|   139|            94|   7776|          10|    Active|     DKK|Withdrawal|              2001|        null|        null|    0.0|         0|       800|       Clear|        Sky is Clear|
+--------+------+-------------

In [103]:
fact_atm_trans1.printSchema()

root
 |-- trans_id: integer (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- weather_loc_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: float (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)



### Saving the tables to s3

#### Saving ATM Fact Table

In [105]:
fact_atm_trans1.coalesce(1).write.save('s3a://shnkreddy/fact-table', format = 'csv', header = 'false')

#### Saving ATM Dimension Table

In [106]:
atm.coalesce(1).write.save('s3a://shnkreddy/dim-atm', format = 'csv', header = 'false')

#### Saving Card Type Dimension Table

In [107]:
card_type.coalesce(1).write.save('s3a://shnkreddy/dim-card-type', format = 'csv', header = 'false')

#### Saving Date Dimension Table

In [108]:
date.coalesce(1).write.save('s3a://shnkreddy/dim-date', format = 'csv', header = 'false')

#### Saving Location Dimension Table

In [109]:
location.coalesce(1).write.save('s3a://shnkreddy/dim-location', format = 'csv', header = 'false')