In [60]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
import pandas as pd 
from sqlalchemy import create_engine


In [61]:
spark = SparkSession.builder.appName('NugaBankETL').getOrCreate()

In [62]:
spark

In [63]:
nuga_bank_df = spark.read.csv('./data/raw/nuga_bank_transactions.csv',header=True, inferSchema=True)

                                                                                

In [64]:
nuga_bank_df.show(5)

+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+-----

In [65]:
nuga_bank_df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

## DATA TRANSFORMATIOIN

In [66]:
nuga_bank_df.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [67]:
rows = nuga_bank_df.count()

rows

1000000

In [68]:
cols = len(nuga_bank_df.columns)
cols

23

In [69]:
# checking for null values 
for column in nuga_bank_df.columns:
    print(column, 'Nulls', nuga_bank_df.filter(nuga_bank_df[column].isNull()).count())

                                                                                

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 100425
Customer_Address Nulls 100087
Customer_City Nulls 100034
Customer_State Nulls 100009
Customer_Country Nulls 100672
Company Nulls 100295
Job_Title Nulls 99924
Email Nulls 100043
Phone_Number Nulls 100524
Credit_Card_Number Nulls 100085
IBAN Nulls 100300
Currency_Code Nulls 99342
Random_Number Nulls 99913
Category Nulls 100332
Group Nulls 100209
Is_Active Nulls 100259


                                                                                

Last_Updated Nulls 100321
Description Nulls 100403
Gender Nulls 99767
Marital_Status Nulls 99904


In [70]:
# how to fill up missing values
nuga_bank_df_clean = nuga_bank_df.fillna(
    {'Customer_Name': 'Unkown',
    'Customer_Address': 'Unkownn',
    'Customer_City':'Unknown',
    'Customer_State': 'Unknown',
    "Customer_Country": 'Unknown',
    'Company':'Unknown',
    'Job_Title': 'Unknown',
    'Email':'Unknown',
    'Phone_Number': 'Unknown',
    'Credit_Card_Number':0,
    'IBAN':'Unknown',
    'Currency_Code': 'Unknown',
    'Random_Number': 0.0,
    'Category': 'Unknown',
    'Group':'Unknown',
    'Is_Active':'Unknown',
    'Description':'Unkown',
    'Gender': 'Unknown',
    'Marital_Status': 'Unknown',
   
    }
)



In [71]:
# Drop rows where last_updated is null
nuga_bank_df_clean = nuga_bank_df_clean.na.drop(subset=['Last_Updated'])

In [72]:
# checking for null values 
for column in nuga_bank_df.columns:
    print(column, 'Nulls', nuga_bank_df_clean.filter(nuga_bank_df_clean[column].isNull()).count())

                                                                                

Transaction_Date Nulls 0
Amount Nulls 0
Transaction_Type Nulls 0
Customer_Name Nulls 0
Customer_Address Nulls 0
Customer_City Nulls 0
Customer_State Nulls 0
Customer_Country Nulls 0
Company Nulls 0
Job_Title Nulls 0
Email Nulls 0
Phone_Number Nulls 0
Credit_Card_Number Nulls 0
IBAN Nulls 0
Currency_Code Nulls 0
Random_Number Nulls 0
Category Nulls 0
Group Nulls 0
Is_Active Nulls 0


[Stage 401:>                                                      (0 + 11) / 11]

Last_Updated Nulls 0
Description Nulls 0
Gender Nulls 0
Marital_Status Nulls 0


                                                                                

In [73]:
nuga_bank_df_clean.count()

                                                                                

899679

In [74]:
# view the summary statistics of the data

nuga_bank_df_clean.describe().show()

[Stage 413:=====>                                                 (1 + 10) / 11]

+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+-----------------+--------+-------+---------+--------------------+-------+--------------+
|summary|            Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|        Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|    Random_Number|Category|  Group|Is_Active|         Description| Gender|Marital_Status|
+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------+-----------------+--------+-------+---------+-------

                                                                                

In [75]:
nuga_bank_df_clean.columns

