Load Credit Card Database (SQL)

1.1 Data Extraction and Transformation with Python and PySpark

For “Credit Card System,” create a Python and PySpark SQL program to read/extract the following JSON files according to the specifications found in the mapping document

1. CDW_SAPP_BRANCH.JSON
2. CDW_SAPP_CREDITCARD.JSON
3. CDW_SAPP_CUSTOMER.JSON

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Capstone_demo').getOrCreate()

#Read json file which holds branch information into dataframe
df_branch = spark.read.json("cdw_sapp_branch.json")
df_branch.show(5)

+-----------------+-----------+------------+------------+------------+-----------------+----------+--------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME|BRANCH_PHONE|BRANCH_STATE|    BRANCH_STREET|BRANCH_ZIP|        LAST_UPDATED|
+-----------------+-----------+------------+------------+------------+-----------------+----------+--------------------+
|        Lakeville|          1|Example Bank|  1234565276|          MN|     Bridle Court|     55044|2018-04-18T16:51:...|
|          Huntley|          2|Example Bank|  1234618993|          IL|Washington Street|     60142|2018-04-18T16:51:...|
|SouthRichmondHill|          3|Example Bank|  1234985926|          NY|    Warren Street|     11419|2018-04-18T16:51:...|
|       Middleburg|          4|Example Bank|  1234663064|          FL| Cleveland Street|     32068|2018-04-18T16:51:...|
|    KingOfPrussia|          5|Example Bank|  1234849701|          PA|      14th Street|     19406|2018-04-18T16:51:...|
+-----------------+-----------+-

In [2]:
#Read json file which holds credit card information into dataframe
df_creditCard = spark.read.load("cdw_sapp_credit.json", format="json", header = True,inferSchema = True)
df_creditCard.show(5)

+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|
|         35|4210653349028689|123459988| 20|    3|             2|   Entertainment|            14.24|2018|
|        160|4210653349028689|123459988|  8|    7|             3|         Grocery|             56.7|2018|
|        114|4210653349028689|123459988| 19|    4|             4|   Entertainment|            59.73|2018|
|         93|4210653349028689|123459988| 10|   10|             5|             Gas|             3.59|2018|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
only showing top 5 rows



In [3]:
#Read json file which holds customer information into dataframe
df_customer = spark.read.load("cdw_sapp_custmer.json", format="json", header = True,inferSchema = True)
df_customer.show(5)

+------+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|         CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME|
+------+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|   656|4210653310061055|     Natchez|United States|AHooper@example.com|   1237818|        MS|   39120|      Alec|   Hooper|2018-04-21T12:49:...|         Wm|123456100|Main Street North|
|   829|4210653310102868|Wethersfield|United States|EHolman@example.com|   1238933|        CT|   06109|      Etta|   Holman|2018-04-21T12:49:...|    Brendan|123453023|    Redwood Drive|
|   683|4210653310116272|     Huntley|United States|WDunham@example.co

Extract all the JSON files based on the mapping 

In [4]:
df_branch_new = df_branch.select("BRANCH_CODE", "BRANCH_NAME", "BRANCH_STREET", "BRANCH_CITY", "BRANCH_STATE", "BRANCH_ZIP", "BRANCH_PHONE", "LAST_UPDATED")
# display the schema in tree format
df_branch_new.printSchema()
#show the first five rows
df_branch_new.show(5)


root
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_ZIP: long (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)

+-----------+------------+-----------------+-----------------+------------+----------+------------+--------------------+
|BRANCH_CODE| BRANCH_NAME|    BRANCH_STREET|      BRANCH_CITY|BRANCH_STATE|BRANCH_ZIP|BRANCH_PHONE|        LAST_UPDATED|
+-----------+------------+-----------------+-----------------+------------+----------+------------+--------------------+
|          1|Example Bank|     Bridle Court|        Lakeville|          MN|     55044|  1234565276|2018-04-18T16:51:...|
|          2|Example Bank|Washington Street|          Huntley|          IL|     60142|  1234618993|2018-04-18T16:51:...|
|          3|Example Bank|    Warren Street|S

In [5]:
df_creditCard_new = df_creditCard.select("CREDIT_CARD_NO","DAY", "MONTH", "YEAR", "CUST_SSN", "BRANCH_CODE", "TRANSACTION_TYPE", "TRANSACTION_VALUE", "TRANSACTION_ID")
# display the schema in tree format
df_creditCard_new.printSchema()
#show the first five rows
df_creditCard_new.show(5)


