In [289]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from dotenv import load_dotenv
from pyspark.sql.functions import*
from pyspark.sql.types import StringType, IntegerType, BooleanType, DoubleType
import pandas as pd
import os
import re


## Loading the environment variables

In [290]:
# Loading the .env file
# Loading as environment variable
load_dotenv()
user = os.environ.get("user")
pwd = os.environ.get("pwd")

## Creating the Spark Session

In [291]:
# Creating Spark Session
spark = SparkSession.builder.appName('Capstone').getOrCreate()

### Fetching the customer, branch and the creditcard data from the creditcard_capstone database

In [292]:
# Function to fetch all the creditcard, customer and branch data from the creditcard_capstone database
def get_creditcard_info(user, pwd):
    
# Getting the creditcard transaction data     
    df_transactions=spark.read.format("jdbc")    \
             .options(driver="com.mysql.cj.jdbc.Driver",\
                      user=user,\
                      password=pwd,\
                      url="jdbc:mysql://localhost:3306/creditcard_capstone",\
                      dbtable = "creditcard_capstone.cdw_sapp_credit_card").load()
    
# Getting the customer data
    df_customers=spark.read.format("jdbc")    \
             .options(driver="com.mysql.cj.jdbc.Driver",\
                      user=user,\
                      password=pwd,\
                      url="jdbc:mysql://localhost:3306/creditcard_capstone",\
                      dbtable = "creditcard_capstone.cdw_sapp_customer").load()
    
# Getting the branch data    
    df_branches=spark.read.format("jdbc")    \
             .options(driver="com.mysql.cj.jdbc.Driver",\
                      user=user,\
                      password=pwd,\
                      url="jdbc:mysql://localhost:3306/creditcard_capstone",\
                      dbtable = "creditcard_capstone.cdw_sapp_branch").load()
    
# Returning the creditcard transaction, customer and branch data fetched from the creditcard_capstone database
    return df_transactions, df_customers, df_branches

## Extracting data from the files

In [293]:
# Function to extract the customer, branch and the creditcard data from the files
def extract():
    # Reading the customer json file into the spark dataframe
    df_customer = spark.read.json("json_files/cdw_sapp_custmer.json")

    # Reading the branch json file into the spark dataframe
    df_branch = spark.read.json('json_files/cdw_sapp_branch.json')

    # Reading the credit card json file into the pandas dataframe
    df_creditcard = pd.read_json("json_files/cdw_sapp_credit.json", lines=True)

    # Returning the customer, branch and the creditcard data after reading from thr input file
    return df_customer, df_branch, df_creditcard


## Transforming the data as per the mapping logic

In [294]:
# Function to transform the customer, branch and the creditcard data as per the mapping logic
def transform(df_customer, df_branch, df_creditcard):
# Customer data

    df_customer = df_customer.select(col("SSN").cast("int"), initcap(col("FIRST_NAME")).alias("FIRST_NAME"), lower(col("MIDDLE_NAME")).alias("MIDDLE_NAME"), \
                            initcap(col("LAST_NAME")).alias("LAST_NAME"), col("CREDIT_CARD_NO"), \
                            concat_ws(',', col("APT_NO"), col("STREET_NAME")).alias("FULL_STREET_ADDRESS"), \
                            col("CUST_CITY"), col("CUST_STATE"), col("CUST_COUNTRY"), col("CUST_ZIP").cast("int"), \
                            regexp_replace(col("CUST_PHONE"), r'^(\d{3})(\d{4})$', '(214)$1-$2').alias('CUST_PHONE'), \
                            col("CUST_EMAIL"), col("LAST_UPDATED").cast("timestamp"))

# Branch data

# Creating the temporary view for the branch data
    df_branch.createOrReplaceTempView("branchtable")

# Getting all the records from the branchtable created above and applying the mapping logic
    df_branch = spark.sql("SELECT CAST(BRANCH_CODE AS INT), BRANCH_NAME, BRANCH_STREET, BRANCH_CITY, \
                          BRANCH_STATE, CAST(IF(BRANCH_ZIP IS NULL, '99999', BRANCH_ZIP) AS INT) AS BRANCH_ZIP, \
                          CONCAT('(', SUBSTR(BRANCH_PHONE, 1, 3), ')', SUBSTR(BRANCH_PHONE, 4,3), '-', SUBSTR(BRANCH_PHONE, 7, 4)) AS BRANCH_PHONE, \
                          CAST(LAST_UPDATED AS TIMESTAMP) FROM BRANCHTABLE")


# Creditcard data

# Converting the data types and renaming the columns as per the mapping logic
    df_creditcard = df_creditcard.astype({"DAY":'str',"MONTH":'str',"YEAR":'str', "CREDIT_CARD_NO":'str'})
    df_creditcard.rename(columns={"CREDIT_CARD_NO" : "CUST_CC_NO"}, inplace=True)
    df_creditcard['DAY'] = df_creditcard['DAY'].str.zfill(2)
    df_creditcard['MONTH'] = df_creditcard['MONTH'].str.zfill(2)    

# Combining the day, month and year to form the TIMEID
    df_creditcard['TIMEID'] = df_creditcard['YEAR'] + df_creditcard['MONTH'] + df_creditcard['DAY']

# Dropping the Day, Month and Year columns from the creditcard data
    df_creditcard.drop(columns=['DAY', 'MONTH', 'YEAR'], axis = 1, inplace = True)

# Converting the credit card pandas dataframe into spark dataframe
    df_creditcard = spark.createDataFrame(df_creditcard)

