In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, initcap, lower, concat, lit, regexp_replace, coalesce, lpad

In [2]:
spark = SparkSession.builder.appName("Credit Card System").getOrCreate()
cust_df = spark.read.json("C:/PE/CAP350/cdw_sapp_custmer.json")

Below, I used Sparks .withColumn method to modify the column for the customer df.

In [3]:
# mapping the columns according to specifications
cust_df = cust_df.withColumn("SSN", col("SSN").cast("integer"))\
                .withColumn("FIRST_NAME", initcap(col("FIRST_NAME")))\
                .withColumn("MIDDLE_NAME", lower(col("MIDDLE_NAME")))\
                .withColumn("LAST_NAME", initcap(col("LAST_NAME")))\
                .withColumn("CREDIT_CARD_NO", col("CREDIT_CARD_NO"))\
                .withColumn("FULL_STREET_ADDRESS", concat(col("STREET_NAME"), lit(", "), col("APT_NO")))\
                .withColumn("CUST_CITY", col("CUST_CITY"))\
                .withColumn("CUST_STATE", col("CUST_STATE"))\
                .withColumn("CUST_COUNTRY", col("CUST_COUNTRY"))\
                .withColumn("CUST_ZIP", col("CUST_ZIP").cast("integer"))\
                .withColumn("CUST_PHONE", col("CUST_PHONE"))\
                .withColumn("CUST_EMAIL", col("CUST_EMAIL"))\
                .withColumn("LAST_UPDATED", col("LAST_UPDATED"))
                

In [4]:
cust_df.printSchema()

root
 |-- APT_NO: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: integer (nullable = true)
 |-- STREET_NAME: string (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = true)



In [5]:
cust_df.show(truncate=False)

+------+----------------+------------+-------------+----------------------+----------+----------+--------+----------+---------+-----------------------------+-----------+---------+-----------------+----------------------+
|APT_NO|CREDIT_CARD_NO  |CUST_CITY   |CUST_COUNTRY |CUST_EMAIL            |CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|LAST_UPDATED                 |MIDDLE_NAME|SSN      |STREET_NAME      |FULL_STREET_ADDRESS   |
+------+----------------+------------+-------------+----------------------+----------+----------+--------+----------+---------+-----------------------------+-----------+---------+-----------------+----------------------+
|656   |4210653310061055|Natchez     |United States|AHooper@example.com   |1237818   |MS        |39120   |Alec      |Hooper   |2018-04-21T12:49:02.000-04:00|wm         |123456100|Main Street North|Main Street North, 656|
|829   |4210653310102868|Wethersfield|United States|EHolman@example.com   |1238933   |CT        |6109    |Etta      

In [6]:
cust_df.createOrReplaceTempView("CDW_SAPP_CUSTOMER")
cust_cleaned = spark.sql("SELECT SSN, FIRST_NAME, MIDDLE_NAME, LAST_NAME, CREDIT_CARD_NO, FULL_STREET_ADDRESS,\
                         CUST_CITY, CUST_STATE, CUST_COUNTRY, CUST_ZIP, CUST_PHONE, CUST_EMAIL,\
                         LAST_UPDATED from CDW_SAPP_CUSTOMER")
cust_cleaned.show(truncate=False)

+---------+----------+-----------+---------+----------------+----------------------+------------+----------+-------------+--------+----------+----------------------+-----------------------------+
|SSN      |FIRST_NAME|MIDDLE_NAME|LAST_NAME|CREDIT_CARD_NO  |FULL_STREET_ADDRESS   |CUST_CITY   |CUST_STATE|CUST_COUNTRY |CUST_ZIP|CUST_PHONE|CUST_EMAIL            |LAST_UPDATED                 |
+---------+----------+-----------+---------+----------------+----------------------+------------+----------+-------------+--------+----------+----------------------+-----------------------------+
|123456100|Alec      |wm         |Hooper   |4210653310061055|Main Street North, 656|Natchez     |MS        |United States|39120   |1237818   |AHooper@example.com   |2018-04-21T12:49:02.000-04:00|
|123453023|Etta      |brendan    |Holman   |4210653310102868|Redwood Drive, 829    |Wethersfield|CT        |United States|6109    |1238933   |EHolman@example.com   |2018-04-21T12:49:02.000-04:00|
|123454487|Wilber   