root
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- DAY: long (nullable = true)
 |-- MONTH: long (nullable = true)
 |-- YEAR: long (nullable = true)
 |-- CUST_SSN: long (nullable = true)
 |-- BRANCH_CODE: long (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_VALUE: double (nullable = true)
 |-- TRANSACTION_ID: long (nullable = true)

+----------------+---+-----+----+---------+-----------+----------------+-----------------+--------------+
|  CREDIT_CARD_NO|DAY|MONTH|YEAR| CUST_SSN|BRANCH_CODE|TRANSACTION_TYPE|TRANSACTION_VALUE|TRANSACTION_ID|
+----------------+---+-----+----+---------+-----------+----------------+-----------------+--------------+
|4210653349028689| 14|    2|2018|123459988|        114|       Education|             78.9|             1|
|4210653349028689| 20|    3|2018|123459988|         35|   Entertainment|            14.24|             2|
|4210653349028689|  8|    7|2018|123459988|        160|         Grocery|             56.7|      

In [6]:
df_customer_new=df_customer.select("SSN", "FIRST_NAME", "MIDDLE_NAME", "LAST_NAME", "CREDIT_CARD_NO", "STREET_NAME", "APT_NO", "CUST_CITY",
"CUST_STATE", "CUST_COUNTRY", "CUST_ZIP", "CUST_PHONE", "CUST_EMAIL", "LAST_UPDATED")
# display the schema in tree format
df_customer_new.printSchema()
#show the first five rows
df_customer_new.show(5)

root
 |-- SSN: long (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- STREET_NAME: string (nullable = true)
 |-- APT_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- CUST_PHONE: long (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)

+---------+----------+-----------+---------+----------------+-----------------+------+------------+----------+-------------+--------+----------+-------------------+--------------------+
|      SSN|FIRST_NAME|MIDDLE_NAME|LAST_NAME|  CREDIT_CARD_NO|      STREET_NAME|APT_NO|   CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|CUST_PHONE|         CUST_EMAIL|        LAST_UPDATED|
+---------+----------+-----------+---------

 change the data type of the branch to the following based on the mapping
 [('BRANCH_CODE', 'int'),
  ('BRANCH_NAME', 'varchar'),
  ('BRANCH_STREET', 'varchar'),
  ('BRANCH_CITY', 'varchar'),
  ('BRANCH_STATE', 'varchar'),
  ('BRANCH_ZIP', 'int'),
  ('BRANCH_PHONE', 'varchar'),
  ('LAST_UPDATED', 'TIMESTAMP')]

In [7]:
from pyspark.sql.types import IntegerType, StringType, TimestampType
df_branch_new = df_branch_new\
.withColumn('BRANCH_CODE', df_branch_new.BRANCH_CODE.cast(IntegerType()))\
.withColumn('BRANCH_NAME', df_branch_new.BRANCH_NAME.cast(StringType()))\
.withColumn('BRANCH_STREET', df_branch_new.BRANCH_STREET.cast(StringType()))\
.withColumn('BRANCH_CITY', df_branch_new.BRANCH_CITY.cast(StringType()))\
.withColumn('BRANCH_STATE', df_branch_new.BRANCH_STATE.cast(StringType()))\
.withColumn('BRANCH_ZIP', df_branch_new.BRANCH_ZIP.cast(IntegerType()))\
.withColumn('BRANCH_PHONE', df_branch_new.BRANCH_PHONE.cast(StringType()))\
.withColumn('LAST_UPDATED', df_branch_new.LAST_UPDATED.cast(TimestampType()))
df_branch_new.dtypes 

[('BRANCH_CODE', 'int'),
 ('BRANCH_NAME', 'string'),
 ('BRANCH_STREET', 'string'),
 ('BRANCH_CITY', 'string'),
 ('BRANCH_STATE', 'string'),
 ('BRANCH_ZIP', 'int'),
 ('BRANCH_PHONE', 'string'),
 ('LAST_UPDATED', 'timestamp')]

If the source value is null load default (99999) value else Direct move


In [9]:
df_branch_new.na.fill(value=99999,subset=["BRANCH_ZIP"]).show(5)

+-----------+------------+-----------------+-----------------+------------+----------+------------+-------------------+
|BRANCH_CODE| BRANCH_NAME|    BRANCH_STREET|      BRANCH_CITY|BRANCH_STATE|BRANCH_ZIP|BRANCH_PHONE|       LAST_UPDATED|
+-----------+------------+-----------------+-----------------+------------+----------+------------+-------------------+
|          1|Example Bank|     Bridle Court|        Lakeville|          MN|     55044|  1234565276|2018-04-18 15:51:47|
|          2|Example Bank|Washington Street|          Huntley|          IL|     60142|  1234618993|2018-04-18 15:51:47|
|          3|Example Bank|    Warren Street|SouthRichmondHill|          NY|     11419|  1234985926|2018-04-18 15:51:47|
|          4|Example Bank| Cleveland Street|       Middleburg|          FL|     32068|  1234663064|2018-04-18 15:51:47|
|          5|Example Bank|      14th Street|    KingOfPrussia|          PA|     19406|  1234849701|2018-04-18 15:51:47|
+-----------+------------+--------------

Change the format of phone number to (XXX)XXX-XXXX


In [10]:
from pyspark.sql.functions import concat, lit, col

df_branch_new =df_branch_new.withColumn("BRANCH_PHONE", concat(lit("("),col("BRANCH_PHONE").substr(1, 3), lit(")"),\
                                                        col("BRANCH_PHONE").substr(4, 3), lit("-"), \
                                                        col("BRANCH_PHONE").substr(7, 4)))

df_branch_new.show(5)

+-----------+------------+-----------------+-----------------+------------+----------+-------------+-------------------+
|BRANCH_CODE| BRANCH_NAME|    BRANCH_STREET|      BRANCH_CITY|BRANCH_STATE|BRANCH_ZIP| BRANCH_PHONE|       LAST_UPDATED|
+-----------+------------+-----------------+-----------------+------------+----------+-------------+-------------------+
|          1|Example Bank|     Bridle Court|        Lakeville|          MN|     55044|(123)456-5276|2018-04-18 15:51:47|
|          2|Example Bank|Washington Street|          Huntley|          IL|     60142|(123)461-8993|2018-04-18 15:51:47|
|          3|Example Bank|    Warren Street|SouthRichmondHill|          NY|     11419|(123)498-5926|2018-04-18 15:51:47|
|          4|Example Bank| Cleveland Street|       Middleburg|          FL|     32068|(123)466-3064|2018-04-18 15:51:47|
|          5|Example Bank|      14th Street|    KingOfPrussia|          PA|     19406|(123)484-9701|2018-04-18 15:51:47|
+-----------+------------+------

change the data type of the credit data to the following 
[('CREDIT_CARD_NO', 'varchar'),
 ('DAY', 'varchar'),
 ('MONTH', 'varchar'),
 ('YEAR', 'varchar'),
 ('CUST_SSN', 'int'),
 ('BRANCH_CODE', 'int'),
 ('TRANSACTION_TYPE', 'varchar'),
 ('TRANSACTION_VALUE', 'double'),
 ('TRANSACTION_ID', 'int')]

In [11]:
from pyspark.sql.types import IntegerType, VarcharType, DoubleType
df_creditCard_new = df_creditCard_new\
.withColumn('CREDIT_CARD_NO', df_creditCard_new.CREDIT_CARD_NO.cast(VarcharType(30)))\
.withColumn('DAY', df_creditCard_new.DAY.cast(StringType()))\
.withColumn('MONTH', df_creditCard_new.MONTH.cast(StringType()))\
.withColumn('YEAR', df_creditCard_new.YEAR.cast(StringType()))\
.withColumn('CUST_SSN', df_creditCard_new.CUST_SSN.cast(IntegerType()))\
.withColumn('BRANCH_CODE', df_creditCard_new.BRANCH_CODE.cast(IntegerType()))\
.withColumn('TRANSACTION_TYPE', df_creditCard_new.TRANSACTION_TYPE.cast(StringType()))\
.withColumn('TRANSACTION_VALUE', df_creditCard_new.TRANSACTION_VALUE.cast(DoubleType()))\
.withColumn('TRANSACTION_ID', df_creditCard_new.TRANSACTION_ID.cast(IntegerType()))\

df_creditCard_new.dtypes 

[('CREDIT_CARD_NO', 'string'),
 ('DAY', 'string'),
 ('MONTH', 'string'),
 ('YEAR', 'string'),
 ('CUST_SSN', 'int'),
 ('BRANCH_CODE', 'int'),
 ('TRANSACTION_TYPE', 'string'),
 ('TRANSACTION_VALUE', 'double'),
 ('TRANSACTION_ID', 'int')]

Convert DAY, MONTH, and YEAR into a TIMEID (YYYYMMDD)


In [22]:
from pyspark.sql.functions import concat, lit, col

df_creditCard_new =df_creditCard_new.withColumn("TIMEID",concat(col("YEAR"),
                                                        col("MONTH"), 
                                                        col("DAY")))

df_creditCard_new.show(5)

+----------------+---+-----+----+---------+-----------+----------------+-----------------+--------------+--------+
|  CREDIT_CARD_NO|DAY|MONTH|YEAR| CUST_SSN|BRANCH_CODE|TRANSACTION_TYPE|TRANSACTION_VALUE|TRANSACTION_ID|  TIMEID|
+----------------+---+-----+----+---------+-----------+----------------+-----------------+--------------+--------+
|4210653349028689| 14|    2|2018|123459988|        114|       Education|             78.9|             1| 2018214|
|4210653349028689| 20|    3|2018|123459988|         35|   Entertainment|            14.24|             2| 2018320|
|4210653349028689|  8|    7|2018|123459988|        160|         Grocery|             56.7|             3|  201878|
|4210653349028689| 19|    4|2018|123459988|        114|   Entertainment|            59.73|             4| 2018419|
|4210653349028689| 10|   10|2018|123459988|         93|             Gas|             3.59|             5|20181010|
+----------------+---+-----+----+---------+-----------+----------------+--------

In [23]:
df_creditCard_new = df_creditCard_new.select("CREDIT_CARD_NO","TIMEID", "CUST_SSN", "BRANCH_CODE", "TRANSACTION_TYPE", "TRANSACTION_VALUE", "TRANSACTION_ID")
df_creditCard_new.show(5)

+----------------+--------+---------+-----------+----------------+-----------------+--------------+
|  CREDIT_CARD_NO|  TIMEID| CUST_SSN|BRANCH_CODE|TRANSACTION_TYPE|TRANSACTION_VALUE|TRANSACTION_ID|
+----------------+--------+---------+-----------+----------------+-----------------+--------------+
|4210653349028689| 2018214|123459988|        114|       Education|             78.9|             1|
|4210653349028689| 2018320|123459988|         35|   Entertainment|            14.24|             2|
|4210653349028689|  201878|123459988|        160|         Grocery|             56.7|             3|
|4210653349028689| 2018419|123459988|        114|   Entertainment|            59.73|             4|
|4210653349028689|20181010|123459988|         93|             Gas|             3.59|             5|
+----------------+--------+---------+-----------+----------------+-----------------+--------------+
only showing top 5 rows



Convert the first Name to Title Case
Convert the middle name in lower case
Convert the Last Name in Title Case
Concatenate Apartment no and Street name of customer's Residence with comma as a seperator (Street, Apartment)

Change the format of phone number to (XXX)XXX-XXXX

In [14]:
from pyspark.sql.functions import initcap, lower, concat_ws
df_customer_new = df_customer_new\
    .withColumn("FIRST_NAME", initcap(df_customer_new["FIRST_NAME"]))\
    .withColumn("MIDDLE_NAME", lower(df_customer_new["MIDDLE_NAME"]))\
    .withColumn("LAST_NAME", initcap(df_customer_new["LAST_NAME"]))\
    .withColumn("FULL_STREET_ADDRESS", concat(df_customer_new["STREET_NAME"],lit(", "), df_customer_new["APT_NO"]))\
    

df_customer_new =df_customer_new.withColumn("CUST_PHONE", concat(lit("("),col("CUST_PHONE").substr(1, 3), lit(")"),\
                                                        col("CUST_PHONE").substr(4, 3), lit("-"), \
                                                        col("CUST_PHONE").substr(7, 4)))

df_customer_new.show(5)

+---------+----------+-----------+---------+----------------+-----------------+------+------------+----------+-------------+--------+-------------+-------------------+--------------------+--------------------+
|      SSN|FIRST_NAME|MIDDLE_NAME|LAST_NAME|  CREDIT_CARD_NO|      STREET_NAME|APT_NO|   CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|   CUST_PHONE|         CUST_EMAIL|        LAST_UPDATED| FULL_STREET_ADDRESS|
+---------+----------+-----------+---------+----------------+-----------------+------+------------+----------+-------------+--------+-------------+-------------------+--------------------+--------------------+
|123456100|      Alec|         wm|   Hooper|4210653310061055|Main Street North|   656|     Natchez|        MS|United States|   39120|((12)3)7-81-8|AHooper@example.com|2018-04-21T12:49:...|Main Street North...|
|123453023|      Etta|    brendan|   Holman|4210653310102868|    Redwood Drive|   829|Wethersfield|        CT|United States|   06109|((12)3)8-93-3|EHolman@examp

In [15]:
df_customer_new = df_customer_new.withColumn("FULL_STREET_ADDRESS", df_customer_new.FULL_STREET_ADDRESS.cast(VarcharType(50)))
df_customer_new = df_customer_new.select("SSN", "FIRST_NAME", "MIDDLE_NAME", "LAST_NAME", "CREDIT_CARD_NO", "FULL_STREET_ADDRESS", "CUST_CITY",
"CUST_STATE", "CUST_COUNTRY", "CUST_ZIP", "CUST_PHONE", "CUST_EMAIL", "LAST_UPDATED")
df_customer_new.show(5)

+---------+----------+-----------+---------+----------------+--------------------+------------+----------+-------------+--------+-------------+-------------------+--------------------+
|      SSN|FIRST_NAME|MIDDLE_NAME|LAST_NAME|  CREDIT_CARD_NO| FULL_STREET_ADDRESS|   CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|   CUST_PHONE|         CUST_EMAIL|        LAST_UPDATED|
+---------+----------+-----------+---------+----------------+--------------------+------------+----------+-------------+--------+-------------+-------------------+--------------------+
|123456100|      Alec|         wm|   Hooper|4210653310061055|Main Street North...|     Natchez|        MS|United States|   39120|((12)3)7-81-8|AHooper@example.com|2018-04-21T12:49:...|
|123453023|      Etta|    brendan|   Holman|4210653310102868|  Redwood Drive, 829|Wethersfield|        CT|United States|   06109|((12)3)8-93-3|EHolman@example.com|2018-04-21T12:49:...|
|123454487|    Wilber|   ezequiel|   Dunham|4210653310116272|12th Street Ea

In [16]:
df_customer_new.dtypes

[('SSN', 'bigint'),
 ('FIRST_NAME', 'string'),
 ('MIDDLE_NAME', 'string'),
 ('LAST_NAME', 'string'),
 ('CREDIT_CARD_NO', 'string'),
 ('FULL_STREET_ADDRESS', 'string'),
 ('CUST_CITY', 'string'),
 ('CUST_STATE', 'string'),
 ('CUST_COUNTRY', 'string'),
 ('CUST_ZIP', 'string'),
 ('CUST_PHONE', 'string'),
 ('CUST_EMAIL', 'string'),
 ('LAST_UPDATED', 'string')]

Req-1.2  Data loading into Database
 Once PySpark reads data from JSON files, and then utilizes Python, PySpark, and Python modules to load data into RDBMS(SQL), perform the following:

Create a Database in SQL(MySQL), named “creditcard_capstone.”
Create a Python and Pyspark Program to load/write the “Credit Card System Data” into RDBMS(creditcard_capstone).
Tables should be created by the following names in RDBMS:
CDW_SAPP_BRANCH
CDW_SAPP_CREDIT_CARD
CDW_SAPP_CUSTOMER +- 
Create a Database in SQL(MySQL), named “creditcard_capstone.”


A Database is created on MySQL workbench 
syntax for creating database “creditcard_capstone." 

CREATE DATABASE `creditcard_capstone` /*!40100 DEFAULT CHARACTER SET latin1 */ /*!80016 DEFAULT ENCRYPTION='N' */;

In [19]:
# creating a table called 'CDW_SAPP_BRANCH' into creditcard_capstone database from SparkSQL Dataframe
# by connecting mysql workbench

df_branch_new.write.format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "creditcard_capstone.CDW_SAPP_BRANCH") \
  .option("user", "root") \
  .option("password", "password") \
  .save()

In [24]:
# creating a table called 'CDW_SAPP_CREDIT_CARD' into creditcard_capstone database from SparkSQL Dataframe by connecting mysql workbench
df_creditCard_new.write.format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "creditcard_capstone.CDW_SAPP_CREDIT_CARD") \
  .option("user", "root") \
  .option("password", "password") \
  .save()

