In [1]:
import os
import requests
import pandas as pd
import mysql.connector as db

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, lpad, length, regexp_replace, initcap, lower, col, lit

In [3]:
spark = SparkSession\
    .builder\
    .appName("capstone")\
    .config("spark.jars","/opt/homebrew/Cellar/apache-spark/3.3.1/libexec/jars/mysql-connector-j-8.0.32.jar")\
    .getOrCreate()

    #.config("spark.jars", "./mysql-connector-j-8.0.32.jar")\

23/03/01 11:33:28 WARN Utils: Your hostname, Sules-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.17 instead (on interface en0)
23/03/01 11:33:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/03/01 11:33:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/01 11:33:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
# authentication for DB connection
mysql_pwd = os.environ.get("mysql_pwd")
mysql_user = os.environ.get("mysql_user")

### Helper functions

In [5]:
def get_df_size(sparkdf):
    return (sparkdf.count(), len(sparkdf.columns))

### 1. Load data sets

In [6]:
# JSON files
branch = spark.read.json("../Credit Card Dataset/cdw_sapp_branch.json")
credit = spark.read.json("../Credit Card Dataset/cdw_sapp_credit.json")
customer = spark.read.json("../Credit Card Dataset/cdw_sapp_custmer.json")

In [7]:
get_df_size(credit), get_df_size(branch), get_df_size(customer)

((46694, 9), (115, 8), (952, 14))

In [8]:
# Loan application API
# note add try except for accessing the api
url = "https://raw.githubusercontent.com/platformps/LoanDataset/main/loan_data.json"
response = requests.get(url)
print(response)
pandas_df = pd.DataFrame(response.json())
loan = spark.createDataFrame(pandas_df)

<Response [200]>


  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


In [9]:
get_df_size(loan)

(511, 10)


### 2. Transform data according to the specifications at [Mapping document](https://docs.google.com/spreadsheets/d/1t8UxBrUV6dxx0pM1VIIGZpSf4IKbzjdJ/edit#gid=1823293337)

In [10]:
table_names = {'credit_transformed':'CDW_SAPP_CREDIT_CARD',
               'branch_transformed':'CDW_SAPP_BRANCH',
               'customer_transformed':'CDW_SAPP_CUSTOMER',
               'loan_transformed':'CDW_SAPP_loan_application'}
# Mapping dictionaries:
# tuple: (old_col, new_col, type)
column_map = {'credit':[('CREDIT_CARD_NO','CUST_CC_NO','string'),
                     #(['DAY','MONTH','YEAR'],'TIMEID','string'),
                     ('CUST_SSN','CUST_SSN','integer'),
                     ('BRANCH_CODE','BRANCH_CODE','integer'),
                     ('TRANSACTION_TYPE','TRANSACTION_TYPE','string'),
                     ('TRANSACTION_VALUE','TRANSACTION_VALUE','float'),
                     ('TRANSACTION_ID','TRANSACTION_ID','integer')],
           'branch':[('BRANCH_CODE','BRANCH_CODE','integer'),
                     ('BRANCH_NAME','BRANCH_NAME','string'),
                     ('BRANCH_STREET','BRANCH_STREET','string'),
                     ('BRANCH_CITY','BRANCH_CITY','string'),
                     ('BRANCH_STATE','BRANCH_STATE','string'),
                     ('BRANCH_ZIP','BRANCH_ZIP','integer'),
                     ('BRANCH_PHONE','BRANCH_PHONE','string'),
                     ('LAST_UPDATED','LAST_UPDATED','timestamp')],
           'customer':[('SSN','SSN','integer'),
                       ('FIRST_NAME','FIRST_NAME','string'),
                       ('MIDDLE_NAME','MIDDLE_NAME','string'),
                       ('LAST_NAME','LAST_NAME','string'),
                       ('CREDIT_CARD_NO','CREDIT_CARD_NO','string'),
                       #(['STREET_NAME','APT_NO'],'FULL_STREET_ADDRESS','string'),
                       ('CUST_CITY','CUST_CITY','string'),
                       ('CUST_STATE','CUST_STATE','string'),
                       ('CUST_COUNTRY','CUST_COUNTRY','string'),
                       ('CUST_ZIP','CUST_ZIP','integer'),
                       ('CUST_PHONE','CUST_PHONE','string'),
                       ('CUST_EMAIL','CUST_EMAIL','string'),
                       ('LAST_UPDATED','LAST_UPDATED','timestamp')]}

