### PySpark Code for ETL Project

In [1]:
#setting all the necessary env variables for initiating Spark session
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
# initializing spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ETL Assignment').master("local").getOrCreate()
spark




In [3]:
# using StructType to define schema for the input file

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType, FloatType

In [4]:
#specifying the schema as per the data dictionary
fileSchema = StructType([StructField('year',IntegerType(),True),
                        StructField('month',StringType(),True),
                        StructField('day',IntegerType(),True),
                        StructField('weekday',StringType(),True),
                        StructField('hour',IntegerType(),True),
                        StructField('atm_status',StringType(),True),
                        StructField('atm_id',StringType(),True),
                        StructField('atm_manufacturer',StringType(),True),
                        StructField('atm_location',StringType(),True),
                        StructField('atm_streetname',StringType(),True),
                        StructField('atm_street_number',IntegerType(),True),
                        StructField('atm_zipcode',IntegerType(),True),
                        StructField('atm_lat',FloatType(),True),
                        StructField('atm_lon',FloatType(),True),
                        StructField('currency',StringType(),True),
                        StructField('card_type',StringType(),True), 
                        StructField('transaction_amount', IntegerType(),True),
                        StructField('service',StringType(),True),
                        StructField('message_code',StringType(),True),
                        StructField('message_text',StringType(),True),
                        StructField('weather_lat',FloatType(),True),
                        StructField('weather_lon',FloatType(),True),
                        StructField('weather_city_id',IntegerType(),True),
                        StructField('weather_city_name',StringType(),True), 
                        StructField('temp',FloatType(),True),
                        StructField('pressure',IntegerType(),True),
                        StructField('humidity',IntegerType(),True),
                        StructField('wind_speed', IntegerType(),True),
                        StructField('wind_deg',IntegerType(),True),
                        StructField('rain_3h',FloatType(),True),
                        StructField('clouds_all',IntegerType(),True),
                        StructField('weather_id',IntegerType(),True),
                        StructField('weather_main',StringType(),True),
                        StructField('weather_description',StringType(),True),
                        ])

In [5]:
# reading the imported input dataset in Hadoop. JHeader is false as the file does not have headers
inp_ds= spark.read.csv("hdfs:/user/root/Atm_data/part-m-00000", header = False, schema = fileSchema)

