In [1]:
#Importing configuration paths.
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
#Initialising SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('demo').master("local").getOrCreate()
spark


In [3]:
#Importing nesseccary functions.
from pyspark.sql import functions as func
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,BooleanType,DoubleType,LongType

### Reading the data from files in HDFS

##### Creating schema to load data

In [4]:
#Creating schema to load data
fileSchema=StructType([StructField('year',IntegerType(),False),
                       StructField('month',StringType(),False),
					   StructField('day',IntegerType(),False),
					   StructField('weekday',StringType(),False),
					   StructField('hour',IntegerType(),False),
					   StructField('atm_status',StringType(),False),
					   StructField('atm_id',StringType(),False),
					   StructField('atm_manufacturer',StringType(),False),
					   StructField('atm_location',StringType(),False),
					   StructField('atm_street_name',StringType(),False),
					   StructField('atm_street_number',IntegerType(),False),
					   StructField('atm_zipcode',IntegerType(),False),
					   StructField('atm_lat',DoubleType(),False),
					   StructField('atm_lon',DoubleType(),False),
					   StructField('currency',StringType(),False),
					   StructField('card_type',StringType(),False),
					   StructField('transaction_amount',IntegerType(),False),
					   StructField('service',StringType(),False),
					   StructField('message_code',StringType(),True),
					   StructField('message_text',StringType(),True),
					   StructField('weather_lat',DoubleType(),False),
					   StructField('weather_lon',DoubleType(),False),
					   StructField('weather_city_id',IntegerType(),False),
					   StructField('weather_city_name',StringType(),False),
					   StructField('temp',DoubleType(),False),
					   StructField('pressure',IntegerType(),False),
					   StructField('humidity',IntegerType(),False),
					   StructField('wind_speed',IntegerType(),False),
                       StructField('wind_deg',IntegerType(),False),	
                       StructField('rain_3h',DoubleType(),True),
					   StructField('clouds_all',IntegerType(),False),
					   StructField('weather_id',IntegerType(),False),
					   StructField('weather_main',StringType(),False),
					   StructField('weather_description',StringType(),False)])

##### Loading Data from HDFS to pyspark

In [5]:
#Loading Data from HDFS to pyspark
Bank_Details=spark.read.load("hdfs:/user/root/Bank_ATM_Data",format="csv",sep=',',schema=fileSchema,header=True,inferSchema = True)

In [7]:
#Checking number of rows imported
Bank_Details.count()

In [8]:
Bank_Details.show(5)

+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|    atm_street_name|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-

##### Converting data to get full_date_time column

In [6]:
from pyspark.sql.functions import *

In [7]:
#Month is represented in string, converting it to digits to form date column.
Bank1=Bank_Details.withColumn("month_num",from_unixtime(unix_timestamp(col("month"),'MMMMMMMMMMM'),'MM'))

In [8]:
#Checking month column
Bank1.select('month_num').show(5)

+---------+
|month_num|
+---------+
|       01|
|       01|
|       01|
|       01|
|       01|
+---------+
only showing top 5 rows



##### Forming full_date_time  column

In [9]:
#Forming Date column
cols=["year","month_num","day"]
Bank_Details_with_Date=Bank1.withColumn("full_date_time",concat_ws("-",*cols).cast("date"))

In [10]:
#Checking datatype of full_date_time
Bank_Details_with_Date.select('full_date_time').printSchema

<bound method DataFrame.printSchema of DataFrame[full_date_time: date]>

### Creating stage tables to create Dimension tables.

###### 1) Creating stage_location table and removing duplicate rows.

In [11]:
#Creating Stage_location table 
STG_LOCTN=Bank_Details_with_Date.select('atm_location','atm_street_name','atm_street_number','atm_zipcode','atm_lat','atm_lon')
#STG_LOCTN.show(5)

In [12]:
#Dropping Duplicate columns and checking count value.
STG_LOCTN_Drop_Duplicate=STG_LOCTN.dropDuplicates()
STG_LOCTN_Drop_Duplicate.count()

109