In [11]:
def transform(df, data_name):
    map_dict = column_map[data_name]
    print(map_dict)
    #general type casting
    for tup in map_dict:
        df = df.withColumn(tup[1],df[tup[0]].cast(tup[2]))

    # special instructions for dataframe transformations
    if data_name == 'credit':
        df = df.withColumns({'MONTH': lpad(df['MONTH'],2,'0'),
                                    'DAY':lpad(df['DAY'],2,'0')})
        
        df = df.withColumns({'TIMEID':concat("YEAR", "MONTH", "DAY")})
        # drop columns:
        df = df.drop("YEAR", "MONTH", "DAY","CREDIT_CARD_NO")
        df = df.select('CUST_CC_NO', 'TIMEID', 'CUST_SSN', 'BRANCH_CODE', 
                       'TRANSACTION_TYPE', 'TRANSACTION_VALUE', 'TRANSACTION_ID')

    elif data_name == 'branch':
        # check for null zip codes:
        if df.filter(df['BRANCH_ZIP'].isNull()).collect():
            df = df.fillna(value={'BRANCH_ZIP':99999})
        # format phone_numbers:
        df = df.withColumn('BRANCH_PHONE',
                            regexp_replace('BRANCH_PHONE', r'(\d{3})(\d{3})(\d{4})',"($1)$2-$3"))
        df = df.select('BRANCH_CODE','BRANCH_NAME','BRANCH_STREET','BRANCH_CITY',
                       'BRANCH_STATE','BRANCH_ZIP', 'BRANCH_PHONE', 'LAST_UPDATED')
        
    elif data_name == 'customer':
        df = df.withColumns({'FIRST_NAME': initcap('FIRST_NAME'),
                                                'MIDDLE_NAME': lower('MIDDLE_NAME'),
                                                'LAST_NAME': initcap('LAST_NAME')})
        df = df.withColumn('FULL_STREET_ADDRESS', concat(col('STREET_NAME'),lit(', '), col('APT_NO')))
        df = df.withColumn('CUST_PHONE',
                                            regexp_replace('CUST_PHONE', r'(\d{3})(\d{4})',"(111)$1-$2"))
        df = df.drop("STREET_NAME","APT_NO")
        #reorder columns:
        df = df.select('SSN','FIRST_NAME', 'MIDDLE_NAME', 'LAST_NAME', 
                       'CREDIT_CARD_NO', 'FULL_STREET_ADDRESS', 'CUST_CITY',
                        'CUST_STATE', 'CUST_COUNTRY', 'CUST_ZIP', 'CUST_PHONE', 
                        'CUST_EMAIL', 'LAST_UPDATED')

    return df

In [12]:
customer_transformed = transform(customer,'customer')
customer_transformed.printSchema()


