In [1]:
import pandas as pd
import pyspark as ps
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os
import re
from pyspark.sql.types import StringType
import numpy as np
import requests
from pprint import pp

In [2]:
# Creating Spark Session
spark = SparkSession.builder.appName('SBA 350').getOrCreate()
# Reading /loading the Dataset from JSON file, SPARK by defualt infersSchema for JSON files; only need headers parameter typically for csv files
customer_spark = spark.read.load("cdw_sapp_customer.json", format="json")
branch_spark = spark.read.load("cdw_sapp_branch.json", format= "json")
credit_spark = spark.read.load("cdw_sapp_credit.json", format= "json")

In [None]:
# credit_spark.show()
credit_spark.dtypes
# [('BRANCH_CODE', 'bigint'),
#  ('CREDIT_CARD_NO', 'string'),
#  ('CUST_SSN', 'bigint'),
#  ('DAY', 'bigint'),
#  ('MONTH', 'bigint'),
#  ('TRANSACTION_ID', 'bigint'),
#  ('TRANSACTION_TYPE', 'string'),
#  ('TRANSACTION_VALUE', 'double'),
#  ('YEAR', 'bigint')]

In [None]:
# branch_spark.show()
branch_spark.dtypes
# [('BRANCH_CITY', 'string'),
#  ('BRANCH_CODE', 'bigint'),
#  ('BRANCH_NAME', 'string'),
#  ('BRANCH_PHONE', 'string'),
#  ('BRANCH_STATE', 'string'),
#  ('BRANCH_STREET', 'string'),
#  ('BRANCH_ZIP', 'bigint'),
#  ('LAST_UPDATED', 'string')]

In [25]:

print(customer_spark.dtypes)
#[('APT_NO', 'string'),
#  ('CREDIT_CARD_NO', 'string'),
#  ('CUST_CITY', 'string'),
#  ('CUST_COUNTRY', 'string'),
#  ('CUST_EMAIL', 'string'),
#  ('CUST_PHONE', 'bigint'),
#  ('CUST_STATE', 'string'),
#  ('CUST_ZIP', 'string'),
#  ('FIRST_NAME', 'string'),
#  ('LAST_NAME', 'string'),
#  ('LAST_UPDATED', 'string'),
#  ('MIDDLE_NAME', 'string'),
#  ('SSN', 'bigint'),
#  ('STREET_NAME', 'string')]
print(customer_spark.count())
print(customer_spark.distinct().count())

[('APT_NO', 'string'), ('CREDIT_CARD_NO', 'string'), ('CUST_CITY', 'string'), ('CUST_COUNTRY', 'string'), ('CUST_EMAIL', 'string'), ('CUST_PHONE', 'bigint'), ('CUST_STATE', 'string'), ('CUST_ZIP', 'string'), ('FIRST_NAME', 'string'), ('LAST_NAME', 'string'), ('LAST_UPDATED', 'string'), ('MIDDLE_NAME', 'string'), ('SSN', 'bigint'), ('STREET_NAME', 'string')]
952
952


In [3]:

# tan_customer = customer_spark['FIRST_NAME', 'MIDDLE_NAME', 'LAST_NAME', 'STREET_NAME','APT_NO', 'CUST_PHONE']
# customer_spark.createOrReplaceTempView('customer')


def tran_cust_title_case(df, column_name):
    return df.withColumn(column_name,initcap(col(column_name)))
def tran_cust_lower_case(df, column_name):
    return df.withColumn(column_name,lower(col(column_name)))
def concat_cust_street_apt(df, col1, col2):
    #concat_ws concatenates multiple string columns 
    return df.withColumn('FULL_STREET_ADDRESS', concat_ws(',', col(col1), col(col2)))
#the cust_phone doesn't have an area code, need to rectify somehow
def tran_phone_num(df, column_name):
    return df.withColumn(column_name, concat(lit('('), 
                                             substring(col(column_name), 1, 3),
                                             lit(')'),
                                             substring(col(column_name), 4, 3),
                                             lit('-'),
                                             substring(col(column_name), 7, 4))
                                             .cast('string'))