In [42]:
#Displaying stage_location table
STG_LOCTN_Drop_Duplicate.show(10)

+--------------------+----------------+-----------------+-----------+-------+-------+
|        atm_location| atm_street_name|atm_street_number|atm_zipcode|atm_lat|atm_lon|
+--------------------+----------------+-----------------+-----------+-------+-------+
|               Vadum|  Ellehammersvej|               43|       9430| 57.118|  9.861|
|            Slagelse| Mariendals Alle|               29|       4200| 55.398| 11.342|
|          Fredericia|SjÃƒÂ¦llandsgade|               33|       7000| 55.564|  9.757|
|             Kolding|        Vejlevej|              135|       6000| 55.505|  9.457|
|   HÃƒÂ¸rning Hallen|        Toftevej|               53|       8362| 56.091| 10.033|
|                Aars| Himmerlandsgade|               70|       9600| 56.803|  9.518|
|     Aarhus Lufthavn| Ny Lufthavnsvej|               24|       8560| 56.308| 10.627|
|                 Fur|      StenÃƒÂ¸re|               19|       7884| 56.805|   9.02|
|            Hasseris|     Hasserisvej|              1

###### 2)Creating stage_ATM table and removing duplicate rows.

In [13]:
#Creating stage_ATM table
STG_ATM=Bank_Details_with_Date.select('atm_id','atm_manufacturer','atm_lat','atm_lon')
#STG_ATM.show(5)

In [15]:
#Removing duplicated and displaying count
STG_ATM_Drop_Duplicate=STG_ATM.dropDuplicates()
STG_ATM_Drop_Duplicate.count()

113

###### 3)Creating stage_DATE table and removing duplicate rows.

In [16]:
#Creating stage_DATE table
STG_DATE=Bank_Details_with_Date.select('full_date_time','year','month','day','weekday','hour')
STG_DATE.show(10)

+--------------+----+-------+---+-------+----+
|full_date_time|year|  month|day|weekday|hour|
+--------------+----+-------+---+-------+----+
|    2017-01-01|2017|January|  1| Sunday|   0|
|    2017-01-01|2017|January|  1| Sunday|   0|
|    2017-01-01|2017|January|  1| Sunday|   0|
|    2017-01-01|2017|January|  1| Sunday|   0|
|    2017-01-01|2017|January|  1| Sunday|   0|
|    2017-01-01|2017|January|  1| Sunday|   0|
|    2017-01-01|2017|January|  1| Sunday|   0|
|    2017-01-01|2017|January|  1| Sunday|   0|
|    2017-01-01|2017|January|  1| Sunday|   0|
|    2017-01-01|2017|January|  1| Sunday|   0|
+--------------+----+-------+---+-------+----+
only showing top 10 rows



In [18]:
#removing duplicate rows
STG_DATE_Drop_Duplicate=STG_DATE.dropDuplicates()
STG_DATE_Drop_Duplicate.count()

8685

###### 4)Creating stage_CARD_TYPE table and removing duplicate rows.

In [19]:
#Creating stage_CARD_TYPE table removing duplicate rows.
STG_CARD_TYPE=Bank_Details.select('card_type')
STG_CARD_TYPE.show(10)

+------------------+
|         card_type|
+------------------+
|        MasterCard|
|              VISA|
|              VISA|
|        MasterCard|
|        MasterCard|
|        MasterCard|
|Mastercard - on-us|
|        MasterCard|
|              VISA|
|           Dankort|
+------------------+
only showing top 10 rows



In [20]:
#removing duplicate rows.
STG_CARD_TYPE_Drop_Duplicate=STG_CARD_TYPE.dropDuplicates()
STG_CARD_TYPE_Drop_Duplicate.count()

12

#### 5)Adding Row_IDs to stage table and re-arranging columns.

###### 5.1)Adding Row_IDs to stage_CARD_TYPE_TABLE

In [21]:
#Writing window function to add card_type_id column
from pyspark.sql.window import Window
windowSpec  = Window.partitionBy().orderBy('card_type')
STG_CARD_TYPE_Drop_Duplicate=STG_CARD_TYPE_Drop_Duplicate.withColumn("card_type_id",3000+(rank().over(windowSpec)))