In [6]:
# checking the input file schema
inp_ds.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: float (nullable = true)
 |-- atm_lon: float (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string

In [7]:
#checking the top 5 rows 
inp_ds.show(5)

+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|     atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-

In [8]:
# verifying the count of the inp_ds
inp_ds.count()

2468572

##### as seen above, the inp_ds has 2468572 records which is equal to the number of records imported from RDS to HDFS via sqoop

## Building dimension tables using spark dataframes

#### 1) DIM_LOC

In [9]:
from pyspark.sql.functions import lit

#selecting required columns from source and removing duplicates
df_0=inp_ds.select('atm_location','atm_streetname','atm_street_number','atm_zipcode','atm_lat','atm_lon').distinct()
df_0.printSchema()


root
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: float (nullable = true)
 |-- atm_lon: float (nullable = true)



In [10]:
df_0.count()

109

* we have 109 records in DIM_LOC table

In [11]:
# genearting surrogate key
df_0_schema = df_0.withColumn("location_id", lit(1))
df_0_schema.printSchema()


root
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: float (nullable = true)
 |-- atm_lon: float (nullable = true)
 |-- location_id: integer (nullable = false)



In [12]:
#converting df into RDD to generate SK's
rdd_1 = df_0.rdd.zipWithIndex().map(lambda (row,rowId): ( list(row) + [rowId+1]))

In [13]:
#convert RDD to DF using defined schema

rdd_to_df0 = spark.createDataFrame(rdd_1, schema=df_0_schema.schema)
rdd_to_df0.printSchema()
rdd_to_df0.show(5)

root
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: float (nullable = true)
 |-- atm_lon: float (nullable = true)
 |-- location_id: integer (nullable = false)

+--------------+--------------------+-----------------+-----------+-------+-------+-----------+
|  atm_location|      atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|location_id|
+--------------+--------------------+-----------------+-----------+-------+-------+-----------+
|   SÃƒÂ¦by Syd|Trafikcenter SÃƒÂ...|                1|       9300| 57.313|  10.45|          1|
|   GlyngÃƒÂ¸re|         FÃƒÂ¦rgevej|                1|       7870| 56.762|  8.867|          2|
|Skelagervej 15|         Skelagervej|               15|       9000| 57.023|  9.891|          3|
|         Durup|              Torvet|                4|       7870| 56.745|  8.949|          4|
|     Svendborg| 

In [14]:
# rearrange columns as per target def
df0_rearg=rdd_to_df0.select("location_id","atm_location","atm_streetname","atm_street_number","atm_zipcode", 
"atm_lat","atm_lon")

In [15]:
df0_rearg.show(5)

+-----------+--------------+--------------------+-----------------+-----------+-------+-------+
|location_id|  atm_location|      atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|
+-----------+--------------+--------------------+-----------------+-----------+-------+-------+
|          1|   SÃƒÂ¦by Syd|Trafikcenter SÃƒÂ...|                1|       9300| 57.313|  10.45|
|          2|   GlyngÃƒÂ¸re|         FÃƒÂ¦rgevej|                1|       7870| 56.762|  8.867|
|          3|Skelagervej 15|         Skelagervej|               15|       9000| 57.023|  9.891|
|          4|         Durup|              Torvet|                4|       7870| 56.745|  8.949|
|          5|     Svendborg|  Sankt Nicolai Gade|                1|       5700| 55.058| 10.609|
+-----------+--------------+--------------------+-----------------+-----------+-------+-------+
only showing top 5 rows



In [16]:
#final dim_loc df
dim_loc= df0_rearg.withColumnRenamed("atm_location","location").withColumnRenamed("atm_streetname","streetname").withColumnRenamed("atm_street_number","street_number").withColumnRenamed("atm_zipcode","zipcode").withColumnRenamed("atm_lat","lat").withColumnRenamed("atm_lon","lon")


In [17]:
dim_loc.show(5)

+-----------+--------------+--------------------+-------------+-------+------+------+
|location_id|      location|          streetname|street_number|zipcode|   lat|   lon|
+-----------+--------------+--------------------+-------------+-------+------+------+
|          1|   SÃƒÂ¦by Syd|Trafikcenter SÃƒÂ...|            1|   9300|57.313| 10.45|
|          2|   GlyngÃƒÂ¸re|         FÃƒÂ¦rgevej|            1|   7870|56.762| 8.867|
|          3|Skelagervej 15|         Skelagervej|           15|   9000|57.023| 9.891|
|          4|         Durup|              Torvet|            4|   7870|56.745| 8.949|
|          5|     Svendborg|  Sankt Nicolai Gade|            1|   5700|55.058|10.609|
+-----------+--------------+--------------------+-------------+-------+------+------+
only showing top 5 rows



In [None]:
#load table to s3 bucket
#Setting S3 access key and security key to upload data to S3
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key","AKIAQN3JW5X4EDXA53GE")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key","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")

#S3 (CSV)
dim_loc.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("s3a://pshrutis3bucket/etlproj/dim_loc.csv")

#### 2) DIM_ATM

In [19]:
# To get the location_id from dim_location table, we are joining the input dataset with dim_loc table on latitue and longitude columns

joined_df=inp_ds.join(dim_loc,(inp_ds["atm_lat"]==dim_loc["lat"]) & (inp_ds["atm_lon"]==dim_loc["lon"]),"left_outer")

In [20]:
joined_df.count()


3398293

In [21]:
  #select direct move src cols and remove duplicates
df_2=joined_df.withColumnRenamed("atm_id","atm_number").select('atm_number','atm_manufacturer','location_id').distinct()


In [22]:
df_2.printSchema()
df_2.count()

root
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- location_id: integer (nullable = true)



156

* we have 156 records in the DIM_ATM table

In [23]:
df_2_schema = df_2.withColumn("atm_id", lit(1)) #create new SK atm_id
df_2_schema.printSchema()


root
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- location_id: integer (nullable = true)
 |-- atm_id: integer (nullable = false)



In [24]:
rdd_2 = df_2.rdd.zipWithIndex().map(lambda (row,rowId): ( list(row) + [rowId+1]))