#checks if the branches zip is null and defaults it to 99999, and if it isn't it returns the branches zip
def tran_branch_zip(df):
    return df.withColumn('BRANCH_ZIP', when(col('BRANCH_ZIP').isNull(), lit(99999)).otherwise(col('BRANCH_ZIP')))

#match cust state to branch state and slice branch phone 3-5 (last included?) to append to cust phone after 2nd element
#first left join the cust df and the branch df on their respective states like in sql
#getting duplicate rows are created for each row the join operation matches
custJoinbranch = customer_spark.join(branch_spark, customer_spark['CUST_STATE'] == branch_spark['BRANCH_STATE'], 'left')

# Updating the customer phone by appending a sliced portion of the branch phone
customer_fix = custJoinbranch.withColumn('CUST_PHONE', concat(col('CUST_PHONE').substr(1, 2), col('BRANCH_PHONE').substr(3, 3), col('CUST_PHONE').substr(3, 7)))
unwanted_columns = branch_spark.columns
#asterisk unpacks the columns from branch_spark so drop doesn't receive it all as a single argument and understands they're
# individual column names. Drop duplicates on cust_ssn so different family members living together aren't dropped. W/o the column 
# selection we'd have 3700 distinct entries vs the 952 we started with
customer_fix = customer_fix.drop(*unwanted_columns).dropDuplicates(['SSN'])

# concat the day, month, year columns into a TIMEID (YYYYMMDD)
def tran_to_timeid(df, day, month, year):
    # first concat the columns so you can use the to_date function so its format won't match the 
    # other tables. Need to lpad the month and day values so not to throw error when parsing to
    # to_date. The 2 represents the desired length of the string and the 0 is what we're left-padding 
    # with. Can't even do what the mapping logic wants YYYYMMDD
    date_string = concat(
        col(year),
        lpad(col(month), 2, '0'),
        lpad(col(day), 2, '0'))
    return df.withColumn('TIMEID', to_date(date_string, 'yyyyMMdd'))

In [4]:
print(customer_spark.count())
print(customer_spark.distinct().count())
print(branch_spark.count())
print(branch_spark.distinct().count())
print(custJoinbranch.count())
print(custJoinbranch.distinct().count())
print(customer_fix.count())
print(customer_fix.distinct().count())
# print(customer_fix1.count())
# print(customer_fix1.distinct().count())

952
952
115
115
5665
5665
952
952


In [4]:
#transforming the specified columns, from the mapping document, of the extracted customer df into new df
# tran_cust_spark = customer_spark.transform(tran_cust_title_case, 'FIRST_NAME')\
tran_cust_spark = customer_fix.transform(tran_cust_title_case, 'FIRST_NAME')\
.transform(tran_cust_lower_case, 'MIDDLE_NAME')\
.transform(tran_cust_title_case, 'LAST_NAME')\
.transform(concat_cust_street_apt, 'APT_NO', 'STREET_NAME')\
.drop('STREET_NAME', 'APT_NO')\
.transform(tran_phone_num, 'CUST_PHONE')


customer_spark.show()
tran_cust_spark.show()


+------+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME|
+------+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|   656|4210653310061055|     Natchez|United States| AHooper@example.com|   1237818|        MS|   39120|      Alec|   Hooper|2018-04-21T12:49:...|         Wm|123456100|Main Street North|
|   829|4210653310102868|Wethersfield|United States| EHolman@example.com|   1238933|        CT|   06109|      Etta|   Holman|2018-04-21T12:49:...|    Brendan|123453023|    Redwood Drive|
|   683|4210653310116272|     Huntley|United States| WDunham@exam

In [6]:
print(tran_cust_spark.count())
print(tran_cust_spark.distinct().count())
print(tran_cust_spark.dtypes)

952
952
[('CREDIT_CARD_NO', 'string'), ('CUST_CITY', 'string'), ('CUST_COUNTRY', 'string'), ('CUST_EMAIL', 'string'), ('CUST_PHONE', 'string'), ('CUST_STATE', 'string'), ('CUST_ZIP', 'string'), ('FIRST_NAME', 'string'), ('LAST_NAME', 'string'), ('MIDDLE_NAME', 'string'), ('SSN', 'bigint'), ('FULL_STREET_ADDRESS', 'string')]


