In [1]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre"
os.environ["SPARK_HOME"]="/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("jupyter_Spark").setMaster("yarn-client")
sc = SparkContext(conf=conf)
sc


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('demo').master("local").enableHiveSupport().getOrCreate()
spark

In [4]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

from pyspark.sql.functions import *

from pyspark.sql.window import *

In [5]:
# creating a customized schema to avoid errors. 
#used strutfield funcion to create the col, type, isnull

revisedSchema = StructType([StructField('year', IntegerType(),True),                            
                            StructField('month', StringType(),True),
                            StructField('day', IntegerType(),True),
                            StructField('weekday', StringType(),True),
                            StructField('hour', IntegerType(),True),
                            StructField('atm_status', StringType(),True),
                            StructField('atm_id', StringType(),True),
                            StructField('atm_manufacturer', StringType(),True),
                            StructField('atm_location', StringType(),True),
                            StructField('atm_streetname', StringType(),True),
                            StructField('atm_street_number', IntegerType(),True),
                            StructField('atm_zipcode', IntegerType(),True),
                            StructField('atm_lat', DoubleType(),True),
                            StructField('atm_lon', DoubleType(),True),
                            StructField('currency', StringType(),True),
                            StructField('card_type', StringType(),True),
                            StructField('transaction_amount', IntegerType(),True),
                            StructField('service', StringType(),True),
                            StructField('message_code', StringType(),True),
                            StructField('message_text', StringType(),True),
                            StructField('weather_lat', DoubleType(),True),
                            StructField('weather_lon', DoubleType(),True),
                            StructField('weather_city_id', LongType(),True),
                            StructField('weather_city_name', StringType(),True),
                            StructField('temp', DoubleType(),True),
                            StructField('pressure', IntegerType(),True),
                            StructField('humidity', IntegerType(),True),
                            StructField('wind_speed', IntegerType(),True),
                            StructField('wind_deg', IntegerType(),True),
                            StructField('rain_3h', DoubleType(),True),
                            StructField('clouds_all', IntegerType(),True),
                            StructField('weather_id', IntegerType(),True),
                            StructField('weather_main', StringType(),True),
                            StructField('weather_description', StringType(),True)                            
                           ])

# LOADING DATA FROM HDFS INTO DATAFRAME

In [6]:

# Loading data from the HDFS path into the data frame
#Using custom Schema
df = spark.read.load("/user/root/ETL_Proj_Data/part-m-00000.snappy", format="csv", schema = revisedSchema)

#printing the schema 
df.printSchema()

#Check count after importing data into a dataframe
df.select("*").count()



root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: long (nullable = true)
 |-- weather_city_name: strin

2468572

In [7]:
# Showing Top 5 rows 
df.show(5)

+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|     atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+-------------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-

# CREATING DIMENTION TABLES

# DIM_LOCATION DIMENTION TABLE

In [7]:
# dataframe for DIM_LOCATION dimention table
# Selected only the colums required for DIM_LOCATION table

DIM_LOCATION = df.select(col("atm_location").alias("location"),
                            col("atm_streetname").alias("streetname"),
                            col("atm_street_number").alias("street_number"), 
                            col("atm_zipcode").alias("zipcode"),
                            col("atm_lat").alias("lat"), 
                            col("atm_lon").alias("lon"))


#-----Check-------
#DIM_LOCATION.show(5)


# Cleaning 
# removing duplicates
DIM_LOCATION = DIM_LOCATION.dropDuplicates()


#show unique dataframe. --check
DIM_LOCATION.show(5)


+--------------------+--------------+-------------+-------+------+-----+
|            location|    streetname|street_number|zipcode|   lat|  lon|
+--------------------+--------------+-------------+-------+------+-----+
|NykÃƒÂ¸bing Mors ...|   Kirketorvet|            1|   7900|56.795| 8.86|
|                Nibe|        Torvet|            1|   9240|56.983|9.639|
|           Skipperen|   Vestre Alle|            2|   9000|57.034|9.908|
|              Viborg|     Toldboden|            3|   8800|56.448|9.401|
|               Vadum|Ellehammersvej|           43|   9430|57.118|9.861|
+--------------------+--------------+-------------+-------+------+-----+
only showing top 5 rows



In [8]:
# auto increasing  number 
id_num = Window.orderBy(monotonically_increasing_id())


#adding a column to the table DIM_LOCATION
DIM_LOCATION = DIM_LOCATION.withColumn("location_id", row_number().over(id_num))


#appending serialnumber with string 'location_id' to form a primary key 
def location_Id(value):
    return "location_" + str(value)
location_id_sNo = udf(location_Id, StringType())


# Creating a column (Primary Key) location_id 
#Updating the values in location_id col.
DIM_LOCATION = DIM_LOCATION.withColumn("location_id", location_id_sNo("location_id"))