###### 5.2)Adding Row_IDs to stage_LOCATION_TABLE

In [22]:
#Writing window function to add location_id column
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
loctn_windw_spec = Window.partitionBy().orderBy(["atm_location","atm_street_name","atm_street_number"])

In [23]:
#Creating and showing location-ID data
STG_LOCTN_Drop_Duplicate=STG_LOCTN_Drop_Duplicate.withColumn("LOCATION_ID",1000+row_number().over(loctn_windw_spec))
STG_LOCTN_Drop_Duplicate.show(5)

+--------------------+---------------+-----------------+-----------+-------+-------+-----------+
|        atm_location|atm_street_name|atm_street_number|atm_zipcode|atm_lat|atm_lon|LOCATION_ID|
+--------------------+---------------+-----------------+-----------+-------+-------+-----------+
|             Aabybro|   ÃƒËœstergade|                6|       9440| 57.162|   9.73|       1001|
|      Aalborg Hallen|   Europa Plads|                4|       9000| 57.044|  9.913|       1002|
|Aalborg Storcente...|       Hobrovej|              452|       9200| 57.005|  9.876|       1003|
|Aalborg Storcente...|       Hobrovej|              452|       9200| 57.005|  9.876|       1004|
|         Aalborg Syd|       Hobrovej|              440|       9200| 57.005|  9.881|       1005|
+--------------------+---------------+-----------------+-----------+-------+-------+-----------+
only showing top 5 rows



In [24]:
#Renaming columns in stage Location table and checking columns
STG_LOCTN_Drop_Duplicate=STG_LOCTN_Drop_Duplicate.withColumnRenamed("atm_location","LOCATION") \
.withColumnRenamed("atm_street_name","STREET_NAME") \
.withColumnRenamed("atm_street_number","STREET_NUMBER") \
.withColumnRenamed("atm_zipcode","ZIPCODE") \
.withColumnRenamed("atm_lat","LAT") \
.withColumnRenamed("atm_lon","LON")
STG_LOCTN_Drop_Duplicate.show(5)

+--------------------+------------+-------------+-------+------+-----+-----------+
|            LOCATION| STREET_NAME|STREET_NUMBER|ZIPCODE|   LAT|  LON|LOCATION_ID|
+--------------------+------------+-------------+-------+------+-----+-----------+
|             Aabybro|ÃƒËœstergade|            6|   9440|57.162| 9.73|       1001|
|      Aalborg Hallen|Europa Plads|            4|   9000|57.044|9.913|       1002|
|Aalborg Storcente...|    Hobrovej|          452|   9200|57.005|9.876|       1003|
|Aalborg Storcente...|    Hobrovej|          452|   9200|57.005|9.876|       1004|
|         Aalborg Syd|    Hobrovej|          440|   9200|57.005|9.881|       1005|
+--------------------+------------+-------------+-------+------+-----+-----------+
only showing top 5 rows



### Creating DIMENSION_LOCATION Table

In [25]:
#Rearranging columns in location table and creating final DIM_LOCATION table
DIM_LOCATION=STG_LOCTN_Drop_Duplicate.select('LOCATION_ID','LOCATION','STREET_NAME','STREET_NUMBER','ZIPCODE','LAT','LON')
DIM_LOCATION.show(5)

+-----------+--------------------+------------+-------------+-------+------+-----+
|LOCATION_ID|            LOCATION| STREET_NAME|STREET_NUMBER|ZIPCODE|   LAT|  LON|
+-----------+--------------------+------------+-------------+-------+------+-----+
|       1001|             Aabybro|ÃƒËœstergade|            6|   9440|57.162| 9.73|
|       1002|      Aalborg Hallen|Europa Plads|            4|   9000|57.044|9.913|
|       1003|Aalborg Storcente...|    Hobrovej|          452|   9200|57.005|9.876|
|       1004|Aalborg Storcente...|    Hobrovej|          452|   9200|57.005|9.876|
|       1005|         Aalborg Syd|    Hobrovej|          440|   9200|57.005|9.881|
+-----------+--------------------+------------+-------------+-------+------+-----+
only showing top 5 rows



