In [1]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql import DataFrameWriter
from pyspark.sql.functions import monotonically_increasing_id
import os
import psycopg2

In [None]:
# set java home
os.environ['JAVA_HOME'] = 'C:\java8'

In [None]:
# Initialize my Spark Session
spark = SparkSession.builder \
        .appName("Nuga Bank ETL") \
        .config("spark.jars", "postgresql-42.7.3.jar") \
        .getOrCreate()

In [None]:
spark

In [None]:
# Extract this historical data into a spark dataframe

df = spark.read.csv(r'dataset\rawdata\nuga_bank_transactions.csv', header=True, inferSchema=True)

In [None]:
df.show()

In [None]:
df.printSchema()

In [None]:
# Data cleaning and transformation
for column in df.columns:
    print(column, 'Nulls:', df.filter(df[column].isNull()).count())

In [None]:
df.describe().show()

In [None]:
# fill up the missing values
df_clean = df.fillna({
    'Customer_Name': 'Unknown',
    'Customer_Address': 'Unknown',
    'Customer_City': 'Unknown',
    'Customer_State': 'Unknown',
    'Customer_Country': 'Unknown',
    'Company': 'Unknown', 
    'Job_Title': 'Unknown',
    'Email': 'Unknown',
    'Phone_Number': 'Unknown',
    'Credit_Card_Number': 0,
    'IBAN': 'Unknown',
    'Currency_Code': 'Unknown',
    'Random_Number': 0.0,
    'Category' : 'Unknown',
    'Group' : 'Unknown',
    'Is_Active' : 'Unknown',
    'Description' : 'Unknown',
    'Gender' : 'Unknown',
    'Marital_Status' : 'Unknown'
})

In [None]:
# Drop the missing values in the Last_Updated column
df_clean = df_clean.na.drop(subset=['Last_Updated'])

In [None]:
for column in df_clean.columns:
    print(column, 'Nulls:', df_clean.filter(df_clean[column].isNull()).count())