In [9]:
# Rearranging columns according to the dimention table schema
DIM_LOCATION = DIM_LOCATION.select('location_id','location','streetname','street_number','zipcode','lat','lon')

#Showing final dimention table 'DIM_LOCATION' data 
DIM_LOCATION.show(5)

#Verifying count of records in the table - 109
DIM_LOCATION.count()

+-----------+--------------------+--------------+-------------+-------+------+-----+
|location_id|            location|    streetname|street_number|zipcode|   lat|  lon|
+-----------+--------------------+--------------+-------------+-------+------+-----+
| location_1|NykÃƒÂ¸bing Mors ...|   Kirketorvet|            1|   7900|56.795| 8.86|
| location_2|                Nibe|        Torvet|            1|   9240|56.983|9.639|
| location_3|           Skipperen|   Vestre Alle|            2|   9000|57.034|9.908|
| location_4|              Viborg|     Toldboden|            3|   8800|56.448|9.401|
| location_5|               Vadum|Ellehammersvej|           43|   9430|57.118|9.861|
+-----------+--------------------+--------------+-------------+-------+------+-----+
only showing top 5 rows



109

# DIM_ATM dimention table

In [16]:
# Creating DIM_ATM table with required cols as mentioned in the schema
DIM_ATM = df.select(col("atm_id").alias("atm_number"), 
                    col("atm_manufacturer"),
                    col('atm_lat').alias('lat'),
                    col('atm_lon').alias('lon'))


# Cleaning 
# Removing duplicates
DIM_ATM = DIM_ATM.dropDuplicates()


# Showing top 5 rows
DIM_ATM.show(5 , truncate=False)

+----------+----------------+------+------+
|atm_number|atm_manufacturer|lat   |lon   |
+----------+----------------+------+------+
|113       |Diebold Nixdorf |55.398|11.342|
|54        |NCR             |56.745|8.949 |
|104       |NCR             |57.049|9.922 |
|18        |Diebold Nixdorf |56.448|9.401 |
|8         |NCR             |56.762|8.867 |
+----------+----------------+------+------+
only showing top 5 rows



In [17]:
# Joining the table with Location Table to get the required location_id.
condition = [DIM_ATM.lat == DIM_LOCATION.lat, 
             DIM_ATM.lon == DIM_LOCATION.lon]

DIM_ATM = DIM_ATM.join(DIM_LOCATION, condition, 'left_outer')


# Assigning a unique value (primary key) to the atm_id column 
window = Window.orderBy(monotonically_increasing_id())
DIM_ATM = DIM_ATM.withColumn("atm_id", row_number().over(window))

def atm_Id(value):
    return "atm_" + str(value)
atm_id_udf = udf(atm_Id, StringType())


# Creating a column (Primary Key) atm_id
#Updating the values in atm_id col.
DIM_ATM = DIM_ATM.withColumn("atm_id", atm_id_udf("atm_id"))

In [18]:
# Rearranging columns according to the dimention table schema
DIM_ATM = DIM_ATM.select('atm_id',
                         'atm_number',
                         'atm_manufacturer',
                         col('location_id').alias('atm_location_id'))


#Showing final dimention table 'DIM_ATM' data 
DIM_ATM.show(5, truncate=False)


# Verifing the total count for the dim_atm dimension.  156
DIM_ATM.count()

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|atm_1 |113       |Diebold Nixdorf |location_104   |
|atm_2 |113       |Diebold Nixdorf |location_90    |
|atm_3 |54        |NCR             |location_93    |
|atm_4 |104       |NCR             |location_63    |
|atm_5 |104       |NCR             |location_25    |
+------+----------+----------------+---------------+
only showing top 5 rows



156

# DIM_DATE

In [19]:

# Selecting required columns from the dataframe to create DIM_DATE dimention table
DIM_DATE = df.select(col("year"), 
                     "month", 
                     "day", 
                     "hour", 
                     "weekday")

# Dropping duplicates from the DIM_DATE dataframe
DIM_DATE = DIM_DATE.dropDuplicates()


#DIM_DATE.show(5, truncate=False)

In [20]:
# Create the date from year, month, and day column
DIM_DATE = DIM_DATE.withColumn('month_int', from_unixtime(
    unix_timestamp(col("month"),'MMM'),'MM')).withColumn(
    'date',date_format(concat_ws("-",col('year'),col('month_int'),col('day')),"yyyy-MM-dd").cast('date'))

# Creating the required timestamp column after merging the date
DIM_DATE = DIM_DATE.withColumn('date',expr("date || '-' || hour || ':00:00'")
                   ).withColumn('date',to_timestamp('date','yyyy-MM-dd-HH:mm:ss'))