# Converting the columns Branch Code, Cust SSN, Transaction ID to the interger data type
    df_creditcard= df_creditcard.withColumn("BRANCH_CODE", df_creditcard["BRANCH_CODE"].cast("int"))
    df_creditcard = df_creditcard.withColumn("CUST_SSN", df_creditcard["CUST_SSN"].cast("int"))
    df_creditcard = df_creditcard.withColumn("TRANSACTION_ID", df_creditcard["TRANSACTION_ID"].cast("int"))

    # Return the transformed customer, branch abd creditcard data
    return df_customer, df_branch, df_creditcard

## Loading the data into the database

In [296]:
# Function to load the customer, branch and the creditcard data to the database creditcard_capstone
def load(df_customer_data, df_branch_data, df_creditcard_data):
    # Writing the customer data to the customer database table
    df_customer_data.write.format("jdbc") \
                    .mode("overwrite") \
                    .option("truncate", "true") \
                    .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
                    .option("createTableColumnTypes", "FIRST_NAME VARCHAR(30), MIDDLE_NAME VARCHAR(30), \
                    LAST_NAME VARCHAR(30), CREDIT_CARD_NO VARCHAR(20), FULL_STREET_ADDRESS VARCHAR(50), \
                    CUST_CITY VARCHAR(30), CUST_STATE VARCHAR(5), CUST_COUNTRY VARCHAR(30), \
                    CUST_PHONE VARCHAR(20), CUST_EMAIL VARCHAR(30)") \
                    .option("dbtable", "creditcard_capstone.CDW_SAPP_CUSTOMER") \
                    .option("user", user) \
                    .option("password", pwd) \
                    .save()

# Writing the branch data to the branch database table
    df_branch_data.write.format("jdbc") \
                    .mode("overwrite") \
                    .option("truncate", "true") \
                    .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
                    .option("createTableColumnTypes", "BRANCH_NAME VARCHAR(30), BRANCH_STREET VARCHAR(50), \
                     BRANCH_CITY VARCHAR(30), BRANCH_STATE VARCHAR(5       ), BRANCH_PHONE VARCHAR(20)") \
                    .option("dbtable", "creditcard_capstone.CDW_SAPP_BRANCH") \
                    .option("user", user) \
                    .option("password", pwd) \
                    .save()


# Writing the creditcard data to the creditcard database table
    df_creditcard_data.write.format("jdbc") \
                    .mode("overwrite") \
                    .option("truncate", "true") \
                    .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
                    .option("createTableColumnTypes", "CUST_CC_NO VARCHAR(20), TIMEID VARCHAR(10), \
                    TRANSACTION_TYPE VARCHAR(30)") \
                    .option("dbtable", "creditcard_capstone.CDW_SAPP_CREDIT_CARD") \
                    .option("user", user) \
                    .option("password", pwd) \
                    .save()


## Displaying Schema

In [297]:
# Function to print the schema in a tree structure for the dataframe data provided
def print_schema(df):
    df.printSchema()
    return ""
   

## Displaying data types

In [298]:
# Function to print the data types of the columns in the dataframe for the given data
def print_data_types(df):
    print(df.dtypes)
    return ""
      

## Displaying the customer, branch and creditcard data

In [299]:
# Function to display the records with all the information 
def display_data(df_type):
    df_type.show(5)
    return ""


## Extract, Transform and Load (ETL)

In [302]:
def extract_transform_load():    

# Extract, transform and load data to the creditcard_capstone database

    # Extract
    print("Extraction started")

    # Extract the customer, branch and creditcard transaction data
    df_customer, df_branch, df_creditcard = extract()

    print("\nExtraction completed")

    # Transform
    print("\nTransform started")

    # Transform the customer, branch and the creditcard transaction data
    df_customer_data, df_branch_data, df_creditcard_data = transform(df_customer, df_branch, df_creditcard)

    print("\nTransform completed")

    # Displaying the schemas after transformation
    print("\nDisplaying the schema after transformation")
    print("\nCustomer schema after transformation:")
    print_schema(df_customer_data)
    print("\nBranch schema after transformation:")
    print(print_schema(df_branch_data))
    print("\nCreditcard schema after transformation:")
    print(print_data_types(df_creditcard_data))

    # Displaying the customer, branch and creditcard transaction data after transformation
    # and before loading it to the creditcard_capstone database
    print("\nDisplaying the data before loading to the database")
    
    # Displaying customer data
    print("\nCustomer data:")
    display_data(df_customer_data)

    # Displaying the branch data
    print("\nBranch data:")
    display_data(df_branch_data)

    # Displaying the creditcard data
    print("\nCreditcard data:")
    display_data(df_creditcard_data)

    # Loading
    print("\nLoading started")

    # Load the customer, branch and the creditcard transaction data
    load(df_customer_data, df_branch_data, df_creditcard_data)
    
    print("\nLoading completed")    

In [303]:
# Checking if the customer, brach and creditcard table exists in the creditcard_capstone database
# Extract, transform and load the customer, branch and creditcard data only if table does not exists in the database
extract_transform_load()

Extraction started

Extraction completed

Transform started


  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():



Transform completed

Displaying the schema after transformation

Customer schema after transformation:
root
 |-- SSN: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = false)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_ZIP: integer (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)


Branch schema after transformation:
root
 |-- BRANCH_CODE: integer (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_ZIP: integer (nullable = true)
 |-- BRANCH_PHONE: string (