In [5]:
#transforming the specified columns, from the mapping document, of the extracted branch df into new df
#just need to transform the branch_zip if the source value is null load default (99999) value else direct move
#and branch_phone change the format of phone number to (xxx)xxx-xxxx
tran_branch_spark = branch_spark.transform(tran_branch_zip)\
.transform(tran_phone_num, 'BRANCH_PHONE')

branch_spark.show()
tran_branch_spark.show()



+-----------------+-----------+------------+------------+------------+-------------------+----------+--------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME|BRANCH_PHONE|BRANCH_STATE|      BRANCH_STREET|BRANCH_ZIP|        LAST_UPDATED|
+-----------------+-----------+------------+------------+------------+-------------------+----------+--------------------+
|        Lakeville|          1|Example Bank|  1234565276|          MN|       Bridle Court|     55044|2018-04-18T16:51:...|
|          Huntley|          2|Example Bank|  1234618993|          IL|  Washington Street|     60142|2018-04-18T16:51:...|
|SouthRichmondHill|          3|Example Bank|  1234985926|          NY|      Warren Street|     11419|2018-04-18T16:51:...|
|       Middleburg|          4|Example Bank|  1234663064|          FL|   Cleveland Street|     32068|2018-04-18T16:51:...|
|    KingOfPrussia|          5|Example Bank|  1234849701|          PA|        14th Street|     19406|2018-04-18T16:51:...|
|         Paters

In [16]:
#have to set sparks configuration to legacy to stop getting error from parsing the to_date, throws null now if it isn't valid
# spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
tran_branch_spark.select('last_updated').show(truncate=False)

+-----------------------------+
|last_updated                 |
+-----------------------------+
|2018-04-18T16:51:47.000-04:00|
|2018-04-18T16:51:47.000-04:00|
|2018-04-18T16:51:47.000-04:00|
|2018-04-18T16:51:47.000-04:00|
|2018-04-18T16:51:47.000-04:00|
|2018-04-18T16:51:47.000-04:00|
|2018-04-18T16:51:47.000-04:00|
|2018-04-18T16:51:47.000-04:00|
|2018-04-18T16:51:47.000-04:00|
|2018-04-18T16:51:47.000-04:00|
|2018-04-18T16:51:47.000-04:00|
|2018-04-18T16:51:47.000-04:00|
|2018-04-18T16:51:47.000-04:00|
|2018-04-18T16:51:47.000-04:00|
|2018-04-18T16:51:47.000-04:00|
|2018-04-18T16:51:47.000-04:00|
|2018-04-18T16:51:47.000-04:00|
|2018-04-18T16:51:47.000-04:00|
|2018-04-18T16:51:47.000-04:00|
|2018-04-18T16:51:47.000-04:00|
+-----------------------------+
only showing top 20 rows



In [22]:
# date_syntax = tran_branch_spark.select('last_updated')
# date_column = to_date(date_syntax)
# date_format_syntax = date_column.cast(StringType()).expr._jc.getDateFormat()

2018-04-18T16:51:47.000-04:00


In [6]:
#transforming the specified columns, from the mapping document, of the extracted credit df into new df
#just need to convert/concat the day, month, year columns into a TIMEID (YYYYMMDD)
#is there a way to use a timestamp and not just concat? is there an easier way?
tran_credit_spark = credit_spark.transform(tran_to_timeid, 'DAY', 'MONTH', 'YEAR')\
.drop('DAY','MONTH','YEAR')

credit_spark.show()
tran_credit_spark.show()


+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|
|         35|4210653349028689|123459988| 20|    3|             2|   Entertainment|            14.24|2018|
|        160|4210653349028689|123459988|  8|    7|             3|         Grocery|             56.7|2018|
|        114|4210653349028689|123459988| 19|    4|             4|   Entertainment|            59.73|2018|
|         93|4210653349028689|123459988| 10|   10|             5|             Gas|             3.59|2018|
|        164|4210653349028689|123459988| 28|    5|             6|       Education|             6.89|2018|
|        119|4210653349028689|123459988| 19|  

