In [1]:
#importing all necessary system requirements
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")


In [2]:
#importing all necessary libraries
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import desc, row_number, monotonically_increasing_id
from pyspark.sql.window import Window

In [3]:

spark = SparkSession.builder.appName('demo').master("local").getOrCreate()

In [4]:
#sc
sc = spark.sparkContext

In [5]:
#Specifying the schema instead of inferring it 
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType, FloatType



In [6]:
#creating fileschema
fileSchema = StructType([StructField('year', IntegerType(),True),
                        StructField('month', StringType(),True),
                        StructField('day', IntegerType(),True),
                        StructField('weekday', StringType(),True),
                        StructField('hour', IntegerType(),True),
                        StructField('atm_status', StringType(),True),
                        StructField('atm_id', StringType(),True),
                        StructField('atm_manufacturer', StringType(),True),
                        StructField('atm_location', StringType(),True),
                        StructField('atm_streetname', StringType(),True),
                        StructField('atm_street_number', IntegerType(),True),
                        StructField('atm_zipcode', IntegerType(),True),
                        StructField('atm_la', FloatType(),True),
                        StructField('atm_lon', FloatType(),True),
                        StructField('currency', StringType(),True),
                        StructField('card_type', StringType(),True),
                        StructField('transaction_amount', IntegerType(),True),
                        StructField('service', StringType(),True),
                        StructField('message_code', StringType(),True),
                        StructField('message_text', StringType(),True),
                        StructField('weather_lat', FloatType(),True),
                        StructField('weather_lon', FloatType(),True),
                        StructField('weather_city_id', IntegerType(),True),
                        StructField('weather_city_name', StringType(),True),
                        StructField('temp', FloatType(),True),
                        StructField('pressure', IntegerType(),True),
                        StructField('humidity', IntegerType(),True),
                        StructField('wind_speed', IntegerType(),True),
                        StructField('wind_deg', IntegerType(),True),
                        StructField('rain_3h', FloatType(),True),
                        StructField('clouds_all', IntegerType(),True),
                        StructField('weather_id', IntegerType(),True),
                        StructField('weather_main', StringType(),True),
                        StructField('weather_description', StringType(),True)])

In [7]:
# Creating dataframe from the csv file and infering the schema
df = spark.read.csv("/user/root/atm_data/part-m-00000",schema = fileSchema, header="true")

In [8]:
# Showing the elements of the dataframe
df.show()

+----+-------+---+-------+----+----------+------+----------------+------------------+--------------------+-----------------+-----------+------+-------+--------+--------------------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+--------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|      atm_location|      atm_streetname|atm_street_number|atm_zipcode|atm_la|atm_lon|currency|           card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------------+--------------------+-----------------+-----------+------+-------+--------+--------------------+---------

In [None]:
# Printing the schema 
df.printSchema()

In [9]:
#checking for data imported
df.count()


2468571

We can see that the total number of rows ingested through sqoop in HDFS is 2468571.

In [10]:
#registering temperiry table
df.registerTempTable('atm_data')

In [11]:

from pyspark.sql.functions import *

### Date Dimension Table

In [12]:
#creating the date dimension table
dim_date=df.select([df.year,df.month,df.day,df.weekday,df.hour])


+----+-------+---+-------+----+
|year|  month|day|weekday|hour|
+----+-------+---+-------+----+
|2017|January|  1| Sunday|   0|
|2017|January|  1| Sunday|   0|
|2017|January|  1| Sunday|   0|
|2017|January|  1| Sunday|   0|
|2017|January|  1| Sunday|   0|
|2017|January|  1| Sunday|   0|
|2017|January|  1| Sunday|   0|
|2017|January|  1| Sunday|   0|
|2017|January|  1| Sunday|   0|
|2017|January|  1| Sunday|   0|
|2017|January|  1| Sunday|   0|
|2017|January|  1| Sunday|   0|
|2017|January|  1| Sunday|   0|
|2017|January|  1| Sunday|   0|
|2017|January|  1| Sunday|   0|
|2017|January|  1| Sunday|   0|
|2017|January|  1| Sunday|   0|
|2017|January|  1| Sunday|   0|
|2017|January|  1| Sunday|   0|
|2017|January|  1| Sunday|   0|
+----+-------+---+-------+----+
only showing top 20 rows



In [13]:
#Adding new column called 'full_date_time'
res_df=dim_date.distinct().withColumn("full_date_time",from_unixtime
                                     (unix_timestamp(concat(dim_date.year.cast(StringType()),dim_date.month.cast(StringType()),
                                    lpad(df.day.cast(StringType()),2,'0'),lpad(df.hour.cast(StringType()),2,'0')),'yyyyMMMMddHH'), 'YYYY-MM-dd HH:mm:SS'))
                           


