1. Starting Spark Application
2. Creating Source Dataframe and Loading Data
3. Importing PySpark Libraries for Data Transformation
4. Creating dimension dataframes</br>
    4.1 Creating location dimension dataframe</br>
    4.2 Creating atm dimension dataframe</br>
    4.3 Creating date dimension dataframe</br>
    4.4 Creating card type dimension dataframe</br>
5. Create fact dataframe</br>
    5.1 Stage 1 Dataframe - Combining location dimension dataframe to fact dataframe</br>
    5.2 Stage 2 Dataframe - Combining atm dimension dataframe to fact dataframe</br>
    5.3 Stage 3 Dataframe - Combining date dimension dataframe to fact dataframe</br>
    5.4 Stage 4 Dataframe - Combining card type dimension dataframe to fact dataframe</br>
    5.5 Adding transaction id to final fact dataframe</br>
    5.6 Transforming fact dataframe to standardize values</br>
6. Loading data to S3 bucket "etlproject-rit" as CSV files

<b>1. Starting Spark Application</b>

In [1]:
spark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1686143134162_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7fc893f3db50>

<b>2. Creating Data Frame and Loading Data </b>

In [2]:
#Specifying the schema to create dataframe
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, LongType

fileSchema = StructType([StructField('year', IntegerType(),True),
                        StructField('month', StringType(),True),
                        StructField('day', IntegerType(),True),
                        StructField('weekday', StringType(),True),
                        StructField('hour', IntegerType(),True),
                        StructField('atm_status', StringType(),True),
                        StructField('atm_id', StringType(),True),
                        StructField('atm_manufacturer', StringType(),True),                
                        StructField('atm_location', StringType(),True),
                        StructField('atm_streetname', StringType(),True),
                        StructField('atm_street_number', IntegerType(),True),
                        StructField('atm_zipcode', IntegerType(),True),
                        StructField('atm_lat', DoubleType(),True), 
                        StructField('atm_lon', DoubleType(),True),
                        StructField('currency', StringType(),True),
                        StructField('card_type', StringType(),True),
                        StructField('transaction_amount', IntegerType(),True),
                        StructField('service', StringType(),True),
                        StructField('message_code', StringType(),True),
                        StructField('message_text', StringType(),True),
                        StructField('weather_lat', DoubleType(),True),
                        StructField('weather_lon', DoubleType(),True),
                        StructField('weather_city_id', IntegerType(),True),
                        StructField('weather_city_name', StringType(),True),
                        StructField('temp', DoubleType(),True),
                        StructField('pressure', IntegerType(),True),
                        StructField('humidity', IntegerType(),True),
                        StructField('wind_speed', IntegerType(),True),
                        StructField('wind_deg', IntegerType(),True),
                        StructField('rain_3h', DoubleType(),True),
                        StructField('clouds_all', IntegerType(),True),
                        StructField('weather_id', IntegerType(),True),
                        StructField('weather_main', StringType(),True),
                        StructField('weather_description', StringType(),True)])


df = spark.read.load("/user/root/atm_trans/part-m-00000", format="csv", sep=",", schema = fileSchema)
df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|     atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-

In [3]:
df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

<b>3. Importing PySpark Libraries for Data Transformation</b>

In [4]:
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import row_number

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<b>4. Creating dimension dataframes</b>

<b>4.1 Creating location dimension dataframe</b>

In [5]:
#creating location dimension
dim_location = df.select("atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon")
#dropping duplicate values
dim_location = dim_location.dropDuplicates()
#using row_number function to generate sequence for id column 
dim_location = dim_location.select(F.row_number().over(Window.partitionBy().orderBy(dim_location['atm_zipcode'])).alias("location_id"),
                                   F.col("atm_location").alias("location"),F.col("atm_streetname").alias("streetname"),
                                   F.col("atm_street_number").alias("street_number"),F.col("atm_zipcode").alias("zipcode"),
                                   F.col("atm_lat").alias("lat"),F.col("atm_lon").alias("lon"))
dim_location.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+----------------+-------------+-------+------+------+
|location_id|            location|      streetname|street_number|zipcode|   lat|   lon|
+-----------+--------------------+----------------+-------------+-------+------+------+
|          1|        KÃƒÂ¸benhavn|  Regnbuepladsen|            5|   1550|55.676|12.571|
|          2|Intern  KÃƒÂ¸benhavn|RÃƒÂ¥dhuspladsen|           75|   1550|55.676|12.571|
|          3|       Frederiksberg| Gammel Kongevej|          157|   1850|55.677|12.537|
|          4|              Lyngby|     Jernbanevej|            6|   2800|55.772|  12.5|
|          5|        HelsingÃƒÂ¸r|  Sct. Olai Gade|           39|   3000|56.036|12.612|
+-----------+--------------------+----------------+-------------+-------+------+------+
only showing top 5 rows

