## Import Necessary Dependancies

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
import pandas as pd
from sqlalchemy import create_engine

In [2]:
# initialize Spark Session
spark = SparkSession.builder.appName('NugaBankETL').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/18 17:14:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark ## checkiong my spark engine and information

## Data Extraction

In [4]:
# Data Extraction
nuga_bank_df = spark.read.csv(r'dataset/nuga_bank_transactions.csv', header=True, inferSchema=True)

                                                                                

In [5]:
nuga_bank_df.show(5)

+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+-----

In [6]:
nuga_bank_df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

## Data Transformation with Pyspark

In [7]:
nuga_bank_df.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [8]:
# number of rows
num_rows = nuga_bank_df.count()
num_rows

                                                                                

1000000

In [9]:
# number of columns
num_columns = len(nuga_bank_df.columns)
num_columns

23

In [10]:
# checking for null values
#checking missing values, null values, filter them out and count them
for column in nuga_bank_df.columns:
    print(column, 'Nulls', nuga_bank_df.filter(nuga_bank_df[column].isNull()).count())


                                                                                

Transaction_Date Nulls 0


                                                                                

Amount Nulls 0


                                                                                

Transaction_Type Nulls 0


                                                                                

Customer_Name Nulls 100425


                                                                                

Customer_Address Nulls 100087


                                                                                

Customer_City Nulls 100034


                                                                                

Customer_State Nulls 100009


                                                                                

Customer_Country Nulls 100672


                                                                                

Company Nulls 100295


                                                                                

Job_Title Nulls 99924


                                                                                

Email Nulls 100043


                                                                                

Phone_Number Nulls 100524


                                                                                

Credit_Card_Number Nulls 100085


                                                                                

IBAN Nulls 100300


                                                                                

Currency_Code Nulls 99342


                                                                                

Random_Number Nulls 99913


                                                                                

Category Nulls 100332


                                                                                

Group Nulls 100209


                                                                                

Is_Active Nulls 100259


                                                                                

Last_Updated Nulls 100321


                                                                                

Description Nulls 100403


                                                                                

Gender Nulls 99767


[Stage 72:>                                                         (0 + 4) / 4]

Marital_Status Nulls 99904


                                                                                

In [11]:
# fillin fup missing values
#   I could choose to drop the missing values and null values, but instead i will fill up missing values

nuga_bank_df_clean = nuga_bank_df.fillna({
    'Customer_Name' : 'Unknown',
    'Customer_Address' : 'Unknown',
    'Customer_City' : 'Unknown',
    'Customer_State' : 'Unknown',
    'Customer_Country' : 'Unknown',
    'Company' : 'Unknown',
    'Job_Title' : 'Unknown',
    'Email' : 'Unknown',
    'Phone_Number' : 'Unknown',
    'Credit_Card_Number' : 0,
    'IBAN' : 'Unknown',
    'Currency_Code' : 'Unknown',
    'Random_Number' : 0.0,
    'Category': 'Unknown',
    'Group' : 'Unknown',
    'Is_Active' : 'Unknown',
    'Last_Updated' : 'Unknown',
    'Description' : 'Unknown',
    'Gender': 'Unknown',
    'Marital_Status' : 'Unknown'


})

In [12]:
# checking the update for null values again
for column in nuga_bank_df_clean.columns:
    print(column, 'Nulls', nuga_bank_df_clean.filter(nuga_bank_df_clean[column].isNull()).count())

                                                                                

Transaction_Date Nulls 0


                                                                                

Amount Nulls 0


                                                                                

Transaction_Type Nulls 0
Customer_Name Nulls 0
Customer_Address Nulls 0
Customer_City Nulls 0
Customer_State Nulls 0
Customer_Country Nulls 0
Company Nulls 0
Job_Title Nulls 0
Email Nulls 0
Phone_Number Nulls 0
Credit_Card_Number Nulls 0
IBAN Nulls 0
Currency_Code Nulls 0
Random_Number Nulls 0
Category Nulls 0
Group Nulls 0
Is_Active Nulls 0


                                                                                