In [25]:
# creating a table called 'CDW_SAPP_BRANCH' into creditcard_capstone database from SparkSQL Dataframe by connecting mysql workbench
df_customer_new.write.format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "creditcard_capstone.CDW_SAPP_CUSTOMER") \
  .option("user", "root") \
  .option("password", "password") \
  .save()

In [26]:
df_branch_new=spark.read.format("jdbc").options(driver="com.mysql.cj.jdbc.Driver",\
 user="root",\
 password="password",\
 url="jdbc:mysql://localhost:3306/creditcard_capstone",\
 dbtable="creditcard_capstone.CDW_SAPP_BRANCH").load()
df_branch_new.show(5)

+-----------+------------+-----------------+-----------------+------------+----------+-------------+-------------------+
|BRANCH_CODE| BRANCH_NAME|    BRANCH_STREET|      BRANCH_CITY|BRANCH_STATE|BRANCH_ZIP| BRANCH_PHONE|       LAST_UPDATED|
+-----------+------------+-----------------+-----------------+------------+----------+-------------+-------------------+
|          1|Example Bank|     Bridle Court|        Lakeville|          MN|     55044|(123)456-5276|2018-04-18 15:51:47|
|          2|Example Bank|Washington Street|          Huntley|          IL|     60142|(123)461-8993|2018-04-18 15:51:47|
|          3|Example Bank|    Warren Street|SouthRichmondHill|          NY|     11419|(123)498-5926|2018-04-18 15:51:47|
|          4|Example Bank| Cleveland Street|       Middleburg|          FL|     32068|(123)466-3064|2018-04-18 15:51:47|
|          5|Example Bank|      14th Street|    KingOfPrussia|          PA|     19406|(123)484-9701|2018-04-18 15:51:47|
+-----------+------------+------

