In [None]:
import findspark

findspark.init("C:\Program Files\spark\spark-3.3.2-bin-hadoop2")

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Bank_ATM_ETL').master("local").getOrCreate()

In [5]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
Schema = StructType([StructField('year', IntegerType(), nullable = True),
                        StructField('month', StringType(), True),
                        StructField('day', IntegerType(), True),
                        StructField('weekday', StringType(), True),
                        StructField('hour', IntegerType(), True),
                        StructField('atm_status', StringType(), True),
                        StructField('atm_id', StringType(), True),
                        StructField('atm_manufacturer', StringType(), True),
                        StructField('atm_location', StringType(), True),
                        StructField('atm_streetname', StringType(), True),
                        StructField('atm_street_number', IntegerType(), True),
                        StructField('atm_zipcode', IntegerType(), True),
                        StructField('atm_lat', DoubleType(), True),
                        StructField('atm_lon', DoubleType(), True),
                        StructField('currency', StringType(), True),
                        StructField('card_type', StringType(), True),
                        StructField('transaction_amount', IntegerType(), True),
                        StructField('service', StringType(), True),
                        StructField('message_code', StringType(), True),
                        StructField('message_text', StringType(), True),
                        StructField('weather_lat', DoubleType(), True),
                        StructField('weather_lon', DoubleType(), True),
                        StructField('weather_city_id', IntegerType(), True),
                        StructField('weather_city_name', StringType(), True),
                        StructField('temp', DoubleType(), True),
                        StructField('pressure', IntegerType(), True),
                        StructField('humidity', IntegerType(), True),
                        StructField('wind_speed', IntegerType(), True),
                        StructField('wind_deg', IntegerType(), True),
                        StructField('rain_3h', DoubleType(), True),
                        StructField('clouds_all', IntegerType(), True),
                        StructField('weather_id', IntegerType(), True),
                        StructField('weather_main', StringType(), True),
                        StructField('weather_description', StringType(), True)])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
df = spark.read.format("csv")\
                .option("inferSchema", "false")\
                .schema(Schema)\
                .option("header", "false")\
                .load("/user/hadoop/ETL/ATMDATA/part-m-00000")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- atm_status: string (nullable = true)
 |-- atm_id: string (nullable = true)
 |-- atm_manufacturer: string (nullable = true)
 |-- atm_location: string (nullable = true)
 |-- atm_streetname: string (nullable = true)
 |-- atm_street_number: integer (nullable = true)
 |-- atm_zipcode: integer (nullable = true)
 |-- atm_lat: double (nullable = true)
 |-- atm_lon: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- message_code: string (nullable = true)
 |-- message_text: string (nullable = true)
 |-- weather_lat: double (nullable = true)
 |-- weather_lon: double (nullable = true)
 |-- weather_city_id: integer (nullable = true)
 |-- weather_city_name: st

In [12]:
df.select('*').count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2468572

### Creating the Dimension and Fact tables
##### Creating a dataframe for Location Dimension according to Target Dimension Model

In [14]:
location = df.select("atm_location", "atm_streetname", "atm_street_number", "atm_zipcode", "atm_lat", "atm_lon").distinct()
location.select('*').count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# creating the primary key column
dim_location = location.select(row_number().over(Window.orderBy(location[1])).alias("location_id"), "*")
dim_location.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+-------------------+-----------------+-----------+-------+-------+-----------+
|    atm_location|     atm_streetname|atm_street_number|atm_zipcode|atm_lat|atm_lon|location_id|
+----------------+-------------------+-----------------+-----------+-------+-------+-----------+
|         Kolding|           Vejlevej|              135|       6000| 55.505|  9.457|          0|
|Intern HolbÃƒÂ¦k|        Slotsvolden|                7|       4300| 55.718| 11.704|          1|
|  Skelagervej 15|        Skelagervej|               15|       9000| 57.023|  9.891|          2|
|          Odense|       FÃƒÂ¦lledvej|                3|       5000| 55.394|  10.37|          3|
|           Ikast|RÃƒÂ¥dhusstrÃƒÂ¦det|               12|       7430| 56.139|  9.154|          4|
+----------------+-------------------+-----------------+-----------+-------+-------+-----------+
only showing top 5 rows

In [24]:
DIM_LOCATION = dim_location.withColumnRenamed('atm_location','location')\
                            .withColumnRenamed('atm_streetname','streetname')\
                            .withColumnRenamed('atm_street_number','street_number')\
                            .withColumnRenamed('atm_zipcode','zipcode')\
                            .withColumnRenamed('atm_lat','lat')\
                            .withColumnRenamed('atm_lon','lon')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
DIM_LOCATION.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+-------------------+-------------+-------+------+------+-----------+
|        location|         streetname|street_number|zipcode|   lat|   lon|location_id|
+----------------+-------------------+-------------+-------+------+------+-----------+
|         Kolding|           Vejlevej|          135|   6000|55.505| 9.457|          0|
|  Skelagervej 15|        Skelagervej|           15|   9000|57.023| 9.891|          1|
|Intern HolbÃƒÂ¦k|        Slotsvolden|            7|   4300|55.718|11.704|          2|
|          Odense|       FÃƒÂ¦lledvej|            3|   5000|55.394| 10.37|          3|
|           Ikast|RÃƒÂ¥dhusstrÃƒÂ¦det|           12|   7430|56.139| 9.154|          4|
+----------------+-------------------+-------------+-------+------+------+-----------+
only showing top 5 rows