In [1]:
import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

from datetime import datetime
from dotenv import load_dotenv              # environment variables
import os

In [2]:
# load the environment variables
load_dotenv()

# assign environment variables
PASSWORD = os.getenv('MariaDB_Password')
USER = os.getenv('MariaDB_Username')

In [3]:
spark = SparkSession.builder.appName("Credit Card App").getOrCreate()

Extract Function

In [4]:
# extract json file
def extract_json(file):
    return spark.read.json(file)

Transformation Functions 
- Customer 
- Branch 
- Credit

In [5]:
# transform customer data
def transform_customer(dataframe):
    # name transformation
    dataframe = dataframe.withColumn('FIRST_NAME', initcap(dataframe['FIRST_NAME']))                         # convert to title case
    dataframe = dataframe.withColumn('MIDDLE_NAME', lower(dataframe['MIDDLE_NAME']))                         # convert to lower case
    dataframe = dataframe.withColumn('LAST_NAME', initcap(dataframe['LAST_NAME']))                           # convert to title case

    # address transformation
    dataframe = dataframe.withColumn('FULL_STREET_ADDRESS', 
                                     concat_ws(', ', dataframe['STREET_NAME'], dataframe['APT_NO']))         # concat street name + apt no
    dataframe = dataframe.drop('APT_NO', 'STREET_NAME')                                                      # drop columns

    # phone number transformation
    dataframe = dataframe.withColumn('CUST_PHONE', concat(lit('(781)'),                                      # change format of phone number
                                                          substring(dataframe['CUST_PHONE'], 1, 3), 
                                                          lit('-'), 
                                                          substring(dataframe['CUST_PHONE'], 3, 4)))
    
    # convert data types
    dataframe = dataframe.withColumn('SSN', dataframe['SSN'].cast('int'))
    dataframe = dataframe.withColumn('CUST_ZIP', dataframe['CUST_ZIP'].cast('int'))
    dataframe = dataframe.withColumn('LAST_UPDATED', to_timestamp(dataframe['LAST_UPDATED']))

    # rearrange columns
    rearranged_customer_df = dataframe.select('SSN', 
                                                'FIRST_NAME', 
                                                'MIDDLE_NAME', 
                                                'LAST_NAME',
                                                'CREDIT_CARD_NO',
                                                'FULL_STREET_ADDRESS',
                                                'CUST_CITY',
                                                'CUST_STATE',
                                                'CUST_COUNTRY',
                                                'CUST_ZIP',
                                                'CUST_PHONE',
                                                'CUST_EMAIL',
                                                'LAST_UPDATED')
    return rearranged_customer_df

In [11]:
# transform branch data 
def transform_branch(dataframe):
    # zip code transformation
    dataframe = dataframe.fillna(999999, subset=['BRANCH_ZIP'])                                              # replace null values

    # phone number transformation
    dataframe = dataframe.withColumn('BRANCH_PHONE', concat(lit('(781)'),                                    # change format of phone number
                                                            substring(dataframe['BRANCH_PHONE'], 1, 3), 
                                                            lit('-'), 
                                                            substring(dataframe['BRANCH_PHONE'], 3, 4)))
    
    # convert data type
    dataframe = dataframe.withColumn('BRANCH_CODE', dataframe['BRANCH_CODE'].cast('int'))
    dataframe = dataframe.withColumn('BRANCH_ZIP', dataframe['BRANCH_ZIP'].cast('int'))
    dataframe = dataframe.withColumn('LAST_UPDATED', to_timestamp(dataframe['LAST_UPDATED']))

    # rearrange columns
    rearranged_branch_df = dataframe.select('BRANCH_CODE',
                                'BRANCH_NAME',
                                'BRANCH_STREET',
                                'BRANCH_CITY',
                                'BRANCH_STATE',
                                'BRANCH_ZIP',
                                'BRANCH_PHONE',
                                'LAST_UPDATED')
    
    return rearranged_branch_df

