In [1]:
# Set spark environments
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('ETL_Project').master("local").enableHiveSupport().getOrCreate()
spark

In [4]:
# Creating dataframe from the csv file and infering the schema
df1 = spark.read.csv("SRC_ATM_TRANS",  inferSchema=True, header=False )


In [5]:
# Printing Schema
df1.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: double (nullable = true)
 |-- _c13: double (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: integer (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: integer (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: double (nullable = true)
 |-- _c21: double (nullable = true)
 |-- _c22: integer (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: double (nullable = true)
 |-- _c25: integer (nullable = true)
 |-- _c26: integer (nullable = true)
 |-- _c27: integer (nu

In [6]:
# Showing the elements of the dataframe
df1.show(2)

+----+-------+---+------+---+--------+---+---+--------+-----------+----+----+------+------+----+----------+----+----------+----+----+------+------+-------+------------+------+----+----+----+----+-----+----+----+----+----------+
| _c0|    _c1|_c2|   _c3|_c4|     _c5|_c6|_c7|     _c8|        _c9|_c10|_c11|  _c12|  _c13|_c14|      _c15|_c16|      _c17|_c18|_c19|  _c20|  _c21|   _c22|        _c23|  _c24|_c25|_c26|_c27|_c28| _c29|_c30|_c31|_c32|      _c33|
+----+-------+---+------+---+--------+---+---+--------+-----------+----+----+------+------+----+----------+----+----------+----+----+------+------+-------+------------+------+----+----+----+----+-----+----+----+----+----------+
|2017|January|  1|Sunday|  0|  Active|  1|NCR|NÃ¦stved|Farimagsvej|   8|4700|55.233|11.763| DKK|MasterCard|5643|Withdrawal|null|null| 55.23|11.761|2616038|    Naestved|281.15|1014|  87|   7| 260|0.215|  92| 500|Rain|light rain|
|2017|January|  1|Sunday|  0|Inactive|  2|NCR|Vejgaard| Hadsundvej|  20|9000|57.043|  9.

In [7]:
#Specifying the schema instead of inferring it 
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, BooleanType, DoubleType, LongType

In [8]:
fileSchema = StructType([StructField('year', IntegerType(),True),
                        StructField('month', StringType(),True),
                        StructField('day', IntegerType(),True),
                        StructField('weekday', StringType(),True),
                        StructField('hour', IntegerType(),True),
                        StructField('atm_status', StringType(),True),
                        StructField('atm_id', StringType(),True),
                        StructField('atm_manufacturer', StringType(),True),
                        StructField('atm_location', StringType(),True),
                        StructField('atm_streetname', StringType(),True),
                        StructField('atm_street_number', IntegerType(),True),
                        StructField('atm_zipcode', IntegerType(),True),
                        StructField('atm_lat', FloatType(),True),
                        StructField('atm_lon', FloatType(),True),
                        StructField('currency', StringType(),True),
                        StructField('card_type', StringType(),True),
                        StructField('transaction_amount', IntegerType(),True),
                        StructField('service', StringType(),True), 
                        StructField('message_code', StringType(),True),
                        StructField('message_text', StringType(),True),
                        StructField('weather_lat', FloatType(),True),
                        StructField('weather_lon', FloatType(),True),
                        StructField('weather_city_id', IntegerType(),True),
                        StructField('weather_city_name', StringType(),True),
                        StructField('temp', FloatType(),True), 
                        StructField('pressure', IntegerType(),True),
                        StructField('humidity', IntegerType(),True),
                        StructField('wind_speed', IntegerType(),True),
                        StructField('wind_deg', IntegerType(),True),
                        StructField('rain_3h', FloatType(),True),
                        StructField('clouds_all', IntegerType(),True),
                        StructField('weather_id', IntegerType(),True),
                        StructField('weather_main', StringType(),True),
                        StructField('weather_description', StringType(),True),
                        ])

In [9]:
# Creating dataframe from the csv file and infering the schema
df = spark.read.csv("SRC_ATM_TRANS", header = False, schema = fileSchema)

In [10]:
# Printing the schema 
df.printSchema()


root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: float (nullable = true)
 |-- atm_lon: float (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: float (nullable = true)
 |-- weather_lon: float (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: string

In [11]:
# Showing the elements of the dataframe
df.show(1)


+----+-------+---+-------+----+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+
|year|  month|day|weekday|hour|atm_status|atm_id|atm_manufacturer|atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency| card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|
+----+-------+---+-------+----+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+----------+------------------+----------+------------+------------+-----------+------

In [12]:
# count the number of records which is matching with sqoop job record count.
df.select("*").count()

2468572

In [13]:
# dataframe which shows all the column names
df.columns

['year',
 'month',
 'day',
 'weekday',
 'hour',
 'atm_status',
 'atm_id',
 'atm_manufacturer',
 'atm_location',
 'atm_streetname',
 'atm_street_number',
 'atm_zipcode',
 'atm_lat',
 'atm_lon',
 'currency',
 'card_type',
 'transaction_amount',
 'service',
 'message_code',
 'message_text',
 'weather_lat',
 'weather_lon',
 'weather_city_id',
 'weather_city_name',
 'temp',
 'pressure',
 'humidity',
 'wind_speed',
 'wind_deg',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description']

In [14]:
######## 1. Location Dimension  ###############
# create dataframe for location_dimension which includes atm_location, atm_streetname, atm_street_number, atm_zipcode,
# atm_lat and atm_lon columns. Use alias method to convert original column name with given schema name.

In [15]:
df.select([df.atm_location.alias("location"), df.atm_streetname.alias("streetname"),
                  df.atm_street_number.alias("street_number"), df.atm_zipcode.alias("zipcode"), df.atm_lat.alias("lat"),
                  df.atm_lon.alias("lon")]).show()

+--------------+--------------------+-------------+-------+------+------+
|      location|          streetname|street_number|zipcode|   lat|   lon|
+--------------+--------------------+-------------+-------+------+------+
|      NÃ¦stved|         Farimagsvej|            8|   4700|55.233|11.763|
|      Vejgaard|          Hadsundvej|           20|   9000|57.043|  9.95|
|      Vejgaard|          Hadsundvej|           20|   9000|57.043|  9.95|
|         Ikast|     RÃ¥dhusstrÃ¦det|           12|   7430|56.139| 9.154|
|    Svogerslev|          BrÃ¸nsager|            1|   4000|55.634|12.018|
|          Nibe|              Torvet|            1|   9240|56.983| 9.639|
|    Fredericia|      SjÃ¦llandsgade|           33|   7000|55.564| 9.757|
|     Hjallerup|   Hjallerup Centret|           18|   9320|57.168|10.148|
|     GlyngÃ¸re|           FÃ¦rgevej|            1|   7870|56.762| 8.867|
|       Hadsund|           Storegade|           12|   9560|56.716|10.114|
|  NÃ¸rresundby|              Torvet| 

In [16]:
## check the count of location_dimension table and as per below got the same count which is mentioned in the validation doc.

In [17]:
df_loc=df.select([ df.atm_location.alias("location"), df.atm_streetname.alias("streetname"),
                  df.atm_street_number.alias("street_number"), df.atm_zipcode.alias("zipcode"), df.atm_lat.alias("lat"),
                  df.atm_lon.alias("lon")]).distinct().count()

In [18]:
# count for the Location Dimension using count() and distinct() method.
df_loc

109

In [19]:
df_location = df.select([df.atm_location.alias("location"), df.atm_streetname.alias("streetname"),
                  df.atm_street_number.alias("street_number"), df.atm_zipcode.alias("zipcode"), df.atm_lat.alias("lat"),
                 df.atm_lon.alias("lon")]).distinct()

In [20]:
df_location.show()

+--------------------+--------------------+-------------+-------+------+------+
|            location|          streetname|street_number|zipcode|   lat|   lon|
+--------------------+--------------------+-------------+-------+------+------+
|      NykÃ¸bing Mors|         Kirketorvet|            1|   7900|56.795|  8.86|
|Aalborg Storcente...|            Hobrovej|          452|   9200|57.005| 9.876|
|            Hasseris|         Hasserisvej|          113|   9000|57.044| 9.898|
|HillerÃ¸d IdrÃ¦ts...|          Milnersvej|           39|   3400|55.921|12.299|
|         Bispensgade|         Bispensgade|           35|   9800|57.453| 9.996|
|Intern  BrÃ¸nderslev|              Algade|            4|   9700|57.269| 9.945|
|  Intern  KÃ¸benhavn|      RÃ¥dhuspladsen|           75|   1550|55.676|12.571|
|               KÃ¸ge|        SÃ¸ndre Alle|            1|   4600|55.454|12.181|
|              Odense|          FÃ¦lledvej|            3|   5000|55.394| 10.37|
|            Slagelse|     Mariendals Al

In [21]:
df_location

DataFrame[location: string, streetname: string, street_number: int, zipcode: int, lat: float, lon: float]

In [22]:
## Using rdd method zipWithIndex() with lambda function to append the rdd with element indices.
df_location.rdd.zipWithIndex().map(lambda(row, rowId): (list(row)+[rowId+1])).take(4)

[[u'Nyk\xc3\xb8bing Mors',
  u'Kirketorvet',
  1,
  7900,
  56.79499816894531,
  8.859999656677246,
  1],
 [u'Aalborg Storcenter indg. D',
  u'Hobrovej',
  452,
  9200,
  57.005001068115234,
  9.87600040435791,
  2],
 [u'Hasseris',
  u'Hasserisvej',
  113,
  9000,
  57.04399871826172,
  9.89799976348877,
  3],
 [u'Hiller\xc3\xb8d Idr\xc3\xa6tscenter',
  u'Milnersvej',
  39,
  3400,
  55.92100143432617,
  12.298999786376953,
  4]]

In [23]:
loc_rdd=df_location.rdd.zipWithIndex().map(lambda(row, rowId): (list(row)+[rowId+1]))

In [24]:
# adding new column location_id which is primary key using lit() method and assign value to the column.
from pyspark.sql.functions import lit
loc_dim=df_location.withColumn("location_id", lit("1"))

In [25]:
loc_dim.show(5)

+--------------------+-----------+-------------+-------+------+------+-----------+
|            location| streetname|street_number|zipcode|   lat|   lon|location_id|
+--------------------+-----------+-------------+-------+------+------+-----------+
|      NykÃ¸bing Mors|Kirketorvet|            1|   7900|56.795|  8.86|          1|
|Aalborg Storcente...|   Hobrovej|          452|   9200|57.005| 9.876|          1|
|            Hasseris|Hasserisvej|          113|   9000|57.044| 9.898|          1|
|HillerÃ¸d IdrÃ¦ts...| Milnersvej|           39|   3400|55.921|12.299|          1|
|         Bispensgade|Bispensgade|           35|   9800|57.453| 9.996|          1|
+--------------------+-----------+-------------+-------+------+------+-----------+
only showing top 5 rows



In [26]:
# create a dataframe using RDD and assinging unique integer value to the location_id column.
df_loc_dim=spark.createDataFrame(loc_rdd,schema=loc_dim.schema)

In [27]:
df_loc_dim.show(5)

+--------------------+-----------+-------------+-------+------+------+-----------+
|            location| streetname|street_number|zipcode|   lat|   lon|location_id|
+--------------------+-----------+-------------+-------+------+------+-----------+
|      NykÃ¸bing Mors|Kirketorvet|            1|   7900|56.795|  8.86|          1|
|Aalborg Storcente...|   Hobrovej|          452|   9200|57.005| 9.876|          2|
|            Hasseris|Hasserisvej|          113|   9000|57.044| 9.898|          3|
|HillerÃ¸d IdrÃ¦ts...| Milnersvej|           39|   3400|55.921|12.299|          4|
|         Bispensgade|Bispensgade|           35|   9800|57.453| 9.996|          5|
+--------------------+-----------+-------------+-------+------+------+-----------+
only showing top 5 rows



In [28]:
df_loc_dim.count()

109

In [29]:
############## 2. ATM Dimesnion #################
## create ATM_Dimension dataframe with columns atm_id, atm_manufacturer, atm_location, atm_streetname, atm_street_number,
## atm_zipcode, atm_lat, atm_lon and use proper naming with alias method as per schema provided.

In [30]:
df_atm = df.select([df.atm_id.alias("atm_number"), df.atm_manufacturer, df.atm_location.alias("location"),
                  df.atm_streetname.alias("streetname"), df.atm_street_number.alias("street_number"), 
                  df.atm_zipcode.alias("zipcode"), df.atm_lat.alias("lat"), df.atm_lon.alias("lon")])

In [31]:
df_atm.count()

2468572

In [32]:
# Using distinct function get the count of ATM_DIMENSION table
df_atm_new = df_atm.distinct().join(df_loc_dim, on=["lat","lon"],
                                    how='left').select(["atm_number","atm_manufacturer","location_id"])

In [33]:
# count is matching with validation documents.
df_atm_new.count()

156

In [34]:
df_atm_new.show()

+----------+----------------+-----------+
|atm_number|atm_manufacturer|location_id|
+----------+----------------+-----------+
|        18| Diebold Nixdorf|         19|
|       101|             NCR|         11|
|         9| Diebold Nixdorf|         36|
|        64|             NCR|         25|
|        59| Diebold Nixdorf|          1|
|        59| Diebold Nixdorf|         16|
|        30|             NCR|          1|
|        30|             NCR|         16|
|        12|             NCR|         35|
|        12|             NCR|         44|
|        12|             NCR|        108|
|       104|             NCR|         35|
|       104|             NCR|         44|
|       104|             NCR|        108|
|        21|             NCR|         35|
|        21|             NCR|         44|
|        21|             NCR|        108|
|        39|             NCR|         23|
|        55|             NCR|         52|
|        22|             NCR|         14|
+----------+----------------+-----

In [35]:
## Using rdd method zipWithIndex() with lambda function to append the rdd with element indices.
atm_rdd = df_atm_new.rdd.zipWithIndex().map(lambda (row, rowId):(list(row)+[rowId+1]))

In [36]:
# adding new column atm_id which is primary key using lit() method and assign value to the column.
atm_dim=df_atm_new.withColumn("atm_id",lit(1))
atm_dim.show()

+----------+----------------+-----------+------+
|atm_number|atm_manufacturer|location_id|atm_id|
+----------+----------------+-----------+------+
|        18| Diebold Nixdorf|         19|     1|
|       101|             NCR|         11|     1|
|         9| Diebold Nixdorf|         36|     1|
|        64|             NCR|         25|     1|
|        59| Diebold Nixdorf|          1|     1|
|        59| Diebold Nixdorf|         16|     1|
|        30|             NCR|          1|     1|
|        30|             NCR|         16|     1|
|        12|             NCR|         35|     1|
|        12|             NCR|         44|     1|
|        12|             NCR|        108|     1|
|       104|             NCR|         35|     1|
|       104|             NCR|         44|     1|
|       104|             NCR|        108|     1|
|        21|             NCR|         35|     1|
|        21|             NCR|         44|     1|
|        21|             NCR|        108|     1|
|        39|        

In [37]:
# create a dataframe using RDD and assinging unique integer value to the location_id column.
df_atm_dim = spark.createDataFrame(atm_rdd, schema=atm_dim.schema).select(["atm_id","atm_number","atm_manufacturer",
                                                                           "location_id"])

In [38]:
#count of atm_dim dataframe
df_atm_dim.count()

156

In [39]:
df_atm_dim.show()

+------+----------+----------------+-----------+
|atm_id|atm_number|atm_manufacturer|location_id|
+------+----------+----------------+-----------+
|     1|        18| Diebold Nixdorf|         19|
|     2|       101|             NCR|         11|
|     3|         9| Diebold Nixdorf|         36|
|     4|        64|             NCR|         25|
|     5|        59| Diebold Nixdorf|          1|
|     6|        59| Diebold Nixdorf|         16|
|     7|        30|             NCR|          1|
|     8|        30|             NCR|         16|
|     9|        12|             NCR|         35|
|    10|        12|             NCR|         44|
|    11|        12|             NCR|        108|
|    12|       104|             NCR|         35|
|    13|       104|             NCR|         44|
|    14|       104|             NCR|        108|
|    15|        21|             NCR|         35|
|    16|        21|             NCR|         44|
|    17|        21|             NCR|        108|
|    18|        39| 

In [40]:
###### 3. Date Dimension ##########
## creating Date_Dimension dataframe with columns year, month, day, hour and weekday.

In [41]:
df_date = df.select([df.year, df.month, df.day, df.hour, df.weekday])

In [None]:
## Applying distinct on date_dim dataframe and change/convert the value of year, month, day and hour using unixTimestamp()
## method and concatenating the value in new column full_date_time. UnixTimeStamp is used to convert the string type data 
## values into datetime format.

In [43]:
from pyspark.sql.functions import *
df_date_new=df_date.distinct().withColumn("full_date_time", from_unixtime(unix_timestamp(concat(df_date.year.cast(StringType()),
            df_date.month.cast(StringType()), lpad(df.day.cast(StringType()),2,'0'),
            lpad(df.hour.cast(StringType()),2,'0')),'yyyyMMMMMddHH'),'YYYY-MM-dd HH:mm:ss'))
df_date_new.show(5)

+----+-------+---+----+--------+-------------------+
|year|  month|day|hour| weekday|     full_date_time|
+----+-------+---+----+--------+-------------------+
|2017|January|  1|   9|  Sunday|2017-01-01 09:00:00|
|2017|January|  3|   5| Tuesday|2017-01-03 05:00:00|
|2017|January|  8|  19|  Sunday|2017-01-08 19:00:00|
|2017|January| 21|   3|Saturday|2017-01-21 03:00:00|
|2017|January| 23|  21|  Monday|2017-01-23 21:00:00|
+----+-------+---+----+--------+-------------------+
only showing top 5 rows



In [44]:
# getting exact count of date_dim dataframe as per validation document.
df_date_new.count()

8685

In [45]:
## Using rdd method zipWithIndex() with lambda function to append the rdd with element indices.
date_rdd=df_date_new.rdd.zipWithIndex().map(lambda (row, rowId):(list(row)+[rowId+1]))

In [46]:
# adding new column date_id which is primary key using lit() method and assign value to the column.
date_dim=df_date_new.withColumn("date_id",lit(1))
date_dim.show(5)

+----+-------+---+----+--------+-------------------+-------+
|year|  month|day|hour| weekday|     full_date_time|date_id|
+----+-------+---+----+--------+-------------------+-------+
|2017|January|  1|   9|  Sunday|2017-01-01 09:00:00|      1|
|2017|January|  3|   5| Tuesday|2017-01-03 05:00:00|      1|
|2017|January|  8|  19|  Sunday|2017-01-08 19:00:00|      1|
|2017|January| 21|   3|Saturday|2017-01-21 03:00:00|      1|
|2017|January| 23|  21|  Monday|2017-01-23 21:00:00|      1|
+----+-------+---+----+--------+-------------------+-------+
only showing top 5 rows



In [47]:
# creating final dataframe for date_dim.
df_date_dim=spark.createDataFrame(date_rdd, schema=date_dim.schema).select(["date_id","full_date_time","year",
                                "month","day","hour","weekday"])
df_date_dim.show()

+-------+-------------------+----+--------+---+----+--------+
|date_id|     full_date_time|year|   month|day|hour| weekday|
+-------+-------------------+----+--------+---+----+--------+
|      1|2017-01-01 09:00:00|2017| January|  1|   9|  Sunday|
|      2|2017-01-03 05:00:00|2017| January|  3|   5| Tuesday|
|      3|2017-01-08 19:00:00|2017| January|  8|  19|  Sunday|
|      4|2017-01-21 03:00:00|2017| January| 21|   3|Saturday|
|      5|2017-01-23 21:00:00|2017| January| 23|  21|  Monday|
|      6|2017-02-02 19:00:00|2017|February|  2|  19|Thursday|
|      7|2017-02-05 16:00:00|2017|February|  5|  16|  Sunday|
|      8|2017-02-21 15:00:00|2017|February| 21|  15| Tuesday|
|      9|2017-03-02 08:00:00|2017|   March|  2|   8|Thursday|
|     10|2017-04-02 02:00:00|2017|   April|  2|   2|  Sunday|
|     11|2017-04-06 08:00:00|2017|   April|  6|   8|Thursday|
|     12|2017-04-30 10:00:00|2017|   April| 30|  10|  Sunday|
|     13|2017-05-02 02:00:00|2017|     May|  2|   2| Tuesday|
|     14

In [48]:
df_date_dim.count()

8685

In [49]:
########## 4. Card Type Dimension ##############
# creating dataframe with column card_type for card_type_dimension

In [50]:
df_card = df.select([df.card_type]).distinct()
df_card.show()

+--------------------+
|           card_type|
+--------------------+
|     Dankort - on-us|
|              CIRRUS|
|                VISA|
|  Mastercard - on-us|
|             Maestro|
|Visa Dankort - on-us|
|        Visa Dankort|
|            VisaPlus|
|           HÃ¦vekort|
|          MasterCard|
|             Dankort|
|   HÃ¦vekort - on-us|
+--------------------+



In [51]:
# count is matching as per validation doc.
df_card.count()

12

In [52]:
## Using rdd method zipWithIndex() with lambda function to append the rdd with element indices.
card_rdd = df_card.rdd.zipWithIndex().map(lambda (row, rowId):(list(row)+[rowId+1]))

In [53]:
# adding new column card_type_id which is primary key using lit() method and assign value to the column.
card_dim=df_card.withColumn("card_type_id",lit(1))
card_dim.show(5)

+------------------+------------+
|         card_type|card_type_id|
+------------------+------------+
|   Dankort - on-us|           1|
|            CIRRUS|           1|
|              VISA|           1|
|Mastercard - on-us|           1|
|           Maestro|           1|
+------------------+------------+
only showing top 5 rows



In [54]:
# creating final dataframe for card_type_dimension 
df_card_type_dim = spark.createDataFrame(card_rdd, schema=card_dim.schema).select(["card_type_id", "card_type"])
df_card_type_dim.show()

+------------+--------------------+
|card_type_id|           card_type|
+------------+--------------------+
|           1|     Dankort - on-us|
|           2|              CIRRUS|
|           3|                VISA|
|           4|  Mastercard - on-us|
|           5|             Maestro|
|           6|Visa Dankort - on-us|
|           7|        Visa Dankort|
|           8|            VisaPlus|
|           9|           HÃ¦vekort|
|          10|          MasterCard|
|          11|             Dankort|
|          12|   HÃ¦vekort - on-us|
+------------+--------------------+



In [55]:
df_card_type_dim.count()

12

In [56]:
########## Fact Table --> FACT_ATM_TRANS  #################
#### for creating fact table we need to merge all the above created dimension table dataframe one by one on the provided schema
#### information. below mentioned the list of columns which we need to use while creating fact_table mapping.
#### 1.location_dim -> location, streetname, street_number,zipcode,lat,lon
#### 2.atm_dim -> atm_id,atm_number
#### 3.date_dim -> year, month, day, hour,weekday
#### 4.card_dim -> card_type

####### Alias Creation- first creating alias of all the above created dataframe.

In [57]:
df=df.alias('df')

In [58]:
df_loc_dim=df_loc_dim.alias("df_loc_dim")

In [59]:
df_atm_dim=df_atm_dim.alias("df_atm_dim")

In [60]:
df_date_dim=df_date_dim.alias("df_date_dim")

In [61]:
df_card_type_dim=df_card_type_dim.alias("df_card_type_dim")

In [None]:
## Joining above created alias dataframe with created dim dataframe based on schema provided. and dropping those columns from 
## the defined dataframe so that those columns will not present in fact_table. 

## join with date_dim dataframe using left outer join with original datafram which we created after schema mapping and 
## stored the result set in df_stg1.

In [62]:
df_stg1=df.join(df_date_dim, on=["year", "month", "day", "hour","weekday"], how="left").select("df.*","df_date_dim.date_id").drop(
                                *["year", "month", "day", "hour","weekday"])

In [63]:
df_stg1.show(5)

+----------+------+----------------+--------------------+--------------+-----------------+-----------+-------+-------+--------+------------------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+
|atm_status|atm_id|atm_manufacturer|        atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency|         card_type|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|   temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|date_id|
+----------+------+----------------+--------------------+--------------+-----------------+-----------+-------+-------+--------+------------------+------------------+----------+------------+------------+-----------+-----------+---------------+--

In [64]:
df_stg1=df_stg1.alias("df_stg1")

In [65]:
## Now joining (left join) df_stg1 dataframe with df_card_type_dim and dropping card_type column.
df_stg2=df_stg1.join(df_card_type_dim, on=["card_type"], how="left").select("df_stg1.*","df_card_type_dim.card_type_id").drop(
                                        *["card_type"])
df_stg2.show(2)

+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+------------+
|atm_status|atm_id|atm_manufacturer|atm_location|atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|currency|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|   temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|date_id|card_type_id|
+----------+------+----------------+------------+--------------+-----------------+-----------+-------+-------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+-------+--------+--------+----------+--

In [66]:
df_stg2 = df_stg2.alias("df_stg2")

In [67]:
## renaming few columns with withColumnRenamed() method and then joining df_stg2 with loc_dim dataframe.
df_stg3 = df_stg2.withColumnRenamed("atm_location","location").withColumnRenamed("atm_lat","lat").withColumnRenamed("atm_lon",
                    "lon").withColumnRenamed("atm_streetname","streetname").withColumnRenamed("atm_street_number",
                    "street_number").withColumnRenamed("atm_zipcode","zipcode").join(df_loc_dim, on=["location", "streetname", "street_number","zipcode","lat","lon"], how="left").select("df_stg2.*","df_loc_dim.location_id").drop(
                    *["location", "streetname", "street_number","zipcode","lat","lon"])

In [68]:
df_stg3.show(2)

+----------+------+----------------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+------------+-----------+
|atm_status|atm_id|atm_manufacturer|currency|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|date_id|card_type_id|location_id|
+----------+------+----------------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+------------+-----------+
|  Inactive|    89|             NCR|     DKK|               463|Withdrawal|        null|        n

In [69]:
df_stg3=df_stg3.alias("df_stg3")

In [70]:
# last we are joining with atm_dim dataframe and dropping unused columns.
df_stg4=df_stg3.withColumnRenamed("atm_id","atm_number").join(df_atm_dim, on=['atm_number','atm_manufacturer','location_id'],
                    how="left").select("df_stg3.*","df_atm_dim.atm_id").drop(*['atm_number','atm_manufacturer'])

df_stg4.show(2)

+-----------+----------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+------------+------+
|location_id|atm_status|currency|transaction_amount|   service|message_code|message_text|weather_lat|weather_lon|weather_city_id|weather_city_name|  temp|pressure|humidity|wind_speed|wind_deg|rain_3h|clouds_all|weather_id|weather_main|weather_description|date_id|card_type_id|atm_id|
+-----------+----------+--------+------------------+----------+------------+------------+-----------+-----------+---------------+-----------------+------+--------+--------+----------+--------+-------+----------+----------+------------+-------------------+-------+------------+------+
|         11|    Active|     DKK|              2613|Withdrawal|        null|        null|     55.709|      9.536|        2610613|            Vejle|2

In [71]:
df_stg4.count()

2468572

In [72]:
## Using rdd method zipWithIndex() with lambda function to append the rdd with element indices.
fact_rdd=df_stg4.rdd.zipWithIndex().map(lambda (row, rowId):(list(row)+[rowId+1]))

In [73]:
## adding primary key column of fact_table dataframe with lit() method.
fact_atm=df_stg4.withColumn("trans_id", lit(1))

In [74]:
## final dataframe of fact_table - fact_atm_trans and below showing details.
df_fact_atm_trans = spark.createDataFrame(fact_rdd, schema=fact_atm.schema).select(["trans_id", "atm_id","location_id",
                    "date_id","card_type_id","atm_status","currency","service","transaction_amount","message_code",
                    "message_text","rain_3h","clouds_all","weather_id","weather_main","weather_description"])

df_fact_atm_trans.show()

+--------+------+-----------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|trans_id|atm_id|location_id|date_id|card_type_id|atm_status|currency|   service|transaction_amount|message_code|message_text|rain_3h|clouds_all|weather_id|weather_main| weather_description|
+--------+------+-----------+-------+------------+----------+--------+----------+------------------+------------+------------+-------+----------+----------+------------+--------------------+
|       1|     2|         11|   6975|           1|    Active|     DKK|Withdrawal|              2613|        null|        null|    0.0|        48|       701|        Mist|                mist|
|       2|     2|         11|   6975|           1|    Active|     DKK|Withdrawal|              7129|        null|        null|    0.0|        48|       701|        Mist|                mist|
|       3|     2|         11|   3066|        

In [75]:
# matched count with validation doc and same count with sqoop import. 
df_fact_atm_trans.count()

2468572

In [76]:
####### LOAD CSV files to S3 bucket s3://etl-project-dsc18/location-dim/  ########
## using write.csv() method we are loading all the above dataframe result set into s3 bucket.
## coalesce uses existing partitions to minimize the amount of data that's shuffled. repartition creates new partitions 
## and does a full shuffle. coalesce results in partitions with different amounts of data (sometimes partitions that have
## much different sizes) and repartition results in roughly equal sized partitions.

In [78]:
loc_dim_file = df_loc_dim.coalesce(1).write.csv("s3a://etl-project-dsc18/location-dim/", mode="overwrite", header=True)

In [79]:
atm_dim_file = df_atm_dim.coalesce(1).write.csv("s3a://etl-project-dsc18/atm-dim/", mode="overwrite", header=True)

In [80]:
date_dim_file = df_date_dim.coalesce(1).write.csv("s3a://etl-project-dsc18/date-dim/", mode="overwrite", header=True)

In [81]:
card_type_file = df_card_type_dim.coalesce(1).write.csv("s3a://etl-project-dsc18/card-type-dim/", mode="overwrite", header=True)

In [82]:
fact_file = df_fact_atm_trans.coalesce(1).write.csv("s3a://etl-project-dsc18/fact-atm-trans/", mode="overwrite", header=True)

In [None]:
######## Prepared by - Pritamkumar. PGDDS-C18 ETL PROJECT.