rdd_to_df2 = spark.createDataFrame(rdd_2, schema=df_2_schema.schema)



In [25]:
#rearranging and renaming cols
#final dim_atm df
dim_atm=rdd_to_df2.withColumnRenamed("location_id","atm_location_id").select("atm_id","atm_number","atm_manufacturer","atm_location_id")

In [26]:
dim_atm.show(5)

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|     1|        74|             NCR|             71|
|     2|        33|             NCR|             77|
|     3|       101|             NCR|            104|
|     4|        50|             NCR|             19|
|     5|        36|             NCR|             30|
+------+----------+----------------+---------------+
only showing top 5 rows



In [27]:
#load table to s3 bucket
#Setting S3 access key and security key to upload data to S3
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key","AKIAQN3JW5X4EDXA53GE")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key","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")

#S3 (CSV)
dim_atm.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("s3a://pshrutis3bucket/etlproj/dim_atm.csv")

#### 3) DIM_DATE

In [28]:
# selecting all the reuqired src cols and removing duplicates
df_3=inp_ds.select('year','month','day','hour','weekday').distinct()
df_3.printSchema()
df_3.count()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)



8685

* Dim_date has 8685 records

In [29]:
import pyspark.sql.functions as F
#full_date_time is a dervied column 

df_derv=df_3.withColumn("full_date_time",F.from_unixtime(F.unix_timestamp(F.concat(df_3.year.cast(StringType()),df_3.month.cast(StringType()),F.lpad(df_3.day.cast(StringType()),2,'0'),
F.lpad(df_3.hour.cast(StringType()),2,'0')),'yyyyMMMMMddHH'),'YYYY-MM-dd HH:mm:SS'))

In [30]:
df_derv.show(5)

+----+-------+---+----+--------+-------------------+
|year|  month|day|hour| weekday|     full_date_time|
+----+-------+---+----+--------+-------------------+
|2017|January|  1|   9|  Sunday|2017-01-01 09:00:00|
|2017|January|  3|   5| Tuesday|2017-01-03 05:00:00|
|2017|January|  8|  19|  Sunday|2017-01-08 19:00:00|
|2017|January| 21|   3|Saturday|2017-01-21 03:00:00|
|2017|January| 23|  21|  Monday|2017-01-23 21:00:00|
+----+-------+---+----+--------+-------------------+
only showing top 5 rows



In [31]:
df_3_schema = df_derv.withColumn("date_id", lit(1)) #generate SK date_id
df_3_schema.printSchema()




root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- full_date_time: string (nullable = true)
 |-- date_id: integer (nullable = false)



In [32]:
rdd_3 = df_derv.rdd.zipWithIndex().map(lambda (row,rowId): ( list(row) + [rowId+1]))
rdd_to_df3 = spark.createDataFrame(rdd_3, schema=df_3_schema.schema)
rdd_to_df3.show(5)


+----+-------+---+----+--------+-------------------+-------+
|year|  month|day|hour| weekday|     full_date_time|date_id|
+----+-------+---+----+--------+-------------------+-------+
|2017|January|  1|   9|  Sunday|2017-01-01 09:00:00|      1|
|2017|January|  3|   5| Tuesday|2017-01-03 05:00:00|      2|
|2017|January|  8|  19|  Sunday|2017-01-08 19:00:00|      3|
|2017|January| 21|   3|Saturday|2017-01-21 03:00:00|      4|
|2017|January| 23|  21|  Monday|2017-01-23 21:00:00|      5|
+----+-------+---+----+--------+-------------------+-------+
only showing top 5 rows



In [36]:
#genearte final df dim_date by rearranging necessary columns
dim_date=rdd_to_df3.select("date_id","full_date_time","year","month","day","hour","weekday")
dim_date.count()
dim_date.show(5)

+-------+-------------------+----+-------+---+----+--------+
|date_id|     full_date_time|year|  month|day|hour| weekday|
+-------+-------------------+----+-------+---+----+--------+
|      1|2017-01-01 09:00:00|2017|January|  1|   9|  Sunday|
|      2|2017-01-03 05:00:00|2017|January|  3|   5| Tuesday|
|      3|2017-01-08 19:00:00|2017|January|  8|  19|  Sunday|
|      4|2017-01-21 03:00:00|2017|January| 21|   3|Saturday|
|      5|2017-01-23 21:00:00|2017|January| 23|  21|  Monday|
+-------+-------------------+----+-------+---+----+--------+
only showing top 5 rows



