Capstone Project for Perscholas written by Poongodi Velayutham

data importing - import CDW_SAPP_CUSTOMER.JSON

In [1]:
# Importing the Libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import initcap, lower, concat, col, regexp_replace, lit, substring, lpad

# Creating Spark Session
spark = SparkSession.builder.appName('Customer').getOrCreate()

schema = StructType([
    StructField("FIRST_NAME", StringType(), True),
    StructField("MIDDLE_NAME", StringType(), True),
    StructField("LAST_NAME", StringType(), True),
    StructField("SSN", IntegerType(), True),
    StructField("CREDIT_CARD_NO", StringType(), True),
    StructField("APT_NO", StringType(), True),
    StructField("STREET_NAME", StringType(), True),
    StructField("CUST_CITY", StringType(), True),
    StructField("CUST_STATE", StringType(), True),
    StructField("CUST_COUNTRY", StringType(), True),
    StructField("CUST_ZIP", StringType(), True),
    StructField("CUST_PHONE", StringType(), True),
    StructField("CUST_EMAIL", StringType(), True),
    StructField("LAST_UPDATED", TimestampType(), True)
])

# Create SparkDataFrame and Reading/loading the Dataset from json file 
cust_df = spark.read.option("multiline", "true").schema(schema).json("data/cdw_sapp_customer.json")



In [2]:
cust_df.columns
cust_df.printSchema()
cust_df.describe().show()
cust_df.show()
cust_df.count()

root
 |-- FIRST_NAME: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- SSN: integer (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- APT_NO: string (nullable = true)
 |-- STREET_NAME: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)

+-------+----------+-----------+---------+--------------------+--------------------+------------------+-----------+---------+----------+-------------+------------------+------------------+--------------------+
|summary|FIRST_NAME|MIDDLE_NAME|LAST_NAME|                 SSN|      CREDIT_CARD_NO|            APT_NO|STREET_NAME|CUST_CITY|CUST_STATE| CUST_COUNTRY|          CUST_ZIP|        CUST_PHONE|         

952

In [3]:
# Convert the "FIRST_NAME" column to Title Case
cust_df = cust_df.withColumn("FIRST_NAME", initcap(cust_df["FIRST_NAME"]))
# Convert the "MIDDLE_NAME" column to lower Case
cust_df = cust_df.withColumn("MIDDLE_NAME", lower(cust_df["MIDDLE_NAME"]))
# Convert the "LAST_NAME" column to Title Case
cust_df = cust_df.withColumn("LAST_NAME", initcap(cust_df["LAST_NAME"]))

# Create new column "FULL_STREET_ADDRESS" by concatenating "APT_NO" and "STREET_NAME"
cust_df = cust_df.withColumn("FULL_STREET_ADDRESS", concat(col("APT_NO"), lit(", "), col("STREET_NAME")).cast(StringType()))

#Since the phone number is only 7 digits, pad it on the left with 000 as the area code
cust_df = cust_df.withColumn("CUST_PHONE", lpad(regexp_replace(col("CUST_PHONE"), "[^0-9]", ""), 10, '000'))

# Change the format of phone number to (XXX)XXX-XXXX
cust_df = cust_df.withColumn("CUST_PHONE", 
                             concat(lit("("), substring("CUST_PHONE", 1, 3), lit(")"),
                                    substring("CUST_PHONE", 4, 3), lit("-"),
                                    substring("CUST_PHONE", 7, 4)))


In [4]:



# Show the updated DataFrame
cust_df.select("CUST_PHONE").show(5)

# Show the updated DataFrame
cust_df.show(5)


+-------------+
|   CUST_PHONE|
+-------------+
|(000)123-7818|
|(000)123-8933|
|(000)124-3018|
|(000)124-3215|
|(000)124-2074|
+-------------+
only showing top 5 rows

+----------+-----------+---------+---------+----------------+------+-----------------+------------+----------+-------------+--------+-------------+-------------------+-------------------+--------------------+
|FIRST_NAME|MIDDLE_NAME|LAST_NAME|      SSN|  CREDIT_CARD_NO|APT_NO|      STREET_NAME|   CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|   CUST_PHONE|         CUST_EMAIL|       LAST_UPDATED| FULL_STREET_ADDRESS|
+----------+-----------+---------+---------+----------------+------+-----------------+------------+----------+-------------+--------+-------------+-------------------+-------------------+--------------------+
|      Alec|         wm|   Hooper|123456100|4210653310061055|   656|Main Street North|     Natchez|        MS|United States|   39120|(000)123-7818|AHooper@example.com|2018-04-21 11:49:02|656, Main Street 