In [27]:
df_creditCard_new=spark.read.format("jdbc").options(driver="com.mysql.cj.jdbc.Driver",\
 user="root",\
 password="password",\
 url="jdbc:mysql://localhost:3306/creditcard_capstone",\
 dbtable="creditcard_capstone.CDW_SAPP_CREDIT_CARD").load()
df_creditCard_new.show(5)

+----------------+-------+---------+-----------+----------------+-----------------+--------------+
|  CREDIT_CARD_NO| TIMEID| CUST_SSN|BRANCH_CODE|TRANSACTION_TYPE|TRANSACTION_VALUE|TRANSACTION_ID|
+----------------+-------+---------+-----------+----------------+-----------------+--------------+
|4210653312478046|2018813|123455692|        156|         Grocery|            91.08|         22562|
|4210653349028689|2018214|123459988|        114|       Education|             78.9|             1|
|4210653342242023|2018315|123451310|        180|           Bills|            77.79|         45069|
|4210653349028689|2018320|123459988|         35|   Entertainment|            14.24|             2|
|4210653312478046|2018626|123455692|        114|           Bills|             22.2|         22563|
+----------------+-------+---------+-----------+----------------+-----------------+--------------+
only showing top 5 rows



In [28]:
df_customer_new=spark.read.format("jdbc").options(driver="com.mysql.cj.jdbc.Driver",\
 user="root",\
 password="password",\
 url="jdbc:mysql://localhost:3306/creditcard_capstone",\
 dbtable="creditcard_capstone.CDW_SAPP_CUSTOMER").load()
df_customer_new.show(5)

+---------+----------+-----------+---------+----------------+--------------------+------------+----------+-------------+--------+-------------+-------------------+--------------------+
|      SSN|FIRST_NAME|MIDDLE_NAME|LAST_NAME|  CREDIT_CARD_NO| FULL_STREET_ADDRESS|   CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|   CUST_PHONE|         CUST_EMAIL|        LAST_UPDATED|
+---------+----------+-----------+---------+----------------+--------------------+------------+----------+-------------+--------+-------------+-------------------+--------------------+
|123456100|      Alec|         wm|   Hooper|4210653310061055|Main Street North...|     Natchez|        MS|United States|   39120|((12)3)7-81-8|AHooper@example.com|2018-04-21T12:49:...|
|123453023|      Etta|    brendan|   Holman|4210653310102868|  Redwood Drive, 829|Wethersfield|        CT|United States|   06109|((12)3)8-93-3|EHolman@example.com|2018-04-21T12:49:...|
|123454487|    Wilber|   ezequiel|   Dunham|4210653310116272|12th Street Ea