In [26]:
#Renaming Stage_CARD_TYPE Table 
STG_CARD_TYPE_Drop_Duplicate=STG_CARD_TYPE_Drop_Duplicate.withColumnRenamed('card_type','CARD_TYPE') \
.withColumnRenamed('card_type_id','CARD_TYPE_ID')
STG_CARD_TYPE_Drop_Duplicate.show(5)

+-------------------+------------+
|          CARD_TYPE|CARD_TYPE_ID|
+-------------------+------------+
|             CIRRUS|        3001|
|            Dankort|        3002|
|    Dankort - on-us|        3003|
|        HÃƒÂ¦vekort|        3004|
|HÃƒÂ¦vekort - on-us|        3005|
+-------------------+------------+
only showing top 5 rows



### Creating DIMENSION_CARD_TYPE table 


In [27]:
DIM_CARD_TYPE=STG_CARD_TYPE_Drop_Duplicate.select('CARD_TYPE_ID','CARD_TYPE')
DIM_CARD_TYPE.show(5)

+------------+-------------------+
|CARD_TYPE_ID|          CARD_TYPE|
+------------+-------------------+
|        3001|             CIRRUS|
|        3002|            Dankort|
|        3003|    Dankort - on-us|
|        3004|        HÃƒÂ¦vekort|
|        3005|HÃƒÂ¦vekort - on-us|
+------------+-------------------+
only showing top 5 rows



In [28]:
STG_ATM_Drop_Duplicate.show(5)

+------+----------------+-------+-------+
|atm_id|atm_manufacturer|atm_lat|atm_lon|
+------+----------------+-------+-------+
|   113| Diebold Nixdorf| 55.398| 11.342|
|    54|             NCR| 56.745|  8.949|
|   104|             NCR| 57.049|  9.922|
|    18| Diebold Nixdorf| 56.448|  9.401|
|     8|             NCR| 56.762|  8.867|
+------+----------------+-------+-------+
only showing top 5 rows



### Joining Dimension_Location and stage table of Dimension_ATM to get Location_ID

#### Creating intermediate table to get foreign key into Dimension_ATM table

In [29]:
#Joining two tables.
Intermediate_STG_ATM=DIM_LOCATION.join(STG_ATM_Drop_Duplicate,(DIM_LOCATION["LON"] ==  STG_ATM_Drop_Duplicate["atm_lon"]) & (DIM_LOCATION["LAT"] ==  STG_ATM_Drop_Duplicate["atm_lat"]),"leftouter")
Intermediate_STG_ATM.show()

+-----------+--------------------+------------------+-------------+-------+------+------+------+----------------+-------+-------+
|LOCATION_ID|            LOCATION|       STREET_NAME|STREET_NUMBER|ZIPCODE|   LAT|   LON|atm_id|atm_manufacturer|atm_lat|atm_lon|
+-----------+--------------------+------------------+-------------+-------+------+------+------+----------------+-------+-------+
|       1014|         Bispensgade|       Bispensgade|           35|   9800|57.453| 9.996|    20|             NCR| 57.453|  9.996|
|       1094|           Svendborg|Sankt Nicolai Gade|            1|   5700|55.058|10.609|    84|             NCR| 55.058| 10.609|
|       1025|          Fredericia|  SjÃƒÂ¦llandsgade|           33|   7000|55.564| 9.757|     6|             NCR| 55.564|  9.757|
|       1008|              Aarhus|    SÃƒÂ¸nder Alle|           11|   8000|56.153|10.206|    50|             NCR| 56.153| 10.206|
|       1072|    NykÃƒÂ¸bing Mors|       Kirketorvet|            1|   7900|56.795|  8.86| 

In [30]:
#Checking unique count of stage_ATM table
Intermediate_STG_ATM_1=Intermediate_STG_ATM.select('atm_id','atm_manufacturer','LOCATION_ID')
Intermediate_STG_ATM_1.distinct().count()