data importing - import CDW_SAPP_BRANCH.JSON

In [5]:
# Importing the Libraries
import pyspark
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import initcap, lower, concat, col, regexp_replace, lit, substring, lpad, when, length

# Creating Spark Session
spark = SparkSession.builder.appName('Branch').getOrCreate()

schema = StructType([
    StructField("BRANCH_CODE", IntegerType(), True),
    StructField("BRANCH_NAME", StringType(), True),
    StructField("BRANCH_STREET", StringType(), True),
    StructField("BRANCH_CITY", StringType(), True),
    StructField("BRANCH_STATE", StringType(), True),
    StructField("BRANCH_ZIP", StringType(), True),  # ZIP codes can contain leading zeros, so we should treat it as string
    StructField("BRANCH_PHONE", StringType(), True),  # Phone numbers can have different formats, so it's better to treat it as string
    StructField("LAST_UPDATED", TimestampType(), True)
])

# Create SparkDataFrame and Reading/loading the Dataset from json file 
branch_df = spark.read.option("multiline", "true").schema(schema).json("data/cdw_sapp_branch.json")



In [6]:
branch_df.columns
branch_df.printSchema()
branch_df.describe().show()
branch_df.show()
branch_df.count()

root
 |-- BRANCH_CODE: integer (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_ZIP: string (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)

+-------+-----------------+------------+-------------+-----------+------------+------------------+--------------------+
|summary|      BRANCH_CODE| BRANCH_NAME|BRANCH_STREET|BRANCH_CITY|BRANCH_STATE|        BRANCH_ZIP|        BRANCH_PHONE|
+-------+-----------------+------------+-------------+-----------+------------+------------------+--------------------+
|  count|              115|         115|          115|        115|         115|               115|                 115|
|   mean|76.67826086956522|        NULL|         NULL|       NULL|        NULL|  38975.2347826087|1.2345499259478261E9|
| stddev|52.94113709535237|        NULL|  

115

In [7]:

# Change the format of phone number to (XXX)XXX-XXXX
branch_df = branch_df.withColumn("BRANCH_PHONE", 
                             concat(lit("("), substring("BRANCH_PHONE", 1, 3), lit(")"),
                                    substring("BRANCH_PHONE", 4, 3), lit("-"),
                                    substring("BRANCH_PHONE", 7, 4)))



# Show the updated DataFrame
branch_df.select("BRANCH_PHONE").show(5)

# Show the updated DataFrame
branch_df.show(5)

+-------------+
| BRANCH_PHONE|
+-------------+
|(123)456-5276|
|(123)461-8993|
|(123)498-5926|
|(123)466-3064|
|(123)484-9701|
+-------------+
only showing top 5 rows

+-----------+------------+-----------------+-----------------+------------+----------+-------------+-------------------+
|BRANCH_CODE| BRANCH_NAME|    BRANCH_STREET|      BRANCH_CITY|BRANCH_STATE|BRANCH_ZIP| BRANCH_PHONE|       LAST_UPDATED|
+-----------+------------+-----------------+-----------------+------------+----------+-------------+-------------------+
|          1|Example Bank|     Bridle Court|        Lakeville|          MN|     55044|(123)456-5276|2018-04-18 15:51:47|
|          2|Example Bank|Washington Street|          Huntley|          IL|     60142|(123)461-8993|2018-04-18 15:51:47|
|          3|Example Bank|    Warren Street|SouthRichmondHill|          NY|     11419|(123)498-5926|2018-04-18 15:51:47|
|          4|Example Bank| Cleveland Street|       Middleburg|          FL|     32068|(123)466-3064|2018-

In [8]:
# Select just the BRANCH_ZIP column and show the values
branch_df.select("BRANCH_ZIP").orderBy("BRANCH_ZIP").show(branch_df.count(), truncate = False)



+----------+
|BRANCH_ZIP|
+----------+
|10954     |
|11001     |
|11419     |
|11510     |
|11530     |
|11756     |
|11791     |
|11803     |
|12601     |
|14534     |
|15101     |
|15317     |
|17013     |
|17050     |
|17201     |
|17325     |
|17331     |
|17543     |
|19380     |
|19406     |
|19438     |
|20772     |
|20814     |
|20901     |
|21206     |
|21222     |
|2155      |
|2169      |
|23112     |
|23223     |
|27103     |
|27284     |
|27834     |
|28173     |
|29550     |
|29576     |
|29680     |
|30012     |
|30052     |
|30101     |
|30117     |
|30236     |
|30741     |
|32068     |
|32703     |
|32708     |
|32765     |
|33414     |
|33442     |
|33594     |
|33904     |
|34711     |
|34990     |
|36330     |
|38655     |
|39120     |
|39759     |
|41051     |
|42001     |
|43512     |
|44070     |
|44224     |
|44512     |
|44663     |
|44805     |
|45601     |
|46530     |
|47274     |
|48047     |
|48071     |
|48124     |
|48178     |
|48239     |
|48430     |

In [9]:
#Add logic to handle Zip code
# if null, make it 99999; if 4 digit, add left pad with 0
branch_df = branch_df.withColumn("BRANCH_ZIP", 
                   when(branch_df["BRANCH_ZIP"].isNull(), "99999")  # Replace null values with "99999"
                   .otherwise(                                   # Otherwise, check if leading zero is needed
                       when(length(col("BRANCH_ZIP")) == 4, lpad(col("BRANCH_ZIP"), 5, "0"))  
                       .otherwise(col("BRANCH_ZIP"))))

print("Branch Data Successfully Cleaned")

Branch Data Successfully Cleaned


In [10]:
# Select just the BRANCH_ZIP column and show the values
branch_df.select("BRANCH_ZIP").orderBy("BRANCH_ZIP").show(branch_df.count(), truncate = False)


+----------+
|BRANCH_ZIP|
+----------+
|02155     |
|02169     |
|06109     |
|06511     |
|07111     |
|07501     |
|07740     |
|07866     |
|08844     |
|10954     |
|11001     |
|11419     |
|11510     |
|11530     |
|11756     |
|11791     |
|11803     |
|12601     |
|14534     |
|15101     |
|15317     |
|17013     |
|17050     |
|17201     |
|17325     |
|17331     |
|17543     |
|19380     |
|19406     |
|19438     |
|20772     |
|20814     |
|20901     |
|21206     |
|21222     |
|23112     |
|23223     |
|27103     |
|27284     |
|27834     |
|28173     |
|29550     |
|29576     |
|29680     |
|30012     |
|30052     |
|30101     |
|30117     |
|30236     |
|30741     |
|32068     |
|32703     |
|32708     |
|32765     |
|33414     |
|33442     |
|33594     |
|33904     |
|34711     |
|34990     |
|36330     |
|38655     |
|39120     |
|39759     |
|41051     |
|42001     |
|43512     |
|44070     |
|44224     |
|44512     |
|44663     |
|44805     |
|45601     |
|46530     |

In [11]:
# Importing the Libraries
import pyspark
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import concat, col, lpad

# Creating Spark Session
spark = SparkSession.builder.appName('Credit Card Transaction').getOrCreate()

schema = StructType([
    StructField("TRANSACTION_ID", IntegerType(), True),
    StructField("DAY", IntegerType(), True),
    StructField("MONTH", IntegerType(), True),
    StructField("YEAR", IntegerType(), True),
    StructField("TIMEID", StringType(), True),
    StructField("CREDIT_CARD_NO", StringType(), True),
    StructField("CUST_SSN", IntegerType(), True),
    StructField("BRANCH_CODE", IntegerType(), True),
    StructField("TRANSACTION_TYPE", StringType(), True),
    StructField("TRANSACTION_VALUE", DoubleType(), True)
])

# Create SparkDataFrame and Reading/loading the Dataset from json file 
credit_df = spark.read.option("multiline", "true").schema(schema).json("data/cdw_sapp_credit.json")


In [12]:
credit_df.columns
credit_df.printSchema()
credit_df.show()
credit_df.count()

root
 |-- TRANSACTION_ID: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- TIMEID: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_SSN: integer (nullable = true)
 |-- BRANCH_CODE: integer (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_VALUE: double (nullable = true)

+--------------+---+-----+----+------+----------------+---------+-----------+----------------+-----------------+
|TRANSACTION_ID|DAY|MONTH|YEAR|TIMEID|  CREDIT_CARD_NO| CUST_SSN|BRANCH_CODE|TRANSACTION_TYPE|TRANSACTION_VALUE|
+--------------+---+-----+----+------+----------------+---------+-----------+----------------+-----------------+
|             1| 14|    2|2018|  NULL|4210653349028689|123459988|        114|       Education|             78.9|
|             2| 20|    3|2018|  NULL|4210653349028689|123459988|         35|   Entertainment|            14.24|
|    

46694

In [13]:
# Rename the "CREDIT_CARD_NO" column to "CUST_CC_NO"
credit_df = credit_df.withColumnRenamed("CREDIT_CARD_NO", "CUST_CC_NO")

# Convert DAY, MONTH, and YEAR into a new column namec TIMEID (YYYYMMDD)
credit_df = credit_df.withColumn("TIMEID", 
                                 concat(
                                     col("YEAR"), 
                                     lpad(col("MONTH").cast("string"), 2, "0"), 
                                     lpad(col("DAY").cast("string"), 2, "0")
                                 ).cast("string"))

print("Credit Card Transactions, cleaned successfully!")

credit_df.show(5)

Credit Card Transactions, cleaned successfully!
+--------------+---+-----+----+--------+----------------+---------+-----------+----------------+-----------------+
|TRANSACTION_ID|DAY|MONTH|YEAR|  TIMEID|      CUST_CC_NO| CUST_SSN|BRANCH_CODE|TRANSACTION_TYPE|TRANSACTION_VALUE|
+--------------+---+-----+----+--------+----------------+---------+-----------+----------------+-----------------+
|             1| 14|    2|2018|20180214|4210653349028689|123459988|        114|       Education|             78.9|
|             2| 20|    3|2018|20180320|4210653349028689|123459988|         35|   Entertainment|            14.24|
|             3|  8|    7|2018|20180708|4210653349028689|123459988|        160|         Grocery|             56.7|
|             4| 19|    4|2018|20180419|4210653349028689|123459988|        114|   Entertainment|            59.73|
|             5| 10|   10|2018|20181010|4210653349028689|123459988|         93|             Gas|             3.59|
+--------------+---+-----+----+-

Load the 3 dataframes into the SQL database named creditcard_capstone

In [23]:

# --- Creating data frame Using createDataFrame() function-----
df = spark.createDataFrame(data = cust_df.rdd, schema = cust_df.schema)
df.printSchema()

#delete unwanted columns to be uploaded into db
df = df.drop("APT_NO", "STREET_NAME")

df.printSchema()

# Reorder the columns
column_order = ["FIRST_NAME", "MIDDLE_NAME", "LAST_NAME", "SSN", "CREDIT_CARD_NO", 
                "FULL_STREET_ADDRESS", "CUST_CITY", "CUST_STATE", "CUST_COUNTRY", 
                "CUST_ZIP", "CUST_PHONE", "CUST_EMAIL", "LAST_UPDATED"]

# Select columns in the desired order
df = df.select(*column_order)

# Print the schema after reordering columns
df.printSchema()

#----  save the dataframe to the MySQL table: CDW_SAPP_CUSTOMER in database: creditcard_capstone-----
df.write.format("jdbc") \
  .mode("overwrite") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "CDW_SAPP_CUSTOMER") \
  .option("user", "root") \
  .option("password", "password") \
  .save()

#----  save the dataframe to the MySQL table: CDW_SAPP_BRANCH in database: creditcard_capstone-----
branch_df.write.format("jdbc") \
  .mode("overwrite") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "CDW_SAPP_BRANCH") \
  .option("user", "root") \
  .option("password", "password") \
  .save()

#----  save the dataframe to the MySQL table: CDW_SAPP_CREDIT_CARD in database: creditcard_capstone-----
credit_df.write.format("jdbc") \
  .mode("overwrite") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "CDW_SAPP_CREDIT_CARD") \
  .option("user", "root") \
  .option("password", "password") \
  .save()

root
 |-- FIRST_NAME: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- SSN: integer (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- APT_NO: string (nullable = true)
 |-- STREET_NAME: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = true)

root
 |-- FIRST_NAME: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- SSN: integer (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_ZIP: s