In [24]:
### USING PYSPARK TO TRANSFORM DATA FETCHED VIA SQOOP AS PER TARGET SCHEMA REQUIREMENT
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre"
os.environ["SPARK_HOME"]="/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType , TimestampType

In [25]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("jupyter_Spark").setMaster("yarn-client")
sc = SparkContext.getOrCreate(conf=conf)
sc

In [26]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('etl').master("local").getOrCreate()
spark

In [27]:
### Create StructType according to target Schema
SourceSchema = StructType([StructField('year',IntegerType(),True),
                        StructField('month',StringType(),True),
                        StructField('day',IntegerType(),True),
                        StructField('weekday',StringType(),True),
                        StructField('hour',IntegerType(),True),
                        StructField('atm_status',StringType(),True),
                        StructField('atm_id',StringType(),True),
                        StructField('atm_manufacturer',StringType(),True),
                        StructField('atm_location',StringType(),True),
                        StructField('atm_streetname',StringType(),True),
                        StructField('atm_street_number',IntegerType(),True),
                        StructField('atm_zipcode',IntegerType(),True),
                        StructField('atm_lat',DoubleType(),True),
                        StructField('atm_lon',DoubleType(),True),
                        StructField('currency',StringType(),True),
                        StructField('card_type',StringType(),True),
                        StructField('transaction_amount',IntegerType(),True), 
                        StructField('service',StringType(),True),
                        StructField('message_code',StringType(),True),
                        StructField('message_text',StringType(),True),
                        StructField('weather_lat',DoubleType(),True),
                        StructField('weather_lon',DoubleType(),True),
                        StructField('weather_city_id',IntegerType(),True),
                        StructField('weather_city_name',StringType(),True), 
                        StructField('temp',DoubleType(),True),
                        StructField('pressure',IntegerType(),True),
                        StructField('humidity',IntegerType(),True),
                        StructField('wind_speed',IntegerType(),True),
                        StructField('wind_deg',IntegerType(),True),
                        StructField('rain_3h',DoubleType(),True),
                        StructField('clouds_all',IntegerType(),True),
                        StructField('weather_id',IntegerType(),True),
                        StructField('weather_main',StringType(),True),
                        StructField('weather_description',StringType(),True)])                               

In [28]:
#df=spark.read("source/part-m-00000",schema=SourceSchema).option("quote",'').format("csv")
df=spark.read.format("csv").option("quote", "").option("ignoreLeadingWhiteSpace","true").load("source/part-m-00000",schema=SourceSchema)

###Note :  Fixing the issue with " on the field message_text while loading the data to df using option("quote", "")

In [29]:
df.show(4)

+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|     atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-

In [30]:
### Validating Counts
df.count()

2468572

In [31]:
DIM_DATE_raw=df.select("year","month","day","hour","weekday").distinct()
DIM_DATE_raw.count()

8685

In [32]:
DIM_CARD_TYPE_raw=df.select("card_type").distinct()
DIM_CARD_TYPE_raw.count()

12

In [33]:
DIM_ATM_raw=df.select("atm_id","atm_manufacturer","atm_location","atm_streetname","atm_lat","atm_lon").distinct()
DIM_ATM_raw.count()

113

In [34]:
DIM_LOCATION_raw=df.select("atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon").distinct()
DIM_LOCATION_raw.count()

109

In [35]:
FACT_ATM_TRANS_raw=df.select("year","month","day","hour","card_type","atm_id","atm_manufacturer","atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon","atm_status","currency","service","transaction_amount","message_code","message_text","rain_3h","clouds_all","weather_id","weather_main","weather_description")
FACT_ATM_TRANS_raw.count()

2468572

In [36]:
### RENAME atm_id to atm_number. atm_id will be generated later.
DIM_ATM_renamed = DIM_ATM_raw.withColumnRenamed("atm_id","atm_number")
FACT_ATM_TRANS_raw=FACT_ATM_TRANS_raw.withColumnRenamed("atm_id","atm_number")

