In [1]:
# Import Necessar Libraries
from pyspark.sql import SparkSession
from pyspark.sql import DataFrameWriter
from pyspark.sql.functions import monotonically_increasing_id
import os
import psycopg2


In [2]:
# Set Java Home
os.environ['JAVA_HOME'] = 'C:\java8'

In [3]:
# Initialized my Spark Session
spark = SparkSession.builder \
        .appName("Nuga Bank ETL") \
        .config("spark.jars", "postgresql-42.7.3.jar") \
        .getOrCreate()

In [4]:
spark

In [5]:
# Extract this Historical data into a spark dataframe
df = spark.read.csv(r'dataset\rawdata\nuga_bank_transactions.csv', header=True, inferSchema=True)

In [6]:
df.show(5)

+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+-----

In [7]:
df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

In [8]:
# Data Cleaning and Transformation
for column in df.columns:
    print(column, 'Nulls: ', df.filter(df[column].isNull()).count())

Transaction_Date Nulls:  0
Amount Nulls:  0
Transaction_Type Nulls:  0
Customer_Name Nulls:  100425
Customer_Address Nulls:  100087
Customer_City Nulls:  100034
Customer_State Nulls:  100009
Customer_Country Nulls:  100672
Company Nulls:  100295
Job_Title Nulls:  99924
Email Nulls:  100043
Phone_Number Nulls:  100524
Credit_Card_Number Nulls:  100085
IBAN Nulls:  100300
Currency_Code Nulls:  99342
Random_Number Nulls:  99913
Category Nulls:  100332
Group Nulls:  100209
Is_Active Nulls:  100259
Last_Updated Nulls:  100321
Description Nulls:  100403
Gender Nulls:  99767
Marital_Status Nulls:  99904


In [9]:
df.describe().show()

+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+------------------+--------+------+---------+--------------------+------+--------------+
|summary|            Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|        Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|     Random_Number|Category| Group|Is_Active|         Description|Gender|Marital_Status|
+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+------------------+--------+------+---------+---------

In [10]:
df_clean = df.fillna({
    'Customer_Name': 'Unknown',
    'Customer_Address': 'Unknown',
    'Customer_City' : 'Unknown',
    'Customer_State' : 'Unknown',
    'Customer_Country' : 'Unknown',
    'Company' : 'Unknown',
    'Job_Title' : 'Unknown',
    'Email' : 'Unknown',
    'Phone_Number' : 'Unknown',
    'Credit_Card_Number' : 0,
    'IBAN' : 'Unknown',
    'Currency_Code' : 'Unknown',
    'Random_Number' : 0.0,
    'Category' : 'Unknown',
    'Group' : 'Unknown',
    'Is_Active' : 'Unknown',
    'Description' : 'Unknown',
    'Gender' : 'Unknown',
    'Marital_Status' : 'Unknown'
})

In [11]:
# drop the missing values in the Last_Updated column
df_clean = df_clean.na.drop(subset=['Last_Updated'])

In [12]:
# Data cleaning and transformation\n",
for column in df_clean.columns:
    print(column, 'Nulls: ', df_clean.filter(df_clean[column].isNull()).count())

Transaction_Date Nulls:  0
Amount Nulls:  0
Transaction_Type Nulls:  0
Customer_Name Nulls:  0
Customer_Address Nulls:  0
Customer_City Nulls:  0
Customer_State Nulls:  0
Customer_Country Nulls:  0
Company Nulls:  0
Job_Title Nulls:  0
Email Nulls:  0
Phone_Number Nulls:  0
Credit_Card_Number Nulls:  0
IBAN Nulls:  0
Currency_Code Nulls:  0
Random_Number Nulls:  0
Category Nulls:  0
Group Nulls:  0
Is_Active Nulls:  0
Last_Updated Nulls:  0
Description Nulls:  0
Gender Nulls:  0
Marital_Status Nulls:  0


In [14]:
# Data Transformation to 2NF
# transaction table
transaction = df_clean.select('Transaction_Date', 'Amount', 'Transaction_Type') \
                    .withColumn('transaction_id', monotonically_increasing_id()) \
                    .select('transaction_id', 'Transaction_Date', 'Amount', 'Transaction_Type')

In [15]:
transaction.show(5)

+--------------+--------------------+------+----------------+
|transaction_id|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|             0|2024-03-23 15:38:...| 34.76|      Withdrawal|
|             1|2024-04-22 19:15:...|163.92|      Withdrawal|
|             2|2024-04-12 19:46:...|386.32|      Withdrawal|
|             3|2024-04-17 15:29:...|407.15|         Deposit|
|             4|2024-02-10 01:51:...|161.31|         Deposit|
+--------------+--------------------+------+----------------+
only showing top 5 rows



In [16]:
# customer table
customer = df_clean.select('Customer_Name', 'Customer_Address', 'Customer_City', \
                        'Customer_State', 'Customer_Country') \
                .withColumn('customer_id', monotonically_increasing_id()) \
                .select('customer_id', 'Customer_Name', 'Customer_Address', 'Customer_City', \
                        'Customer_State', 'Customer_Country')

In [17]:
customer.show(5)

+-----------+--------------+--------------------+------------------+--------------+--------------------+
|customer_id| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|
+-----------+--------------+--------------------+------------------+--------------+--------------------+
|          0|    James Neal|54912 Holmes Lodg...| West Keithborough|       Florida|                Togo|
|          1|   Thomas Long| 1133 Collin Passage|        Joshuabury|   Connecticut|Lao People's Demo...|
|          2|Ashley Shelton|5297 Johnson Port...|       North Maria|    New Jersey|              Bhutan|
|          3| James Rosario|56955 Moore Glens...|North Michellefurt|    New Mexico|             Iceland|
|          4|Miguel Leonard|262 Beck Expressw...|           Unknown| West Virginia|             Eritrea|
+-----------+--------------+--------------------+------------------+--------------+--------------------+
only showing top 5 rows



