In [1]:
# Import necessary dependencies
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
import pandas as pd
from sqlalchemy import create_engine

In [2]:
# Initialise our Spark Session
spark = SparkSession.builder.appName('NugaBankETL').getOrCreate()

In [3]:
spark

In [4]:
# Data Extraction
nuga_bank_df = spark.read.csv(r'dataset\nuga_bank_transactions.csv', header=True, inferSchema=True)

In [5]:
nuga_bank_df.show(5)

+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+-----

In [6]:
nuga_bank_df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

In [9]:
# Data Cleaning and Transformation

nuga_bank_df.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [10]:
# no of rows
num_rows = nuga_bank_df.count()

num_rows

1000000

In [11]:
# no of columns 
num_columns = len(nuga_bank_df.columns)

num_columns

23

In [None]:
# Checking for null values
for column in nuga_bank_df.columns:
    print(column, 'Nulls', nuga_bank_df.filter(nuga_bank_df[column].isNull()).count())

In [13]:
# How to fill up missing values
nuga_bank_df_clean = nuga_bank_df.fillna({
    'Customer_Name' : 'Unknown',
    'Customer_Address' : 'Unknown',
    'Customer_City' : 'Unknown',
    'Customer_State' : 'Unknown',
    'Customer_Country' : 'Unknown',
    'Company' : 'Unknown',
    'Job_Title' : 'Unknown',
    'Email' : 'Unknown',
    'Phone_Number' : 'Unknown',
    'Credit_Card_Number' : 0,
    'IBAN' : 'Unknown',
    'Currency_Code' : 'Unknown',
    'Random_Number' : 0.0,
    'Category' : 'Unknown',
    'Group' : 'Unknown',
    'Is_Active' : 'Unknown',
    'Description' : 'Unknown',
    'Gender' : 'Unknown',
    'Marital_Status' : 'Unknown'
})

In [14]:
# Drop rows where last_updated is null
nuga_bank_df_clean = nuga_bank_df_clean.na.drop(subset=['Last_Updated'])

In [None]:
# Checking for null values again
for column in nuga_bank_df_clean.columns:
    print(column, 'Nulls', nuga_bank_df_clean.filter(nuga_bank_df_clean[column].isNull()).count())

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 0
Customer_Address Nulls 0
Customer_City Nulls 0
Customer_State Nulls 0
Customer_Country Nulls 0
Company Nulls 0
Job_Title Nulls 0
Email Nulls 0
Phone_Number Nulls 0
Credit_Card_Number Nulls 0
IBAN Nulls 0
Currency_Code Nulls 0
Random_Number Nulls 0
Category Nulls 0
Group Nulls 0
Is_Active Nulls 0
Last_Updated Nulls 0
Description Nulls 0
Gender Nulls 0
Marital_Status Nulls 0


In [None]:
# no of rows
num_rows = nuga_bank_df_clean.count()

num_rows

899679

In [None]:
# To view the summary statistics of the data
nuga_bank_df_clean.describe().show()

+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+-----------------+--------+-------+---------+--------------------+-------+--------------+
|summary|            Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|        Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|    Random_Number|Category|  Group|Is_Active|         Description| Gender|Marital_Status|
+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+-----------------+--------+-------+---------+-------

In [None]:
nuga_bank_df_clean.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [15]:
# transaction table
transaction = nuga_bank_df_clean.select('Transaction_Date','Amount','Transaction_Type')

In [16]:
# Adding the transaction_id column
transaction = transaction.withColumn('transaction_id', monotonically_increasing_id())

In [17]:
# Reordering the columns
transaction = transaction.select('transaction_id', 'Transaction_Date','Amount','Transaction_Type')

In [18]:
transaction.show()

+--------------+--------------------+------+----------------+
|transaction_id|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|             0|2024-03-23 15:38:...| 34.76|      Withdrawal|
|             1|2024-04-22 19:15:...|163.92|      Withdrawal|
|             2|2024-04-12 19:46:...|386.32|      Withdrawal|
|             3|2024-04-17 15:29:...|407.15|         Deposit|
|             4|2024-02-10 01:51:...|161.31|         Deposit|
|             5|2024-02-10 22:56:...|764.34|        Transfer|
|             6|2024-04-07 00:07:...|734.59|         Deposit|
|             7|2024-03-08 01:51:...|592.43|         Deposit|
|             8|2024-02-01 12:34:...| 927.1|         Deposit|
|             9|2024-03-22 16:46:...| 66.59|        Transfer|
|            10|2024-04-23 13:30:...| 246.3|      Withdrawal|
|            11|2024-01-13 01:22:...|782.32|      Withdrawal|
|            12|2024-02-25 15:16:...|818.42|      Withdrawal|
|       

In [19]:
# Customer table
customer = nuga_bank_df_clean.select('Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_State', \
                                    'Customer_Country','Email', 'Phone_Number').distinct()

# add id column
customer = customer.withColumn('customer_id', monotonically_increasing_id())

# reorder the table
customer = customer.select('customer_id', 'Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_State', \
                                    'Customer_Country', 'Email', 'Phone_Number')

In [20]:
customer.count()

899679

In [21]:
customer.show()