[('SSN', 'SSN', 'integer'), ('FIRST_NAME', 'FIRST_NAME', 'string'), ('MIDDLE_NAME', 'MIDDLE_NAME', 'string'), ('LAST_NAME', 'LAST_NAME', 'string'), ('CREDIT_CARD_NO', 'CREDIT_CARD_NO', 'string'), ('CUST_CITY', 'CUST_CITY', 'string'), ('CUST_STATE', 'CUST_STATE', 'string'), ('CUST_COUNTRY', 'CUST_COUNTRY', 'string'), ('CUST_ZIP', 'CUST_ZIP', 'integer'), ('CUST_PHONE', 'CUST_PHONE', 'string'), ('CUST_EMAIL', 'CUST_EMAIL', 'string'), ('LAST_UPDATED', 'LAST_UPDATED', 'timestamp')]
root
 |-- SSN: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_ZIP: integer (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |

In [13]:
credit_transformed = transform(credit,'credit')
credit_transformed.printSchema()


[('CREDIT_CARD_NO', 'CUST_CC_NO', 'string'), ('CUST_SSN', 'CUST_SSN', 'integer'), ('BRANCH_CODE', 'BRANCH_CODE', 'integer'), ('TRANSACTION_TYPE', 'TRANSACTION_TYPE', 'string'), ('TRANSACTION_VALUE', 'TRANSACTION_VALUE', 'float'), ('TRANSACTION_ID', 'TRANSACTION_ID', 'integer')]
root
 |-- CUST_CC_NO: string (nullable = true)
 |-- TIMEID: string (nullable = true)
 |-- CUST_SSN: integer (nullable = true)
 |-- BRANCH_CODE: integer (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_VALUE: float (nullable = true)
 |-- TRANSACTION_ID: integer (nullable = true)



In [14]:
branch_transformed = transform(branch,'branch')
branch_transformed.printSchema()

[('BRANCH_CODE', 'BRANCH_CODE', 'integer'), ('BRANCH_NAME', 'BRANCH_NAME', 'string'), ('BRANCH_STREET', 'BRANCH_STREET', 'string'), ('BRANCH_CITY', 'BRANCH_CITY', 'string'), ('BRANCH_STATE', 'BRANCH_STATE', 'string'), ('BRANCH_ZIP', 'BRANCH_ZIP', 'integer'), ('BRANCH_PHONE', 'BRANCH_PHONE', 'string'), ('LAST_UPDATED', 'LAST_UPDATED', 'timestamp')]
root
 |-- BRANCH_CODE: integer (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_ZIP: integer (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)



In [15]:
loan_transformed = loan

### 3. Upload data to the mysql database:
*  Create DB through mysql.connector
*  Upload spark dataframes to the tables

In [16]:
# Create a database called: creditcard_capstone

mysql = db.connect(user=f"{mysql_user}", password=f"{mysql_pwd}")
cursor =mysql.cursor()

In [None]:
cursor.execute("CREATE DATABASE creditcard_capstone;")
cursor.execute("USE creditcard_capstone;")


In [19]:
def check_database():
    # Create a database called "creditcard_capstone" if does not exist
    mysql = db.connect(user="root", password=mysql_pwd)
    cursor =mysql.cursor()
    cursor.execute("SHOW DATABASES;")
    dbs = cursor.fetchall()
    dbs_list = [x[0] for x in dbs]
    if 'creditcard_capstone' not in dbs_list:
        cursor.execute("CREATE DATABASE creditcard_capstone;")
        print("Created the database creditcard_capstone.")
    else:
        print("Database exist, the tables will be overwritten.")

In [20]:
check_database()

Database exist, the tables will be overwritten.


In [22]:
column_types_mysql = {
    'customer_transformed':\
        "SSN INT, "\
        "FIRST_NAME VARCHAR(50), "\
        "MIDDLE_NAME VARCHAR(50), "\
        "LAST_NAME VARCHAR(50), "\
        "CREDIT_CARD_NO VARCHAR(16), "\
        "FULL_STREET_ADDRESS VARCHAR(100), "\
        "CUST_CITY VARCHAR(50), "\
        "CUST_STATE VARCHAR(10), "\
        "CUST_COUNTRY VARCHAR(50), "\
        "CUST_ZIP INT, "\
        "CUST_PHONE VARCHAR(50), "\
        "CUST_EMAIL VARCHAR(100), "\
        "LAST_UPDATED TIMESTAMP",\
    'credit_transformed':\
        "CUST_CC_NO VARCHAR(16), "\
        "TIMEID VARCHAR(10), "\
        "CUST_SSN INT, "\
        "BRANCH_CODE INT, "\
        "TRANSACTION_TYPE VARCHAR(50), "\
        "TRANSACTION_VALUE DOUBLE, "\
        "TRANSACTION_ID INT",\
    'branch_transformed':
        "BRANCH_CODE INT, "\
        "BRANCH_NAME VARCHAR(50), "\
        "BRANCH_STREET VARCHAR(100), "\
        "BRANCH_CITY VARCHAR(50), "\
        "BRANCH_STATE VARCHAR(10), "\
        "BRANCH_ZIP INT, "\
        "BRANCH_PHONE VARCHAR(50), "\
        "LAST_UPDATED TIMESTAMP",\
    'loan_transformed':\
        'Application_ID VARCHAR(20),'\
        'Gender VARCHAR(20),'\
        'Married VARCHAR(20),'\
        'Dependents VARCHAR(20),'\
        'Education VARCHAR(20),'\
        'Self_Employed VARCHAR(20),'\
        'Credit_History INT, '\
        'Property_Area VARCHAR(20),'\
        'Income VARCHAR(20),'\
        'Application_Status VARCHAR(20)'\
        }

In [None]:
column_types_mysql

{'customer_transformed': 'SSN INT, FIRST_NAME VARCHAR(50), MIDDLE_NAME VARCHAR(50), LAST_NAME VARCHAR(50), CREDIT_CARD_NO VARCHAR(16), FULL_STREET_ADDRESS VARCHAR(100), CUST_CITY VARCHAR(50), CUST_STATE VARCHAR(10), CUST_COUNTRY VARCHAR(50), CUST_ZIP INT, CUST_PHONE VARCHAR(50), CUST_EMAIL VARCHAR(100), LAST_UPDATED TIMESTAMP',
 'credit_transformed': 'CUST_CC_NO VARCHAR(16), TIMEID VARCHAR(10), CUST_SSN INT, BRANCH_CODE INT, TRANSACTION_TYPE VARCHAR(50), TRANSACTION_VALUE DOUBLE, TRANSACTION_ID INT',
 'branch_transformed': 'BRANCH_CODE INT, BRANCH_NAME VARCHAR(50), BRANCH_STREET VARCHAR(100), BRANCH_CITY VARCHAR(50), BRANCH_STATE VARCHAR(10), BRANCH_ZIP INT, BRANCH_PHONE VARCHAR(50), LAST_UPDATED TIMESTAMP',
 'loan_transformed': 'Application_ID VARCHAR(20),Gender VARCHAR(20),Married VARCHAR(20),Dependents VARCHAR(20),Education VARCHAR(20),Self_Employed VARCHAR(20),Credit_History INT, Property_Area VARCHAR(20),Income VARCHAR(20),Application_Status VARCHAR(20)'}

In [23]:
# write df to tables (table_names dict has the names)
for dfname, tab in table_names.items():
  print(f'Uploaded {dfname} to {tab}')
  eval(dfname).write.format("jdbc") \
    .mode("overwrite") \
    .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
    .option("dbtable", "creditcard_capstone."+tab) \
    .option("user", mysql_user) \
    .option("driver", "com.mysql.cj.jdbc.Driver")\
    .option("createTableColumnTypes", column_types_mysql[dfname])\
    .option("password", mysql_pwd).save()

Uploaded credit_transformed to CDW_SAPP_CREDIT_CARD


                                                                                

Uploaded branch_transformed to CDW_SAPP_BRANCH
Uploaded customer_transformed to CDW_SAPP_CUSTOMER
Uploaded loan_transformed to CDW_SAPP_loan_application


In [None]:
spark.stop()

### This completes the extraction, transformation and loading of the data to the database