In [7]:
cust_cleaned.printSchema()

root
 |-- SSN: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_ZIP: integer (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)



In [8]:
# For CDW_SAPP_BRANCH
branch_df = spark.read.json("C:/PE/CAP350/cdw_sapp_branch.json")
branch_df = branch_df.withColumn("BRANCH_ZIP", coalesce(col("BRANCH_ZIP").cast("integer"), lit(99999)))\
                     .withColumn("BRANCH_PHONE", regexp_replace(col("BRANCH_PHONE"), r"(^\d{3})(\d{3})(\d{4}$)", r"($1)$2-$3"))



In [9]:
branch_df.show(truncate=False)

+-----------------+-----------+------------+-------------+------------+-------------------+----------+-----------------------------+
|BRANCH_CITY      |BRANCH_CODE|BRANCH_NAME |BRANCH_PHONE |BRANCH_STATE|BRANCH_STREET      |BRANCH_ZIP|LAST_UPDATED                 |
+-----------------+-----------+------------+-------------+------------+-------------------+----------+-----------------------------+
|Lakeville        |1          |Example Bank|(123)456-5276|MN          |Bridle Court       |55044     |2018-04-18T16:51:47.000-04:00|
|Huntley          |2          |Example Bank|(123)461-8993|IL          |Washington Street  |60142     |2018-04-18T16:51:47.000-04:00|
|SouthRichmondHill|3          |Example Bank|(123)498-5926|NY          |Warren Street      |11419     |2018-04-18T16:51:47.000-04:00|
|Middleburg       |4          |Example Bank|(123)466-3064|FL          |Cleveland Street   |32068     |2018-04-18T16:51:47.000-04:00|
|KingOfPrussia    |5          |Example Bank|(123)484-9701|PA         

In [10]:
branch_df.createOrReplaceTempView("CDW_SAPP_BRANCH")
branch_cleaned = spark.sql("SELECT BRANCH_CODE, BRANCH_NAME, BRANCH_STREET, BRANCH_CITY, BRANCH_STATE, BRANCH_ZIP,\
                           BRANCH_PHONE, LAST_UPDATED from CDW_SAPP_BRANCH")
branch_cleaned.show(truncate=False)

+-----------+------------+-------------------+-----------------+------------+----------+-------------+-----------------------------+
|BRANCH_CODE|BRANCH_NAME |BRANCH_STREET      |BRANCH_CITY      |BRANCH_STATE|BRANCH_ZIP|BRANCH_PHONE |LAST_UPDATED                 |
+-----------+------------+-------------------+-----------------+------------+----------+-------------+-----------------------------+
|1          |Example Bank|Bridle Court       |Lakeville        |MN          |55044     |(123)456-5276|2018-04-18T16:51:47.000-04:00|
|2          |Example Bank|Washington Street  |Huntley          |IL          |60142     |(123)461-8993|2018-04-18T16:51:47.000-04:00|
|3          |Example Bank|Warren Street      |SouthRichmondHill|NY          |11419     |(123)498-5926|2018-04-18T16:51:47.000-04:00|
|4          |Example Bank|Cleveland Street   |Middleburg       |FL          |32068     |(123)466-3064|2018-04-18T16:51:47.000-04:00|
|5          |Example Bank|14th Street        |KingOfPrussia    |PA   

In [11]:
branch_cleaned.printSchema()

root
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_ZIP: integer (nullable = false)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)



Next, TIMEID is created by concatenating the "YEAR", "MONTH", and "DAY" columns. the month and day columns
will be padded with 0 on the left side if less than 2 digits.

In [12]:
# For CDW_SAPP_CREDITCARD
credit_df = spark.read.json("C:/PE/CAP350/cdw_sapp_credit.json")
credit_df = credit_df.withColumn("TIMEID", concat(col("YEAR"), lpad(col("MONTH"), 2, "0"), lpad(col("DAY"), 2, "0"))) 

In [26]:
credit_df.columns

['BRANCH_CODE',
 'CREDIT_CARD_NO',
 'CUST_SSN',
 'DAY',
 'MONTH',
 'TRANSACTION_ID',
 'TRANSACTION_TYPE',
 'TRANSACTION_VALUE',
 'YEAR',
 'TIMEID']