In [6]:
dim_location.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

<b>4.2 Creating atm dimension dataframe</b>

In [7]:
#creating atm dimension
dim_atm = df.select("atm_id","atm_manufacturer","atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon")
#joining with location dimension
dim_atm = dim_atm.join(dim_location, (dim_atm.atm_location == dim_location.location) & (dim_atm.atm_streetname == dim_location.streetname) & (dim_atm.atm_street_number == dim_location.street_number) 
& (dim_atm.atm_zipcode == dim_location.zipcode) & (dim_atm.atm_lat == dim_location.lat) & (dim_atm.atm_lon == dim_location.lon), 'left')\
.select(dim_atm.atm_id, dim_atm.atm_manufacturer, dim_location.location_id.alias('atm_location_id'))
#dropping duplicate values
dim_atm = dim_atm.dropDuplicates()
#Since atm_id is string type renaming it to atm_number and also casting it to integer type to use as id column
dim_atm = dim_atm.select(F.col("atm_id").cast(IntegerType()), F.col("atm_id").alias("atm_number"), "atm_manufacturer", "atm_location_id")
dim_atm.show(5) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|    36|        36|             NCR|             98|
|    15|        15|             NCR|             65|
|    71|        71|             NCR|            107|
|   107|       107| Diebold Nixdorf|             23|
|    46|        46| Diebold Nixdorf|              5|
+------+----------+----------------+---------------+
only showing top 5 rows

In [8]:
dim_atm.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

<b>4.3 Creating date dimension dataframe</b>

In [9]:
#creating date dimension
dim_date = df.select("year","month","day","hour","weekday")
#dropping duplicate values
dim_date = dim_date.dropDuplicates()
# creating temp column for month num
dim_date = dim_date.withColumn('month_num', F.when(F.trim(dim_date.month) == 'January', '01').when(F.trim(dim_date.month) == 'February', '02')\
                               .when(F.trim(dim_date.month) == 'March', '03').when(F.trim(dim_date.month) == 'April', '04')\
                               .when(F.trim(dim_date.month) == 'May', '05').when(F.trim(dim_date.month) == 'June', '06')\
                               .when(F.trim(dim_date.month) == 'July', '07').when(F.trim(dim_date.month) == 'August', '08')\
                               .when(F.trim(dim_date.month) == 'September', '09').when(F.trim(dim_date.month) == 'October', '10')\
                               .when(F.trim(dim_date.month) == 'November', '11').when(F.trim(dim_date.month) == 'December', '12')) 
#combining year, month and day using '-' as separator and adding 0 for day less than 10
dim_date = dim_date.withColumn('full_date_time',  F.concat_ws('-',dim_date.year, dim_date.month_num, 
                                                              F.substring(F.when(dim_date.day < 10, 
                                                                                 F.concat(F.lit('0'), 
                                                                                          F.round(dim_date.day)))
                                                                          .otherwise(dim_date.day),0,2))) 
#combining year, month and day and adding 0 for day less than 10
dim_date = dim_date.withColumn('date_id',  F.concat(dim_date.year, dim_date.month_num, 
                                                              F.substring(F.when(dim_date.day < 10, 
                                                                                 F.concat(F.lit('0'), 
                                                                                          F.round(dim_date.day)))
                                                                          .otherwise(dim_date.day),0,2)))
#adding hour to timestamp using ' ' as separator and adding 0 for hour less than 10
dim_date = dim_date.withColumn('full_date_time',   F.concat_ws(' ',dim_date.full_date_time, 
                                                               F.substring(F.when(dim_date.hour < 10, 
                                                                                  F.concat(F.lit('0'), 
                                                                                           F.round(dim_date.hour)))
                                                                           .otherwise(dim_date.hour),0,2))) 
#adding hour to timestamp using and adding 0 for hour less than 10
dim_date = dim_date.withColumn('date_id',  F.concat(dim_date.date_id, 
                                                               F.substring(F.when(dim_date.hour < 10, 
                                                                                  F.concat(F.lit('0'), 
                                                                                           F.round(dim_date.hour)))
                                                                           .otherwise(dim_date.hour),0,2))) 
