In [None]:
# Install required dependencies 

!pip install -r requirments.txt



In [38]:
# import necessary dependencies
from pyspark.sql import SparkSession
from sqlalchemy import create_engine
from pyspark.sql.functions import monotonically_increasing_id
import pandas as pd
import os

## Data Extraction

In [None]:
# initializing a spark session

spark = SparkSession.builder.appName("BankTransactionETL").getOrCreate()

In [5]:
# spark standalone 
spark

In [6]:
# Extract data 
# inferSchema=True ; helps infer data types of the schema
bank_df = spark.read.csv("/Users/villy/Documents/GitHub/DE/data/raw/bank_transactions.csv", header=True, inferSchema=True)

bank_df.show()

                                                                                

+--------------------+------+----------------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type|    Customer_Name|    Customer_Address|       Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------

In [7]:
bank_df.show()

+--------------------+------+----------------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type|    Customer_Name|    Customer_Address|       Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------

In [8]:
# investigate the schema
bank_df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

In [9]:
# row count, data count
bank_df.count()

                                                                                

1000000

In [10]:
# number of column 
len(bank_df.columns)

23

In [11]:
bank_df.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [12]:
# checking for null values
for column in bank_df.columns:
    print(column, 'Nulls', bank_df.filter(bank_df[column].isNull()).count())

                                                                                

Transaction_Date Nulls 0


                                                                                

Amount Nulls 0


                                                                                

Transaction_Type Nulls 0


                                                                                

Customer_Name Nulls 100425


                                                                                

Customer_Address Nulls 100087


                                                                                

Customer_City Nulls 100034


                                                                                

Customer_State Nulls 100009


                                                                                

Customer_Country Nulls 100672


                                                                                

Company Nulls 100295


                                                                                

Job_Title Nulls 99924


                                                                                

Email Nulls 100043


                                                                                

Phone_Number Nulls 100524


                                                                                

Credit_Card_Number Nulls 100085


                                                                                

IBAN Nulls 100300


                                                                                

Currency_Code Nulls 99342


                                                                                

Random_Number Nulls 99913


                                                                                

Category Nulls 100332


                                                                                

Group Nulls 100209


                                                                                

Is_Active Nulls 100259


                                                                                

Last_Updated Nulls 100321


                                                                                

Description Nulls 100403


                                                                                

Gender Nulls 99767


[Stage 73:>                                                         (0 + 4) / 4]

Marital_Status Nulls 99904


                                                                                

In [13]:
# Fillna in respect to data type
bank_df_clean = bank_df.fillna({
    'Customer_Name':'Unknown',
    'Customer_Address' : 'Unknown',
    'Customer_City':'Unknown',
    'Customer_State':'Unknown',
    'Customer_Country':'Unknown',
    'Company':'Unknown',
    'Job_Title':'Unknown',
    'Email':'Unknown',
    'Phone_Number':'Unknown',
    'Credit_Card_Number':0,
    'IBAN':'Unknown',
    'Currency_Code':'Unknown',
    'Random_Number':0.0,
    'Category':'Unknown',
    'Group':'Unknown',
    'Is_Active':'Unknown',
    'Description':'Unknown',
    'Gender':'Unknown',
    'Marital_Status':'Unknown'
})

In [14]:
from pyspark.sql.functions import col, count, when

#inspect columns with null
bank_df_clean.select([count(when(col(c).isNull(), c)).alias(c) for c in bank_df_clean.columns]).show()




+----------------+------+----------------+-------------+----------------+-------------+--------------+----------------+-------+---------+-----+------------+------------------+----+-------------+-------------+--------+-----+---------+------------+-----------+------+--------------+
|Transaction_Date|Amount|Transaction_Type|Customer_Name|Customer_Address|Customer_City|Customer_State|Customer_Country|Company|Job_Title|Email|Phone_Number|Credit_Card_Number|IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|Last_Updated|Description|Gender|Marital_Status|
+----------------+------+----------------+-------------+----------------+-------------+--------------+----------------+-------+---------+-----+------------+------------------+----+-------------+-------------+--------+-----+---------+------------+-----------+------+--------------+
|               0|     0|               0|            0|               0|            0|             0|               0|      0|        0|    0|           0| 

                                                                                

In [15]:
#drop 'Last_Updated' column . it is about 10% of the whole data
bank_df_clean = bank_df_clean.dropna(subset=['Last_Updated'])

# check null value to verify changes
bank_df_clean.filter(col("Last_Updated").isNull()).show()