['Transaction_Date',
 'Amount',
 'Transaction_Type',
 'Customer_Name',
 'Customer_Address',
 'Customer_City',
 'Customer_State',
 'Customer_Country',
 'Company',
 'Job_Title',
 'Email',
 'Phone_Number',
 'Credit_Card_Number',
 'IBAN',
 'Currency_Code',
 'Random_Number',
 'Category',
 'Group',
 'Is_Active',
 'Last_Updated',
 'Description',
 'Gender',
 'Marital_Status']

In [76]:
transaction = nuga_bank_df_clean.select('Transaction_Date','Amount','Transaction_Type').distinct()

In [77]:
# Adding the transaciton_id colujmn 
transaction = transaction.withColumn('transaction_id', monotonically_increasing_id())


In [78]:
# reirderubg tge columns 
transaction = transaction.select('transaction_id','Transaction_Date','Amount','Transaction_Type')

In [79]:
transaction.show()



+--------------+--------------------+------+----------------+
|transaction_id|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|             0|2024-01-10 22:13:...|169.64|         Deposit|
|             1|2024-01-06 12:05:...|444.53|         Deposit|
|             2|2024-01-09 02:32:...|976.36|        Transfer|
|             3|2024-02-18 21:04:...|521.62|         Deposit|
|             4|2024-04-11 13:35:...|416.11|         Deposit|
|             5|2024-03-20 11:34:...|438.03|         Deposit|
|             6|2024-04-29 10:42:...| 28.27|        Transfer|
|             7|2024-02-12 15:48:...|657.39|         Deposit|
|             8|2024-01-16 03:08:...|489.04|      Withdrawal|
|             9|2024-04-27 01:11:...| 32.36|      Withdrawal|
|            10|2024-04-13 04:39:...| 152.8|         Deposit|
|            11|2024-02-07 20:31:...|736.03|      Withdrawal|
|            12|2024-03-09 11:50:...|516.88|        Transfer|
|       

                                                                                

In [80]:
customer = nuga_bank_df_clean.select('Customer_Name','Customer_Address','Customer_City','Customer_State','Customer_Country','Email','Phone_Number').distinct()

# add id column 
customer = customer.withColumn('Customer_id',monotonically_increasing_id())

customer = customer.select('Customer_id','Customer_Name','Customer_Address','Customer_City','Customer_State','Customer_Country','Email','Phone_Number')

customer.show()



+-----------+------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+
|Customer_id|     Customer_Name|    Customer_Address|       Customer_City|Customer_State|    Customer_Country|               Email|        Phone_Number|
+-----------+------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+
|          0|    Miguel Leonard|262 Beck Expressw...|             Unknown| West Virginia|             Eritrea| zweaver@example.net|             Unknown|
|          1|    Michael Murphy|894 Williams Ridg...|       Dominguezview|      New York|              Sweden|kristinstanley@ex...|+1-693-739-2204x8851|
|          2|      James Miller|             Unkownn|         South Tracy|       Arizona|           Nicaragua|alexanderbailey@e...|        971-987-0958|
|          3|    Tina Gutierrez|    415 Taylor Knoll|           Donnastad|South Ca

                                                                                

In [81]:
employees = nuga_bank_df_clean.select('Company','Job_Title','Gender','Marital_Status').distinct()

employees = employees.withColumn('employee_id', monotonically_increasing_id())

employees = employees.select('employee_id','Company','Job_Title','Gender','Marital_Status')

employees.show()