In [14]:
#adding date_id to the date dimension dataframe
dim_date=res_df.withColumn('date_id', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
dim_date=dim_date.select('date_id','full_date_time','year','month','day','hour','weekday')


In [15]:
dim_date.show()

+-------+-------------------+----+--------+---+----+--------+
|date_id|     full_date_time|year|   month|day|hour| weekday|
+-------+-------------------+----+--------+---+----+--------+
|      0|2017-01-01 09:00:00|2017| January|  1|   9|  Sunday|
|      1|2017-01-03 05:00:00|2017| January|  3|   5| Tuesday|
|      2|2017-01-08 19:00:00|2017| January|  8|  19|  Sunday|
|      3|2017-01-21 03:00:00|2017| January| 21|   3|Saturday|
|      4|2017-01-23 21:00:00|2017| January| 23|  21|  Monday|
|      5|2017-02-02 19:00:00|2017|February|  2|  19|Thursday|
|      6|2017-02-05 16:00:00|2017|February|  5|  16|  Sunday|
|      7|2017-02-21 15:00:00|2017|February| 21|  15| Tuesday|
|      8|2017-03-02 08:00:00|2017|   March|  2|   8|Thursday|
|      9|2017-04-02 02:00:00|2017|   April|  2|   2|  Sunday|
|     10|2017-04-06 08:00:00|2017|   April|  6|   8|Thursday|
|     11|2017-04-30 10:00:00|2017|   April| 30|  10|  Sunday|
|     12|2017-05-02 02:00:00|2017|     May|  2|   2| Tuesday|
|     13

In [30]:
#Check count for the Date Dimension
dim_date.count()

8685

### Location Dimension table

In [16]:
# creating location data frame
dim_location=df.select([df.atm_location ,df.atm_streetname ,df.atm_street_number 
                       ,df.atm_zipcode ,df.atm_la ,df.atm_lon ])


In [17]:
#Renaming the required columns and adding location_id column
res_df=dim_location.distinct().withColumnRenamed('atm_location','location').withColumnRenamed('atm_streetname','streetname').withColumnRenamed('atm_street_number','street_number').withColumnRenamed('atm_zipcode','zipcode').withColumnRenamed('atm_la','lat').withColumnRenamed('atm_lon','lon')
dim_location = res_df.withColumn('location_id', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)


In [18]:
#Final location dataframe
dim_location=dim_location.select('location_id','location','streetname','street_number','zipcode','lat','lon')


In [19]:
#showing the elements of location dataframe
dim_location.show()

+-----------+--------------------+------------------+-------------+-------+------+------+
|location_id|            location|        streetname|street_number|zipcode|   lat|   lon|
+-----------+--------------------+------------------+-------------+-------+------+------+
|          0|             Jebjerg|         Kirkegade|            4|   7870|56.671| 9.013|
|          1|             KÃƒÂ¸ge|    SÃƒÂ¸ndre Alle|            1|   4600|55.454|12.181|
|          2|          NÃƒÂ¦stved|       Farimagsvej|            8|   4700|55.233|11.763|
|          3|Menu KÃƒÂ¸bmand K...|         Klarupvej|           52|   9270|57.013|10.046|
|          4|           HolbÃƒÂ¦k|       Slotsvolden|            7|   4300|55.718|11.704|
|          5|      NÃƒÂ¸rresundby|            Torvet|            6|   9400|57.059| 9.922|
|          6|           Skipperen|       Vestre Alle|            2|   9000|57.034| 9.908|
|          7|            Vejgaard|        Hadsundvej|           20|   9000|57.043|  9.95|
|         

In [20]:
#Check count for the Location Dimension
dim_location.count()

109

### Atm dimension table

In [21]:
#Renaming the columns
df=df.alias('df')
df=df.withColumnRenamed('atm_location','location').withColumnRenamed('atm_streetname','streetname').withColumnRenamed('atm_street_number','street_number').withColumnRenamed('atm_zipcode','zipcode').withColumnRenamed('atm_la','lat').withColumnRenamed('atm_lon','lon')

In [22]:
# creating atm data frame
dim_atm=df.select(df.atm_id ,df.atm_manufacturer,df.lat,df.lon)

In [23]:
#joining atm and location dataframes
dim_atm=dim_atm.join(dim_location,on=['lat','lon'],how='leftouter').select(dim_atm.atm_id,dim_atm.atm_manufacturer,dim_location.location_id.alias('atm_location_id'))

In [24]:
#adding new column atm_id
res_df=dim_atm.distinct().withColumnRenamed('atm_id','atm_number')
dim_atm = res_df.withColumn('atm_id', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
dim_atm=dim_atm.select('atm_id','atm_number','atm_manufacturer','atm_location_id')

In [25]:
#showing the elements of atm dataframe
dim_atm.show(3)

+------+----------+----------------+---------------+
|atm_id|atm_number|atm_manufacturer|atm_location_id|
+------+----------+----------------+---------------+
|     0|        30|             NCR|             22|
|     1|        12|             NCR|             77|
|     2|        92|             NCR|             15|
+------+----------+----------------+---------------+
only showing top 3 rows



In [26]:
#Check count for the ATM Dimension
dim_atm.count()

156

### Card_type dimension table

In [27]:
#creating te card_type data frame
dim_card_type=df.select('card_type')


In [28]:
#adding card_id column for the card_type dataframe
dim_card_type = dim_card_type.distinct().withColumn('card_type_id', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)

dim_card_type=dim_card_type.select(dim_card_type.card_type_id,dim_card_type.card_type)

In [30]:
#showing the elements 
dim_card_type.show()

+------------+--------------------+
|card_type_id|           card_type|
+------------+--------------------+
|           0|     Dankort - on-us|
|           1|              CIRRUS|
|           2|         HÃƒÂ¦vekort|
|           3|                VISA|
|           4|  Mastercard - on-us|
|           5|             Maestro|
|           6|Visa Dankort - on-us|
|           7|        Visa Dankort|
|           8|            VisaPlus|
|           9|          MasterCard|
|          10|             Dankort|
|          11| HÃƒÂ¦vekort - on-us|
+------------+--------------------+



In [31]:
#Check count for the Card Type Dimension
dim_card_type.count()

12

### Fact table creation

In [33]:
##fact_trans_creation
dim_date=dim_date.alias('dim_date')
dim_location=dim_location.alias('dim_location')

dim_card_type=dim_card_type.alias('dim_card_type')
dim_atm=dim_atm.alias('atm_df')

In [34]:
#creating j1 by joining the df and location dimension dataframes
j1=df.join(dim_location,on=['location','streetname','street_number','zipcode','lat','lon'],
            how='leftouter').select('df.*','location_id').drop(*['location','streetname','street_number','zipcode','la','lon'])

In [69]:
#showing the elements of j1
j1.show()

In [35]:
#creating j2 by joining the j1 and date dataframes
j1=j1.alias('j1')
j2=j1.join(dim_date,on=['year','month','day','hour','weekday'],how='leftouter').select('j1.*','date_id').drop(*['year','month','day','hour','weekday'])


In [None]:
#showing the top 5 elements of j2
j2.show(5)

In [36]:
#creating j3 by joining the j2 and card_type dataframes
j2=j2.alias('j2')
j3=j2.join(dim_card_type,on=['card_type'],how='leftouter').select('j2.*','card_type_id').drop(*['card_type'])


In [37]:
#renaming the required columns 
j3=j3.alias('j3')

j3=j3.withColumnRenamed('atm_id','atm_number').withColumnRenamed('location_id','atm_location_id')


In [None]:
#showing the top 5 elements of j3
j3.show(5)

In [38]:
##creating j4 by joining the j3 and atm dataframes
j4=j3.join(dim_atm,on=['atm_number','atm_manufacturer','atm_location_id'],how='leftouter').select('j3.*','atm_id','atm_location_id').drop(*['atm_number','atm_manufacturer'])

In [74]:
#showing the top 5 elements of j5
j4.show(5)

In [39]:
#renaming the required columns and adding trans_id 
res_df=j4.withColumnRenamed('atm_location_id','weather_loc_id')
j4 = res_df.withColumn('trans_id', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)


In [40]:
#creating the fact table
fact_atm_trans=j4.select(j4.trans_id,j4.atm_id,j4.weather_loc_id,j4.date_id,j4.card_type_id,j4.atm_status,j4.currency,j4.service,
                         j4.transaction_amount,j4.message_code,j4.message_text,j4.rain_3h,j4.clouds_all,j4.weather_id,
                        j4.weather_main,j4.weather_description)

In [41]:
#showing the elements of fact table
fact_atm_trans.show(5)

+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|trans_id|atm_id|weather_loc_id|date_id|card_type_id|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main|weather_description|
+--------+------+--------------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+-------------------+
|       0|     3|            57|    710|           0|    Active|     DKK|Withdrawal|               212|        null|        null|    0.0|        40|       802|      Clouds|   scattered clouds|
|       1|     3|            57|    710|           0|    Active|     DKK|Withdrawal|              8252|        null|        null|    0.0|        40|       802|      Clouds|   scattered clouds|
|       2|     3|            57|   

In [43]:
#Check count for the all the Stages in the creation of Transaction Fact table
fact_atm_trans.count()

2468571

### Writing file to s3 buckets


In [78]:
#setting the access key to move files to s3 buckets
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId","AKIA6IJRNYQNPLCOECX7")
 

In [79]:
#setting the security key to move files to s3 buckets
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey","tbYKkorWn2SuVwiJB6dkT9sUaRiOC99XRRUacM8Z")


In [21]:
#writing the location dataframe data to dim_location folder in s3 bucket
dim_location.write.csv('s3a://myetlproject/dim_location/location.csv')

In [48]:
#writing the atm dataframe data to dim_atm folder in s3 bucket
dim_atm.write.csv('s3a://myetlproject/dim_atm/atm.csv')

In [58]:
#writing the date dataframe data to dim_date folder in s3 bucket
dim_date.write.csv('s3a://myetlproject/dim_date/date.csv')

In [23]:
#writing the card_type dataframe data to dim_card_type folder in s3 bucket
dim_card_type.write.csv('s3a://myetlproject/dim_card_type/card_type.csv')

In [80]:
#writing the fact_atm_trans dataframe data to fact_atm_trans folder in s3 bucket
fact_atm_trans.write.csv('s3a://myetlproject/fact_atm_trans/trans.csv')