### Initializing Spark Context

In [1]:
spark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1659435808186_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7effcce3d850>

In [2]:
#Specifying custom schema for data frame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

atmTransSchema = StructType([StructField('year', IntegerType(),True),
                        StructField('month', StringType(),True),
                        StructField('day', IntegerType(),True),
                        StructField('weekday', StringType(),True),
                        StructField('hour', IntegerType(),True),
                        StructField('atm_status', StringType(),True) ,  
                        StructField('atm_id', StringType(),True),
                        StructField('atm_manufacturer', StringType(),True),
                        StructField('atm_location', StringType(),True) ,   
                        StructField('atm_streetname', StringType(),True),
                        StructField('atm_street_number', IntegerType(),True),
                        StructField('atm_zipcode', IntegerType(),True),    
                        StructField('atm_lat', DoubleType(),True),
                        StructField('atm_lon', DoubleType(),True),
                        StructField('currency', StringType(),True),
                        StructField('card_type', StringType(),True),
                        StructField('transaction_amount', IntegerType(),True),
                        StructField('service', StringType(),True),
                        StructField('message_code', StringType(),True),
                        StructField('message_text', StringType(),True),
                        StructField('weather_lat', DoubleType(),True),
                        StructField('weather_lon', DoubleType(),True),
                        StructField('weather_city_id', LongType(),True),
                        StructField('weather_city_name', StringType(),True),
                        StructField('temp', DoubleType(),True),
                        StructField('pressure', IntegerType(),True),
                        StructField('humidity', IntegerType(),True),
                        StructField('wind_speed', IntegerType(),True),
                        StructField('wind_deg', IntegerType(),True),
                        StructField('rain_3h', DoubleType(),True),
                        StructField('clouds_all', IntegerType(),True),
                        StructField('weather_id', IntegerType(),True),
                        StructField('weather_main', StringType(),True),
                        StructField('weather_description', StringType(),True)])

#setting header to False since there is no header
dfAtmTrans = spark.read.csv("Atm_Trans_data/part-m-00000", schema = atmTransSchema, header=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
#checking data frame schema
dfAtmTrans.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: long (nullable = true)
 |-- weather_city_name: strin

In [4]:
#Display some records in data frame
dfAtmTrans.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+---+-------+----+----------+------+----------------+------------------+--------------------+-----------------+-----------+-------+-------+--------+--------------------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|      atm_location|      atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency|           card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------------+--------------------+-----------------+-----------+-------+-------+--------+--------------------+------

## creating data frames for dimension and fact tables

In [5]:
#importing necessary files
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import row_number

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Location dimension data frame

In [6]:
#fetching unique records for location dimension data frame
dfLoc=dfAtmTrans.select('atm_location', 'atm_streetname','atm_street_number','atm_zipcode','atm_lat','atm_lon').distinct()
#creating unique row number for location_id column 
dfLoc=dfLoc.select(F.row_number().over(Window.partitionBy().orderBy(dfLoc['atm_location'])).alias("location_id"),'atm_location', 'atm_streetname','atm_street_number','atm_zipcode','atm_lat','atm_lon')

#renaming columns in location dimension
dfLoc = dfLoc.withColumnRenamed("atm_location","location") \
    .withColumnRenamed("atm_streetname","streetname")\
    .withColumnRenamed("atm_street_number","street_number")\
    .withColumnRenamed("atm_zipcode","zipcode")\
    .withColumnRenamed("atm_lat","lat")\
    .withColumnRenamed("atm_lon","lon")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# printing location schema
dfLoc.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- location_id: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)

In [8]:
#Display some records in location data frame
dfLoc.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+------------------+-------------+-------+------+------+
|location_id|            location|        streetname|street_number|zipcode|   lat|   lon|
+-----------+--------------------+------------------+-------------+-------+------+------+
|          1|             Aabybro|      ÃƒËœstergade|            6|   9440|57.162|  9.73|
|          2|      Aalborg Hallen|      Europa Plads|            4|   9000|57.044| 9.913|
|          3|Aalborg Storcente...|          Hobrovej|          452|   9200|57.005| 9.876|
|          4|Aalborg Storcente...|          Hobrovej|          452|   9200|57.005| 9.876|
|          5|         Aalborg Syd|          Hobrovej|          440|   9200|57.005| 9.881|
|          6|           AalbÃƒÂ¦k|        Centralvej|            5|   9982|57.593|10.412|
|          7|              Aarhus|    SÃƒÂ¸nder Alle|           11|   8000|56.153|10.206|
|          8|              Aarhus|        Ceres Byen|           75|   8000|56.157|10.194|
|         

