In [None]:
pip install findspark

In [None]:
pip install requests

In [1]:
import json
import requests
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession

from pyspark.sql.functions import concat, concat_ws, col, initcap, lower, substring, lit

In [2]:
spark = SparkSession.builder.master("local[*]").appName('CreditCardData').getOrCreate()

In [7]:
from pyspark.sql.types import StructType,StructField, StringType,IntegerType,BooleanType,DoubleType,LongType
schema = StructType([
    StructField('APT_NO', IntegerType(), True),
    StructField('CREDIT_CARD_NO',LongType(),True),
    StructField('CUST_CITY',StringType(),True),
    StructField('CUST_COUNTRY',StringType(),True),
    StructField('CUST_EMAIL',StringType(),True),
    StructField('CUST_PHONE',StringType(),True),
    StructField('CUST_STATE',StringType(),True),
    StructField('CUST_ZIP',LongType(),True),
    StructField('FIRST_NAME', StringType(),True),
    StructField('LAST_NAME',StringType(), True),
    StructField('LAST_UPDATED', StringType(),True),
    StructField('MIDDLE_NAME', StringType(),True),
    StructField('SSN', LongType(), True),
    StructField('STREET_NAME',StringType(),True),
])
df_with_schema = spark.read.schema(schema).json('Credit Card Dataset\cdw_sapp_custmer.json')
df_with_schema.printSchema()