In [37]:
### generate *_id's 
from pyspark.sql.functions import desc, row_number, monotonically_increasing_id
from pyspark.sql.window import Window
DIM_DATE = DIM_DATE_raw.withColumn('date_id', row_number().over(Window.orderBy(monotonically_increasing_id())))
DIM_CARD_TYPE = DIM_CARD_TYPE_raw.withColumn('card_id', row_number().over(Window.orderBy(monotonically_increasing_id())))
DIM_ATM = DIM_ATM_renamed.withColumn('atm_id', row_number().over(Window.orderBy(monotonically_increasing_id())))
DIM_LOCATION = DIM_LOCATION_raw.withColumn('location_id', row_number().over(Window.orderBy(monotonically_increasing_id())))
FACT_ATM_TRANS = FACT_ATM_TRANS_raw.withColumn('trans_id', row_number().over(Window.orderBy(monotonically_increasing_id())))

In [38]:
DIM_DATE.show(3)

+----+-----+---+----+--------+-------+
|year|month|day|hour| weekday|date_id|
+----+-----+---+----+--------+-------+
|2017| July| 18|   9| Tuesday|      1|
|2017| July| 18|  22| Tuesday|      2|
|2017| July| 20|   0|Thursday|      3|
+----+-----+---+----+--------+-------+
only showing top 3 rows



In [39]:
DIM_CARD_TYPE.show(3)

+---------------+-------+
|      card_type|card_id|
+---------------+-------+
|Dankort - on-us|      1|
|         CIRRUS|      2|
|    HÃƒÂ¦vekort|      3|
+---------------+-------+
only showing top 3 rows



In [40]:
DIM_ATM.show(3)

+----------+----------------+------------+----------------+-------+-------+------+
|atm_number|atm_manufacturer|atm_location|  atm_streetname|atm_lat|atm_lon|atm_id|
+----------+----------------+------------+----------------+-------+-------+------+
|        57|             NCR| HillerÃƒÂ¸d|KÃƒÂ¸benhavnsvej| 55.933| 12.314|     1|
|        17|             NCR|     Randers|    ÃƒËœstervold| 56.462| 10.038|     2|
|        23| Diebold Nixdorf|     Vodskov|      Vodskovvej| 57.104| 10.027|     3|
+----------+----------------+------------+----------------+-------+-------+------+
only showing top 3 rows



In [41]:
DIM_LOCATION.show(3)

+------------+----------------+-----------------+-----------+-------+-------+-----------+
|atm_location|  atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|location_id|
+------------+----------------+-----------------+-----------+-------+-------+-----------+
|       Vadum|  Ellehammersvej|               43|       9430| 57.118|  9.861|          1|
|    Slagelse| Mariendals Alle|               29|       4200| 55.398| 11.342|          2|
|  Fredericia|SjÃƒÂ¦llandsgade|               33|       7000| 55.564|  9.757|          3|
+------------+----------------+-----------------+-----------+-------+-------+-----------+
only showing top 3 rows



In [42]:
FACT_ATM_TRANS.show(3)

+----+-------+---+----+----------+----------+----------------+------------+--------------+-----------------+-----------+-------+-------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+--------+
|year|  month|day|hour| card_type|atm_number|atm_manufacturer|atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main|weather_description|trans_id|
+----+-------+---+----+----------+----------+----------------+------------+--------------+-----------------+-----------+-------+-------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+--------+
|2017|January|  1|   0|MasterCard|         1|             NCR|  NÃƒÂ¦stved|   Farimagsvej|                8|       4700| 55.233| 11.763|    Activ

