In [2]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [3]:
# Import required libraries

from pyspark.sql import functions as sf
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pairRDD_demo').master("local").getOrCreate()
spark


In [4]:
# Load the data from HDFS into dataframe

df = spark.read.load("new_etldata_data/part-m-00000", format="csv", sep=",", inferSchema="true")


In [5]:
# Check count of data after importing data from HDFS to dataframe

df.count()

2468572

In [6]:
# Printing the schema 

df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: double (nullable = true)
 |-- _c13: double (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: integer (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: integer (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: double (nullable = true)
 |-- _c21: double (nullable = true)
 |-- _c22: integer (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: double (nullable = true)
 |-- _c25: integer (nullable = true)
 |-- _c26: integer (nullable = true)
 |-- _c27: integer (nu

In [7]:
# Check the data

df.show(5)

+----+-------+---+------+---+--------+---+---------------+------------------+--------------------+----+----+------+------+----+--------------------+----+----------+----+----+------+------+-------+--------------+------+----+----+----+----+-----+----+----+-------+--------------------+
| _c0|    _c1|_c2|   _c3|_c4|     _c5|_c6|            _c7|               _c8|                 _c9|_c10|_c11|  _c12|  _c13|_c14|                _c15|_c16|      _c17|_c18|_c19|  _c20|  _c21|   _c22|          _c23|  _c24|_c25|_c26|_c27|_c28| _c29|_c30|_c31|   _c32|                _c33|
+----+-------+---+------+---+--------+---+---------------+------------------+--------------------+----+----+------+------+----+--------------------+----+----------+----+----+------+------+-------+--------------+------+----+----+----+----+-----+----+----+-------+--------------------+
|2017|January|  1|Sunday|  0|  Active|  1|            NCR|        NÃƒÂ¦stved|         Farimagsvej|   8|4700|55.233|11.763| DKK|          MasterCard|

In [11]:
#Specifying the schema instead of inferring it 

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

fileSchema = StructType([StructField('year', IntegerType(),True),
                        StructField('month', StringType(),True),
                        StructField('day', IntegerType(),True),
                        StructField('weekday', StringType(),True),
                        StructField('hour', IntegerType(),True),
                        StructField('atm_status', StringType(),True),
                        StructField('atm_id', StringType(),True),
                        StructField('atm_manufacturer', StringType(),True),
                        StructField('atm_location', StringType(),True),
                        StructField('atm_streetname', StringType(),True),
                        StructField('atm_street_number', IntegerType(),True),
                        StructField('atm_zipcode', IntegerType(),True),
                        StructField('atm_lat', DoubleType(),True),
                        StructField('atm_lon', DoubleType(),True),
                        StructField('currency', StringType(),True),
                        StructField('card_type', StringType(),True),
                        StructField('transaction_amount', IntegerType(),True),
                        StructField('service', StringType(),True),
                        StructField('message_code', StringType(),True),
                        StructField('message_text', StringType(),True),
                        StructField('weather_lat', DoubleType(),True),
                        StructField('weather_lon', DoubleType(),True),
                        StructField('weather_city_id', IntegerType(),True),
                        StructField('weather_city_name', StringType(),True),
                        StructField('temp', DoubleType(),True),
                        StructField('pressure', IntegerType(),True),
                        StructField('humidity', IntegerType(),True),
                        StructField('wind_speed', IntegerType(),True),
                        StructField('wind_deg', IntegerType(),True),
                        StructField('rain_3h', DoubleType(),True),
                        StructField('clouds_all', IntegerType(),True),
                        StructField('weather_id', IntegerType(),True),
                        StructField('weather_main', StringType(),True),
                        StructField('weather_description', StringType(),True)])



In [12]:
# Load the data with required Schema

df_main = spark.read.load("new_etldata_data/part-m-00000", format="csv", sep=",", schema = fileSchema)

In [13]:
# print the schema

df_main.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [14]:
# Show Data

df_main.show(5)

+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|     atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-

In [15]:
# Check the count

df_main.count()

2468572

In [16]:
# Check the No. of Columns

df_main.columns

['year',
 'month',
 'day',
 'weekday',
 'hour',
 'atm_status',
 'atm_id',
 'atm_manufacturer',
 'atm_location',
 'atm_streetname',
 'atm_street_number',
 'atm_zipcode',
 'atm_lat',
 'atm_lon',
 'currency',
 'card_type',
 'transaction_amount',
 'service',
 'message_code',
 'message_text',
 'weather_lat',
 'weather_lon',
 'weather_city_id',
 'weather_city_name',
 'temp',
 'pressure',
 'humidity',
 'wind_speed',
 'wind_deg',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description']

<h3> Creating DIM_LOCATION Dimension </h3>

In [18]:
DIM_LOCATION = df_main.select("atm_location","atm_streetname","atm_street_number","atm_zipcode","atm_lat","atm_lon").distinct()

In [19]:
# Create Unique Key i.e. location_id

DIM_LOCATION = DIM_LOCATION.withColumn('location_id', 
                    sf.concat(sf.col('atm_location'),
                                        sf.lit('_'),
                                        sf.col('atm_streetname'),
                                        sf.lit('_'),
                                        sf.col('atm_street_number')))

In [20]:
# show data

DIM_LOCATION.show(5)

+-----------------+----------------+-----------------+-----------+-------+-------+--------------------+
|     atm_location|  atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|         location_id|
+-----------------+----------------+-----------------+-----------+-------+-------+--------------------+
|            Vadum|  Ellehammersvej|               43|       9430| 57.118|  9.861|Vadum_Ellehammers...|
|         Slagelse| Mariendals Alle|               29|       4200| 55.398| 11.342|Slagelse_Marienda...|
|       Fredericia|SjÃƒÂ¦llandsgade|               33|       7000| 55.564|  9.757|Fredericia_SjÃƒÂ¦...|
|          Kolding|        Vejlevej|              135|       6000| 55.505|  9.457|Kolding_Vejlevej_135|
|HÃƒÂ¸rning Hallen|        Toftevej|               53|       8362| 56.091| 10.033|HÃƒÂ¸rning Hallen...|
+-----------------+----------------+-----------------+-----------+-------+-------+--------------------+
only showing top 5 rows



In [21]:
# Check Count 

DIM_LOCATION.count()

109

<h3>Creating DIM_CARD_TYPE Dimension</h3>

In [23]:
df_main.select("card_type").show(5);

+----------+
| card_type|
+----------+
|MasterCard|
|MasterCard|
|      VISA|
|      VISA|
|MasterCard|
+----------+
only showing top 5 rows



In [24]:
DIM_CARD_TYPE = df_main.select("card_type").distinct();

In [25]:
#Create Unique Key i.e. card_type_id

DIM_CARD_TYPE = DIM_CARD_TYPE.withColumn('card_type_id', 
                    sf.concat(sf.col('card_type'),
                                        sf.lit('_'),
                                        sf.lit('1')))

In [26]:
# show data

DIM_CARD_TYPE.show(5)

+------------------+--------------------+
|         card_type|        card_type_id|
+------------------+--------------------+
|   Dankort - on-us|   Dankort - on-us_1|
|            CIRRUS|            CIRRUS_1|
|       HÃƒÂ¦vekort|       HÃƒÂ¦vekort_1|
|              VISA|              VISA_1|
|Mastercard - on-us|Mastercard - on-us_1|
+------------------+--------------------+
only showing top 5 rows



In [27]:
# check count of data

DIM_CARD_TYPE.count()

12

<h3>Creating DIM_ATM Dimension
</h3>

In [29]:
df_main.select("atm_id","atm_manufacturer").show(5);

+------+----------------+
|atm_id|atm_manufacturer|
+------+----------------+
|     1|             NCR|
|     2|             NCR|
|     2|             NCR|
|     3|             NCR|
|     4|             NCR|
+------+----------------+
only showing top 5 rows



In [30]:
# Fetch required columns

DIM_ATM = df_main.select("atm_id","atm_manufacturer").distinct()

In [31]:
# Create unique foreign key i.e. atm_location_id

DIM_ATM = df_main.withColumn('atm_location_id', 
                    sf.concat(sf.col('atm_location'),
                                        sf.lit('_'),
                                        sf.col('atm_streetname'),
                                        sf.lit('_'),
                                        sf.col('atm_street_number')))

In [32]:

DIM_ATM = DIM_ATM.withColumnRenamed("atm_id", "atm_number")

In [33]:
# Create unique i.e. atm_id

DIM_ATM = DIM_ATM.withColumn('atm_id', 
                    sf.concat(sf.col('atm_number'),
                                        sf.lit('_'),
                                        sf.col('atm_manufacturer')))

In [34]:
# Fetch required columns

DIM_ATM = DIM_ATM.select("atm_id","atm_manufacturer","atm_number","atm_location_id").distinct()

In [35]:
# check schema

DIM_ATM.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_number: string (nullable = true)
 |-- atm_location_id: string (nullable = true)



In [36]:
DIM_ATM.show(5)

+-----------------+----------------+----------+--------------------+
|           atm_id|atm_manufacturer|atm_number|     atm_location_id|
+-----------------+----------------+----------+--------------------+
|7_Diebold Nixdorf| Diebold Nixdorf|         7|Hjallerup_Hjaller...|
|           57_NCR|             NCR|        57|HillerÃƒÂ¸d_KÃƒÂ¸...|
|           76_NCR|             NCR|        76|DAYZ Feriecenter_...|
|           42_NCR|             NCR|        42|Vinderup_SÃƒÂ¸nde...|
|          101_NCR|             NCR|       101|Bryggen  Vejle_SÃ...|
+-----------------+----------------+----------+--------------------+
only showing top 5 rows



In [38]:
# check count

DIM_ATM.count()

113

<h3>Creating DIM_DATE Dimension </h3>

In [40]:
df_main.select("year","month","day","hour","weekday").show(5);

+----+-------+---+----+-------+
|year|  month|day|hour|weekday|
+----+-------+---+----+-------+
|2017|January|  1|   0| Sunday|
|2017|January|  1|   0| Sunday|
|2017|January|  1|   0| Sunday|
|2017|January|  1|   0| Sunday|
|2017|January|  1|   0| Sunday|
+----+-------+---+----+-------+
only showing top 5 rows



In [41]:
# Fetch required columns

DIM_DATE = df_main.select("year","month","day","hour","weekday").distinct()

In [42]:
# create full_date_time field by concatinating required fields

from pyspark.sql import functions as sf

DIM_DATE = DIM_DATE.withColumn('full_date_time', 
                    sf.concat(sf.col('year'), sf.col('month'), sf.col('day'),sf.lit(' '),sf.col('hour')))

In [43]:
# change the data type to time stamp

from pyspark.sql.functions import *
pattern1 = 'yyyyMMMMddHH'

DIM_DATE = DIM_DATE.withColumn('full_date_time', unix_timestamp(DIM_DATE["full_date_time"], pattern1).cast('timestamp'))


In [44]:
# check schema

DIM_DATE.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- full_date_time: timestamp (nullable = true)



In [45]:
# create unique key i.e. date_id

DIM_DATE = DIM_DATE.withColumn('date_id', 
                    sf.concat(sf.col('year'), sf.col('month'), sf.col('day'),sf.lit(' '),sf.col('hour')))

In [46]:
# show data

DIM_DATE.show(5)

+----+-------+---+----+--------+-------------------+----------------+
|year|  month|day|hour| weekday|     full_date_time|         date_id|
+----+-------+---+----+--------+-------------------+----------------+
|2017|January|  1|   9|  Sunday|2017-01-01 09:00:00|  2017January1 9|
|2017|January|  3|   5| Tuesday|2017-01-03 05:00:00|  2017January3 5|
|2017|January|  8|  19|  Sunday|2017-01-08 19:00:00| 2017January8 19|
|2017|January| 21|   3|Saturday|2017-01-21 03:00:00| 2017January21 3|
|2017|January| 23|  21|  Monday|2017-01-23 21:00:00|2017January23 21|
+----+-------+---+----+--------+-------------------+----------------+
only showing top 5 rows



In [47]:
# check count

DIM_DATE.count()

8685

<h3>Creating FACT_ATM_TRANS Dimension</h3>

In [49]:
# Fetch required columns

FACT_ATM_TRANS = df_main.select("atm_status","currency","service","transaction_amount","message_code","message_text","rain_3h","clouds_all","weather_id","weather_main","weather_description"
                               ,"atm_location","atm_streetname","atm_street_number"
                               ,"year","month","day","hour"
                               ,"atm_id","atm_manufacturer"
                               ,"card_type")

In [50]:
# create unique field foreign key i.e. weather_loc_id
    
FACT_ATM_TRANS = FACT_ATM_TRANS.withColumn('weather_loc_id', 
                    sf.concat(sf.col('atm_location'),
                                        sf.lit('_'),
                                        sf.col('atm_streetname'),
                                        sf.lit('_'),
                                        sf.col('atm_street_number')))

In [51]:
# create unique field foreign key i.e. date_id

FACT_ATM_TRANS = FACT_ATM_TRANS.withColumn('date_id', 
                    sf.concat(sf.col('year'), sf.col('month'), sf.col('day'),sf.lit(' '),sf.col('hour')))

In [52]:
# create unique field foreign key i.e. atm_id

FACT_ATM_TRANS = FACT_ATM_TRANS.withColumn('atm_id', 
                    sf.concat(sf.col('atm_id'),
                                        sf.lit('_'),
                                        sf.col('atm_manufacturer')))

In [53]:
# create unique field foreign key i.e. card_type_id

FACT_ATM_TRANS = FACT_ATM_TRANS.withColumn('card_type_id', 
                    sf.concat(sf.col('card_type'),
                                        sf.lit('_'),
                                        sf.lit('1')))

In [54]:
# create unique field i.e. trans_id

FACT_ATM_TRANS = FACT_ATM_TRANS.withColumn('trans_id', 
                    sf.concat(sf.col('weather_loc_id'),
                                        sf.lit('_'),
                                        sf.lit('date_id'),
                                        sf.lit('_'),
                                        sf.lit('atm_id'),
                                        sf.lit('_'),
                                        sf.lit('card_type_id')))

In [55]:
# check data

FACT_ATM_TRANS.show(5)

+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+------------+-------------------+-----------------+----+-------+---+----+------+----------------+----------+--------------------+--------------+------------+--------------------+
|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main| weather_description|atm_location|     atm_streetname|atm_street_number|year|  month|day|hour|atm_id|atm_manufacturer| card_type|      weather_loc_id|       date_id|card_type_id|            trans_id|
+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+------------+-------------------+-----------------+----+-------+---+----+------+----------------+----------+--------------------+--------------+------------+--------------------+
|    Active|     DKK|Withdrawal|        

In [56]:
# select required fields

FACT_ATM_TRANS = FACT_ATM_TRANS.select("trans_id","atm_id","weather_loc_id","date_id","card_type_id","atm_status","currency","service","transaction_amount","message_code","message_text","rain_3h","clouds_all","weather_id","weather_main","weather_description")

In [57]:
# check data

FACT_ATM_TRANS.show(5)

+--------------------+------+--------------------+--------------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|            trans_id|atm_id|      weather_loc_id|       date_id|card_type_id|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+--------------------+------+--------------------+--------------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|NÃƒÂ¦stved_Farima...| 1_NCR|NÃƒÂ¦stved_Farima...|2017January1 0|MasterCard_1|    Active|     DKK|Withdrawal|              5643|        null|        null|  0.215|        92|       500|        Rain|          light rain|
|Vejgaard_Hadsundv...| 2_NCR|Vejgaard_Hadsundv...|2017January1 0|MasterCard_1|  Inactive|     DKK|Withdrawal|              1

In [58]:
# check count

FACT_ATM_TRANS.count()

2468572

<h2>Number of records that are imported after the Sqoop Job</h2>

In [60]:
df.count()

2468572

<h2>Count after importing data into a dataframe</h2>

In [61]:
df_main.count()

2468572

<h2>Count for the Location Dimension</h2>

In [63]:
DIM_LOCATION.count()

109

<h2>Count for the Card Type Dimension</h2>

In [64]:
DIM_CARD_TYPE.count()

12

<h2>Count for the ATM Dimension</h2>

In [65]:
DIM_ATM.count()

113

<h2>Count for the Date Dimension</h2>

In [66]:
DIM_DATE.count()

8685

<h2>count for the all the Stages in the creation of Transaction Fact table</h2>

In [67]:
FACT_ATM_TRANS.count()

2468572

<h1> Loading data into S3 bucket </h1>

In [None]:
DIM_LOCATION.coalesce(1).write.options("header","true").format("csv").save("s3a://sbetlcasestudy/tables/DIM_LOCATION")

In [None]:
DIM_CARD_TYPE.coalesce(1).write.options("header","true").format("csv").save("s3a://sbetlcasestudy/tables/DIM_CARD_TYPE")

In [None]:
DIM_ATM.coalesce(1).write.options("header","true").format("csv").save("s3a://sbetlcasestudy/tables/DIM_ATM")

In [None]:
DIM_DATE.coalesce(1).write.options("header","true").format("csv").save("s3a://sbetlcasestudy/tables/DIM_DATE")

In [None]:
FACT_ATM_TRANS.coalesce(1).write.options("header","true").format("csv").save("s3a://sbetlcasestudy/tables/FACT_ATM_TRANS")