In [18]:
# employee table
employee = df_clean.select('Company', 'Job_Title', 'Email', 'Phone_Number', \
                           'Gender', 'Marital_Status') \
                        .withColumn('employee_id', monotonically_increasing_id()) \
                        .select('employee_id', 'Company', 'Job_Title', 'Email', 'Phone_Number', \
                            'Gender', 'Marital_Status')

In [19]:
employee.show()

+-----------+--------------------+--------------------+--------------------+-------------------+-------+--------------+
|employee_id|             Company|           Job_Title|               Email|       Phone_Number| Gender|Marital_Status|
+-----------+--------------------+--------------------+--------------------+-------------------+-------+--------------+
|          0|Benson, Johnson a...|             Unknown|             Unknown|  493.720.6609x7545|  Other|      Divorced|
|          1|             Unknown|   Food technologist|michellelynch@exa...|      (497)554-3317| Female|       Married|
|          2|       Jones-Mueller|Database administ...| ljordan@example.org|      (534)769-3072|  Other|       Unknown|
|          3|       Vargas-Harris|Horticultural the...|parkerjames@examp...|+1-447-900-1320x257|Unknown|       Unknown|
|          4|Richardson, Gonza...|   Minerals surveyor| zweaver@example.net|            Unknown| Female|       Married|
|          5|           Smith Ltd| Seism

In [20]:
# fact table
fact_table = df_clean.join(transaction, ['Transaction_Date', 'Amount', 'Transaction_Type'], 'inner') \
                    .join(customer, ['Customer_Name', 'Customer_Address', 'Customer_City', \
                                     'Customer_State', 'Customer_Country'], 'inner') \
                    .join(employee, ['Company', 'Job_Title', 'Email', 'Phone_Number', \
                                     'Gender', 'Marital_Status'], 'inner') \
                    .select('transaction_id', 'customer_id', 'employee_id', 'Credit_Card_Number',\
                                     'IBAN', 'Currency_Code', 'Random_Number','Category', 'Group', \
                                     'Is_Active', 'Last_Updated', 'Description')

In [21]:
fact_table.show(5)

+--------------+-----------+-----------+-------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|transaction_id|customer_id|employee_id| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|  Group|Is_Active|        Last_Updated|         Description|
+--------------+-----------+-----------+-------------------+--------------------+-------------+-------------+--------+-------+---------+--------------------+--------------------+
|   17179987341|17179987341|17179987341|     30233184463165|GB82ZFSS908131813...|          PAB|       6515.0|       B|Unknown|      Yes|2021-12-12 16:34:...|Without shake mom...|
|    8589986604| 8589986604| 8589986604|4131053293826613966|GB78CKOS832138470...|          GNF|       7475.0|       B|      Z|      Yes|2022-02-06 23:35:...|Health sure story...|
|         29992|      29992|      29992|   2452924189738024|GB50ZGVC717046003...|          TRY|       915

In [30]:
# Develop Functions to Get Database Connection\n",
def get_db_connection():
    connection = psycopg2.connect(
        host='localhost',
        database='nuga_bank_etl',
        user='postgres',
        password='Melacholy201)'
    )
    return connection

# Connect to SQL Database
conn = get_db_connection()

In [31]:
# Create a function to create tables
def create_table():
    conn = get_db_connection()
    cursor = conn.cursor()
    create_table_query = '''
                            DROP TABLE IF EXISTS customer;
                            DROP TABLE IF EXISTS transaction;
                            DROP TABLE IF EXISTS employee;
                            DROP TABLE IF EXISTS fact_table;

                            CREATE TABLE customer (
                                customer_id BIGINT,
                                Customer_Name VARCHAR(10000),
                                Customer_Address VARCHAR(10000),
                                Customer_City VARCHAR(10000),
                                Customer_State VARCHAR(10000),
                                Customer_Country VARCHAR(10000)
                            );

                            CREATE TABLE transaction (
                                transaction_id BIGINT,
                                Transaction_Date DATE,
                                Amount FLOAT,
                                Transaction_Type VARCHAR(10000)
                            );

                            CREATE TABLE employee (
                                employee_id BIGINT,
                                Company VARCHAR(10000),
                                Job_Title VARCHAR(10000),
                                Email VARCHAR(10000),
                                Phone_Number VARCHAR(10000),
                                Gender VARCHAR(10000),
                                Marital_Status VARCHAR(10000)
                            );

                            CREATE TABLE fact_table (
                                transaction_id BIGINT,
                                customer_id BIGINT,
                                employee_id BIGINT,
                                Credit_Card_Number BIGINT,
                                IBAN VARCHAR(10000),
                                Currency_Code VARCHAR(10000),
                                Random_Number FLOAT,
                                Category VARCHAR(10000),
                                "Group" VARCHAR(10000),
                                Is_Active VARCHAR(10000),
                                Last_Updated DATE,
                                Description VARCHAR(10000)
                            );
                            '''

    cursor.execute(create_table_query)
    conn.commit()
    cursor.close()
    conn.close()

In [32]:
create_table()

In [33]:
url = "jdbc:postgresql://localhost:5432/nuga_bank_etl"
properties = {
    "user" : "postgres",
    "password" : "Melacholy201)",
    "driver" : "org.postgresql.Driver"
}

customer.write.jdbc(url=url, table="customer", mode="append", properties=properties)

In [34]:
employee.write.jdbc(url=url, table="employee", mode="append", properties=properties)
transaction.write.jdbc(url=url, table="transaction", mode="append", properties=properties)
fact_table.write.jdbc(url=url, table="fact_table", mode="append", properties=properties)