In [7]:
#create and populate the requisite tables in SQL db creditcard_capstone
tran_cust_spark.write.format("jdbc").options(driver="com.mysql.cj.jdbc.Driver",
                                     user="root",
                                     password="password",
                                     url="jdbc:mysql://localhost:3306/creditcard_capstone",
                                     dbtable="CDW_SAPP_CUSTOMER", 
                                     ).mode('overwrite').save()

tran_branch_spark.write.format("jdbc").options(driver="com.mysql.cj.jdbc.Driver",
                                     user="root",
                                     password="password",
                                     url="jdbc:mysql://localhost:3306/creditcard_capstone",
                                     dbtable="CDW_SAPP_BRANCH", 
                                     ).mode('overwrite').save()

tran_credit_spark.write.format("jdbc").options(driver="com.mysql.cj.jdbc.Driver",
                                     user="root",
                                     password="password",
                                     url="jdbc:mysql://localhost:3306/creditcard_capstone",
                                     dbtable="CDW_SAPP_CREDIT_CARD", 
                                     ).mode('overwrite').save()

In [8]:
# Read the data from the MySQL table
cust_sql = spark.read.format("jdbc").options(driver="com.mysql.cj.jdbc.Driver",
                                             user="root",
                                             password="password",
                                             url="jdbc:mysql://localhost:3306/creditcard_capstone",
                                             dbtable="CDW_SAPP_CUSTOMER"
                                             ).load()
branch_sql = spark.read.format("jdbc").options(driver="com.mysql.cj.jdbc.Driver",
                                             user="root",
                                             password="password",
                                             url="jdbc:mysql://localhost:3306/creditcard_capstone",
                                             dbtable="CDW_SAPP_BRANCH"
                                             ).load()
credit_sql = spark.read.format("jdbc").options(driver="com.mysql.cj.jdbc.Driver",
                                             user="root",
                                             password="password",
                                             url="jdbc:mysql://localhost:3306/creditcard_capstone",
                                             dbtable="CDW_SAPP_CREDIT_CARD"
                                             ).load()

# Display the contents of the DataFrame
# cap_spark.show()


Terminal Application

Let's define a simple terminal app that we can make in this notebook. Greeter will:

Always display a title bar at the top of the screen, showing the name of the app that is running.
Offer you three choices:
    Enter a name.
        If the program knows that name, it will print a message saying something like "Hello, old friend."
        If the program does not know that name, it will print a message saying something like, "It's nice to meet you. I will remember you."
            The next time the program hears this name, it will greet the person as an old friend.
    See a list of everyone that is known.
    Quit.
The program will remember people, even after it closes.
The program may list the number of known people in the title bar.


Now we will go over a few things we need to know in order to build this app, and then we will build it.

In [None]:
# FUNCTIONS

def display_title_bar():
    # Clears the terminal screen, and displays a title bar.
    os.system('cls')
              
    print("\t**********************************************")
    print("\t***  Welcome to Chad's Final Project!  ***")
    print("\t**********************************************")

def get_user_choice():
    # Menu options...
    print("[1] Display transactions by customers given zip code and date?.")
    print("[2] Display the number and total values of tranactions of a given TYPE.")
    print("[3] Display the number and total values of transactions for branches of a given STATE.")
    print("[4] Check account details of an existing customer.")
    print("[5] Modify the details of an existing customer's account.")
    print("[6] Generate a monthly bill for a credit card number, given the month and year.")
    print("[7] Display the transactions of a customer between two given dates.")
    print("[8] Quit.")
    
    return input("What would you like to do? ")

# Removed the visualization and plotting functions

# check_case function remains the same

# MAIN PROGRAM