In [9]:
#checking count of location data frame
dfLoc.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

109

In [10]:
# creating intermediate dataframe for Atm dimension data frame
dfLocAtm=dfAtmTrans.select('atm_id','atm_location', 'atm_streetname','atm_street_number','atm_manufacturer',"atm_zipcode","atm_lat","atm_lon").distinct()
dfLocAtm.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

### Atm dimension data frame

In [11]:
#fetching unique records for atm dimension data frame
dfAtm = dfLocAtm.join(dfLoc,(dfLocAtm.atm_location==dfLoc.location)&(dfLocAtm.atm_streetname==dfLoc.streetname)\
                      &(dfLocAtm.atm_street_number==dfLoc.street_number)\
                      &(dfLocAtm.atm_lat==dfLoc.lat) &(dfLocAtm.atm_lon==dfLoc.lon)\
                      &(dfLocAtm.atm_zipcode==dfLoc.zipcode)\
                      ,"left").select("atm_id","atm_manufacturer","location_id").distinct()
#creating unique row number for atm_id column 
dfAtm=dfAtm.select(F.row_number().over(Window.partitionBy().orderBy(dfAtm['atm_id'])).alias("atmID"),"atm_id","atm_manufacturer","location_id").distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
#renaming columns in Atm dimension
dfAtm = dfAtm.withColumnRenamed("atm_id","atm_number")\
    .withColumnRenamed("atmID","atm_id") \
    .withColumnRenamed("location_id","atm_location_id")\

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# printing Atm schema
dfAtm.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- atm_id: integer (nullable = true)
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location_id: integer (nullable = true)

In [14]:
#Display some records in atm data frame
dfAtm.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|     1|         1|             NCR|             74|
|     2|        10|             NCR|             76|
|     3|       100|             NCR|             56|
|     4|       101|             NCR|             17|
|     5|       102|             NCR|              3|
|     6|       103| Diebold Nixdorf|            103|
|     7|       104|             NCR|             58|
|     8|       105| Diebold Nixdorf|             76|
|     9|       106|             NCR|             55|
|    10|       107| Diebold Nixdorf|             62|
|    11|       108|             NCR|             47|
|    12|       109| Diebold Nixdorf|              5|
|    13|        11|             NCR|             80|
|    14|       110| Diebold Nixdorf|             41|
|    15|       111| Diebold Nixdorf|              8|
|    16|       112| Diebold Nixdorf|          

In [15]:
#checking count of Atm data frame
dfAtm.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

113

### Date dimension data frame

In [16]:
#fetching unique records for date dimension data frame
dfDate=dfAtmTrans.select("year","month","day","hour","weekday").distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# extracting month number from month name to derive date field

