In [1]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('demo').master("local").getOrCreate()
spark

In [4]:
sc = spark.sparkContext
sc

In [5]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

In [1]:
# Defining the struct types for each column based schema.

In [6]:
fileSchema = StructType([StructField('year', IntegerType(),False),
                        StructField('month', StringType(),False),
                        StructField('day', IntegerType(),False),
                        StructField('weekday', StringType(),False),
                        StructField('hour', IntegerType(),False),
                        StructField('atm_status', StringType(),False),
                        StructField('atm_id', StringType(),False),
                        StructField('atm_manufacturer', StringType(),False),
                        StructField('atm_location', StringType(),False),
                        StructField('atm_streetname', StringType(),False),
                        StructField('atm_street_number', IntegerType(),False),
                        StructField('atm_zipcode', IntegerType(),False),
                        StructField('atm_lat', DoubleType(),False),
                        StructField('atm_lon', DoubleType(),False),
                        StructField('currency', StringType(),False),
                        StructField('card_type', StringType(),False),
                        StructField('transaction_amount', IntegerType(),False), 
                        StructField('service', StringType(),False),
                        StructField('message_code', StringType(),True),
                        StructField('message_text', StringType(),True),
                        StructField('weather_lat', DoubleType(),False),
                        StructField('weather_lon', DoubleType(),False),
                        StructField('weather_city_id', IntegerType(),False),
                        StructField('weather_city_name', StringType(),False), 
                        StructField('temp', DoubleType(),False),
                        StructField('pressure', IntegerType(),False), 
                        StructField('humidity', IntegerType(),False), 
                        StructField('wind_speed', IntegerType(),False), 
                        StructField('wind_deg', IntegerType(),False), 
                        StructField('rain_3h', DoubleType(),True), 
                        StructField('clouds_all', IntegerType(),False), 
                        StructField('weather_id', IntegerType(),False), 
                        StructField('weather_main', StringType(),False), 
                        StructField('weather_description', StringType(),False), 
                        ])

In [7]:
files = spark.read.csv("hdfs:/user/root/atm_transaction_etl", header = False, schema = fileSchema)

In [8]:
#Checking number of records loaded from HDFS
files.count()

2468572

In [9]:
files.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [10]:
files.columns