Last_Updated Nulls 100321
Description Nulls 0
Gender Nulls 0
Marital_Status Nulls 0


In [13]:
# Drop roles where last_updated is null
#  correcting the last updated, i would be dropping the missing value
nuga_bank_df_clean = nuga_bank_df_clean.na.drop(subset=['Last_Updated'])


In [14]:
# confirming
for column in nuga_bank_df_clean.columns:
    print(column, 'Nulls', nuga_bank_df_clean.filter(nuga_bank_df_clean[column].isNull()).count())

                                                                                

Transaction_Date Nulls 0


                                                                                

Amount Nulls 0


                                                                                

Transaction_Type Nulls 0
Customer_Name Nulls 0
Customer_Address Nulls 0
Customer_City Nulls 0
Customer_State Nulls 0
Customer_Country Nulls 0
Company Nulls 0
Job_Title Nulls 0
Email Nulls 0
Phone_Number Nulls 0
Credit_Card_Number Nulls 0
IBAN Nulls 0
Currency_Code Nulls 0
Random_Number Nulls 0
Category Nulls 0
Group Nulls 0
Is_Active Nulls 0


[Stage 166:>                                                        (0 + 4) / 4]

Last_Updated Nulls 0
Description Nulls 0
Gender Nulls 0
Marital_Status Nulls 0


                                                                                

In [15]:
# checking number of rows based on the update

num_rows = nuga_bank_df_clean.count()
num_rows

                                                                                

899679

In [16]:
# To view summary statistuics of the data
nuga_bank_df_clean.describe().show()

24/07/18 17:17:04 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 180:>                                                        (0 + 1) / 1]

+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+-----------------+--------+-------+---------+--------------------+-------+--------------+
|summary|            Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|        Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|    Random_Number|Category|  Group|Is_Active|         Description| Gender|Marital_Status|
+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+-----------------+--------+-------+---------+-------

                                                                                

## Data Transformation with Pyspark (Data Modelling)

In [17]:
nuga_bank_df_clean.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

#### At this point i am proceding to my draw.io to draw my architecture (basically modelling my data) so as to reduce  redundancy, you will find this in the repository , also note that i had ensure github version control at every point.

#### Creating tables

In [18]:
# transactions table

transaction = nuga_bank_df_clean.select('Transaction_Date', 'Amount', 'Transaction_Type').distinct()

In [20]:
transaction.show()

[Stage 181:>                                                        (0 + 4) / 4]

+--------------------+------+----------------+
|    Transaction_Date|Amount|Transaction_Type|
+--------------------+------+----------------+
|2024-01-10 22:13:...|169.64|         Deposit|
|2024-01-06 12:05:...|444.53|         Deposit|
|2024-01-09 02:32:...|976.36|        Transfer|
|2024-02-18 21:04:...|521.62|         Deposit|
|2024-04-11 13:35:...|416.11|         Deposit|
|2024-03-20 11:34:...|438.03|         Deposit|
|2024-04-29 10:42:...| 28.27|        Transfer|
|2024-02-12 15:48:...|657.39|         Deposit|
|2024-01-16 03:08:...|489.04|      Withdrawal|
|2024-04-27 01:11:...| 32.36|      Withdrawal|
|2024-04-13 04:39:...| 152.8|         Deposit|
|2024-02-07 20:31:...|736.03|      Withdrawal|
|2024-03-09 11:50:...|516.88|        Transfer|
|2024-01-22 16:12:...|615.23|         Deposit|
|2024-02-18 19:51:...|119.83|        Transfer|
|2024-04-15 10:58:...|630.29|         Deposit|
|2024-02-22 06:42:...|923.79|         Deposit|
|2024-04-22 06:27:...|832.71|      Withdrawal|
|2024-02-14 1

                                                                                