In [43]:
### Create TimeStamp in Date Dimention Table from year,month,date and hour
from pyspark.sql.functions import *
### Convert month into numeric format
DIM_DATE=DIM_DATE.withColumn("month",from_unixtime(unix_timestamp(col("month"),'MMM'),'MM'))
from pyspark.sql import functions as func
cols=["year","month","day"]
DIM_DATE=DIM_DATE.withColumn("date",concat_ws("-",*cols).cast("date"))
### Create full_date_time from date and hour 
DIM_DATE=DIM_DATE.withColumn("full_date_time",func.date_format(func.to_timestamp(func.concat("date","hour"),"yyyy-MM-ddHH"),"yyyy-MM-dd hh:ss:SSa"))
#Dropped extra field 
DIM_DATE=DIM_DATE.drop("date")
###revert the month from numeric back to string
DIM_DATE=DIM_DATE.withColumn("month",from_unixtime(unix_timestamp(col("month"),'MM'),'MMM'))
DIM_DATE=DIM_DATE.withColumn('full_date_time',func.unix_timestamp('full_date_time','yyyy-MM-dd HH:mm:ss').cast(TimestampType()))
DIM_DATE.show(3)

+----+-----+---+----+-------+-------+-------------------+
|year|month|day|hour|weekday|date_id|     full_date_time|
+----+-----+---+----+-------+-------+-------------------+
|2017|  Jan|  1|   9| Sunday|      1|2017-01-01 09:00:00|
|2017|  Jan|  3|   5|Tuesday|      2|2017-01-03 05:00:00|
|2017|  Jan|  8|  19| Sunday|      3|2017-01-08 07:00:00|
+----+-----+---+----+-------+-------+-------------------+
only showing top 3 rows



In [44]:
### Perform the same transformation on FACT_ATM_TRANS as in DIM_DATE for the upcoming join function
### Needed because in DIM_DATE the transformation is Jaunary ---> 01 ---> Jan
FACT_ATM_TRANS=FACT_ATM_TRANS.withColumn("month",from_unixtime(unix_timestamp(col("month"),'MMM'),'MM'))
FACT_ATM_TRANS=FACT_ATM_TRANS.withColumn("month",from_unixtime(unix_timestamp(col("month"),'MM'),'MMM'))
FACT_ATM_TRANS.show(3)

+----+-----+---+----+----------+----------+----------------+------------+--------------+-----------------+-----------+-------+-------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+--------+
|year|month|day|hour| card_type|atm_number|atm_manufacturer|atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main|weather_description|trans_id|
+----+-----+---+----+----------+----------+----------------+------------+--------------+-----------------+-----------+-------+-------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+--------+
|2017|  Jan|  1|   0|MasterCard|         1|             NCR|  NÃƒÂ¦stved|   Farimagsvej|                8|       4700| 55.233| 11.763|    Active|     D

In [45]:
### REARRANGING COLUMNS [ Dimensions Table]
DIM_DATE = DIM_DATE.select("date_id","full_date_time","year","month","day","hour","weekday")
DIM_CARD_TYPE = DIM_CARD_TYPE.select("card_id","card_type")
DIM_ATM = DIM_ATM.select("atm_id","atm_number","atm_manufacturer","atm_location","atm_streetname","atm_lat","atm_lon")
DIM_LOCATION = DIM_LOCATION.select("location_id","atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon")

In [46]:
### Perform join to import *_id's to FACT_ATM_TRANS
FACT_ATM_TRANS=FACT_ATM_TRANS.join(DIM_DATE,on=['year','month','day','hour'],how="left")
FACT_ATM_TRANS.show(3)
FACT_ATM_TRANS=FACT_ATM_TRANS.join(DIM_CARD_TYPE,on=['card_type'],how="left")
FACT_ATM_TRANS=FACT_ATM_TRANS.join(DIM_ATM,on=['atm_number','atm_manufacturer'],how="left")
FACT_ATM_TRANS=FACT_ATM_TRANS.join(DIM_LOCATION,on=['atm_location','atm_streetname','atm_lat','atm_lon'],how="left")