['year',
 'month',
 'day',
 'weekday',
 'hour',
 'atm_status',
 'atm_id',
 'atm_manufacturer',
 'atm_location',
 'atm_streetname',
 'atm_street_number',
 'atm_zipcode',
 'atm_lat',
 'atm_lon',
 'currency',
 'card_type',
 'transaction_amount',
 'service',
 'message_code',
 'message_text',
 'weather_lat',
 'weather_lon',
 'weather_city_id',
 'weather_city_name',
 'temp',
 'pressure',
 'humidity',
 'wind_speed',
 'wind_deg',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description']

In [11]:
from pyspark.sql.functions import *
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window

##### ADDING INDEX COLUMNS TO FILE THAT ARE USED AS PRIMARY KEYS FOR EACH DIM TABLES

In [12]:
from pyspark.sql import functions as sf

##### Summary so far - 

###### 1. We have checked schema after assigning Struct types.
###### 2. We have validated the no. of rows loaded.
###### 3. We have also validated the column names. 

##### Since, all of them are looking as expected so we'll go ahead and create dimension tables as per our requirement. 

# Creating Location Dimension  

In [13]:
# Identifying LOCATION DIM From Source File.

temp_dim1_loc= files.select('atm_location','atm_streetname','atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon','atm_id')

In [14]:
#De-Duplicating Data based on Location parameters

DIM_LOCS = temp_dim1_loc.dropDuplicates((['atm_location','atm_streetname','atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon']))

In [15]:
temp_dim1_loc.show()

+------------------+--------------------+-----------------+-----------+-------+-------+------+
|      atm_location|      atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|atm_id|
+------------------+--------------------+-----------------+-----------+-------+-------+------+
|        NÃƒÂ¦stved|         Farimagsvej|                8|       4700| 55.233| 11.763|     1|
|          Vejgaard|          Hadsundvej|               20|       9000| 57.043|   9.95|     2|
|          Vejgaard|          Hadsundvej|               20|       9000| 57.043|   9.95|     2|
|             Ikast| RÃƒÂ¥dhusstrÃƒÂ¦det|               12|       7430| 56.139|  9.154|     3|
|        Svogerslev|        BrÃƒÂ¸nsager|                1|       4000| 55.634| 12.018|     4|
|              Nibe|              Torvet|                1|       9240| 56.983|  9.639|     5|
|        Fredericia|    SjÃƒÂ¦llandsgade|               33|       7000| 55.564|  9.757|     6|
|         Hjallerup|   Hjallerup Centret|         

In [16]:
DIM_LOCS.count()


109

##### Count of LOC DIM is as expected.

In [17]:
DIM_LOC = DIM_LOCS.withColumn(
    "location_id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)

In [18]:
#Order DIM: Location by Primary Index
DIM_LOC.orderBy('location_id', ascending=True)

DataFrame[atm_location: string, atm_streetname: string, atm_street_number: int, atm_zipcode: int, atm_lat: double, atm_lon: double, atm_id: string, location_id: int]

In [19]:
DIM_LOC.show(5)

+-----------------+----------------+-----------------+-----------+-------+-------+------+-----------+
|     atm_location|  atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|atm_id|location_id|
+-----------------+----------------+-----------------+-----------+-------+-------+------+-----------+
|            Vadum|  Ellehammersvej|               43|       9430| 57.118|  9.861|    33|          1|
|         Slagelse| Mariendals Alle|               29|       4200| 55.398| 11.342|    31|          2|
|       Fredericia|SjÃƒÂ¦llandsgade|               33|       7000| 55.564|  9.757|     6|          3|
|HÃƒÂ¸rning Hallen|        Toftevej|               53|       8362| 56.091| 10.033|   108|          4|
|          Kolding|        Vejlevej|              135|       6000| 55.505|  9.457|    19|          5|
+-----------------+----------------+-----------------+-----------+-------+-------+------+-----------+
only showing top 5 rows



In [20]:
DIM_LOC= DIM_LOC.select('location_id','atm_location','atm_streetname','atm_street_number','atm_zipcode','atm_lat',
                       'atm_lon')

In [21]:
DIM_LOC.show(5)

+-----------+-----------------+----------------+-----------------+-----------+-------+-------+
|location_id|     atm_location|  atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|
+-----------+-----------------+----------------+-----------------+-----------+-------+-------+
|          1|            Vadum|  Ellehammersvej|               43|       9430| 57.118|  9.861|
|          2|         Slagelse| Mariendals Alle|               29|       4200| 55.398| 11.342|
|          3|       Fredericia|SjÃƒÂ¦llandsgade|               33|       7000| 55.564|  9.757|
|          4|          Kolding|        Vejlevej|              135|       6000| 55.505|  9.457|
|          5|HÃƒÂ¸rning Hallen|        Toftevej|               53|       8362| 56.091| 10.033|
+-----------+-----------------+----------------+-----------------+-----------+-------+-------+
only showing top 5 rows



# Creating ATM Dimension

In [22]:
# Identifying ATM DIM From Source File.
# file_indexes = files.withColumnRenamed('atm_id','atm_number')
temp_dim2_atm= files.select('atm_lat','atm_lon','atm_id','atm_manufacturer')

#De-Duplicating Data based on ATM parameters
DIM_ATM = temp_dim2_atm.dropDuplicates((['atm_id']))

In [23]:
temp_dim2_atm.registerTempTable("atm")
DIM_LOC.registerTempTable("loc")

In [24]:
## Creating a temp table to join with location table to copy the atm location.

DIM_ATMt= spark.sql("select atm.atm_id,atm.atm_manufacturer,loc.*from atm left join loc on atm.atm_lat==loc.atm_lat and atm.atm_lon==loc.atm_lon").distinct()

In [25]:
DIM_ATMt.count()

156

In [26]:
# Creating primary key for the ATM DIM.

DIM_ATMtt =  DIM_ATMt.withColumn(
    "atm_prim_id",row_number().over(Window.orderBy(monotonically_increasing_id()))
)

In [27]:
#Order DIM: ATM by Primary Index

DIM_ATMtt.orderBy('atm_prim_id', ascending=True)

DataFrame[atm_id: string, atm_manufacturer: string, location_id: int, atm_location: string, atm_streetname: string, atm_street_number: int, atm_zipcode: int, atm_lat: double, atm_lon: double, atm_prim_id: int]

In [28]:
DIM_ATMtt.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- location_id: integer (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- atm_prim_id: integer (nullable = true)



In [29]:
DIM_ATM = DIM_ATMtt.select('atm_prim_id','atm.atm_id','atm_manufacturer','location_id')

In [30]:
DIM_ATM.count()

156

In [31]:
DIM_ATM.show()

+-----------+------+----------------+-----------+
|atm_prim_id|atm_id|atm_manufacturer|location_id|
+-----------+------+----------------+-----------+
|          1|    35|             NCR|         15|
|          2|    51|             NCR|         65|
|          3|     9| Diebold Nixdorf|         76|
|          4|    62| Diebold Nixdorf|         30|
|          5|    34|             NCR|         43|
|          6|    76|             NCR|         41|
|          7|    49|             NCR|         80|
|          8|    88|             NCR|         66|
|          9|    59| Diebold Nixdorf|         90|
|         10|    93|             NCR|         55|
|         11|    42|             NCR|         31|
|         12|     8|             NCR|         21|
|         13|    30|             NCR|         90|
|         14|    89|             NCR|         88|
|         15|   113| Diebold Nixdorf|         49|
|         16|    79|             NCR|         29|
|         17|   104|             NCR|         68|


##### DIM_ATM count & schema is as expected.  

# Creating Date Dimension 

In [32]:
# Identifying DATE DIM From Source File.

temp_dim3_date= files.select('year','month','day','hour','weekday')

In [33]:
### Convert int String Month to Numeric

date_conv = temp_dim3_date.withColumn("month",from_unixtime(unix_timestamp(col("Month"),'MMM'),'MM'))

In [34]:
date_conv.show()

+----+-----+---+----+-------+
|year|month|day|hour|weekday|
+----+-----+---+----+-------+
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
|2017|   01|  1|   0| Sunday|
+----+-----+---+----+-------+
only showing top 20 rows



#### Merging day month and year to a full date and time format

In [35]:
from pyspark.sql import functions as sf
date_conv = date_conv.withColumn('full_date_time',sf.concat(sf.col('year'),sf.lit('/'),sf.col('month'),sf.lit('/'),sf.col('day'),sf.lit(' '),sf.col('hour'),sf.lit(':'),sf.lit('00:00')))

In [36]:
date_conv.show()

+----+-----+---+----+-------+-----------------+
|year|month|day|hour|weekday|   full_date_time|
+----+-----+---+----+-------+-----------------+
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0:00:00|
|2017|   01|  1|   0| Sunday|2017/01/1 0

##### Converting full date time created above into time stamp format

In [37]:
date_conv2 = date_conv.withColumn('full_date_time', unix_timestamp(date_conv['full_date_time'], 'yyyy/MM/dd HH:mm:ss').cast('timestamp'))

In [38]:
date_conv2.show()

+----+-----+---+----+-------+-------------------+
|year|month|day|hour|weekday|     full_date_time|
+----+-----+---+----+-------+-------------------+
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|
|2017|   01|  1|   0| Sunday|2017-01-01 00:00:00|


In [39]:
date_conv2.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- full_date_time: timestamp (nullable = true)



In [40]:
#De-Duplicating Data based on DATE parameters

DIM_DATE = date_conv2.dropDuplicates((['full_date_time']))

#Order DIM: ATM by Primary Index

In [41]:
DIM_DATE.count()

#Count looks as expected

8685

In [42]:
DIM_DATE.show()

+----+-----+---+----+---------+-------------------+
|year|month|day|hour|  weekday|     full_date_time|
+----+-----+---+----+---------+-------------------+
|2017|   01|  5|   1| Thursday|2017-01-05 01:00:00|
|2017|   01| 11|   8|Wednesday|2017-01-11 08:00:00|
|2017|   01| 21|  22| Saturday|2017-01-21 22:00:00|
|2017|   02|  7|  11|  Tuesday|2017-02-07 11:00:00|
|2017|   02|  9|   7| Thursday|2017-02-09 07:00:00|
|2017|   02|  9|  22| Thursday|2017-02-09 22:00:00|
|2017|   02| 16|   5| Thursday|2017-02-16 05:00:00|
|2017|   02| 23|  18| Thursday|2017-02-23 18:00:00|
|2017|   02| 25|  11| Saturday|2017-02-25 11:00:00|
|2017|   02| 26|  13|   Sunday|2017-02-26 13:00:00|
|2017|   03| 12|  13|   Sunday|2017-03-12 13:00:00|
|2017|   04|  1|   8| Saturday|2017-04-01 08:00:00|
|2017|   04|  6|  14| Thursday|2017-04-06 14:00:00|
|2017|   04| 21|  17|   Friday|2017-04-21 17:00:00|
|2017|   05|  6|  15| Saturday|2017-05-06 15:00:00|
|2017|   05|  9|  14|  Tuesday|2017-05-09 14:00:00|
|2017|   05|

In [43]:
#Adding primary key

DIM_DATE = DIM_DATE.withColumn(
    "date_id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)

In [44]:
DIM_DATE.count()

8685

In [45]:
DIM_DATE.show()

+----+-----+---+----+---------+-------------------+-------+
|year|month|day|hour|  weekday|     full_date_time|date_id|
+----+-----+---+----+---------+-------------------+-------+
|2017|   01|  5|   1| Thursday|2017-01-05 01:00:00|      1|
|2017|   01| 11|   8|Wednesday|2017-01-11 08:00:00|      2|
|2017|   01| 21|  22| Saturday|2017-01-21 22:00:00|      3|
|2017|   02|  7|  11|  Tuesday|2017-02-07 11:00:00|      4|
|2017|   02|  9|   7| Thursday|2017-02-09 07:00:00|      5|
|2017|   02|  9|  22| Thursday|2017-02-09 22:00:00|      6|
|2017|   02| 16|   5| Thursday|2017-02-16 05:00:00|      7|
|2017|   02| 23|  18| Thursday|2017-02-23 18:00:00|      8|
|2017|   02| 25|  11| Saturday|2017-02-25 11:00:00|      9|
|2017|   02| 26|  13|   Sunday|2017-02-26 13:00:00|     10|
|2017|   03| 12|  13|   Sunday|2017-03-12 13:00:00|     11|
|2017|   04|  1|   8| Saturday|2017-04-01 08:00:00|     12|
|2017|   04|  6|  14| Thursday|2017-04-06 14:00:00|     13|
|2017|   04| 21|  17|   Friday|2017-04-2

# Creating Card Dimension

In [46]:
# Identifying CARD TYPE DIM From Source File.

temp_dim4_card= files.select('card_type')

#De-Duplicating Data based on DATE parameters

DIM_CARD = temp_dim4_card.dropDuplicates((['card_type']))

In [47]:
DIM_CARD = DIM_CARD.withColumn(
    "card_type_id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)

In [48]:
DIM_CARD.orderBy('card_type_id', ascending=True)

DataFrame[card_type: string, card_type_id: int]

In [49]:
DIM_CARD.count()

12

In [50]:
DIM_CARD.show()

+--------------------+------------+
|           card_type|card_type_id|
+--------------------+------------+
|     Dankort - on-us|           1|
|              CIRRUS|           2|
|         HÃƒÂ¦vekort|           3|
|                VISA|           4|
|  Mastercard - on-us|           5|
|             Maestro|           6|
|Visa Dankort - on-us|           7|
|        Visa Dankort|           8|
|            VisaPlus|           9|
|          MasterCard|          10|
|             Dankort|          11|
| HÃƒÂ¦vekort - on-us|          12|
+--------------------+------------+



In [2]:
#Reverse Sync PK's to source file by using joins
#This is used while doing analytics and thus keeping all primary keys created above intact with original source file.

In [51]:
ff = files.join(DIM_ATMt,on=['atm_id','atm_manufacturer','atm_location','atm_streetname','atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon'],how='left')

In [52]:
ff.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [53]:
ff.count()

2468572

In [54]:
ff.filter(ff.location_id.isNotNull()).count()

2468572

In [55]:
## Renaming as there is clash on location id where cols from ATM location also gets copied during join. 
## These renamed cols will be retained and reverted to previous values while joined table cols will be deleted. 
## This process is purely as for convenience.

In [56]:
ff = ff.withColumnRenamed('location_id','tlocation_id')

In [57]:
ff9 = ff.join(DIM_ATMtt,on=['atm_id','atm_manufacturer','atm_location','atm_streetname','atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon'],how='left')

In [58]:
ff9.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [59]:
ff9.count()

2468572

In [60]:
ff9.filter(ff9.atm_prim_id.isNotNull()).count()

2468572

In [61]:
ff9 = ff9.drop('tlocation_id')
ff9.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

#####  code to reverse sync of card table PK

In [62]:
ff1 = ff9.withColumnRenamed('card_type','card_types')
files_index = ff1.join(DIM_CARD, ff1.card_types == DIM_CARD.card_type,how='LEFT')  

In [63]:
files_index.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_types: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: s

In [64]:
files_index.select('year', 'month', 'card_type', 'card_type_id','card_types').show(5)

+----+-------+---------------+------------+---------------+
|year|  month|      card_type|card_type_id|     card_types|
+----+-------+---------------+------------+---------------+
|2017|January|Dankort - on-us|           1|Dankort - on-us|
|2017|January|Dankort - on-us|           1|Dankort - on-us|
|2017|January|Dankort - on-us|           1|Dankort - on-us|
|2017|January|Dankort - on-us|           1|Dankort - on-us|
|2017|January|Dankort - on-us|           1|Dankort - on-us|
+----+-------+---------------+------------+---------------+
only showing top 5 rows



In [65]:
files_index.count()

2468572

In [66]:
files_index = files_index.drop('card_type')

In [67]:
files_index= files_index.withColumnRenamed('card_types','card_type')

In [68]:
files_index.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [69]:
## Checking point to make sure no previous data is disturbed.

In [70]:
DIM_LOC.printSchema()

root
 |-- location_id: integer (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)



In [71]:
DIM_LOC.count()

109

In [3]:
# So far everything is looking as expected. 

In [73]:
files_index2 = files_index.withColumn("month",from_unixtime(unix_timestamp(col("Month"),'MMM'),'MM'))

In [74]:
files_index2 = files_index2.withColumn('full_date_time',sf.concat(sf.col('year'),sf.lit('/'),sf.col('month'),sf.lit('/'),sf.col('day'),sf.lit(' '),sf.col('hour'),sf.lit(':'),sf.lit('00:00')))

In [75]:
files_index2 = files_index2.withColumn('full_date_time', unix_timestamp(files_index2['full_date_time'], 'yyyy/MM/dd HH:mm:ss').cast('timestamp'))

In [76]:
files_index2.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [4]:
#Renaming only for convenience. 

In [78]:
files_index2 = files_index2.withColumnRenamed('year','ryear')
files_index2 = files_index2.withColumnRenamed('month','rmonth')
files_index2 = files_index2.withColumnRenamed('day','rday')
files_index2 = files_index2.withColumnRenamed('weekday','rweekday')
files_index2 = files_index2.withColumnRenamed('hour','rhour')
files_index2 = files_index2.withColumnRenamed('full_date_time','rfull_date_time')

In [79]:
files_index2.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- ryear: integer (nullable = true)
 |-- rmonth: string (nullable = true)
 |-- rday: integer (nullable = true)
 |-- rweekday: string (nullable = true)
 |-- rhour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_nam

In [80]:
files_index2 = files_index2.join(DIM_DATE, files_index2.rfull_date_time == DIM_DATE.full_date_time,how='LEFT') 

In [81]:
files_index2.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- ryear: integer (nullable = true)
 |-- rmonth: string (nullable = true)
 |-- rday: integer (nullable = true)
 |-- rweekday: string (nullable = true)
 |-- rhour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_nam

In [82]:
files_index2.count()

2468572

In [83]:
files_index2 = files_index2.drop('year','month','day','weekday','hour','full_date_time')

In [84]:
files_index2 = files_index2.withColumnRenamed('ryear','year')
files_index2 = files_index2.withColumnRenamed('rmonth','month')
files_index2 = files_index2.withColumnRenamed('rday','day')
files_index2 = files_index2.withColumnRenamed('rweekday','weekday')
files_index2 = files_index2.withColumnRenamed('rhour','hour')
files_index2 = files_index2.withColumnRenamed('rfull_date_time','full_date_time')

In [85]:
files_index2.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [86]:
files_index2 = files_index2.drop('full_date_time')

In [87]:
files_index2.printSchema()

root
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

# Creating Fact Table

In [88]:
files_index2 = files_index2.withColumn(
    "trans_id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)

In [89]:
FACT_ATM_TRANS = files_index2.select('trans_id','atm_prim_id','location_id','date_id','card_type_id','atm_status','currency','service','transaction_amount','message_code','message_text',
                                     'rain_3h','clouds_all','weather_id','weather_main','weather_description')

In [90]:
FACT_ATM_TRANS.printSchema()

root
 |-- trans_id: integer (nullable = true)
 |-- atm_prim_id: integer (nullable = true)
 |-- location_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)



##### All rows loaded as expected and count is same.

In [92]:
FACT_ATM_TRANS.count()

2468572

In [93]:
FACT_ATM_TRANS.select('trans_id','atm_prim_id','location_id','date_id','card_type_id','atm_status').show(10)

+--------+-----------+-----------+-------+------------+----------+
|trans_id|atm_prim_id|location_id|date_id|card_type_id|atm_status|
+--------+-----------+-----------+-------+------------+----------+
|       1|        150|         98|      1|          10|    Active|
|       2|        113|         16|      1|          10|    Active|
|       3|         31|         55|      2|           1|    Active|
|       4|          8|         66|      2|           1|  Inactive|
|       5|         62|          6|      2|           1|    Active|
|       6|         36|          9|      2|           1|    Active|
|       7|         36|          9|      2|           1|    Active|
|       8|        135|         75|      2|           1|    Active|
|       9|        127|         32|      2|           3|    Active|
|      10|         57|         27|      2|           4|    Active|
+--------+-----------+-----------+-------+------------+----------+
only showing top 10 rows



In [94]:
FACT_ATM_TRANS.filter(FACT_ATM_TRANS.atm_prim_id.isNotNull()).count()

2468572

In [95]:
FACT_ATM_TRANS.filter(FACT_ATM_TRANS.location_id.isNotNull()).count()

2468572

In [96]:
FACT_ATM_TRANS.filter(FACT_ATM_TRANS.date_id.isNotNull()).count()

2468572

In [100]:
FACT_ATM_TRANS.filter(FACT_ATM_TRANS.card_type_id.isNotNull()).count()

2468572

## COPYING  FACT & DIMENSION TABLE to S3 BUCKET "artyagi-s3-bucket-etl-project"

In [101]:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key","AKIAZ6F5NFKWU4FGBHXG")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key","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")

In [102]:
DIM_LOC.write.save("s3a://artyagi-s3-bucket-etl-project/DIM_LOC",format='csv',header='true')

In [103]:
DIM_ATM.write.save("s3a://artyagi-s3-bucket-etl-project/DIM_ATM",format='csv',header='true')

In [104]:
DIM_DATE.write.save("s3a://artyagi-s3-bucket-etl-project/DIM_DATE",format='csv',header='true')

In [105]:
DIM_CARD.write.save("s3a://artyagi-s3-bucket-etl-project/DIM_CARD",format='csv',header='true')

In [106]:
FACT_ATM_TRANS.write.save("s3a://artyagi-s3-bucket-etl-project/FACT_ATM_TRANS",format='csv',header='true')

##### After this we will create the Redshift Cluster in AWS & then we will create the schema and load the data from S3 to Bucket into the Redshift.