+----------------+------+----------------+-------------+----------------+-------------+--------------+----------------+-------+---------+-----+------------+------------------+----+-------------+-------------+--------+-----+---------+------------+-----------+------+--------------+
|Transaction_Date|Amount|Transaction_Type|Customer_Name|Customer_Address|Customer_City|Customer_State|Customer_Country|Company|Job_Title|Email|Phone_Number|Credit_Card_Number|IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|Last_Updated|Description|Gender|Marital_Status|
+----------------+------+----------------+-------------+----------------+-------------+--------------+----------------+-------+---------+-----+------------+------------------+----+-------------+-------------+--------+-----+---------+------------+-----------+------+--------------+
+----------------+------+----------------+-------------+----------------+-------------+--------------+----------------+-------+---------+-----+------------+-

                                                                                

In [16]:
#inspect columns with null
bank_df_clean.select([count(when(col(c).isNull(), c)).alias(c) for c in bank_df_clean.columns]).show()




+----------------+------+----------------+-------------+----------------+-------------+--------------+----------------+-------+---------+-----+------------+------------------+----+-------------+-------------+--------+-----+---------+------------+-----------+------+--------------+
|Transaction_Date|Amount|Transaction_Type|Customer_Name|Customer_Address|Customer_City|Customer_State|Customer_Country|Company|Job_Title|Email|Phone_Number|Credit_Card_Number|IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|Last_Updated|Description|Gender|Marital_Status|
+----------------+------+----------------+-------------+----------------+-------------+--------------+----------------+-------+---------+-----+------------+------------------+----+-------------+-------------+--------+-----+---------+------------+-----------+------+--------------+
|               0|     0|               0|            0|               0|            0|             0|               0|      0|        0|    0|           0| 

                                                                                

In [17]:
# row count, data count
bank_df_clean.count()

                                                                                

899679

In [18]:
# explore summary statistics of the data
bank_df_clean.describe().show()

24/06/23 12:49:53 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 89:>                                                         (0 + 1) / 1]

+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+-----------------+--------+-------+---------+--------------------+-------+--------------+
|summary|            Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|        Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|    Random_Number|Category|  Group|Is_Active|         Description| Gender|Marital_Status|
+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+-----------------+--------+-------+---------+-------

                                                                                

# Build Data Model


In [19]:
bank_df_clean.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [20]:
bank_df_clean.show()

+--------------------+------+----------------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+-------+--------------+
|    Transaction_Date|Amount|Transaction_Type|    Customer_Name|    Customer_Address|       Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|  Group|Is_Active|        Last_Updated|         Description| Gender|Marital_Status|
+--------------------+------+----------------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+-

In [21]:
# transaction table 
transaction_dim = bank_df_clean.select('Transaction_Date',
 'Amount',
 'Transaction_Type')

In [22]:
# adding transaction_id column using monotonycally increasing id function
transaction_dim = transaction_dim.withColumn('transaction_id', monotonically_increasing_id())

# re-arrange 
transaction_dim = transaction_dim.select('transaction_id','Amount','Transaction_Type','Transaction_Date')
transaction_dim.show(3)

+--------------+------+----------------+--------------------+
|transaction_id|Amount|Transaction_Type|    Transaction_Date|
+--------------+------+----------------+--------------------+
|             0| 34.76|      Withdrawal|2024-03-23 15:38:...|
|             1|163.92|      Withdrawal|2024-04-22 19:15:...|
|             2|386.32|      Withdrawal|2024-04-12 19:46:...|
+--------------+------+----------------+--------------------+
only showing top 3 rows



In [23]:
# customer table
# adding distinct when pulling the data optimise the table by reducing redundancy

customer_dim = bank_df_clean.select('Customer_Name','Customer_Address','Customer_City','Customer_State','Customer_Country').distinct()
customer_dim = customer_dim.withColumn('customer_id', monotonically_increasing_id())
customer_dim = customer_dim.select('customer_id','Customer_Name','Customer_Address','Customer_City','Customer_State','Customer_Country')
customer_dim.show(3)




+-----------+----------------+--------------------+-------------+--------------+--------------------+
|customer_id|   Customer_Name|    Customer_Address|Customer_City|Customer_State|    Customer_Country|
+-----------+----------------+--------------------+-------------+--------------+--------------------+
|          0|     Jamie Dixon|0146 Veronica Mou...|    Jonesland|      Delaware|        Saint Martin|
|          1|Jennifer Kennedy|     388 Susan Forks|   East Jason|          Ohio|United States of ...|
|          2|     Susan Green|40609 Amber Junct...|     Markfort|      Colorado|       Liechtenstein|
+-----------+----------------+--------------------+-------------+--------------+--------------------+
only showing top 3 rows



                                                                                

In [24]:
# employee table
 
employee_dim = bank_df_clean.select('Company','Job_Title', 'Gender','Marital_Status').distinct()
employee_dim = employee_dim.withColumn('employee_id', monotonically_increasing_id())
employee_dim = employee_dim.select('employee_id','Job_Title','Company','Gender','Marital_Status')
employee_dim.show(3)