156

### Creating DIM_ATM table




In [31]:
#Renaming columns
STG_ATM_Drop_Duplicate_copy=Intermediate_STG_ATM_1.withColumnRenamed("atm_id","ATM_NUMBER") \
.withColumnRenamed("atm_manufacturer","ATM_MANUFACTURER") \
.withColumnRenamed("LOCATION_ID","ATM_LOCATION_ID")
STG_ATM_Drop_Duplicate_copy.show(5)

+----------+----------------+---------------+
|ATM_NUMBER|ATM_MANUFACTURER|ATM_LOCATION_ID|
+----------+----------------+---------------+
|        20|             NCR|           1014|
|        84|             NCR|           1094|
|         6|             NCR|           1025|
|        50|             NCR|           1008|
|        59| Diebold Nixdorf|           1072|
+----------+----------------+---------------+
only showing top 5 rows



In [32]:
#Checking unique count
STG_ATM_Drop_Duplicate_copy.distinct().count()

156

###### Writing window function to add ATM_ID

In [33]:
Atm_windw_spec = Window.partitionBy().orderBy(["ATM_NUMBER"])
STG_ATM_Drop_Duplicate_copy=STG_ATM_Drop_Duplicate_copy.withColumn("ATM_ID",5000+row_number().over(Atm_windw_spec))
STG_ATM_Drop_Duplicate_copy.show(5)


+----------+----------------+---------------+------+
|ATM_NUMBER|ATM_MANUFACTURER|ATM_LOCATION_ID|ATM_ID|
+----------+----------------+---------------+------+
|         1|             NCR|           1075|  5001|
|        10|             NCR|           1076|  5002|
|       100|             NCR|           1056|  5003|
|       100|             NCR|           1086|  5004|
|       100|             NCR|           1087|  5005|
+----------+----------------+---------------+------+
only showing top 5 rows



In [34]:
#Re-arranging columns in required order and checking the data
DIM_ATM=STG_ATM_Drop_Duplicate_copy.select('ATM_ID','ATM_NUMBER','ATM_MANUFACTURER','ATM_LOCATION_ID')
DIM_ATM.show(5)

+------+----------+----------------+---------------+
|ATM_ID|ATM_NUMBER|ATM_MANUFACTURER|ATM_LOCATION_ID|
+------+----------+----------------+---------------+
|  5001|         1|             NCR|           1075|
|  5002|        10|             NCR|           1076|
|  5003|       100|             NCR|           1056|
|  5004|       100|             NCR|           1086|
|  5005|       100|             NCR|           1087|
+------+----------+----------------+---------------+
only showing top 5 rows



In [35]:
#Checking the count of rows.
DIM_ATM.count()

156

### Creating Dimension_DATE table

In [36]:
#Renaming column names
STG_DATE_Drop_Duplicate=STG_DATE_Drop_Duplicate.withColumnRenamed("full_date_time","FULL_DATE_TIME") \
.withColumnRenamed("month","MONTH") \
.withColumnRenamed("year","YEAR") \
.withColumnRenamed("day","DAY") \
.withColumnRenamed("hour","HOUR") \
.withColumnRenamed("weekday","WEEKDAY")
#STG_DATE_Drop_Duplicate.show(5)

###### Writing window function to create DATE_ID

In [37]:
date_windw_spec = Window.partitionBy().orderBy(["MONTH"])
STG_DATE_Drop_Duplicate=STG_DATE_Drop_Duplicate.withColumn("DATE_ID",1000+row_number().over(date_windw_spec))
#STG_DATE_Drop_Duplicate.show(5)

In [38]:
# Creating Final Dimension_Date table
DIM_DATE=STG_DATE_Drop_Duplicate.select('DATE_ID','FULL_DATE_TIME','YEAR','MONTH','DAY','HOUR','WEEKDAY')
DIM_DATE.show(5)