In [21]:
# Assigning a unique value (primary key) to the atm_id column 
window = Window.orderBy(monotonically_increasing_id())


DIM_DATE = DIM_DATE.withColumn("date_id", row_number().over(window))


def date_Id(value):
    return "date_" + str(value)
date_udf = udf(date_Id, StringType())


# Creating a column (Primary Key) date_id
#Updating the values in date_id col.
DIM_DATE = DIM_DATE.withColumn("date_id", date_udf("date_id"))

#DIM_DATE.show(5,truncate=False)

In [22]:
# Rearranging columns according to the dimention table schema
DIM_DATE = DIM_DATE.select(col('date_id'),
                           col('date').alias('full_date_time'),
                           col("year"), 
                           "month", "day", "hour", "weekday")

DIM_DATE.show(5,truncate=False)

# Verifing the total count for the DIM_DATE dimension.   -8685
DIM_DATE.count()

+-------+-------------------+----+-------+---+----+--------+
|date_id|full_date_time     |year|month  |day|hour|weekday |
+-------+-------------------+----+-------+---+----+--------+
|date_1 |2017-01-01 09:00:00|2017|January|1  |9   |Sunday  |
|date_2 |2017-01-03 05:00:00|2017|January|3  |5   |Tuesday |
|date_3 |2017-01-08 19:00:00|2017|January|8  |19  |Sunday  |
|date_4 |2017-01-21 03:00:00|2017|January|21 |3   |Saturday|
|date_5 |2017-01-23 21:00:00|2017|January|23 |21  |Monday  |
+-------+-------------------+----+-------+---+----+--------+
only showing top 5 rows



8685

# DIM_CARD_TYPE

In [24]:
# Selecting required columns from the dataframe to create DIM_CARD_TYPE dimention table
DIM_CARD_TYPE = df.select(col("card_type"))

# Dropping duplicates from the dim_card_type dataframe
DIM_CARD_TYPE = DIM_CARD_TYPE.dropDuplicates()
#DIM_CARD_TYPE.show(5, truncate=False)

In [25]:
# Assigning a unique value (primary key) to the atm_id column 

window = Window.orderBy(monotonically_increasing_id())


DIM_CARD_TYPE = DIM_CARD_TYPE.withColumn("card_type_id", row_number().over(window))
def card_type_Id(value):
    return "card_type_" + str(value)
card_type_udf = udf(card_type_Id, StringType())



# Creating a column (Primary Key) card_type_id
#Updating the values in card_type_id col.
DIM_CARD_TYPE = DIM_CARD_TYPE.withColumn("card_type_id", card_type_udf("card_type_id"))


#DIM_CARD_TYPE.show(5, truncate=False)

In [26]:
# Rearranging columns according to the dimention table schema
DIM_CARD_TYPE = DIM_CARD_TYPE.select('card_type_id','card_type')

# showing top 5 rows of new dimention table DIM_CARD_TYPE
DIM_CARD_TYPE.show(5, truncate=False)

# Verifing the total count for the dim_atm dimension. 12
DIM_CARD_TYPE.count()

+------------+------------------+
|card_type_id|card_type         |
+------------+------------------+
|card_type_1 |Dankort - on-us   |
|card_type_2 |CIRRUS            |
|card_type_3 |HÃƒÂ¦vekort       |
|card_type_4 |VISA              |
|card_type_5 |Mastercard - on-us|
+------------+------------------+
only showing top 5 rows



12

# CREATING FACT TABLE

In [27]:

# Copying the full data into another dataframe to create the fact table
FACT_ATM_TRANS = df.select('*')

In [28]:
# Joining the table with DIM_LOCATION table to get the required location_id.
condition = [FACT_ATM_TRANS.atm_location == DIM_LOCATION.location, 
             FACT_ATM_TRANS.atm_lat == DIM_LOCATION.lat,
             FACT_ATM_TRANS.atm_lon == DIM_LOCATION.lon, 
             FACT_ATM_TRANS.atm_streetname == DIM_LOCATION.streetname, 
             FACT_ATM_TRANS.atm_street_number == DIM_LOCATION.street_number, 
             FACT_ATM_TRANS.atm_zipcode == DIM_LOCATION.zipcode]

FACT_ATM_TRANS = FACT_ATM_TRANS.join(DIM_LOCATION, condition, 'left_outer').select('*')


In [29]:
# Joining the table with DIM_CARD_TYPE table to get the required card_type_id.
condition = [FACT_ATM_TRANS.card_type == DIM_CARD_TYPE.card_type]


FACT_ATM_TRANS = FACT_ATM_TRANS.join(DIM_CARD_TYPE, condition, 'left_outer').select('*')



