## ETL Pre-assignment
 - After succefully loading data from Amazon RDS to Hadoop using Sqoop
 - Tranforming data into dimension and fact tables
 - And, store it into Amazon S3 bucket using pyspark

In [1]:
# Importing necessary packages and declare variables
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
# Importing pyspark libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType
from pyspark.sql.functions import *
from pyspark.sql.window import Window


In [3]:
# Creating Spark session
spark = SparkSession.builder.appName('demo').master("local").enableHiveSupport().getOrCreate()
spark

In [4]:
# Reading data from hadoop
df = spark.read.load("/user/root/ETL/SRC_ATM_TRANS", format="csv", sep=",", inferSchema="true")

In [5]:
# Checking column size
len(df.columns)

34

In [6]:
# Printing schema
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: double (nullable = true)
 |-- _c13: double (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: integer (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: integer (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: double (nullable = true)
 |-- _c21: double (nullable = true)
 |-- _c22: integer (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: double (nullable = true)
 |-- _c25: integer (nullable = true)
 |-- _c26: integer (nullable = true)
 |-- _c27: integer (nu

In [7]:
df.show(5)

+----+-------+---+------+---+--------+---+---+----------+-------------------+----+----+------+------+----+----------+----+----------+----+----+------+------+-------+--------------+------+----+----+----+----+-----+----+----+-------+--------------------+
| _c0|    _c1|_c2|   _c3|_c4|     _c5|_c6|_c7|       _c8|                _c9|_c10|_c11|  _c12|  _c13|_c14|      _c15|_c16|      _c17|_c18|_c19|  _c20|  _c21|   _c22|          _c23|  _c24|_c25|_c26|_c27|_c28| _c29|_c30|_c31|   _c32|                _c33|
+----+-------+---+------+---+--------+---+---+----------+-------------------+----+----+------+------+----+----------+----+----------+----+----+------+------+-------+--------------+------+----+----+----+----+-----+----+----+-------+--------------------+
|2017|January|  1|Sunday|  0|  Active|  1|NCR|NÃƒÂ¦stved|        Farimagsvej|   8|4700|55.233|11.763| DKK|MasterCard|5643|Withdrawal|null|null| 55.23|11.761|2616038|      Naestved|281.15|1014|  87|   7| 260|0.215|  92| 500|   Rain|          

In [8]:
# Creating fileSchema for dataframe
fileSchema = StructType([StructField('year', IntegerType(),True),
                        StructField('month', StringType(),True),
                        StructField('day', IntegerType(),True),
                        StructField('weekday', StringType(),True),
                        StructField('hour', IntegerType(),True),
                        StructField('atm_status', StringType(),True),
                        StructField('atm_number', IntegerType(),True),
                        StructField('atm_manufacturer', StringType(),True),
                        StructField('atm_location', StringType(),True),
                        StructField('atm_streetname', StringType(),True),
                        StructField('atm_street_number', IntegerType(),True),
                        StructField('atm_zipcode', IntegerType(),True),
                        StructField('atm_lat', DoubleType(),True),
                        StructField('atm_lon', DoubleType(),True),
                        StructField('currency', StringType(),True),
                        StructField('card_type', StringType(),True),
                        StructField('transaction_amount', IntegerType(),True), 
                        StructField('service', StringType(),True),
                        StructField('message_code', StringType(),True),
                        StructField('message_text', StringType(),True),
                        StructField('weather_lat', DoubleType(),True),
                        StructField('weather_lon', DoubleType(),True),
                        StructField('weather_city_id', IntegerType(),True),
                        StructField('weather_city_name', StringType(),True), 
                        StructField('temp', DoubleType(),True),
                        StructField('pressure', IntegerType(),True),
                        StructField('humidity', IntegerType(),True),
                        StructField('wind_speed', IntegerType(),True),
                        StructField('wind_deg', IntegerType(),True),
                        StructField('rain_3h', DoubleType(),True),
                        StructField('clouds_all', IntegerType(),True),
                        StructField('weather_id', IntegerType(),True), 
                        StructField('weather_main', StringType(),True),
                        StructField('weather_description', StringType(),True)
                        ])

In [9]:
# Reading data from hadoop with defined schema
df = spark.read.load("/user/root/ETL/SRC_ATM_TRANS", format="csv", sep=",", schema = fileSchema)

In [10]:
# Printing Schema
df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_number: integer (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_nam

In [11]:
df.show(2)

+----+-------+---+-------+----+----------+----------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+
|year|  month|day|weekday|hour|atm_status|atm_number|atm_manufacturer|atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|
+----+-------+---+-------+----+----------+----------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+------

In [12]:
# Creating temperary table
df.registerTempTable("ATM_DATA")

# Checking count of dataframe
spark.sql("select count(*) from ATM_DATA").show()

+--------+
|count(1)|
+--------+
| 2468572|
+--------+



In [13]:
# checking location data in dataframe
spark.sql("select atm_location, atm_streetname, atm_street_number, atm_zipcode, atm_lat, atm_lon from ATM_DATA").show(10)

+------------+-------------------+-----------------+-----------+-------+-------+
|atm_location|     atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|
+------------+-------------------+-----------------+-----------+-------+-------+
|  NÃƒÂ¦stved|        Farimagsvej|                8|       4700| 55.233| 11.763|
|    Vejgaard|         Hadsundvej|               20|       9000| 57.043|   9.95|
|    Vejgaard|         Hadsundvej|               20|       9000| 57.043|   9.95|
|       Ikast|RÃƒÂ¥dhusstrÃƒÂ¦det|               12|       7430| 56.139|  9.154|
|  Svogerslev|       BrÃƒÂ¸nsager|                1|       4000| 55.634| 12.018|
|        Nibe|             Torvet|                1|       9240| 56.983|  9.639|
|  Fredericia|   SjÃƒÂ¦llandsgade|               33|       7000| 55.564|  9.757|
|   Hjallerup|  Hjallerup Centret|               18|       9320| 57.168| 10.148|
| GlyngÃƒÂ¸re|        FÃƒÂ¦rgevej|                1|       7870| 56.762|  8.867|
|     Hadsund|          Stor

#### Table - Dim Location

In [14]:
# Creating dataframe for dim_location
df_dim_location = df.select('atm_location', 
                            'atm_streetname', 
                            'atm_street_number', 
                            'atm_zipcode', 
                            'atm_lat', 
                            'atm_lon').distinct().withColumn("new_c",lit("ABC"))
df_dim_location = df_dim_location.withColumn("location_id", row_number().over(Window().partitionBy('new_c')\
                                                                         .orderBy(lit('A')))).drop("new_c")
df_dim_location = df_dim_location.select('location_id',
                                         'atm_location',
                                         'atm_streetname',
                                         'atm_street_number',
                                         'atm_zipcode',
                                         'atm_lat',
                                         'atm_lon')
df_dim_location.show()

+-----------+--------------------+----------------+-----------------+-----------+-------+-------+
|location_id|        atm_location|  atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|
+-----------+--------------------+----------------+-----------------+-----------+-------+-------+
|          1|               Vadum|  Ellehammersvej|               43|       9430| 57.118|  9.861|
|          2|            Slagelse| Mariendals Alle|               29|       4200| 55.398| 11.342|
|          3|          Fredericia|SjÃƒÂ¦llandsgade|               33|       7000| 55.564|  9.757|
|          4|             Kolding|        Vejlevej|              135|       6000| 55.505|  9.457|
|          5|   HÃƒÂ¸rning Hallen|        Toftevej|               53|       8362| 56.091| 10.033|
|          6|                Aars| Himmerlandsgade|               70|       9600| 56.803|  9.518|
|          7|     Aarhus Lufthavn| Ny Lufthavnsvej|               24|       8560| 56.308| 10.627|
|          8|       

In [15]:
# Checking count of atm location dimension table
df_dim_location.count()

109

In [16]:
# Merging location dim to original df
df = df.join(df_dim_location, on=['atm_location',
                                  'atm_streetname',
                                  'atm_street_number',
                                  'atm_zipcode',
                                  'atm_lat',
                                  'atm_lon'], how='left')

In [17]:
# Checking the count of distinct location
df.select('location_id').distinct().count()

109

In [18]:
# Verifying total row count
df.count()

2468572

#### Table - Dim Card Type

In [19]:
# Creating dataframe for dim_card_type
df_dim_card_type = df.select('card_type').distinct().withColumn("new_c",lit("ABC"))
df_dim_card_type = df_dim_card_type.withColumn('card_type_id', row_number().over(Window().partitionBy('new_c')\
                                                                         .orderBy(lit('A')))).drop("new_c")
df_dim_card_type = df_dim_card_type.select('card_type_id',
                                           'card_type')
df_dim_card_type.show()

+------------+--------------------+
|card_type_id|           card_type|
+------------+--------------------+
|           1|     Dankort - on-us|
|           2|              CIRRUS|
|           3|         HÃƒÂ¦vekort|
|           4|                VISA|
|           5|  Mastercard - on-us|
|           6|             Maestro|
|           7|Visa Dankort - on-us|
|           8|        Visa Dankort|
|           9|            VisaPlus|
|          10|          MasterCard|
|          11|             Dankort|
|          12| HÃƒÂ¦vekort - on-us|
+------------+--------------------+



In [20]:
# Merging card type dim to original df
df = df.join(df_dim_card_type, on=['card_type'], how='left')

In [21]:
# Checking the count of distinct card type
df.select('card_type_id').distinct().count()

12

In [22]:
# Verifying total row count
df.count()

2468572

#### Table - Dim ATM

In [23]:
# Creating dataframe for dim_atm
df_dim_atm = df.select('atm_number',
                       'atm_manufacturer',
                       'atm_lat',
                       'atm_lon').join(df_dim_location, on=['atm_lat',
                                                            'atm_lon'], how='left').distinct().withColumn("new_c",lit("ABC"))
df_dim_atm = df_dim_atm.withColumn('atm_id', row_number().over(Window().partitionBy('new_c').orderBy(lit('A')))).drop("new_c")
df_dim_atm = df_dim_atm.select('atm_id',
                               'atm_number',
                               'atm_manufacturer',
                               'location_id')
df_dim_atm.show(10)

+------+----------+----------------+-----------+
|atm_id|atm_number|atm_manufacturer|location_id|
+------+----------+----------------+-----------+
|     1|       109| Diebold Nixdorf|         35|
|     2|         3|             NCR|         47|
|     3|        82|             NCR|         79|
|     4|        95|             NCR|         10|
|     5|        95|             NCR|        106|
|     6|        85| Diebold Nixdorf|         10|
|     7|        85| Diebold Nixdorf|        106|
|     8|        63|             NCR|         78|
|     9|        41| Diebold Nixdorf|         40|
|    10|        41| Diebold Nixdorf|        105|
+------+----------+----------------+-----------+
only showing top 10 rows



In [24]:
# Verifying count of dim_atm
df_dim_atm.count()

156

In [25]:
# To resolve "resolve(d) attribute(s)" error
df_dim_atm = df_dim_atm.select([col(column_name).alias(column_name) for column_name in df_dim_atm.columns])

In [26]:
# Merging atm dim to original df
df = df.join(df_dim_atm, on=['atm_number', 'atm_manufacturer', 'location_id'], how='left')

In [27]:
# Verifying total row count
df.count()

2468572

#### Table - Dim Date

In [28]:
# Creating dataframe for dim_date
df_dim_date = df.select('year', 'month', 'day', 'hour', 'weekday')
cols = ['year', 'month', 'day', 'hour']
df_dim_date = df_dim_date.withColumn("full_date_time",to_timestamp(concat_ws("-",*cols),"yyyy-MMM-dd-HH").cast("timestamp"))
df_dim_date = df_dim_date.distinct().withColumn("new_c",lit("ABC"))

df_dim_date = df_dim_date.withColumn('date_id', row_number().over(Window().partitionBy('new_c')\
                                                                  .orderBy(lit('A')))).drop("new_c")
df_dim_date = df_dim_date.select('date_id',
                                 'full_date_time',
                                 'year',
                                 'month',
                                 'day',
                                 'hour',
                                 'weekday')
df_dim_date.show(10)

+-------+-------------------+----+--------+---+----+---------+
|date_id|     full_date_time|year|   month|day|hour|  weekday|
+-------+-------------------+----+--------+---+----+---------+
|      1|2017-02-27 13:00:00|2017|February| 27|  13|   Monday|
|      2|2017-03-22 14:00:00|2017|   March| 22|  14|Wednesday|
|      3|2017-04-23 19:00:00|2017|   April| 23|  19|   Sunday|
|      4|2017-05-04 18:00:00|2017|     May|  4|  18| Thursday|
|      5|2017-08-11 14:00:00|2017|  August| 11|  14|   Friday|
|      6|2017-10-07 11:00:00|2017| October|  7|  11| Saturday|
|      7|2017-11-14 12:00:00|2017|November| 14|  12|  Tuesday|
|      8|2017-04-04 09:00:00|2017|   April|  4|   9|  Tuesday|
|      9|2017-06-14 10:00:00|2017|    June| 14|  10|Wednesday|
|     10|2017-03-24 08:00:00|2017|   March| 24|   8|   Friday|
+-------+-------------------+----+--------+---+----+---------+
only showing top 10 rows



In [29]:
# Verifying count of dim_date
df_dim_date.count()

8685

In [30]:
# Merging date dim to original df
df = df.join(df_dim_date, on=['year', 'month', 'day', 'hour', 'weekday'], how='left')

In [31]:
# Verifying total row count
df.count()

2468572

#### Table - Fact ATM Trans

In [32]:
# Creating dataframe for fact ATM trans table
df = df.withColumn("new_c",lit("ABC"))
df = df.withColumn('trans_id', row_number().over(Window().partitionBy('new_c')\
                                                                  .orderBy(lit('A')))).drop("new_c")
df_fact_atm_trans = df.select('trans_id','atm_id','location_id','date_id','card_type_id','atm_status','currency',
                              'service','transaction_amount','message_code','message_text','rain_3h','clouds_all',
                              'weather_id','weather_main','weather_description')

In [33]:
# Renaming column from location id to weather_loc_id
df_fact_atm_trans = df_fact_atm_trans.withColumnRenamed('location_id','weather_loc_id')

In [34]:
# Verifying count of fact_atm_trans
df_fact_atm_trans.count()

2468572

In [35]:
df_fact_atm_trans.show(5)

+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|trans_id|atm_id|weather_loc_id|date_id|card_type_id|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main|weather_description|
+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|       1|    13|           108|     59|           8|    Active|     DKK|Withdrawal|              3999|        null|        null|    0.0|         0|       800|       Clear|       Sky is Clear|
|       2|   116|            88|   3922|           1|    Active|     DKK|Withdrawal|              9164|        null|        null|    0.0|        40|       802|      Clouds|   scattered clouds|
|       3|    13|           108|   

#### Loading data to S3

 - DIM_LOCATION
 - DIM_ATM
 - DIM_DATE
 - DIM_CARD_TYPE
 - FACT_ATM_TRANS

In [36]:
# dim_location to s3
df_dim_location.write.option('header','true').csv('s3a://etl-proj/csv/dim_location.csv', mode='overwrite')

In [37]:
# dim_atm to s3
df_dim_atm.write.option('header','true').csv('s3a://etl-proj/csv/dim_atm.csv', mode='overwrite')

In [41]:
# dim_date to s3
df_dim_date.write.option('header','true').option('timestampFormat', 'yyyy-MM-dd HH:mm:ss').csv('s3a://etl-proj/csv/dim_date.csv',mode='overwrite')

In [39]:
# dim_card_type to s3
df_dim_card_type.write.option('header','true').csv('s3a://etl-proj/csv/dim_card_type.csv',mode='overwrite')

In [40]:
# fact_atm_trans to s3
df_fact_atm_trans.write.option('header','true').csv('s3a://etl-proj/csv/fact_atm_trans.csv',mode='overwrite')