In [34]:
#load table to s3 bucket
#Setting S3 access key and security key to upload data to S3
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key","AKIAQN3JW5X4EDXA53GE")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key","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")

#S3 (CSV)
dim_date.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("s3a://pshrutis3bucket/etlproj/dim_date.csv")

#### 4) DIM_CARD_TYPE

In [37]:
df_4=inp_ds.select('card_type').distinct()

In [38]:
df_4.printSchema()
df_4.count()

root
 |-- card_type: string (nullable = true)



12

* dim_card_type has 12 records

In [39]:
df_4_schema = df_4.withColumn("card_type_id", lit(1))
df_4_schema.printSchema()

root
 |-- card_type: string (nullable = true)
 |-- card_type_id: integer (nullable = false)



In [40]:
rdd_4 = df_4.rdd.zipWithIndex().map(lambda (row,rowId): ( list(row) + [rowId+1]))
rdd_to_df4 = spark.createDataFrame(rdd_4, schema=df_4_schema.schema)

In [41]:
rdd_to_df4.printSchema()
rdd_to_df4.show()

root
 |-- card_type: string (nullable = true)
 |-- card_type_id: integer (nullable = false)

+--------------------+------------+
|           card_type|card_type_id|
+--------------------+------------+
|     Dankort - on-us|           1|
|              CIRRUS|           2|
|         HÃƒÂ¦vekort|           3|
|                VISA|           4|
|  Mastercard - on-us|           5|
|             Maestro|           6|
|Visa Dankort - on-us|           7|
|        Visa Dankort|           8|
|            VisaPlus|           9|
|          MasterCard|          10|
|             Dankort|          11|
| HÃƒÂ¦vekort - on-us|          12|
+--------------------+------------+



In [42]:
# dim_card_type_df final df
dim_card_type=rdd_to_df4.select("card_type_id","card_type")

In [43]:
dim_card_type.show()

+------------+--------------------+
|card_type_id|           card_type|
+------------+--------------------+
|           1|     Dankort - on-us|
|           2|              CIRRUS|
|           3|         HÃƒÂ¦vekort|
|           4|                VISA|
|           5|  Mastercard - on-us|
|           6|             Maestro|
|           7|Visa Dankort - on-us|
|           8|        Visa Dankort|
|           9|            VisaPlus|
|          10|          MasterCard|
|          11|             Dankort|
|          12| HÃƒÂ¦vekort - on-us|
+------------+--------------------+



In [44]:
#load table to s3 bucket
#Setting S3 access key and security key to upload data to S3
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key","AKIAQN3JW5X4EDXA53GE")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key","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")

#S3 (CSV)
dim_card_type.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("s3a://pshrutis3bucket/etlproj/dim_card_type.csv")

#### Generation of Fact table - join with  Dimension  tables to get corresponding Primary keys from dimension tables

* join with dim_date

In [45]:
dim_date.printSchema()

root
 |-- date_id: integer (nullable = false)
 |-- full_date_time: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)



In [46]:
dim_date_join1 = inp_ds.join(dim_date,(['year','month','day','hour','weekday']),"left_outer") #as the col names are same in both tables,it will retain only single join cols

In [47]:
dim_date_join1.count()

2468572