In [12]:
# transform credit data 
def transform_credit(dataframe):
    # date transformation
    dataframe = dataframe.withColumn('TIMEID',                                                               # change format of date
                                     concat_ws('-', dataframe['YEAR'], dataframe['MONTH'], dataframe['DAY']).cast('date'))
    
    # remove all hypens
    dataframe = dataframe.withColumn('TIMEID', regexp_replace(dataframe['TIMEID'], '-', ''))
    dataframe = dataframe.drop('YEAR', 'MONTH', 'DAY')

    # convert data type
    dataframe = dataframe.withColumn('BRANCH_CODE', dataframe['BRANCH_CODE'].cast('int'))
    dataframe = dataframe.withColumn('CUST_SSN', dataframe['CUST_SSN'].cast('int'))
    dataframe = dataframe.withColumn('TRANSACTION_ID', dataframe['TRANSACTION_ID'].cast('int'))

    # rename column
    dataframe = dataframe.withColumnRenamed('CREDIT_CARD_NO', 'CUST_CC_NO')
    
    # rearrange columns
    rearranged_credit_df = dataframe.select('CUST_CC_NO',
                                            'TIMEID',
                                            'CUST_SSN',
                                            'BRANCH_CODE',
                                            'TRANSACTION_TYPE',
                                            'TRANSACTION_VALUE',
                                            'TRANSACTION_ID')
    
    return rearranged_credit_df

Loading Function

In [6]:
# load/write data to MariaDB
def load_to_db(dataframe, db_name, table_name, user, password):
    dataframe.write.format("jdbc") \
                    .mode("append") \
                    .option("url", f"jdbc:mysql://localhost:3306/{db_name}") \
                    .option("dbtable", table_name) \
                    .option("user", user) \
                    .option("password", password) \
                    .save()

In [23]:
# load/write data to csv file to use for interactive dashboard
def load_to_csv(dataframe, file):
    pandas_df = dataframe.toPandas()
    pandas_df.to_csv(file, mode='a', index=False)               # don't include the index in the csv file

Logging Function

In [10]:
# logging
def log(message):
    now = datetime.now()                                        # get current timestamp
    timestamp_format = '%Y-%h-%d-%H:%M:%S'                      # Year-Month_name-Day-Hour-Minute-Second
    timestamp = now.strftime(timestamp_format)

    with open('cc_logfile.txt', 'a') as f:                      # outputs logs to cc_logfile.txt
        f.write(timestamp + ',' + message + '\n')

ETL Pipelines
- Customer
- Branch
- Credit

In [13]:
# Customer ETL Pipeline
log('Customer ETL Job Started')
#-----------------------------------------------------------
log('Customer Extraction Started')
customer_df = extract_json('cdw_files/cdw_sapp_custmer.json')
log('Customer Extraction Ended')
#-----------------------------------------------------------
log('Customer Transformation Started')
transformed_customer_df = transform_customer(customer_df)
log('Customer Transformation Ended')
#-----------------------------------------------------------
log('Customer Loading Started')
load_to_db(transformed_customer_df,     # dataframe
           'creditcard_capstone',       # db_name
           'CDW_SAPP_CUSTOMER',         # table_name
           USER,                        # user_name
           PASSWORD)                    # password
load_to_csv(transformed_customer_df, 'cleaned_files/cleaned_customer.csv')                                   # for interactive dashboard
log('Customer Loading Ended')
#-----------------------------------------------------------
log('Customer ETL Job Ended')

In [15]:
# Branch ETL Pipeline
log('Branch ETL Job Started')
#-----------------------------------------------------------
log('Branch Extraction Started')
branch_df = extract_json('cdw_files/cdw_sapp_branch.json')
log('Branch Extraction Ended')
#-----------------------------------------------------------
log('Branch Transformation Started')
transformed_branch_df = transform_branch(branch_df)
log('Branch Transformation Ended')
#-----------------------------------------------------------
log('Branch Loading Started')
load_to_db(transformed_branch_df,       # dataframe
           'creditcard_capstone',       # db_name
           'CDW_SAPP_BRANCH',           # table_name
           USER,                        # user_name
           PASSWORD)                    # password
load_to_csv(transformed_branch_df, 'cleaned_files/cleaned_branch.csv')                                       # for interactive dashboard
log('Branch Loading Ended')
#-----------------------------------------------------------
log('Branch ETL Job Ended')

In [19]:
# Credit ETL Pipeline
log('Credit ETL Job Started')
#-----------------------------------------------------------
log('Credit Extraction Started')
credit_df = extract_json('cdw_files/cdw_sapp_credit.json')
log('Credit Extraction Ended')
#-----------------------------------------------------------
log('Credit Transformation Started')
transformed_credit_df = transform_credit(credit_df)
log('Credit Transformation Ended')
#-----------------------------------------------------------
log('Credit Loading Started')
load_to_db(transformed_credit_df,       # dataframe
           'creditcard_capstone',       # db_name
           'CDW_SAPP_CREDIT_CARD',      # table_name
           USER,                        # user_name
           PASSWORD)                    # password
load_to_csv(transformed_credit_df, 'cleaned_files/cleaned_credit.csv')                                       # for interactive dashboard
log('Credit Loading Ended')
#-----------------------------------------------------------
log('Credit ETL Job Ended')