root
 |-- APT_NO: integer (nullable = true)
 |-- CREDIT_CARD_NO: long (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: long (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- STREET_NAME: string (nullable = true)



In [3]:
cust_df = spark.read.json('Credit Card Dataset\cdw_sapp_custmer.json')
cust_df.printSchema() #print original schema for the Dataframe

cust_df.createOrReplaceTempView('CDW_SAPP_CUSTOMER') #create a temp view of the customer table
street = spark.sql('SELECT *, CONCAT(STREET_NAME,",",APT_NO) as STREET_NAME_APT_NO FROM CDW_SAPP_CUSTOMER ') #concatenates the street name and apt no
#street.printSchema()
#street.show()

updated_address = street.select('SSN','FIRST_NAME','MIDDLE_NAME','LAST_NAME','CREDIT_CARD_NO','STREET_NAME_APT_NO','CUST_CITY','CUST_STATE','CUST_COUNTRY','CUST_ZIP','CUST_PHONE','CUST_EMAIL','LAST_UPDATED')
#updated_address.printSchema()
#updated_address.show()

cust_phone = spark.sql('SELECT CONCAT("(", SUBSTRING(CUST_PHONE, 1, 3), ")", SUBSTRING(CUST_PHONE, 4, 3), "-", SUBSTRING(CUST_PHONE, 7, 4)) AS CUST_PHONE FROM  CDW_SAPP_CUSTOMER') #gets customers phone number
#cust_phone.printSchema()
#cust_phone.show()

mid_name = spark.sql('SELECT *, LOWER(MIDDLE_NAME) AS MIDDLE_NAME FROM CDW_SAPP_CUSTOMER') #this query obtains the customers middle name and places it into a lower case format

updated_cust_df = updated_address.select('SSN','FIRST_NAME','MIDDLE_NAME','LAST_NAME','CREDIT_CARD_NO','STREET_NAME_APT_NO','CUST_CITY','CUST_STATE','CUST_COUNTRY','CUST_ZIP','CUST_PHONE','CUST_EMAIL','LAST_UPDATED')
#updated_cust_df.printSchema()
updated_cust_df.show()

root
 |-- APT_NO: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: long (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- STREET_NAME: string (nullable = true)

+---------+----------+-----------+---------+----------------+--------------------+------------+----------+-------------+--------+----------+--------------------+--------------------+
|      SSN|FIRST_NAME|MIDDLE_NAME|LAST_NAME|  CREDIT_CARD_NO|  STREET_NAME_APT_NO|   CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|CUST_PHONE|          CUST_EMAIL|        LAST_UPDATED|
+---------+----------+-----------+---------+-----

In [None]:
cust_df.printSchema()

In [None]:
# #mid_name = df_cdw_cust.select('*', lower(col('MIDDLE_NAME'))).show()
# df_cust_apt = df_cdw_cust.withColumn('STREET_NAME,APT_NO',concat_ws(',',df_cdw_cust['STREET_NAME'],df_cdw_cust['APT_NO'])).alias("STREET_NAME,APT_NO")
# df_cust_apt.show(truncate=False)


In [None]:
spark.sql('SELECT *, LOWER(MIDDLE_NAME) AS MIDDLE_NAME FROM custtable').show()
# df_cust_midName = df_cdw_cust.withColumn('MIDDLE_NAME',lower('MIDDLE_NAME'))
# df_cust_midName.show()

In [None]:
# df_phone_number = df_cdw_cust.withColumn('CUST_PHONE', concat(lit('('),substring(df_cdw_cust['CUST_PHONE'],1,3),lit(')'),substring(df_cdw_cust['CUST_PHONE'],4,3),lit('-'), substring(df_cdw_cust['CUST_PHONE'],7,4)))

# df_phone_number.show()
spark.sql('SELECT CONCAT("(", SUBSTRING(CUST_PHONE, 1, 3), ")", SUBSTRING(CUST_PHONE, 4, 3), "-", SUBSTRING(CUST_PHONE, 7, 4)) AS CUST_PHONE FROM  custtable').show()

In [None]:
# new_df_cust = df_cust_apt.select(concat(df_cust_apt['STREET_NAME,APT_NO']),'SSN','FIRST_NAME','MIDDLE_NAME','LAST_NAME','CREDIT_CARD_NO','STREET_NAME,APT_NO','CUST_CITY','CUST_STATE','CUST_COUNTRY','CUST_ZIP','CUST_PHONE','CUST_EMAIL','LAST_UPDATED').show()
# new_df_cust = df_cust_apt.select('SSN','FIRST_NAME','MIDDLE_NAME','LAST_NAME','CREDIT_CARD_NO','STREET_NAME,APT_NO','CUST_CITY','CUST_STATE','CUST_COUNTRY','CUST_ZIP','CUST_PHONE','CUST_EMAIL','LAST_UPDATED')
# new_df_cust.show()

In [4]:
cc_df = spark.read.json('Credit Card Dataset\cdw_sapp_credit.json')
cc_df.printSchema()
# cc_df.createOrReplaceTempView('CDW_SAPP_CREDIT_CARD')
# date = spark.sql('SELECT *, CONCAT(DAY,"-",MONTH,"-",YEAR) AS DAY_MONTH_YEAR FROM CDW_SAPP_CREDIT_CARD')
# updated_cc_df = date.select('CREDIT_CARD_NO','DAY_MONTH_YEAR','CUST_SSN','BRANCH_CODE','TRANSACTION_TYPE','TRANSACTION_VALUE','TRANSACTION_ID')
# #cc_phone= df_cdw_cc.select(concat_ws('-', df_cdw_cc['DAY'],df_cdw_cc['MONTH'],df_cdw_cc['YEAR']).alias('DAY,MONTH,YEAR'),'CREDIT_CARD_NO','CUST_SSN','BRANCH_CODE','TRANSACTION_TYPE','TRANSACTION_VALUE','TRANSACTION_ID')
# #cc_phone.show()
# updated_cc_df.printSchema()
# updated_cc_df.show()

root
 |-- BRANCH_CODE: long (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_SSN: long (nullable = true)
 |-- DAY: long (nullable = true)
 |-- MONTH: long (nullable = true)
 |-- TRANSACTION_ID: long (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_VALUE: double (nullable = true)
 |-- YEAR: long (nullable = true)



In [None]:

# newf_df_cc= df_cdw_cc.select(concat_ws('-', df_cdw_cc['DAY'],df_cdw_cc['MONTH'],df_cdw_cc['YEAR']).alias('DAY,MONTH,YEAR'),'CREDIT_CARD_NO','CUST_SSN','BRANCH_CODE','TRANSACTION_TYPE','TRANSACTION_VALUE','TRANSACTION_ID').show()

#new_df_cc = df_cdw_cc.select(concat_ws('-',df_cdw_cc['DAY'],df_cdw_cc['MONTH'],df_cdw_cc['YEAR']).alias('DAY,MONTH,YEAR'), df_cdw_cc).show(truncate=False)

#newf_df_cc = df_cdw_cc.withColumn('DAY,MONTH,YEAR', col('DAY'),col('MONTH'),col('YEAR')).show()

In [None]:
#date = df_cdw_cc.select(concat_ws('-', df_cdw_cc['DAY'],df_cdw_cc['MONTH'],df_cdw_cc['YEAR']).alias('DAY,MONTH,YEAR')).show()
# newf_df_cc = df_cdw_cc.withColumn('DAY,MONTH,YEAR', col(date)).show()

In [5]:

branch_df = spark.read.json('Credit Card Dataset\cdw_sapp_branch.json')
branch_df.printSchema()
# branch_df.createOrReplaceTempView('CDW_SAPP_BRANCH')

# branch_phone = spark.sql('SELECT CONCAT("(", SUBSTRING(BRANCH_PHONE, 1, 3), ")", SUBSTRING(BRANCH_PHONE, 4, 3), "-", SUBSTRING(BRANCH_PHONE, 7, 4)) AS BRANCH_PHONE FROM  CDW_SAPP_BRANCH').show() #reformats branch number to (XXX)XXX-XXX 
# branch_zip = spark.sql('SELECT COALESCE(BRANCH_ZIP,99999) AS BRANCH_ZIP FROM CDW_SAPP_BRANCH') #Locates if any value is equal to Null and replaces it with 99999
# # new_branch_df = branch_zip.select('BRANCH_CODE','BRANCH_NAME','BRANCH_STREET','BRANCH_CITY','BRANCH_STATE','BRANCH_ZIP','BRANCH_PHONE','LAST_UPDATED')
# # new_branch_df.show()
# updated_branch_df = new_branch_df.select('BRANCH_CODE','BRANCH_NAME','BRANCH_STREET','BRANCH_CITY','BRANCH_STATE','BRANCH_ZIP','BRANCH_PHONE','LAST_UPDATED')
# updated_branch_df.printSchema()
# updated_branch_df.show()

root
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_ZIP: long (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)



In [None]:
# df_branch_number = df_cdw_branch.withColumn('BRANCH_PHONE', concat(lit('('),substring(df_cdw_branch['BRANCH_PHONE'],1,3),lit(')'),substring(df_cdw_branch['BRANCH_PHONE'],4,3),lit('-'), substring(df_cdw_branch['BRANCH_PHONE'],7,4)))

# df_branch_number.show(200)

In [None]:
#df_phone_number.select("BRANCH_ZIP", df_phone_number.like("%Null%")).show(100)
spark.sql('SELECT COALESCE(BRANCH_ZIP,99999) AS BRANCH_ZIP FROM branchtable').show()

In [34]:
spark.stop()

In [None]:
cust_df.write.format("jdbc") \
.mode("append") \
.option("url", "jdbc:mysql://localhost:3306/creditcad_capstone") \
.option("dbtable", "TableName") \
.option("user", "root") \
.option("password", "password") \
.save()