In [48]:
dim_date_join1.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: float (nullable = true)
 |-- atm_lon: float (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string

* join above result with dim_card_type table

In [49]:
dim_card_join=dim_date_join1.join(dim_card_type,["card_type"],"left_outer")

In [50]:
dim_card_join.printSchema()

root
 |-- card_type: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: float (nullable = true)
 |-- atm_lon: float (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string

In [52]:
dim_card_join.count()

2468572

* join above result with dim_loc table

In [53]:
# perform join and drop columns to avoid duplicate col names 
dim_loc_join=dim_card_join.join(dim_loc,(dim_card_join["atm_location"]==dim_loc["location"]) & (dim_card_join["atm_lat"]==dim_loc["lat"]) & (dim_card_join["atm_lon"]==dim_loc["lon"]) & (dim_card_join["atm_streetname"]==dim_loc["streetname"])& (dim_card_join["atm_street_number"]==dim_loc["street_number"])& (dim_card_join["atm_zipcode"]==dim_loc["zipcode"]),"left_outer").drop(dim_card_join["atm_location"]).drop(dim_card_join["atm_lat"]).drop(dim_card_join["atm_lon"]).drop(dim_card_join["atm_street_number"]).drop(dim_card_join["atm_streetname"]).drop(dim_card_join["atm_zipcode"])



In [54]:
dim_loc_join.count()

2468572

In [55]:
dim_loc_join.printSchema()

root
 |-- card_type: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string (nullable = true)
 |-- temp: float (nullable = true)
 |-- pressure: integer (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- wind_speed: integer (nullable = true)
 |-- wind_deg: integer (nullable = true)
 |-- rain_3h: float (nullable = true)
 |

* join above result set with dim_atm table

In [56]:
# join with atm_dima nd drop redundant columns
dim_atm_join=dim_loc_join.join(dim_atm,(dim_loc_join["atm_id"]==dim_atm["atm_number"]) & (dim_loc_join["atm_manufacturer"]==dim_atm["atm_manufacturer"]) &(dim_loc_join["location_id"]==dim_atm["atm_location_id"]),"left_outer").drop(dim_loc_join['atm_id']).drop(dim_loc_join['atm_manufacturer']).drop(dim_atm["atm_location_id"])



In [57]:
dim_atm_join.printSchema()

root
 |-- card_type: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string (nullable = true)
 |-- temp: float (nullable = true)
 |-- pressure: integer (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- wind_speed: integer (nullable = true)
 |-- wind_deg: integer (nullable = true)
 |-- rain_3h: float (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |

In [58]:
dim_atm_join.count()

2468572

In [59]:
#select and rename cols as per target table def
fact_df=dim_atm_join.withColumnRenamed("location_id","weather_loc_id").select("atm_id","weather_loc_id","date_id","card_type_id","atm_status","currency","service","transaction_amount","message_code","message_text","rain_3h","clouds_all","weather_id","weather_main","weather_description")



In [60]:
fact_df.printSchema()

root
 |-- atm_id: integer (nullable = true)
 |-- weather_loc_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: float (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)



In [61]:
# generate trans_id SK

df_0=inp_ds.select('atm_location','atm_streetname','atm_street_number','atm_zipcode','atm_lat','atm_lon').distinct()


fact_df_schema = fact_df.withColumn("trans_id", lit(1))
fact_df_schema.printSchema()
rdd_fact = fact_df.rdd.zipWithIndex().map(lambda (row,rowId): ( list(row) + [rowId+1]))

rdd_to_fact = spark.createDataFrame(rdd_fact, schema=fact_df_schema.schema)
rdd_to_fact.show(5)



root
 |-- atm_id: integer (nullable = true)
 |-- weather_loc_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: float (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- trans_id: integer (nullable = false)

root
 |-- atm_id: integer (nullable = true)
 |-- weather_loc_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- 

In [64]:
#rearrange cols for final load
fact_dim_load=rdd_to_fact.select("trans_id","atm_id","weather_loc_id","date_id","card_type_id","atm_status","currency","service","transaction_amount","message_code","message_text","rain_3h","clouds_all","weather_id","weather_main","weather_description")



fact_dim_load.show(5)

+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|trans_id|atm_id|weather_loc_id|date_id|card_type_id|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main|weather_description|
+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|       1|     1|            71|     61|           1|  Inactive|     DKK|Withdrawal|              2770|        null|        null|    0.0|         0|       800|       Clear|       Sky is Clear|
|       2|     1|            71|    711|           1|  Inactive|     DKK|Withdrawal|              3623|        null|        null|    0.0|        40|       802|      Clouds|   scattered clouds|
|       3|     1|            71|   

In [65]:
fact_dim_load.count()

2468572

### Loading fact table to S3 bucket

In [66]:
#load table to s3 bucket
#Setting S3 access key and security key to upload data to S3
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key","AKIAQN3JW5X4EDXA53GE")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key","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")

#S3 (CSV)
fact_dim_load.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("s3a://pshrutis3bucket/etlproj/fact_dim_load.csv")