choice = ''
display_title_bar()
while choice != '8':    
    choice = get_user_choice()
    
    if choice == '1':
        zip = input("Enter the 5 digit desired zipcode: ")
        zip = check_case(zip, zip_pattern)
        y = input("Enter in year (yyyy): ")
        y = check_case(y, year_pattern)
        m = input("Enter in month (MM): ")
        m = check_case(m, month_pattern)
        disp_tran_by_cust_zip(zip, y, m)
    elif choice == '2':
        t_type = input("Enter desired transaction type(Education, Entertainment, Healthcare, Grocery, Test, Gas, Bills): ")
        t_type = check_case(t_type, transaction_type_pattern)
        disp_tran_total_by_type(t_type)
    elif choice == '3':
        state = input("Enter desired state of query: ")
        state = check_case(state, state_pattern)
        disp_tran_total_by_branch_state(state)
    elif choice == '4':
        cust_ssn = input("Enter in customer's SSN: ")
        cust_ssn = check_case(cust_ssn, ssn_pattern)
        check_cust_exist(cust_ssn)
    elif choice == '5':
        cust_ssn = input("Enter customer's SSN for the acconut account you wish to modify: ")
        cust_ssn = check_case(cust_ssn, ssn_pattern)
        modify_cust_account(cust_ssn)
    elif choice == '6':
        card = input("Enter in credit card number: ")
        card = check_case(card, credit_card_pattern)
        y = input("Enter in year (yyyy): ")
        y = check_case(y, year_pattern)
        m = input("Enter in month (MM): ")
        m = check_case(m, month_pattern)
        gen_monthly_bill_by_card_number(card, y, m)
    elif choice == '7':
        cust_ssn = input("Enter in customer's SSN number: ")
        cust_ssn = check_case(cust_ssn, ssn_pattern)
        sd = input("Enter in start date (YYYY-MM-DD): ")
        sd = check_case(sd, date_pattern)
        ed = input("Enter in end date (YYYY-MM-DD): ")
        ed = check_case(ed, date_pattern)
        disp_tran_by_cust_between_given_dates(cust_ssn, sd, ed)
    elif choice == '8':
        print("\nExiting program.")
    else:
        print("\nI didn't understand that choice.\n")


In [9]:
#Loan Application Data API
#Banks want to automate the loan eligibility process (in realtime) based on customer details provided
# while filling out the online application form. These details are Gender, Marital Status, Education,
# Number of Dependents, Income, Loan Amount, Credit History, and others.

# char_keys = {}
# for application in loan_data:
#     for key, value in application.items():
#         char_keys[key] = value
# pp(char_keys)


#4. Functional Requirements - LOAN Application Dataset

#Create a Python program to GET (consume) data from the above API endpoint for the loan
#  application dataset.
bank_url = 'https://raw.githubusercontent.com/platformps/LoanDataset/main/loan_data.json'
response = requests.get(bank_url)
loan_data = response.json()

#Find the status code of the above API endpoint.
# Hint: status code could be 200, 400, 404, 401.
print(response.status_code)

#Once Python reads data from the API, utilize PySpark to load data into RDBMS (SQL).
# The table name should be CDW-SAPP_loan_application in the database.
# Note: Use the “creditcard_capstone” database.
loan_df = spark.createDataFrame(loan_data)
loan_df.write.format("jdbc").options(driver="com.mysql.cj.jdbc.Driver",
                                     user="root",
                                     password="password",
                                     url="jdbc:mysql://localhost:3306/creditcard_capstone",
                                     dbtable="CDW_SAPP_loan_application", 
                                     ).mode('overwrite').save()




200


In [9]:
import mysql.connector

def modify_cust_account(cust_ssn, cust_sql):
    con = mysql.connector.connect(user='root', password='password', host='localhost', database='creditcard_capstone')
    cursor = con.cursor()
    for column in cust_sql.columns:
        user_input = input(f"Do you want to modify customer's {column} (y/n): ")
        if user_input.lower() == 'y':
            new_value = input(f'Enter the new value for {column}: ')
            update_q = f"UPDATE CDW_SAPP_CUSTOMER SET {column} = %s WHERE SSN = %s"
            cursor.execute(update_q, (new_value, cust_ssn))
    con.commit()
    cursor.close()
    con.close()
modify_cust_account(123451007, cust_sql)

In [33]:
spark.sql("DROP TABLE IF EXISTS CDW_SAPP_CUSTOMER_MODIFIED")

DataFrame[]

In [14]:
spark.catalog.listDatabases()

[Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/C:/Users/Learner_9ZH3Z126/2023_data_engineering/SBA-350-Final_Project/spark-warehouse')]