+-----------+--------------------+--------------------+-------+--------------+
|employee_id|             Company|           Job_Title| Gender|Marital_Status|
+-----------+--------------------+--------------------+-------+--------------+
|          0|         Price Group|             Unknown|   Male|        Single|
|          1|Rhodes, King and ...| Trade mark attorney|   Male|       Unknown|
|          2|Schmidt, Morgan a...|     Engineer, water| Female|        Single|
|          3|       Johnson Group|  Forensic scientist|   Male|       Unknown|
|          4|     Phillips-Prince|Production assist...|Unknown|        Single|
|          5|      Henry and Sons|Engineer, civil (...| Female|       Married|
|          6|Thompson, Johnson...|Exercise physiolo...|  Other|       Unknown|
|          7|Hernandez, Johnso...|Forensic psycholo...|Unknown|      Divorced|
|          8|Carrillo, Schwart...| Solicitor, Scotland| Female|        Single|
|          9|         Olson-Lucas| Magazine journali

                                                                                

In [93]:
# fact-table 


fact_table = nuga_bank_df_clean.join(customer,['Customer_Name','Customer_Address','Customer_City','Customer_State','Customer_Country','Email','Phone_Number'], 'left') \
            .join(transaction,['Transaction_Date','Amount','Transaction_Type'],'left') \
            .join(employees,['Company','Job_Title','Gender','Marital_Status'], 'left') \
           

In [94]:
# fact_table = fact_table.select('transaction_id','Customer_id','employee_id', 'Credit_Card_Number','IBAN','Currency_Code','Random_Number','Category','Group','Is_Active','Last_Updated','Description')

fact_table = fact_table.select('transaction_id','employee_id','Customer_id','Credit_Card_Number','IBAN','Currency_Code','Random_Number','Category','Group','Is_Active','Last_Updated','Description')

fact_table.show()



+--------------+-----------+-----------+-------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|transaction_id|employee_id|Customer_id| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|
+--------------+-----------+-----------+-------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|   17179881322|       8447|       7735|4622935083555889735|GB38NHWI142007006...|          EUR|       2210.0|       D|    Z|      Yes|2021-10-22 07:48:...|White might elect...|
|   77309420823|       7297|77309423858|   6580934179272674|GB56IWIH341432913...|          TMT|       6120.0|       C|    X|       No|2023-03-18 15:44:...|World little ment...|
|   85899417722|      54358|77309472004|    374741864551707|GB15NKUF545398325...|          MRO|       3522.0|      

                                                                                

In [None]:
# output the cleaned data to parquet
# uses columnar package whcih is smaller and easier.
import os 



# transaction.write.mode('overwrite').parquet(os.path.join('./data/processed','transaction'))
# customer.write.mode('overwrite').parquet(os.path.join('./data/processed','customer'))
# employees.write.mode('overwrite').parquet(os.path.join('./data/processed','employee'))
# fact_table.write.mode('overwrite').parquet(os.path.join('./data/processed','fact_table'))

25/07/03 10:22:07 WARN MemoryManager: Total allocation exceeds 95.00% (986,683,789 bytes) of heap memory
Scaling row group sizes to 91.89% for 8 writers
25/07/03 10:22:07 WARN MemoryManager: Total allocation exceeds 95.00% (986,683,789 bytes) of heap memory
Scaling row group sizes to 81.68% for 9 writers
25/07/03 10:22:07 WARN MemoryManager: Total allocation exceeds 95.00% (986,683,789 bytes) of heap memory
Scaling row group sizes to 73.51% for 10 writers
25/07/03 10:22:07 WARN MemoryManager: Total allocation exceeds 95.00% (986,683,789 bytes) of heap memory
Scaling row group sizes to 66.83% for 11 writers
25/07/03 10:22:07 WARN MemoryManager: Total allocation exceeds 95.00% (986,683,789 bytes) of heap memory
Scaling row group sizes to 73.51% for 10 writers
25/07/03 10:22:07 WARN MemoryManager: Total allocation exceeds 95.00% (986,683,789 bytes) of heap memory
Scaling row group sizes to 81.68% for 9 writers
25/07/03 10:22:07 WARN MemoryManager: Total allocation exceeds 95.00% (986,683,

In [97]:
# to csv

transaction.repartition(3).write.mode('overwrite').option('header','True').csv(os.path.join('./data/csv','transaction'))
customer.repartition(3).write.mode('overwrite').option('header','True').csv(os.path.join('./data/csv','customer'))
employees.repartition(3).write.mode('overwrite').option('header','True').csv(os.path.join('./data/csv','employee'))
fact_table.repartition(3).write.mode('overwrite').option('header','True').csv(os.path.join('./data/csv','fact_table'))

25/07/03 10:28:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/03 10:28:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/03 10:28:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/03 10:28:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/03 10:28:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/03 10:28:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/03 10:28:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/03 10:28:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/07/03 10:28:44 WARN RowBasedKeyValueBatch: Calling spill() on