from pyspark.sql.functions import *
dfDate=dfDate.withColumn("mnth_no",from_unixtime(unix_timestamp(col("month"),'MMM'),'MM'))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
#creating Date column using day, month, year fields
dfDate=dfDate.withColumn("Date",concat_ws("-",col("year"),col("mnth_no"),col("day")).cast("date"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
#creating date time column using date and hour fields

dfDate=dfDate.withColumn("full_date_time", \
     F.to_timestamp(F.concat("Date","hour"),"yyyy-MM-ddHH"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
#converting to timestamp type
dfDate.withColumn("full_date_time",to_timestamp("full_date_time"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[year: int, month: string, day: int, hour: int, weekday: string, mnth_no: string, Date: date, full_date_time: timestamp]

In [21]:
#creating unique row number for date_id column 
dfDate=dfDate.select(F.row_number().over(Window.partitionBy().orderBy(dfDate['full_date_time'])).alias("date_id"),"full_date_time","year","month","day","hour","weekday").distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
#Display some records in date data frame
dfDate.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+----+-------+---+----+-------+
|date_id|     full_date_time|year|  month|day|hour|weekday|
+-------+-------------------+----+-------+---+----+-------+
|      1|2017-01-01 00:00:00|2017|January|  1|   0| Sunday|
|      2|2017-01-01 01:00:00|2017|January|  1|   1| Sunday|
|      3|2017-01-01 02:00:00|2017|January|  1|   2| Sunday|
|      4|2017-01-01 03:00:00|2017|January|  1|   3| Sunday|
|      5|2017-01-01 04:00:00|2017|January|  1|   4| Sunday|
|      6|2017-01-01 05:00:00|2017|January|  1|   5| Sunday|
|      7|2017-01-01 06:00:00|2017|January|  1|   6| Sunday|
|      8|2017-01-01 07:00:00|2017|January|  1|   7| Sunday|
|      9|2017-01-01 08:00:00|2017|January|  1|   8| Sunday|
|     10|2017-01-01 09:00:00|2017|January|  1|   9| Sunday|
|     11|2017-01-01 10:00:00|2017|January|  1|  10| Sunday|
|     12|2017-01-01 11:00:00|2017|January|  1|  11| Sunday|
|     13|2017-01-01 12:00:00|2017|January|  1|  12| Sunday|
|     14|2017-01-01 13:00:00|2017|Januar

In [23]:
# printing date schema
dfDate.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- date_id: integer (nullable = true)
 |-- full_date_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)

In [24]:
#checking count of Date data frame
dfDate.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8685

### Card dimension data frame

In [25]:
#fetching unique records for card dimension data frame
dfCard=dfAtmTrans.select("card_type").distinct()
#creating unique row number for card_type_id column 
dfCard=dfCard.select(F.row_number().over(Window.partitionBy().orderBy(dfCard['card_type'])).alias("card_type_id"),"card_type")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
#checking count of Card data frame
dfCard.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12

In [27]:
#printing schema of Card dimension
dfCard.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- card_type_id: integer (nullable = true)
 |-- card_type: string (nullable = true)

In [28]:
#Display some records in card data frame
dfCard.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+--------------------+
|card_type_id|           card_type|
+------------+--------------------+
|           1|              CIRRUS|
|           2|             Dankort|
|           3|     Dankort - on-us|
|           4|         HÃƒÂ¦vekort|
|           5| HÃƒÂ¦vekort - on-us|
|           6|             Maestro|
|           7|          MasterCard|
|           8|  Mastercard - on-us|
|           9|                VISA|
|          10|        Visa Dankort|
|          11|Visa Dankort - on-us|
|          12|            VisaPlus|
+------------+--------------------+

### Creating fact table                      
                        

#### create fact table data frame by joining with each dimension data frame

In [29]:
# left join with location table
df_fact_atm_trans = dfAtmTrans.join(dfLoc,(dfAtmTrans.atm_location==dfLoc.location)&\
                                    (dfAtmTrans.atm_streetname==dfLoc.streetname)\
                    &(dfAtmTrans.atm_street_number==dfLoc.street_number)&(dfAtmTrans.atm_zipcode==dfLoc.zipcode)\
                    &(dfAtmTrans.atm_lat==dfLoc.lat)&(dfAtmTrans.atm_lon==dfLoc.lon)\
                        ,"left")\
.select("location_id","atm_status","currency","service","transaction_amount","message_code","message_text","rain_3h","clouds_all"\
       ,"weather_id","weather_main","weather_description","year","month","day","hour","weekday","card_type",
       "atm_manufacturer","atm_id")


                        
df_fact_atm_trans.printSchema()                   
                        
                        

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- location_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_id: string (nullable = true)

In [30]:
#Checking Count of Records in each stage 
df_fact_atm_trans.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [31]:
# left join with Date table
df_fact_atm_trans = df_fact_atm_trans.join(dfDate,(df_fact_atm_trans.year==dfDate.year)&\
                                    (df_fact_atm_trans.month==dfDate.month)\
                    &(df_fact_atm_trans.day==dfDate.day)
                               &(df_fact_atm_trans.hour==dfDate.hour)\
                    &(df_fact_atm_trans.weekday==dfDate.weekday)\
                        ,"left")\
.select("location_id","date_id","atm_status","currency","service","transaction_amount","message_code","message_text","rain_3h","clouds_all"\
       ,"weather_id","weather_main","weather_description"
        ,"card_type","atm_manufacturer","atm_id")


                        
df_fact_atm_trans.printSchema()  

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- location_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_id: string (nullable = true)

In [32]:
#Checking Count of Records in each stage 
df_fact_atm_trans.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [33]:
# left join with Card table
df_fact_atm_trans = df_fact_atm_trans.join(dfCard,(df_fact_atm_trans.card_type==dfCard.card_type),"left")\
.select("location_id","date_id","card_type_id","atm_status","currency","service","transaction_amount",\
        "message_code","message_text","rain_3h","clouds_all"\
       ,"weather_id","weather_main","weather_description"
        ,"atm_manufacturer","atm_id")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
df_fact_atm_trans.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- location_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_id: string (nullable = true)

In [35]:
#Checking Count of Records in each stage 
df_fact_atm_trans.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

In [36]:
# left join with Atm table
#creating unique row number for trans_id

df_fact_atm_trans = df_fact_atm_trans.join(dfAtm,(df_fact_atm_trans.location_id==dfAtm.atm_location_id)&\
                               (df_fact_atm_trans.atm_manufacturer==dfAtm.atm_manufacturer)&\
                               (df_fact_atm_trans.atm_id==dfAtm.atm_number)
                               ,"left")\
.select(F.row_number().over(Window.partitionBy().orderBy(df_fact_atm_trans["atm_id"])).alias("trans_id"),
        dfAtm["atm_id"],"location_id","date_id","card_type_id","atm_status",\
        "currency","service","transaction_amount",\
        "message_code", "message_text","rain_3h","clouds_all"\
       ,"weather_id","weather_main","weather_description")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
#renaming columns in fact data frame
df_fact_atm_trans = df_fact_atm_trans.withColumnRenamed("location_id","weather_loc_id") 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
#printing schema of fact data frame
df_fact_atm_trans.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- trans_id: integer (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- weather_loc_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)

In [39]:
#handling invalid values for message_text column and assigning it to appropriate fields in fact data frame
df_fact_atm_trans=df_fact_atm_trans.withColumn('rain_3h',when(df_fact_atm_trans.message_text.contains(','),F.split(df_fact_atm_trans['message_text'], ',')[11]).otherwise(df_fact_atm_trans['rain_3h']))
df_fact_atm_trans=df_fact_atm_trans.withColumn('weather_id',when(df_fact_atm_trans.message_text.contains(','),F.split(df_fact_atm_trans['message_text'], ',')[13]).otherwise(df_fact_atm_trans['weather_id']))
df_fact_atm_trans=df_fact_atm_trans.withColumn('clouds_all',when(df_fact_atm_trans.message_text.contains(','),F.split(df_fact_atm_trans['message_text'], ',')[12]).otherwise(df_fact_atm_trans['clouds_all']))
df_fact_atm_trans=df_fact_atm_trans.withColumn('weather_main',when(df_fact_atm_trans.message_text.contains(','),F.split(df_fact_atm_trans['message_text'], ',')[14]).otherwise(df_fact_atm_trans['weather_main']))
df_fact_atm_trans=df_fact_atm_trans.withColumn('weather_description',when(df_fact_atm_trans.message_text.contains(','),F.split(df_fact_atm_trans['message_text'], ',')[14]).otherwise(df_fact_atm_trans['weather_description']))

df_fact_atm_trans=df_fact_atm_trans.withColumn('message_text',when(df_fact_atm_trans.message_text.contains(','),F.split(df_fact_atm_trans['message_text'], ',')[0]).otherwise(df_fact_atm_trans['message_text']))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
#checking count of fact table data frame
df_fact_atm_trans.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

### Saving dimension and fact tables to s3 bucket

In [41]:
dfLoc.write.format('csv').option('header','true')\
.save('s3a://atmtransdata/dimLoc',mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
dfAtm.write.format('csv').option('header','true')\
.save('s3a://atmtransdata/dimAtm',mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
dfDate.write.format('csv').option('header','true')\
.save('s3a://atmtransdata/dimDate',mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
dfCard.write.format('csv').option('header','true')\
.save('s3a://atmtransdata/dimCard',mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [45]:
df_fact_atm_trans.write.format('csv').option('header','true')\
.save('s3a://atmtransdata/factAtmTrans',mode='overwrite')


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…