In [30]:
# Joining the table with DIM_DATE table to get the required date_id.
condition = [FACT_ATM_TRANS.year == DIM_DATE.year, 
             FACT_ATM_TRANS.month == DIM_DATE.month,
             FACT_ATM_TRANS.day == DIM_DATE.day, 
             FACT_ATM_TRANS.hour == DIM_DATE.hour,
            FACT_ATM_TRANS.weekday == DIM_DATE.weekday]


FACT_ATM_TRANS = FACT_ATM_TRANS.join(DIM_DATE, condition, 'left_outer').select('*')



In [31]:
# Joining the table with DIM_ATM table to get the required atm_id.
FACT_ATM_TRANS = FACT_ATM_TRANS.withColumnRenamed("atm_id","atm_id_fact")


condition = [FACT_ATM_TRANS.atm_id_fact == DIM_ATM.atm_number, 
             FACT_ATM_TRANS.atm_manufacturer == DIM_ATM.atm_manufacturer,
             FACT_ATM_TRANS.location_id == DIM_ATM.atm_location_id]


FACT_ATM_TRANS = FACT_ATM_TRANS.join(DIM_ATM, condition, 'left_outer').select('*')



In [32]:

# Keeping only the rows mentioned in the fact table schema
removedCol = ['atm_location',
              'atm_streetname',
              'atm_street_number',
              'atm_zipcode',
                   'atm_lat',
              'atm_lon',
              'location',
              'streetname','street_number',
              'zipcode','lat','lon','card_type',
                  'year','month','day','hour',
              'weekday','full_date_time',
              'atm_manufacturer','atm_number',
              'atm_id_fact','atm_location_id']


FACT_ATM_TRANS = FACT_ATM_TRANS.drop(*removedCol)


#FACT_ATM_TRANS.show(2, truncate=False)

In [33]:
# Assigning a unique value (primary key) to the trans_id column 
window = Window.orderBy(monotonically_increasing_id())


FACT_ATM_TRANS = FACT_ATM_TRANS.withColumn('trans_id', row_number().over(window))


def fact_atm_trans_Id(value):
    return "trans_" + str(value)
fact_atm_trans_udf = udf(fact_atm_trans_Id, StringType())


FACT_ATM_TRANS = FACT_ATM_TRANS.withColumn('trans_id', fact_atm_trans_udf('trans_id'))



In [34]:
# Rearranging columns according to the fact table schema

FACT_ATM_TRANS = FACT_ATM_TRANS.select('trans_id',
                                       'atm_id',
                                       col('location_id').alias('weather_location_id'),
                                       'date_id',
                                       'card_type_id',
                                       'atm_status',
                                       'currency',
                                       'service',
                                       'transaction_amount',
                                       'message_code',
                                      'message_text',
                                       'rain_3h',
                                       'clouds_all',
                                       'weather_id',
                                       'weather_main',
                                       'weather_description')




In [35]:
# showing top 2 rows of the final fact table FACT_ATM_TRANS
FACT_ATM_TRANS.show(2, truncate=False)

# Verifying the count of the fact table:  2468572
FACT_ATM_TRANS.count()

+--------+------+-------------------+---------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|trans_id|atm_id|weather_location_id|date_id  |card_type_id|atm_status|currency|service   |transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main|weather_description|
+--------+------+-------------------+---------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|trans_1 |atm_40|location_101       |date_4715|card_type_2 |Inactive  |DKK     |Withdrawal|5399              |null        |null        |0.0    |0         |800       |Clear       |Sky is Clear       |
|trans_2 |atm_40|location_101       |date_183 |card_type_4 |Inactive  |DKK     |Withdrawal|3774              |null        |null        |0.0    |92        |804       |Clouds      |overcast clouds    |


2468572

# WRITING DATA INTO S3 BUCKET

In [36]:
# Writing data to the s3 bucket using write method for DIM_LOCATION table
DIM_LOCATION.write.csv("s3a://etlproj/Dim_Location/Dim_Location.csv", mode="overwrite")


In [37]:

# Writing data to the s3 bucket using write method for DIM_CARD_TYPE table
DIM_CARD_TYPE.write.csv("s3a://etlproj/Dim_Card_Type/Dim_Card_Type.csv", mode="overwrite")


In [38]:

# Writing data to the s3 bucket using write method for DIM_ATM table
DIM_ATM.write.csv("s3a://etlproj/Dim_ATM/Dim_ATM.csv", mode="overwrite")


In [39]:

# Writing data to the s3 bucket using write method for DIM_DATE table
DIM_DATE.write.parquet("s3a://etlproj/Dim_Date/Dim_Date.parquet", mode="overwrite")


In [40]:

# Writing data to the s3 bucket using write method for FACT_ATM_TRANS table
FACT_ATM_TRANS.write.csv("s3a://etlproj/Fact_Transaction/Fact_Transaction.csv", mode="overwrite")