+-------+--------------+----+-----+---+----+--------+
|DATE_ID|FULL_DATE_TIME|YEAR|MONTH|DAY|HOUR| WEEKDAY|
+-------+--------------+----+-----+---+----+--------+
|   1001|    2017-04-11|2017|April| 11|   2| Tuesday|
|   1002|    2017-04-16|2017|April| 16|  16|  Sunday|
|   1003|    2017-04-17|2017|April| 17|  10|  Monday|
|   1004|    2017-04-17|2017|April| 17|  19|  Monday|
|   1005|    2017-04-20|2017|April| 20|  14|Thursday|
+-------+--------------+----+-----+---+----+--------+
only showing top 5 rows



### Creating Fact Table

##### Checking count of 4 Dimension tables before Joining them.

In [39]:
DIM_LOCATION.show(5)

+-----------+--------------------+------------+-------------+-------+------+-----+
|LOCATION_ID|            LOCATION| STREET_NAME|STREET_NUMBER|ZIPCODE|   LAT|  LON|
+-----------+--------------------+------------+-------------+-------+------+-----+
|       1001|             Aabybro|ÃƒËœstergade|            6|   9440|57.162| 9.73|
|       1002|      Aalborg Hallen|Europa Plads|            4|   9000|57.044|9.913|
|       1003|Aalborg Storcente...|    Hobrovej|          452|   9200|57.005|9.876|
|       1004|Aalborg Storcente...|    Hobrovej|          452|   9200|57.005|9.876|
|       1005|         Aalborg Syd|    Hobrovej|          440|   9200|57.005|9.881|
+-----------+--------------------+------------+-------------+-------+------+-----+
only showing top 5 rows



In [40]:
DIM_LOCATION.count()

109

In [41]:
DIM_CARD_TYPE.show(5)

+------------+-------------------+
|CARD_TYPE_ID|          CARD_TYPE|
+------------+-------------------+
|        3001|             CIRRUS|
|        3002|            Dankort|
|        3003|    Dankort - on-us|
|        3004|        HÃƒÂ¦vekort|
|        3005|HÃƒÂ¦vekort - on-us|
+------------+-------------------+
only showing top 5 rows



In [42]:
DIM_CARD_TYPE.count()

12

In [43]:
DIM_ATM.show(5)

+------+----------+----------------+---------------+
|ATM_ID|ATM_NUMBER|ATM_MANUFACTURER|ATM_LOCATION_ID|
+------+----------+----------------+---------------+
|  5001|         1|             NCR|           1075|
|  5002|        10|             NCR|           1076|
|  5003|       100|             NCR|           1056|
|  5004|       100|             NCR|           1086|
|  5005|       100|             NCR|           1087|
+------+----------+----------------+---------------+
only showing top 5 rows



In [44]:
DIM_ATM.count()

156

In [45]:
DIM_DATE.show(5)

+-------+--------------+----+-----+---+----+--------+
|DATE_ID|FULL_DATE_TIME|YEAR|MONTH|DAY|HOUR| WEEKDAY|
+-------+--------------+----+-----+---+----+--------+
|   1001|    2017-04-11|2017|April| 11|   2| Tuesday|
|   1002|    2017-04-16|2017|April| 16|  16|  Sunday|
|   1003|    2017-04-17|2017|April| 17|  10|  Monday|
|   1004|    2017-04-17|2017|April| 17|  19|  Monday|
|   1005|    2017-04-20|2017|April| 20|  14|Thursday|
+-------+--------------+----+-----+---+----+--------+
only showing top 5 rows



In [46]:
DIM_DATE.count()

8685

##### Renaming column atm_id to atm_num to avoid data ambiguity issue.

In [47]:
#Renaming column
Bank_Details_with_Date=Bank_Details_with_Date.withColumnRenamed('atm_id','atm_num')

### Joining Dimension Tables to create Fact Table 

##### Joining Bank_Details table with DIM_LOCATION Table and creating Stage1_fact Table