#adding default minutes and seconds to timestamp using ':' as separator
dim_date = dim_date.withColumn('full_date_time',   F.concat_ws(':',dim_date.full_date_time, F.lit('00:00')))
#casting date_id column to Integer
dim_date = dim_date.select( F.col("date_id").cast(IntegerType()),
                           "full_date_time", "year","month","day","hour","weekday")
dim_date.show(5) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-------------------+----+-------+---+----+--------+
|   date_id|     full_date_time|year|  month|day|hour| weekday|
+----------+-------------------+----+-------+---+----+--------+
|2017010521|2017-01-05 21:00:00|2017|January|  5|  21|Thursday|
|2017012215|2017-01-22 15:00:00|2017|January| 22|  15|  Sunday|
|2017040709|2017-04-07 09:00:00|2017|  April|  7|   9|  Friday|
|2017012318|2017-01-23 18:00:00|2017|January| 23|  18|  Monday|
|2017031701|2017-03-17 01:00:00|2017|  March| 17|   1|  Friday|
+----------+-------------------+----+-------+---+----+--------+
only showing top 5 rows

In [11]:
dim_date.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8685

<b>4.4 Creating card type dimension dataframe</b>

In [12]:
#creating card type dimension
dim_card_type = df.select("card_type")
#dropping duplicate values
dim_card_type = dim_card_type.dropDuplicates()
#using row_number function to generate sequence for id column 
dim_card_type = dim_card_type.select(F.row_number().over(Window.partitionBy().orderBy(dim_card_type['card_type'])).alias("card_type_id"), "card_type")
dim_card_type.show(5) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-------------------+
|card_type_id|          card_type|
+------------+-------------------+
|           1|             CIRRUS|
|           2|            Dankort|
|           3|    Dankort - on-us|
|           4|        HÃƒÂ¦vekort|
|           5|HÃƒÂ¦vekort - on-us|
+------------+-------------------+
only showing top 5 rows

In [13]:
dim_card_type.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12

<b>5. Creating fact dataframe</b>

<b>5.1 Stage 1 Dataframe - Combining location dimension dataframe to fact dataframe</b>

In [15]:
#copy source dataframe to fact dataframe
fact = df
#joining location dimension to fact table
fact = fact.join(dim_location, (fact.atm_location == dim_location.location) & (fact.atm_streetname == dim_location.streetname) 
                 & (fact.atm_street_number == dim_location.street_number) & (fact.atm_zipcode == dim_location.zipcode) 
                 & (fact.atm_lat == dim_location.lat) & (fact.atm_lon == dim_location.lon), 'left')\
.select(fact.year,
 fact.month,
 fact.day,
 fact.weekday,
 fact.hour,
 fact.atm_status,
 fact.atm_id,
 fact.atm_manufacturer,
 fact.currency,
 fact.card_type,
 fact.transaction_amount,
 fact.service,
 fact.message_code,
 fact.message_text,
 fact.weather_lat,
 fact.weather_lon,
 fact.weather_city_id,
 fact.weather_city_name,
 fact.temp,
 fact.pressure,
 fact.humidity,
 fact.wind_speed,
 fact.wind_deg,
 fact.rain_3h,
 fact.clouds_all,
 fact.weather_id,
 fact.weather_main,
 fact.weather_description, dim_location.location_id.alias('weather_loc_id'))
fact.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+-------+----+----------+------+----------------+--------+------------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+--------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|currency|   card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|weather_loc_id|
+----+-------+---+-------+----+----------+------+----------------+--------+------------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+--------------+
|2017|January|  1| Sun

In [16]:
fact.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

<b>5.2 Stage 2 Dataframe - Combining atm dimension dataframe to fact dataframe</b>

In [18]:
#joining with atm dimension
fact = fact.join(dim_atm, (fact.atm_id == dim_atm.atm_id) & (fact.atm_manufacturer == dim_atm.atm_manufacturer)\
                 & (fact.weather_loc_id == dim_atm.atm_location_id), 'left')\
.select(fact.year,
 fact.month,
 fact.day,
 fact.weekday,
 fact.hour,
 fact.atm_status,
 fact.currency,
 fact.card_type,
 fact.transaction_amount,
 fact.service,
 fact.message_code,
 fact.message_text,
 fact.weather_lat,
 fact.weather_lon,
 fact.weather_city_id,
 fact.weather_city_name,
 fact.temp,
 fact.pressure,
 fact.humidity,
 fact.wind_speed,
 fact.wind_deg,
 fact.rain_3h,
 fact.clouds_all,
 fact.weather_id,
 fact.weather_main,
 fact.weather_description, fact.weather_loc_id, dim_atm.atm_id)