+-----------+------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+
|customer_id|     Customer_Name|    Customer_Address|       Customer_City|Customer_State|    Customer_Country|               Email|        Phone_Number|
+-----------+------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+
|          0|    Miguel Leonard|262 Beck Expressw...|             Unknown| West Virginia|             Eritrea| zweaver@example.net|             Unknown|
|          1|           Unknown|             Unknown|         Evanchester|        Oregon|             Uruguay|             Unknown| (384)778-9942x91236|
|          2|    Michael Murphy|894 Williams Ridg...|       Dominguezview|      New York|              Sweden|kristinstanley@ex...|+1-693-739-2204x8851|
|          3|    Tina Gutierrez|    415 Taylor Knoll|           Donnastad|South Ca

In [22]:
# employee table
employees = nuga_bank_df_clean.select('Company', 'Job_Title', 'Gender', 'Marital_Status').distinct()

# add id column
employee = employees.withColumn('employee_id', monotonically_increasing_id())

# re-order the dataframe
employee = employee.select('employee_id', 'Company', 'Job_Title', 'Gender', 'Marital_Status')

In [23]:
employee.show()

+-----------+--------------------+--------------------+-------+--------------+
|employee_id|             Company|           Job_Title| Gender|Marital_Status|
+-----------+--------------------+--------------------+-------+--------------+
|          0|         Price Group|             Unknown|   Male|        Single|
|          1|Rhodes, King and ...| Trade mark attorney|   Male|       Unknown|
|          2|Schmidt, Morgan a...|     Engineer, water| Female|        Single|
|          3|       Johnson Group|  Forensic scientist|   Male|       Unknown|
|          4|     Phillips-Prince|Production assist...|Unknown|        Single|
|          5|      Henry and Sons|Engineer, civil (...| Female|       Married|
|          6|Thompson, Johnson...|Exercise physiolo...|  Other|       Unknown|
|          7|Hernandez, Johnso...|Forensic psycholo...|Unknown|      Divorced|
|          8|Carrillo, Schwart...| Solicitor, Scotland| Female|        Single|
|          9|         Olson-Lucas| Magazine journali

In [24]:
# fact_table

fact_table = nuga_bank_df_clean.join(customer, ['Customer_Name', 'Customer_Address', 'Customer_City', 'Customer_State', \
                                    'Customer_Country', 'Email', 'Phone_Number'], 'left') \
                                .join(transaction, ['Transaction_Date','Amount','Transaction_Type'], 'left') \
                                .join(employee, ['Company', 'Job_Title', 'Gender', 'Marital_Status'], 'left') \
                                .select('transaction_id', 'customer_id', 'employee_id', 'Credit_Card_Number', 'IBAN', 'Currency_Code', 'Random_Number', \
                                    'Category', 'Group', 'Is_Active', 'Last_Updated', 'Description')

In [25]:
fact_table.show()

+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|transaction_id|customer_id|employee_id|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|  Group|Is_Active|        Last_Updated|         Description|
+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|   17179869198| 8590097714|     143861|   180067592769732|GB92JVMY004197871...|          EGP|       7198.0|       A|      Z|      Yes|2023-10-12 22:25:...|Before story prof...|
|            18|25769812854|      41531|   213112163828334|GB50TJFN039979307...|          SVC|       7382.0|       B|      Z|      Yes|2020-01-19 18:19:...|Great evening so ...|
|   25769803779|25769976749|     196153|                 0|GB32LGFL895760023...|          PAB|       8898.0|  

In [None]:
# output the transformed data to parquet
transaction.write.mode('overwrite').parquet(r'dataset/transaction')
customer.write.mode('overwrite').parquet(r'dataset/customer')
employee.write.mode('overwrite').parquet(r'dataset/employee')
fact_table.write.mode('overwrite').parquet(r'dataset/fact_table')

In [None]:
# output the transformed data as csv
transaction.repartition(1).write.mode('overwrite').option('header', 'true').csv(r'dataset/transformeddata/csv/transaction')
customer.repartition(1).write.mode('overwrite').option('header', 'true').csv(r'dataset/transformeddata/csv/customer')
employee.repartition(1).write.mode('overwrite').option('header', 'true').csv(r'dataset/transformeddata/csv/employee')
fact_table.repartition(1).write.mode('overwrite').option('header', 'true').csv(r'dataset/transformeddata/csv/fact_table')

In [28]:
# Convert spark df to pandas df
transaction_pd_df = transaction.toPandas()
customer_pd_df = customer.toPandas()
employee_pd_df = employee.toPandas()
fact_table_pd_df = fact_table.toPandas()

In [32]:
# Loading the dataset into a Postgresql DB

# define database connection parameters
db_params =  {
    'username' : 'postgres',
    'password' : 'password',
    'host' : 'localhost',
    'port' : '5432',
    'database' : 'nuga_bank'
}

# define the database connection url with db parameters
db_url = f"postgresql://{db_params['username']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['database']}"

# Create the database engine with the  db url
engine = create_engine(db_url)

# Connect to PostgreSQL server
with engine.connect() as connection:
    # Create tables and load the data
    transaction_pd_df.to_sql('transaction', connection, index=False, if_exists='replace')
    customer_pd_df.to_sql('customer', connection, index=False, if_exists='replace')
    employee_pd_df.to_sql('employee', connection, index=False, if_exists='replace')
    fact_table_pd_df.to_sql('fact_table', connection, index=False, if_exists='replace')

print('Database, tables and data loaded successfully ')

Database, tables and data loaded successfully 