In [14]:
credit_df.createOrReplaceTempView("CDW_SAPP_CREDIT_CARD")
credit_cleaned = spark.sql("SELECT * from CDW_SAPP_CREDIT_CARD")
credit_cleaned.show(truncate=False)

+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+--------+
|BRANCH_CODE|CREDIT_CARD_NO  |CUST_SSN |DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|TIMEID  |
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+--------+
|114        |4210653349028689|123459988|14 |2    |1             |Education       |78.9             |2018|20180214|
|35         |4210653349028689|123459988|20 |3    |2             |Entertainment   |14.24            |2018|20180320|
|160        |4210653349028689|123459988|8  |7    |3             |Grocery         |56.7             |2018|20180708|
|114        |4210653349028689|123459988|19 |4    |4             |Entertainment   |59.73            |2018|20180419|
|93         |4210653349028689|123459988|10 |10   |5             |Gas             |3.59             |2018|20181010|
|164        |4210653349028689|123459988|28 |5    |6             |Education      

In [17]:
credit_cleaned.printSchema()

root
 |-- BRANCH_CODE: long (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_SSN: long (nullable = true)
 |-- DAY: long (nullable = true)
 |-- MONTH: long (nullable = true)
 |-- TRANSACTION_ID: long (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_VALUE: double (nullable = true)
 |-- YEAR: long (nullable = true)
 |-- TIMEID: string (nullable = true)



In [18]:
credit_df.createOrReplaceTempView("CDW_SAPP_CREDIT_CARD")
credit_cleaned = spark.sql("SELECT CREDIT_CARD_NO as CUST_CC_NO, TIMEID, CUST_SSN, BRANCH_CODE, TRANSACTION_TYPE,\
    TRANSACTION_VALUE, TRANSACTION_ID from CDW_SAPP_CREDIT_CARD")
credit_cleaned.show(truncate=False)

+----------------+--------+---------+-----------+----------------+-----------------+--------------+
|CUST_CC_NO      |TIMEID  |CUST_SSN |BRANCH_CODE|TRANSACTION_TYPE|TRANSACTION_VALUE|TRANSACTION_ID|
+----------------+--------+---------+-----------+----------------+-----------------+--------------+
|4210653349028689|20180214|123459988|114        |Education       |78.9             |1             |
|4210653349028689|20180320|123459988|35         |Entertainment   |14.24            |2             |
|4210653349028689|20180708|123459988|160        |Grocery         |56.7             |3             |
|4210653349028689|20180419|123459988|114        |Entertainment   |59.73            |4             |
|4210653349028689|20181010|123459988|93         |Gas             |3.59             |5             |
|4210653349028689|20180528|123459988|164        |Education       |6.89             |6             |
|4210653349028689|20180519|123459988|119        |Entertainment   |43.39            |7             |


In [37]:

driver = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/creditcard_capstone"
properties = {
    "driver": driver,
    "user": "root",
    "password": "password"
}
cust_cleaned.write.jdbc(url=url, table="CDW_SAPP_CUSTOMER", mode="overwrite", properties=properties)
branch_cleaned.write.jdbc(url=url, table="CDW_SAPP_BRANCH", mode="overwrite", properties=properties)
credit_cleaned.write.jdbc(url=url, table="CDW_SAPP_CREDIT_CARD", mode="overwrite", properties=properties)


In [16]:
credit_df.createOrReplaceTempView("CDW_SAPP_CREDIT_CARD")
credit_type = spark.sql("SELECT TRANSACTION_TYPE, sum(TRANSACTION_VALUE) from CDW_SAPP_CREDIT_CARD\
                        group by TRANSACTION_TYPE\
                        having TRANSACTION_TYPE='Bills'")
credit_type.show(truncate=False)

+----------------+----------------------+
|TRANSACTION_TYPE|sum(TRANSACTION_VALUE)|
+----------------+----------------------+
|Bills           |351405.2800000001     |
+----------------+----------------------+



In [15]:
credit_df.createOrReplaceTempView("TransType")
trans_cleaned = spark.sql("SELECT DISTINCT TRANSACTION_TYPE from CDW_SAPP_CREDIT_CARD")
trans_cleaned.show(truncate=False)

+----------------+
|TRANSACTION_TYPE|
+----------------+
|Education       |
|Entertainment   |
|Healthcare      |
|Grocery         |
|Test            |
|Gas             |
|Bills           |
+----------------+