fact.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+-------+----+----------+--------+---------------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+--------------+------+
|year|  month|day|weekday|hour|atm_status|currency|      card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|   temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|weather_loc_id|atm_id|
+----+-------+---+-------+----+----------+--------+---------------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+--------------+------+
|2017|January|  1| Sunday|   1|    Active|     DKK|     Maste

In [19]:
fact.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

<b>5.3 Stage 3 Dataframe - Combining date dimension dataframe to fact dataframe</b>

In [20]:
#joining with date dimension
fact = fact.join(dim_date, (fact.year == dim_date.year) & (fact.month == dim_date.month) & (fact.day == dim_date.day) & (fact.hour == dim_date.hour) & (fact.weekday == dim_date.weekday), 'left')\
.select(fact.atm_status,
 fact.currency,
 fact.card_type,
 fact.transaction_amount,
 fact.service,
 fact.message_code,
 fact.message_text,
 fact.weather_lat,
 fact.weather_lon,
 fact.weather_city_id,
 fact.weather_city_name,
 fact.temp,
 fact.pressure,
 fact.humidity,
 fact.wind_speed,
 fact.wind_deg,
 fact.rain_3h,
 fact.clouds_all,
 fact.weather_id,
 fact.weather_main,
 fact.weather_description, fact.weather_loc_id, fact.atm_id, dim_date.date_id)
fact.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------+---------------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+--------------+------+----------+
|atm_status|currency|      card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|   temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|weather_loc_id|atm_id|   date_id|
+----------+--------+---------------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+--------------+------+----------+
|    Active|     DKK|     MasterCard|              7655|Withdrawal|        null|        null|     57.441|     10.537| 

In [21]:
fact.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

<b>5.4 Stage 4 Dataframe - Combining card type dimension dataframe to fact dataframe</b>

In [22]:
#joining with card type dimension
fact = fact.join(dim_card_type, (fact.card_type == dim_card_type.card_type), 'left')\
.select(fact.atm_status,
 fact.currency,
 fact.transaction_amount,
 fact.service,
 fact.message_code,
 fact.message_text,
 fact.weather_lat,
 fact.weather_lon,
 fact.weather_city_id,
 fact.weather_city_name,
 fact.temp,
 fact.pressure,
 fact.humidity,
 fact.wind_speed,
 fact.wind_deg,
 fact.rain_3h,
 fact.clouds_all,
 fact.weather_id,
 fact.weather_main,
 fact.weather_description, fact.weather_loc_id, fact.atm_id, fact.date_id, dim_card_type.card_type_id)
fact.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+--------------+------+----------+------------+
|atm_status|currency|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|   temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|weather_loc_id|atm_id|   date_id|card_type_id|
+----------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+--------------+------+----------+------------+
|    Active|     DKK|              7655|Withdrawal|        null|        null|     57.441|     10.537|        2621927|    Freder

In [23]:
fact.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

<b>5.5 Adding transaction id to final fact dataframe</b>

In [24]:
#using row_number() function to create sequence for id column
fact_atm_trans = fact.select(F.row_number().over(Window.partitionBy().orderBy("atm_id")).alias("trans_id"),"atm_id",
                                       "weather_loc_id", "date_id","card_type_id", "atm_status","currency","service",
                                       "transaction_amount","message_code","message_text","rain_3h","clouds_all","weather_id",
                                       "weather_main","weather_description")
fact_atm_trans.show(5) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+------+--------------+----------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|trans_id|atm_id|weather_loc_id|   date_id|card_type_id|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main|weather_description|
+--------+------+--------------+----------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|       1|     1|            16|2017010100|           7|    Active|     DKK|Withdrawal|              5643|        null|        null|  0.215|        92|       500|        Rain|         light rain|
|       2|     1|            16|2017010101|           9|    Active|     DKK|Withdrawal|              1979|        null|        null|  0.215|        92|       500|        Rain|         light rain|
|       3|     1|   

In [25]:
fact_atm_trans.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

<b>5.6 Transforming fact dataframe to standardize values</b>