In [48]:
#Joining Bank_Details table with DIM_LOCATION Table
Stage1_Fact=Bank_Details_with_Date.join(DIM_LOCATION,(Bank_Details_with_Date["atm_location"] == DIM_LOCATION["LOCATION"]) & (Bank_Details_with_Date["atm_street_name"] == DIM_LOCATION["STREET_NAME"]) & (Bank_Details_with_Date["atm_street_number"] == DIM_LOCATION["STREET_NUMBER"]) & (Bank_Details_with_Date["atm_zipcode"] == DIM_LOCATION["ZIPCODE"]),"leftouter")

In [49]:
#Checking count
Stage1_Fact.count()

2468571

##### Joining Stage1_fact table with DIM_ATM Table and creating Stage2_fact Table

In [50]:
Stage2_Fact=Stage1_Fact.join(DIM_ATM,(Stage1_Fact["atm_num"]==DIM_ATM["ATM_NUMBER"]) & (Stage1_Fact["atm_manufacturer"]==DIM_ATM["ATM_MANUFACTURER"]) & (Stage1_Fact["LOCATION_ID"]==DIM_ATM["ATM_LOCATION_ID"]),"leftouter")

In [51]:
#Checking count
Stage2_Fact.count()

2468571

##### Joining Stage2_fact table with DIM_DATE Table and creating Stage3_fact Table

In [52]:
Stage3_Fact=Stage2_Fact.join(DIM_DATE,(Stage2_Fact["full_date_time"]==DIM_DATE["FULL_DATE_TIME"]) & (Stage2_Fact["weekday"]==DIM_DATE["WEEKDAY"]) & (Stage2_Fact["hour"]==DIM_DATE["HOUR"]),"leftouter" )

In [53]:
Stage3_Fact.count()

2468571

##### Joining Stage3_fact table with DIM_CARD_TYPE Table and creating Stage4_fact Table

In [54]:
Stage4_Fact=Stage3_Fact.join(DIM_CARD_TYPE,(Stage3_Fact["card_type"]==DIM_CARD_TYPE['CARD_TYPE']),"leftouter")

In [55]:
#checkin count
Stage4_Fact.count()

2468571

In [56]:
#Printing schema
Stage4_Fact.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_num: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_street_name: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: 

##### Adding Trans_ID to stage fact table 

In [57]:
#Writing window function to add Trans_Id
window1_spec = Window.partitionBy().orderBy(["DATE_ID","LOCATION_ID","CARD_TYPE_ID"])
Stage_Intermediate=Stage4_Fact.withColumn("TRANS_ID",row_number().over(window1_spec))
#Stage_Intermediate.show(5)


In [58]:
#Stage_Intermediate.count()

##### Selecting only required columns to create final fact table

In [59]:
#Selecting columns
Stage_Intermediate_copy=Stage_Intermediate.select('TRANS_ID','ATM_ID','LOCATION_ID','DATE_ID','CARD_TYPE_ID','atm_status','currency','service','transaction_amount','message_code','message_text','rain_3h','clouds_all','weather_id','weather_main','weather_description')

In [60]:
#Checking schema
Stage_Intermediate_copy.printSchema()

root
 |-- TRANS_ID: integer (nullable = true)
 |-- ATM_ID: integer (nullable = true)
 |-- LOCATION_ID: integer (nullable = true)
 |-- DATE_ID: integer (nullable = true)
 |-- CARD_TYPE_ID: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)



In [61]:
#Checking whether trans_id is assigned
Stage_Intermediate_copy.select('TRANS_ID').show(7)

+--------+
|TRANS_ID|
+--------+
|       1|
|       2|
|       3|
|       4|
|       5|
|       6|
|       7|
+--------+
only showing top 7 rows



In [62]:
#Renaming columns.
FACT_ATM_TRANS=Stage_Intermediate_copy.withColumnRenamed("LOCATION_ID","WEATHER_LOC_ID") \
.withColumnRenamed("atm_status","ATM_STATUS") \
.withColumnRenamed("message_code","MESSAGE_CODE") \
.withColumnRenamed("currency","CURRENCY") \
.withColumnRenamed("service","SERVICE") \
.withColumnRenamed("transaction_amount","TRANSACTION_AMOUNT") \
.withColumnRenamed("message_code","MESSAGE_CODE") \
.withColumnRenamed("message_text","MESSAGE_TEXT") \
.withColumnRenamed("rain_3h","RAIN_3H") \
.withColumnRenamed("clouds_all","CLOUDS_ALL") \
.withColumnRenamed("weather_id","WEATHER_ID") \
.withColumnRenamed("weather_main","WEATHER_MAIN") \
.withColumnRenamed("weather_description","WEATHER_DESCRIPTION") 