+-----------+-------------------+--------------------+------+--------------+
|employee_id|          Job_Title|             Company|Gender|Marital_Status|
+-----------+-------------------+--------------------+------+--------------+
|          0|            Unknown|         Price Group|  Male|        Single|
|          1|Trade mark attorney|Rhodes, King and ...|  Male|       Unknown|
|          2|    Engineer, water|Schmidt, Morgan a...|Female|        Single|
+-----------+-------------------+--------------------+------+--------------+
only showing top 3 rows



                                                                                

In [25]:
# bank_df_fact table 

bank_df_fact = bank_df_clean.join(customer_dim, ['Customer_Name','Customer_Address','Customer_City','Customer_State','Customer_Country'], 'left') \
                            .join(transaction_dim, ['Amount','Transaction_Type','Transaction_Date'], 'left') \
                            .join(employee_dim, ['Job_Title', 'Gender','Marital_Status'], 'left')\
                            .select('transaction_id','customer_id','employee_id','Credit_Card_Number','IBAN','Currency_Code','Random_Number','Category','Group','Is_Active','Last_Updated','Description')

bank_df_fact.show(5)


[Stage 119:>                                                        (0 + 1) / 1]

+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|transaction_id|customer_id|employee_id|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|
+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|             4|25769849583|        740|   213156729655186|GB94EWRN587847592...|          SOS|       9179.0|       C|    Y|       No|2022-01-22 19:08:...|Husband find ok w...|
|             4|25769849583|      10374|   213156729655186|GB94EWRN587847592...|          SOS|       9179.0|       C|    Y|       No|2022-01-22 19:08:...|Husband find ok w...|
|             4|25769849583|      14518|   213156729655186|GB94EWRN587847592...|          SOS|       9179.0|       C|   

                                                                                

In [26]:
bank_df_fact.show()



+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|transaction_id|customer_id|employee_id|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|
+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|   25769803786|17180087600|        536|  3553538220664415|GB17CKDS252816135...|          SZL|       8710.0|       D|    Y|      Yes|2023-07-06 05:44:...|Exist that movie ...|
|   25769803786|17180087600|       2606|  3553538220664415|GB17CKDS252816135...|          SZL|       8710.0|       D|    Y|      Yes|2023-07-06 05:44:...|Exist that movie ...|
|   25769803786|17180087600|       5859|  3553538220664415|GB17CKDS252816135...|          SZL|       8710.0|       D|   

                                                                                

# Loading Data

In [None]:
# outputing cleaned and transformed data to parquet
# partitioning will optimise query 
bank_df_fact.write.mode('overwrite').partitionBy('transaction_id').parquet(r'data/processed/bank_df_fact')
transaction_dim.write.mode('overwrite').parquet(r'data/processed/transaction_dim')
employee_dim.write.mode('overwrite').parquet(r'data/processed/employee_dim')
customer_dim.write.mode('overwrite').parquet(r'data/processed/customer_dim')

In [None]:
# outputing to a csv file 
# repartitioning 

bank_df_fact.repartition(3)write.mode('overwrite').option('header', 'true').csv(r'data/processed/bank_df_fact')
transaction_dim.repartition(3)write.mode('overwrite').option('header', 'true').csv(r'data/processed/transaction_dim')
employee_dim.repartition(3)write.mode('overwrite').option('header', 'true').csv(r'data/processed/employee_dim')
customer_dim.repartition(3)write.mode('overwrite').option('header', 'true').csv(r'data/processed/customer_dim')

In [32]:
import os
print(os.getcwd())

/Users/villy/Documents/GitHub/Bank-Data-Processing-with-PySpark


In [None]:
# converting spark DataFrame to Pandas DataFrame
bank_df_fact_df = bank_df_fact.toPandas()
transaction_dim_df = transaction_dim.toPandas()
employee_dim_df = employee_dim.toPandas()
customer_dim_df = customer_dim.toPandas()

In [None]:
# Loading to database

# Retrieve Azure SQL Database credentials from environment variables
sql_user = os.getenv("az_user")
sql_password = os.getenv("az_password")

# Azure SQL Database connection properties
sql_url = "jdbc:sqlserver://ridwanclouds.database.windows.net:1433;database=bankdb"
sql_properties = {
    "user": sql_user,
    "password": sql_password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# Write the DataFrame to Azure SQL Database
bank_df_fact_df.write.mode("overwrite").jdbc(url=sql_url, table="dbo.bank_df_fact", properties=sql_properties)
transaction_dim.write.mode("overwrite").jdbc(url=sql_url, table="dbo.transaction_dim", properties=sql_properties)
customer_dim.write.mode("overwrite").jdbc(url=sql_url, table="dbo.customer_dim", properties=sql_properties)
employee_dim.write.mode("overwrite").jdbc(url=sql_url, table="dbo.employee_dim", properties=sql_properties)