In [42]:
#checking values for weather_description
fact_atm_trans.select('weather_description').distinct().collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(weather_description=None), Row(weather_description='shower sleet'), Row(weather_description='haze'), Row(weather_description='light intensity drizzle'), Row(weather_description='light intensity shower rain'), Row(weather_description='proximity thunderstorm'), Row(weather_description='sleet'), Row(weather_description='thunderstorm with heavy rain'), Row(weather_description='rain and snow'), Row(weather_description='light shower snow'), Row(weather_description='rain and drizzle'), Row(weather_description='ragged shower rain'), Row(weather_description='proximity shower rain'), Row(weather_description='light rain'), Row(weather_description='heavy intensity rain and drizzle'), Row(weather_description='broken clouds'), Row(weather_description='scattered clouds'), Row(weather_description='thunderstorm with light rain'), Row(weather_description='heavy intensity rain'), Row(weather_description='tornado'), Row(weather_description='light snow'), Row(weather_description='thunderstorm with rai

In [27]:
#Standardizing values for weather_description 
fact_atm_trans = fact_atm_trans.na.replace('sky is Clear', 'sky is clear', 'weather_description')
fact_atm_trans.select('weather_description').distinct().collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(weather_description=None), Row(weather_description='shower sleet'), Row(weather_description='haze'), Row(weather_description='light intensity drizzle'), Row(weather_description='light intensity shower rain'), Row(weather_description='proximity thunderstorm'), Row(weather_description='sleet'), Row(weather_description='thunderstorm with heavy rain'), Row(weather_description='rain and snow'), Row(weather_description='light shower snow'), Row(weather_description='rain and drizzle'), Row(weather_description='ragged shower rain'), Row(weather_description='proximity shower rain'), Row(weather_description='light rain'), Row(weather_description='heavy intensity rain and drizzle'), Row(weather_description='broken clouds'), Row(weather_description='scattered clouds'), Row(weather_description='thunderstorm with light rain'), Row(weather_description='heavy intensity rain'), Row(weather_description='tornado'), Row(weather_description='light snow'), Row(weather_description='thunderstorm with rai

In [29]:
#checking values for weather_main
fact_atm_trans.select('weather_main').distinct().collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(weather_main=None), Row(weather_main='Clear'), Row(weather_main='Clouds'), Row(weather_main='Mist'), Row(weather_main='Thunderstorm'), Row(weather_main='Snow'), Row(weather_main='Haze'), Row(weather_main='TORNADO'), Row(weather_main='Drizzle'), Row(weather_main='Fog'), Row(weather_main='Rain')]

In [41]:
#Replacing null values with Other for weather_main
fact_atm_trans = fact_atm_trans.na.fill('Other', 'weather_main')
fact_atm_trans.select('weather_main').distinct().collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(weather_main='Clear'), Row(weather_main='Other'), Row(weather_main='Clouds'), Row(weather_main='Mist'), Row(weather_main='Thunderstorm'), Row(weather_main='Snow'), Row(weather_main='Haze'), Row(weather_main='TORNADO'), Row(weather_main='Drizzle'), Row(weather_main='Fog'), Row(weather_main='Rain')]

In [45]:
#checking values for message_code
fact_atm_trans.select('message_code').distinct().collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(message_code=None), Row(message_code='4014'), Row(message_code='4018'), Row(message_code='4006'), Row(message_code='4002'), Row(message_code='4019'), Row(message_code='0000'), Row(message_code='4017')]

In [46]:
#Replacing null values with '0000' for message_code
fact_atm_trans = fact_atm_trans.na.fill('0000', 'message_code')
fact_atm_trans.select('message_code').distinct().collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(message_code='4014'), Row(message_code='4018'), Row(message_code='4006'), Row(message_code='4002'), Row(message_code='4019'), Row(message_code='0000'), Row(message_code='4017')]

<b> 6. Loading data to S3 bucket "etlproject-rit" as CSV files</b>

In [53]:
dim_location.write.options(header='True', delimiter=',').csv("s3://etlproject-rit/dim_location")
dim_atm.write.options(header='True', delimiter=',').csv("s3://etlproject-rit/dim_atm")
dim_date.write.options(header='True', delimiter=',').csv("s3://etlproject-rit/dim_date")
dim_card_type.write.options(header='True', delimiter=',').csv("s3://etlproject-rit/dim_card_type")
fact_atm_trans.write.options(header='True', delimiter=',').csv("s3://etlproject-rit/fact_atm_trans")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…