# ETL CASESTUDY

<b>Problem Statement</b>


<u>Extract</u> data of atm transactions form RDS and perform ETL operations to <u>load</u> data into Amazon Redshift warehouse, using Sqoop pipeline, HDFS and S3 storages and PySpark for processing and <u>transformation</u>

## Contents:
* [Extract Data](#extract) 
* [Transform Data](#transform)
    * [Dimension tables](#dim)
        * [Location](#loc)
        * [ATM](#atm)
        * [Date](#date)
        * [Card](#card)
    * [Fact table](#fact)
* [Load Data into S3](#load)

In [1]:
# Setting environment variables

import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
# Get spark session
from pyspark.sql import SparkSession

In [3]:
# Get spark context
spark = SparkSession.builder.appName('ATM_Transactions').master("local").getOrCreate()
sc = spark.sparkContext
sc

## Extract Data <a id="extract"></a>

In [4]:
#Importing StructType
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, FloatType, DecimalType

 The Datatypes of `atm_lat`,`atm_lon`,`weather_lat`,`weather_lon`,`temp` and `rain_3h` has been mentioned has String
 insted of float or double or decimal because when reading the parquet file with custom schema, pyspark was not reading
 these columns as numeric. Efforts to specify it as any numeric type was throwing as error mentioned below.

<u> java.lang.UnsupportedOperationException: parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary</u>

 One of the work arounds is to read these columns as string type and then convert the column type to a numeric type.

In [5]:
#Schema for the parquet file

fileSchema = StructType([StructField('year', IntegerType(),True),
                        StructField('month', StringType(),True),
                        StructField('day', IntegerType(),True),
                        StructField('weekday', StringType(),True),
                        StructField('hour', IntegerType(),True),
                        StructField('atm_status', StringType(),True),
                        StructField('atm_id', StringType(),True),
                        StructField('atm_manufacturer', StringType(),True),
                        StructField('atm_location', StringType(),True),
                        StructField('atm_streetname', StringType(),True),
                        StructField('atm_street_number', IntegerType(),True),
                        StructField('atm_zipcode', IntegerType(),True),
                        StructField('atm_lat', StringType(),True),
                        StructField('atm_lon', StringType(),True),
                        StructField('currency', StringType(),True),
                        StructField('card_type', StringType(),True),
                        StructField('transaction_amount', IntegerType(),True),
                        StructField('service', StringType(),True),
                        StructField('message_code', StringType(),True),
                        StructField('message_text', StringType(),True),
                        StructField('weather_lat', StringType(),True),
                        StructField('weather_lon', StringType(),True),
                        StructField('weather_city_id', IntegerType(),True),
                        StructField('weather_city_name', StringType(),True),
                        StructField('temp', StringType(),True),
                        StructField('pressure', IntegerType(),True),
                        StructField('humidity', IntegerType(),True),
                        StructField('wind_speed', IntegerType(),True),
                        StructField('wind_deg', IntegerType(),True),
                        StructField('rain_3h', StringType(),True),
                        StructField('clouds_all', IntegerType(),True),
                        StructField('weather_id', IntegerType(),True),
                        StructField('weather_main', StringType(),True),
                        StructField('weather_description', StringType(),True)])

In [6]:
#Reading the parquet file from HDFS
master_df = spark.read.schema(fileSchema).load("atm_trans_data/99cabc80-c250-4b0e-97f2-1c6766ab412e.parquet")

In [7]:
# Check the schema
master_df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: string (nullable = true)
 |-- atm_lon: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: string (nullable = true)
 |-- weather_lon: string (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

## Transform Data <a id="transform"></a>

There are a few numeric values in 'weather_main' column, here I am replacing the numeric values with null

In [8]:
#Replacing the numeric values with null
from pyspark.sql.functions import col , column, when
master_df = master_df.withColumn("weather_main", when(col("weather_main").cast("int").isNotNull(), None)
                                     .otherwise(col("weather_main")))


In [9]:
#Change the datatype of "atm_lat","atm_lon","weather_lat","weather_lon","temp","rain_3h" to double.


master_df = master_df.withColumn('atm_lat', col("atm_lat").cast("Decimal(10,3)"))\
.withColumn('atm_lon', col("atm_lon").cast("Decimal(10,3)"))\
.withColumn('weather_lat', col("weather_lat").cast("Decimal(10,3)"))\
.withColumn('weather_lon', col("weather_lon").cast("Decimal(10,3)"))\
.withColumn('temp', col("temp").cast("Decimal(10,3)"))\
.withColumn('rain_3h', col("rain_3h").cast("Decimal(10,3)"))

master_df.printSchema()


root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: decimal(10,3) (nullable = true)
 |-- atm_lon: decimal(10,3) (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: decimal(10,3) (nullable = true)
 |-- weather_lon: decimal(10,3) (nullable = true)
 |-- weather_city_id: integer (nullable = true

In [10]:
# Count of total number of records read from HDFS
master_df.count()

2468572

## Dimension tables(DataFrames) <a id="dim"></a>



The data has been extracted and needs to be transformed further into fact and dimension tables as a preparation to load into data warehouse and perform analytical queries

### Location<a id="loc"></a>

- DIM_LOCATION

In [11]:
# Creating location dimension table
DIM_LOCATION = master_df.select('atm_location', 'atm_streetname', 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon').distinct()
DIM_LOCATION.show(5)

+-------------+-----------------+-----------------+-----------+-------+-------+
| atm_location|   atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|
+-------------+-----------------+-----------------+-----------+-------+-------+
|      Gistrup|       Hadsundvej|              346|       9260| 56.997|  9.993|
|Intern Skagen|Sct. Laurentiivej|               36|       9990| 57.723| 10.590|
|      KÃƒÂ¸ge|   SÃƒÂ¸ndre Alle|                1|       4600| 55.454| 12.181|
|    Svenstrup|   GodthÃƒÂ¥bsvej|               14|       9230| 56.973|  9.851|
|  Aalborg Syd|         Hobrovej|              440|       9200| 57.005|  9.881|
+-------------+-----------------+-----------------+-----------+-------+-------+
only showing top 5 rows



In [12]:
# Adding primary key
from pyspark.sql.window import Window
import pyspark.sql.functions as func

# Create the window specification
w = Window.orderBy("atm_zipcode")

# Use row number with the window specification
DIM_LOCATION = DIM_LOCATION.withColumn("location_id", func.row_number().over(w))

DIM_LOCATION.show(10)

+--------------------+----------------+-----------------+-----------+-------+-------+-----------+
|        atm_location|  atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|location_id|
+--------------------+----------------+-----------------+-----------+-------+-------+-----------+
|        KÃƒÂ¸benhavn|  Regnbuepladsen|                5|       1550| 55.676| 12.571|          1|
|Intern  KÃƒÂ¸benhavn|RÃƒÂ¥dhuspladsen|               75|       1550| 55.676| 12.571|          2|
|       Frederiksberg| Gammel Kongevej|              157|       1850| 55.677| 12.537|          3|
|              Lyngby|     Jernbanevej|                6|       2800| 55.772| 12.500|          4|
|        HelsingÃƒÂ¸r|  Sct. Olai Gade|               39|       3000| 56.036| 12.612|          5|
|         HillerÃƒÂ¸d|KÃƒÂ¸benhavnsvej|               31|       3400| 55.933| 12.314|          6|
|HillerÃƒÂ¸d IdrÃƒ...|      Milnersvej|               39|       3400| 55.921| 12.299|          7|
|          Svogersle

In [13]:
# Renaming columns according to target schema
DIM_LOCATION = DIM_LOCATION.withColumnRenamed("location_id","location_id") \
.withColumnRenamed("atm_location","location") \
.withColumnRenamed("atm_streetname","streetname")\
.withColumnRenamed("atm_street_number","street_number")\
.withColumnRenamed("atm_zipcode","zipcode")\
.withColumnRenamed("atm_lat","lat")\
.withColumnRenamed("atm_lon","lon")

DIM_LOCATION.schema.names

['location',
 'streetname',
 'street_number',
 'zipcode',
 'lat',
 'lon',
 'location_id']

In [14]:
# Rearranging columns according to target schema

DIM_LOCATION = DIM_LOCATION.select('location_id', 'location', 'streetname', 'street_number', 'zipcode', 'lat', 'lon')

DIM_LOCATION.printSchema()

root
 |-- location_id: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- streetname: string (nullable = true)
 |-- street_number: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: decimal(10,3) (nullable = true)
 |-- lon: decimal(10,3) (nullable = true)



In [15]:
# Verifying the count
DIM_LOCATION.count()

109

### ATM <a id="atm"></a>

- DIM_ATM

In [16]:
# Get ATM dimension data
ATM = master_df.select('atm_id', 'atm_manufacturer', 'atm_lat', 'atm_lon')
ATM.show(5)

+------+----------------+-------+-------+
|atm_id|atm_manufacturer|atm_lat|atm_lon|
+------+----------------+-------+-------+
|     1|             NCR| 55.233| 11.763|
|     2|             NCR| 57.043|  9.950|
|     2|             NCR| 57.043|  9.950|
|     3|             NCR| 56.139|  9.154|
|     4|             NCR| 55.634| 12.018|
+------+----------------+-------+-------+
only showing top 5 rows



In [17]:
# Renaming columns to facilitate merging with location data
ATM = ATM.withColumnRenamed("atm_id","atm_number") \
.withColumnRenamed("atm_manufacturer","atm_manufacturer")\
.withColumnRenamed("atm_lat","lat")\
.withColumnRenamed("atm_lon","lon")

#Validate
ATM.schema.names


# Merge with location data
DIM_ATM = ATM.join(DIM_LOCATION, on=["lat", "lon"], how='inner')

In [18]:
# Validate data merge
DIM_ATM.show(5)

+------+------+----------+----------------+-----------+------------+--------------+-------------+-------+
|   lat|   lon|atm_number|atm_manufacturer|location_id|    location|    streetname|street_number|zipcode|
+------+------+----------+----------------+-----------+------------+--------------+-------------+-------+
|55.676|12.571|        85| Diebold Nixdorf|          1|KÃƒÂ¸benhavn|Regnbuepladsen|            5|   1550|
|55.676|12.571|        85| Diebold Nixdorf|          1|KÃƒÂ¸benhavn|Regnbuepladsen|            5|   1550|
|55.676|12.571|        85| Diebold Nixdorf|          1|KÃƒÂ¸benhavn|Regnbuepladsen|            5|   1550|
|55.676|12.571|        85| Diebold Nixdorf|          1|KÃƒÂ¸benhavn|Regnbuepladsen|            5|   1550|
|55.676|12.571|        85| Diebold Nixdorf|          1|KÃƒÂ¸benhavn|Regnbuepladsen|            5|   1550|
+------+------+----------+----------------+-----------+------------+--------------+-------------+-------+
only showing top 5 rows



In [19]:
#Drop unwanted columns
columns_to_drop = ['location', 'streetname', 'street_number', "zipcode","lat", "lon"]
DIM_ATM = DIM_ATM.drop(*columns_to_drop)

In [20]:
#Validate column drop
DIM_ATM.schema.names

['atm_number', 'atm_manufacturer', 'location_id']

In [21]:
# Removing duplicates

DIM_ATM = DIM_ATM.select("atm_number","location_id","atm_manufacturer").distinct()

In [22]:
# Renaming columns according to target schema
DIM_ATM = DIM_ATM.withColumnRenamed("atm_number","atm_number") \
.withColumnRenamed("atm_manufacturer","atm_manufacturer")\
.withColumnRenamed("location_id","atm_location_id")

DIM_ATM.schema.names

['atm_number', 'atm_location_id', 'atm_manufacturer']

In [23]:
# Adding primary key

# Create the window specification
w = Window.orderBy("atm_location_id")

# Use row number with the window specification
DIM_ATM = DIM_ATM.withColumn("atm_id", func.row_number().over(w))

DIM_ATM.show(10)

+----------+---------------+----------------+------+
|atm_number|atm_location_id|atm_manufacturer|atm_id|
+----------+---------------+----------------+------+
|        95|              1|             NCR|     1|
|        85|              1| Diebold Nixdorf|     2|
|        85|              2| Diebold Nixdorf|     3|
|        95|              2|             NCR|     4|
|        47|              3|             NCR|     5|
|        44|              4|             NCR|     6|
|        46|              5| Diebold Nixdorf|     7|
|        57|              6|             NCR|     8|
|        86|              7|             NCR|     9|
|         4|              8|             NCR|    10|
+----------+---------------+----------------+------+
only showing top 10 rows



In [24]:
# Rearranging columns according to target schema

DIM_ATM = DIM_ATM.select('atm_id', 'atm_number', 'atm_manufacturer', 'atm_location_id')

DIM_ATM.printSchema()

root
 |-- atm_id: integer (nullable = true)
 |-- atm_number: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location_id: integer (nullable = true)



In [25]:
# Validating ATM dimension count
DIM_ATM.count()


156


### Date <a id="date"></a>

- DIM_DATE

In [26]:
#Get date dimension from master dataframe
DIM_DATE = master_df.select('year', 'month', 'day', 'weekday', 'hour').distinct()
DIM_DATE.count()

8685

In [27]:
from pyspark.sql.functions import concat, lit

# Create full_date_time column

concat(col("k"), lit(" "), col("v"))
DIM_DATE = DIM_DATE.withColumn("full_date_time", concat(col('year'), lit("/"),col("month"), lit("/"),
                                                        col('day'), lit(" "),col('hour')))
DIM_DATE.show(5)

+----+-------+---+--------+----+------------------+
|year|  month|day| weekday|hour|    full_date_time|
+----+-------+---+--------+----+------------------+
|2017|January|  1|  Sunday|   9|  2017/January/1 9|
|2017|January|  3| Tuesday|   5|  2017/January/3 5|
|2017|January|  8|  Sunday|  19| 2017/January/8 19|
|2017|January| 21|Saturday|   3| 2017/January/21 3|
|2017|January| 23|  Monday|  21|2017/January/23 21|
+----+-------+---+--------+----+------------------+
only showing top 5 rows



In [28]:
from pyspark.sql.functions import unix_timestamp

# Type casting full_date_time to timestamp
date_Time_pattern = 'yyyy/MMMMM/dd H'
DIM_DATE = DIM_DATE.withColumn('full_date_time', unix_timestamp(DIM_DATE['full_date_time'], date_Time_pattern).cast('timestamp'))
DIM_DATE.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- full_date_time: timestamp (nullable = true)



In [29]:
# Adding primary key

# Create the window specification
w = Window.orderBy("full_date_time")

# Use row number with the window specification
DIM_DATE = DIM_DATE.withColumn("date_id", func.row_number().over(w))

DIM_DATE.show(5)

+----+-------+---+-------+----+-------------------+-------+
|year|  month|day|weekday|hour|     full_date_time|date_id|
+----+-------+---+-------+----+-------------------+-------+
|2017|January|  1| Sunday|   0|2017-01-01 00:00:00|      1|
|2017|January|  1| Sunday|   1|2017-01-01 01:00:00|      2|
|2017|January|  1| Sunday|   2|2017-01-01 02:00:00|      3|
|2017|January|  1| Sunday|   3|2017-01-01 03:00:00|      4|
|2017|January|  1| Sunday|   4|2017-01-01 04:00:00|      5|
+----+-------+---+-------+----+-------------------+-------+
only showing top 5 rows



In [30]:
# Rearranging columns according to target schema

DIM_DATE = DIM_DATE.select('date_id', 'full_date_time', 'year', 'month', 'day', 'hour', 'weekday')

DIM_DATE.printSchema()

root
 |-- date_id: integer (nullable = true)
 |-- full_date_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekday: string (nullable = true)



In [31]:
#Verify count
DIM_DATE.count()

8685

### Card Type <a id="card"></a>

- DIM_CARD_TYPE

In [32]:
#Create card type dimension
DIM_CARD_TYPE = master_df.select('card_type').distinct()
DIM_CARD_TYPE.show()

+--------------------+
|           card_type|
+--------------------+
|     Dankort - on-us|
|              CIRRUS|
|         HÃƒÂ¦vekort|
|                VISA|
|  Mastercard - on-us|
|             Maestro|
|Visa Dankort - on-us|
|        Visa Dankort|
|            VisaPlus|
|          MasterCard|
|             Dankort|
| HÃƒÂ¦vekort - on-us|
+--------------------+



In [33]:
#Add primary key
pd_cardType = DIM_CARD_TYPE.toPandas()
pd_cardType["card_type_id"] = range(1, pd_cardType.shape[0]+1)
DIM_CARD_TYPE = spark.createDataFrame(pd_cardType)
DIM_CARD_TYPE.show()

+--------------------+------------+
|           card_type|card_type_id|
+--------------------+------------+
|     Dankort - on-us|           1|
|              CIRRUS|           2|
|         HÃƒÂ¦vekort|           3|
|                VISA|           4|
|  Mastercard - on-us|           5|
|             Maestro|           6|
|Visa Dankort - on-us|           7|
|        Visa Dankort|           8|
|            VisaPlus|           9|
|          MasterCard|          10|
|             Dankort|          11|
| HÃƒÂ¦vekort - on-us|          12|
+--------------------+------------+



In [34]:
# Rearranging columns according to target schema

DIM_CARD_TYPE = DIM_CARD_TYPE.select('card_type_id', 'card_type')

DIM_CARD_TYPE.printSchema()

root
 |-- card_type_id: long (nullable = true)
 |-- card_type: string (nullable = true)



In [35]:
#Verify count
DIM_CARD_TYPE.count()

12

## Fact Table<a id="fact"></a>

- FACT_ATM_TRANS

In [36]:
# Join Location dimension
FACT_ATM_TRANS = master_df.join(DIM_LOCATION,(master_df["atm_location"] == DIM_LOCATION["location"]) & 
                                (master_df["atm_streetname"] == DIM_LOCATION["streetname"]) &
                                (master_df["atm_street_number"] == DIM_LOCATION["street_number"]) &
                                (master_df["atm_zipcode"] == DIM_LOCATION["zipcode"]) &
                                (master_df["atm_lat"] == DIM_LOCATION["lat"]) &
                                (master_df["atm_lon"] == DIM_LOCATION["lon"]), how='inner')                          

In [37]:
# Renaming to facilitate merg
FACT_ATM_TRANS = FACT_ATM_TRANS.withColumnRenamed("atm_id","atm_number")

In [38]:
# Join ATM dimension
FACT_ATM_TRANS = FACT_ATM_TRANS.join(DIM_ATM,(FACT_ATM_TRANS["atm_number"] == DIM_ATM["atm_number"]) & 
                                (FACT_ATM_TRANS["atm_manufacturer"] == DIM_ATM["atm_manufacturer"]) &
                                (FACT_ATM_TRANS["location_id"] == DIM_ATM["atm_location_id"]), how='inner')

In [39]:
# Join CARD dimension
FACT_ATM_TRANS = FACT_ATM_TRANS.join(DIM_CARD_TYPE, on = ["card_type"], how='inner') 

In [40]:
# Join Date dimension
FACT_ATM_TRANS = FACT_ATM_TRANS.join(DIM_DATE,on = ['year', 'month', 'day', 'hour', 'weekday'], how='inner') 

In [41]:
# Verify count
FACT_ATM_TRANS.count()

2468572

In [42]:
#Drop unwanted columns
columns_to_drop = ['year', 'month', 'day', 'hour', 'weekday','atm_number', 'atm_manufacturer', 'atm_location', 'atm_streetname',
 'atm_street_number', 'atm_zipcode', 'atm_lat', 'atm_lon','weather_lat', 'weather_lon', 'weather_city_id', 'weather_city_name',
 'temp', 'pressure', 'humidity', 'wind_speed', 'wind_deg', 'card_type','location', 'streetname', 'street_number', 'zipcode',
 'lat', 'lon','atm_number', 'atm_manufacturer', 'atm_location_id', 'full_date_time']
FACT_ATM_TRANS = FACT_ATM_TRANS.drop(*columns_to_drop)

In [43]:
FACT_ATM_TRANS.schema.names

['atm_status',
 'currency',
 'transaction_amount',
 'service',
 'message_code',
 'message_text',
 'rain_3h',
 'clouds_all',
 'weather_id',
 'weather_main',
 'weather_description',
 'location_id',
 'atm_id',
 'card_type_id',
 'date_id']

In [44]:
# Renaming columns according to target schema
FACT_ATM_TRANS = FACT_ATM_TRANS.withColumnRenamed("location_id","weather_loc_id")

In [45]:
# Adding primary key

# Create the window specification
w = Window.orderBy("transaction_amount")

# Use row number with the window specification
FACT_ATM_TRANS = FACT_ATM_TRANS.withColumn("trans_id", func.row_number().over(w))

FACT_ATM_TRANS.select("weather_loc_id","atm_id","card_type_id","date_id","trans_id").show(5)

+--------------+------+------------+-------+--------+
|weather_loc_id|atm_id|card_type_id|date_id|trans_id|
+--------------+------+------------+-------+--------+
|            33|    50|           1|   6774|       1|
|           109|   156|           1|    777|       2|
|            11|    16|           1|   7570|       3|
|            44|    66|           1|   5539|       4|
|            64|    92|           1|   4278|       5|
+--------------+------+------------+-------+--------+
only showing top 5 rows



In [46]:
# Rearranging columns according to target schema

FACT_ATM_TRANS = FACT_ATM_TRANS.select("trans_id","atm_id","weather_loc_id","date_id","card_type_id","atm_status",
"currency","service","transaction_amount","message_code","message_text","rain_3h","clouds_all","weather_id","weather_main",
"weather_description")

FACT_ATM_TRANS.printSchema()

root
 |-- trans_id: integer (nullable = true)
 |-- atm_id: integer (nullable = true)
 |-- weather_loc_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- card_type_id: long (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- service: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- rain_3h: decimal(10,3) (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)



## Load Data into s3 <a id="load"></a>

 - <u>S3 Bucket</u>: atmtransetl
 - <u>Folder</u>: atmtransetl/atm_trans
 - <u>Location dimension file path</u>: s3a://atmtransetl/atm_trans/location
 - <u>ATM dimension file path</u>: s3a://atmtransetl/atm_trans/atm
 - <u>Card dimension file path</u>: s3a://atmtransetl/atm_trans/card
 - <u>Date dimension file path</u>: s3a://atmtransetl/atm_trans/date
 - <u>Transactions fact file path</u>: s3a://atmtransetl/atm_trans/trans

In [47]:
#Writing Location dimension table into S3 bucket
DIM_LOCATION.write.csv("s3a://atmtransetl/atm_trans/location", mode="overwrite", header = True)

In [48]:
#Writing ATM dimension table into S3 bucket
DIM_ATM.write.csv("s3a://atmtransetl/atm_trans/atm", mode="overwrite", header = True)

In [49]:
#Writing card type dimension table into S3 bucket
DIM_CARD_TYPE.write.csv("s3a://atmtransetl/atm_trans/card", mode="overwrite", header = True)

In [50]:
#Writing date dimension table into S3 bucket
DIM_DATE.write.csv("s3a://atmtransetl/atm_trans/date", mode="overwrite", header = True)

In [51]:
#Writing Transaction fact table into S3 bucket
FACT_ATM_TRANS.write.csv("s3a://atmtransetl/atm_trans/trans", mode="overwrite", header = True)