+----+-----+---+----+------------------+----------+----------------+------------+--------------+-----------------+-----------+-------+-------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+--------+-------+-------------------+--------+
|year|month|day|hour|         card_type|atm_number|atm_manufacturer|atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main| weather_description|trans_id|date_id|     full_date_time| weekday|
+----+-----+---+----+------------------+----------+----------------+------------+--------------+-----------------+-----------+-------+-------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+--------+-------+-------------------+--------+
|2017|  Apr| 

In [47]:
FACT_ATM_TRANS.drop("year","month","day","hour","card_type","atm_number","atm_manufacturer","atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon")
###Rearranging Fact table and rename columns per target schema
FACT_ATM_TRANS = FACT_ATM_TRANS.select("trans_id","atm_id","location_id","date_id","card_id","atm_status","currency","service","transaction_amount","message_code","message_text","rain_3h","clouds_all","weather_id","weather_main","weather_description")
FACT_ATM_TRANS=FACT_ATM_TRANS.withColumnRenamed("location_id","weather_loc_id")
FACT_ATM_TRANS=FACT_ATM_TRANS.withColumnRenamed("card_id","card_type_id")

In [48]:
### Perform join to import location_id to DIM_ATM
DIM_ATM=DIM_ATM.join(DIM_LOCATION,on=['atm_location','atm_streetname','atm_lat','atm_lon'],how="left")
DIM_ATM=DIM_ATM.select("atm_id","atm_number","atm_manufacturer","location_id")
DIM_ATM=DIM_ATM.withColumnRenamed("location_id","atm_location_id")

In [49]:
### Rename according to target schema
DIM_LOCATION=DIM_LOCATION.withColumnRenamed("atm_location","location")
DIM_LOCATION=DIM_LOCATION.withColumnRenamed("atm_streetname","streetname")
DIM_LOCATION=DIM_LOCATION.withColumnRenamed("atm_zipcode","zipcode")
DIM_LOCATION=DIM_LOCATION.withColumnRenamed("atm_lat","lat")
DIM_LOCATION=DIM_LOCATION.withColumnRenamed("atm_lon","lon")

In [50]:
### Verify the columns - per target schema
DIM_DATE.printSchema()
DIM_ATM.printSchema()
DIM_LOCATION.printSchema()
DIM_CARD_TYPE.printSchema()
FACT_ATM_TRANS.printSchema()

root
 |-- date_id: integer (nullable = true)
 |-- full_date_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)

root
 |-- atm_id: integer (nullable = true)
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location_id: integer (nullable = true)

root
 |-- location_id: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)

root
 |-- card_id: integer (nullable = true)
 |-- card_type: string (nullable = true)

root
 |-- trans_id: integer (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- weather_loc_id: integer (nullable = true)
 |-- date_id: integer (nu

In [51]:
### validating final counts 
DIM_DATE.count()

8685

In [52]:
DIM_ATM.count()

113

In [53]:
DIM_LOCATION.count()

109

In [54]:
DIM_CARD_TYPE.count()

12

In [55]:
FACT_ATM_TRANS.count()

2468572

In [56]:
### Write to HDFS
#DIM_DATE.write.csv('dim_date')
#DIM_ATM.write.csv('dim_atm')
#DIM_LOCATION.write.csv('dim_location')
#DIM_CARD_TYPE.write.csv('dim_card_type')
#FACT_ATM_TRANS.write.csv('fact_atm_trans')


### Write to S3 , access key and secret key was configured in spark-defaults.conf
DIM_DATE.write.csv('s3a://satheeshgopalan/dim_date/')
DIM_ATM.write.csv('s3a://satheeshgopalan/dim_atm/')
DIM_LOCATION.write.csv('s3a://satheeshgopalan/dim_location/')
DIM_CARD_TYPE.write.csv('s3a://satheeshgopalan/dim_card_type/')
FACT_ATM_TRANS.write.csv('s3a://satheeshgopalan/fact_atm_trans/')