In [63]:
#Checking schema
FACT_ATM_TRANS.printSchema()

root
 |-- TRANS_ID: integer (nullable = true)
 |-- ATM_ID: integer (nullable = true)
 |-- WEATHER_LOC_ID: integer (nullable = true)
 |-- DATE_ID: integer (nullable = true)
 |-- CARD_TYPE_ID: integer (nullable = true)
 |-- ATM_STATUS: string (nullable = true)
 |-- CURRENCY: string (nullable = true)
 |-- SERVICE: string (nullable = true)
 |-- TRANSACTION_AMOUNT: integer (nullable = true)
 |-- MESSAGE_CODE: string (nullable = true)
 |-- MESSAGE_TEXT: string (nullable = true)
 |-- RAIN_3H: double (nullable = true)
 |-- CLOUDS_ALL: integer (nullable = true)
 |-- WEATHER_ID: integer (nullable = true)
 |-- WEATHER_MAIN: string (nullable = true)
 |-- WEATHER_DESCRIPTION: string (nullable = true)



In [82]:
#Showing fact_atm_trans table
FACT_ATM_TRANS.show(5)

+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|TRANS_ID|ATM_ID|WEATHER_LOC_ID|DATE_ID|CARD_TYPE_ID|ATM_STATUS|CURRENCY|   SERVICE|TRANSACTION_AMOUNT|MESSAGE_CODE|MESSAGE_TEXT|RAIN_3H|CLOUDS_ALL|WEATHER_ID|WEATHER_MAIN|WEATHER_DESCRIPTION|
+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|       1|  5084|          1008|   1001|        3001|  Inactive|     DKK|Withdrawal|              8543|        null|        null|    0.0|         8|       800|       Clear|       sky is clear|
|       2|  5084|          1008|   1001|        3011|  Inactive|     DKK|Withdrawal|              1443|        null|        null|    0.0|         8|       800|       Clear|       sky is clear|
|       3|  5111|          1020|   

In [83]:
#Checking count of the columns
FACT_ATM_TRANS.count()

2468571

##### Writing data from Dimension_ATM to s3

In [85]:
DIM_ATM.write.csv("s3a://swathimss3bucket/ETL_Project/ATM_DIMENSION/ATM_DIMENSION.csv",mode="overwrite",header=True)

##### Writing data from Dimension_CARD_TYPE to s3

In [86]:
DIM_CARD_TYPE.write.csv("s3a://swathimss3bucket/ETL_Project/CARD_TYPE_DIMENSION/CARD_TYPE_DIMENSION.csv",mode="overwrite",header=True)

##### Writing data from Dimension_DATE to s3

In [87]:
DIM_DATE.write.csv("s3a://swathimss3bucket/ETL_Project/DATETIME_DIMENSION/DATETIME_DIMENSION",mode="overwrite",header=True)


##### Writing data from Dimension_Location to s3

In [88]:
DIM_LOCATION.write.csv("s3a://swathimss3bucket/ETL_Project/LOCATION_DIMENSION/LOCATION_DIMENSION.csv",mode="overwrite",header=True)


##### Writing data from FACT_ATM_TRANS to s3

In [89]:

FACT_ATM_TRANS.write.options(quotes=',').csv("s3a://swathimss3bucket/ETL_Project/FACT_ATM_TRANS/FACT_ATM_TRANS",mode="overwrite",header=True)

In [90]:
#FACT_ATM_TRANS.write.csv("s3a://swathimss3bucket/ETL_Project/TRANSACTION_FACT/TRANSACTION_FACT.csv",mode="overwrite",header=True)