# Business Requirements - ETL  

### 1. Functional Requirements - Load Credit Card Database (SQL)

Req-1.1 Data Extraction and Transformation with Python and PySpark

For “Credit Card System,” create a Python and PySpark SQL program to read/extract the following JSON files    
        according to the specifications found in the mapping document.

        1. CDW_SAPP_BRANCH.JSON
        2. CDW_SAPP_CREDITCARD.JSON
        3. CDW_SAPP_CUSTOMER.JSON       


In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from dotenv import load_dotenv
import os

In [44]:
#create spark session
spark = SparkSession.builder.master("local[*]").appName("Capstone").getOrCreate()

In [45]:
#load/read cdw_sapp_custmer.json file into spark dataframe 
customer_df=spark.read.load("creditcard_system_data/cdw_sapp_custmer.json",format="json")

In [46]:
#show first five records from the customer dataframe
customer_df.show(5,truncate=False)

+------+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+-----------------------------+-----------+---------+-----------------+
|APT_NO|CREDIT_CARD_NO  |CUST_CITY   |CUST_COUNTRY |CUST_EMAIL         |CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|LAST_UPDATED                 |MIDDLE_NAME|SSN      |STREET_NAME      |
+------+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+-----------------------------+-----------+---------+-----------------+
|656   |4210653310061055|Natchez     |United States|AHooper@example.com|1237818   |MS        |39120   |Alec      |Hooper   |2018-04-21T12:49:02.000-04:00|Wm         |123456100|Main Street North|
|829   |4210653310102868|Wethersfield|United States|EHolman@example.com|1238933   |CT        |06109   |Etta      |Holman   |2018-04-21T12:49:02.000-04:00|Brendan    |123453023|Redwood Drive    |
|683   |4210653310116272|

In [47]:
#print schema of the dataset
customer_df.printSchema()

root
 |-- APT_NO: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: long (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- STREET_NAME: string (nullable = true)



In [48]:
#temporary table view of the custmer dataframe
customer_df.createOrReplaceTempView("customer")

In [51]:
#trnasform data as per mapping document
transform_customer_df=spark.sql("SELECT CAST(SSN AS INT) AS SSN,\
                            CONCAT(UPPER(SUBSTR(FIRST_NAME,1,1)),LOWER(substr(FIRST_NAME,2))) AS FIRST_NAME, \
                            CONCAT(LOWER(MIDDLE_NAME)) MIDDLE_NAME, \
                            CONCAT(UPPER(substr(LAST_NAME,1,1)),LOWER(substr(LAST_NAME,2))) AS LAST_NAME, \
                            CREDIT_CARD_NO,\
                            CONCAT(STREET_NAME, ',' ,APT_NO) AS FULL_STREET_ADDRESS, \
                            CUST_CITY, \
                            CUST_STATE, \
                            CUST_COUNTRY,\
                            CAST(CUST_ZIP AS INT) AS CUST_ZIP, \
                            CONCAT('(',LEFT(RAND()*100+201,3),')',SUBSTR(CUST_PHONE,1,3),'-',SUBSTR(CUST_PHONE,4)) AS CUST_PHONE, \
                            CUST_EMAIL,\
                            CAST(LAST_UPDATED AS TIMESTAMP) AS LAST_UPDATED\
                            FROM customer")


In [52]:
#show first 5 records from the transform customer dataframe
transform_customer_df.show(5,truncate=False)

+---------+----------+-----------+---------+----------------+---------------------+------------+----------+-------------+--------+-------------+-------------------+-------------------+
|SSN      |FIRST_NAME|MIDDLE_NAME|LAST_NAME|CREDIT_CARD_NO  |FULL_STREET_ADDRESS  |CUST_CITY   |CUST_STATE|CUST_COUNTRY |CUST_ZIP|CUST_PHONE   |CUST_EMAIL         |LAST_UPDATED       |
+---------+----------+-----------+---------+----------------+---------------------+------------+----------+-------------+--------+-------------+-------------------+-------------------+
|123456100|Alec      |wm         |Hooper   |4210653310061055|Main Street North,656|Natchez     |MS        |United States|39120   |(208)123-7818|AHooper@example.com|2018-04-21 12:49:02|
|123453023|Etta      |brendan    |Holman   |4210653310102868|Redwood Drive,829    |Wethersfield|CT        |United States|6109    |(243)123-8933|EHolman@example.com|2018-04-21 12:49:02|
|123454487|Wilber    |ezequiel   |Dunham   |4210653310116272|12th Street Ea

In [53]:
#printschema of transform cutomer dataframe
transform_customer_df.printSchema()

root
 |-- SSN: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_ZIP: integer (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)



In [54]:
#read/load cdw_sapp_branch.json file into spark branch dataframe
branch_df=spark.read.load("creditcard_system_data/cdw_sapp_branch.json",format="json")

In [55]:
#show first five record from branch dataframe
branch_df.show(4,truncate=False)

+-----------------+-----------+------------+------------+------------+-----------------+----------+-----------------------------+
|BRANCH_CITY      |BRANCH_CODE|BRANCH_NAME |BRANCH_PHONE|BRANCH_STATE|BRANCH_STREET    |BRANCH_ZIP|LAST_UPDATED                 |
+-----------------+-----------+------------+------------+------------+-----------------+----------+-----------------------------+
|Lakeville        |1          |Example Bank|1234565276  |MN          |Bridle Court     |55044     |2018-04-18T16:51:47.000-04:00|
|Huntley          |2          |Example Bank|1234618993  |IL          |Washington Street|60142     |2018-04-18T16:51:47.000-04:00|
|SouthRichmondHill|3          |Example Bank|1234985926  |NY          |Warren Street    |11419     |2018-04-18T16:51:47.000-04:00|
|Middleburg       |4          |Example Bank|1234663064  |FL          |Cleveland Street |32068     |2018-04-18T16:51:47.000-04:00|
+-----------------+-----------+------------+------------+------------+-----------------+--

In [56]:
#print schema of brach dataframe
branch_df.printSchema()

root
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_ZIP: long (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)



In [73]:
#transformed branch dataframe according to the mapping document
transform_branch_df=branch_df.select(col("BRANCH_CODE").astype('int').alias('BRANCH_CODE'),\
                 col("BRANCH_NAME"),\
                 col("BRANCH_STREET"), \
                 col("BRANCH_CITY"),\
                 col("BRANCH_STATE"), \
                 when(col("BRANCH_ZIP").isNull(),'99999').otherwise(col("BRANCH_ZIP")).astype('int').alias('BRANCH_ZIP'),\
                 (concat( lit("(") ,substring(col("BRANCH_PHONE"),1,3),lit(")") ,substring(col("BRANCH_PHONE"),4,3), \
                          lit("-") ,substring(col("BRANCH_PHONE"),7,4) )).alias("BRANCH_PHONE"),\
                 to_timestamp(col("LAST_UPDATED")).alias("LAST_UPDATED")
                )

In [74]:
#print schema of transformed branch dataframe
transform_branch_df.printSchema()

root
 |-- BRANCH_CODE: integer (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_ZIP: integer (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)



In [75]:
#show first five records from transformed dataframe
transform_branch_df.show(5,truncate=True)

+-----------+------------+-----------------+-----------------+------------+----------+-------------+-------------------+
|BRANCH_CODE| BRANCH_NAME|    BRANCH_STREET|      BRANCH_CITY|BRANCH_STATE|BRANCH_ZIP| BRANCH_PHONE|       LAST_UPDATED|
+-----------+------------+-----------------+-----------------+------------+----------+-------------+-------------------+
|          1|Example Bank|     Bridle Court|        Lakeville|          MN|     55044|(123)456-5276|2018-04-18 16:51:47|
|          2|Example Bank|Washington Street|          Huntley|          IL|     60142|(123)461-8993|2018-04-18 16:51:47|
|          3|Example Bank|    Warren Street|SouthRichmondHill|          NY|     11419|(123)498-5926|2018-04-18 16:51:47|
|          4|Example Bank| Cleveland Street|       Middleburg|          FL|     32068|(123)466-3064|2018-04-18 16:51:47|
|          5|Example Bank|      14th Street|    KingOfPrussia|          PA|     19406|(123)484-9701|2018-04-18 16:51:47|
+-----------+------------+------

In [76]:
#read/load cdw_sapp_credit.json file into pandas credit dataframe
credit_df=spark.read.load("creditcard_system_data/cdw_sapp_credit.json",format="json").toPandas()

In [77]:
#first five records from credit dataframe
credit_df.head(5)

Unnamed: 0,BRANCH_CODE,CREDIT_CARD_NO,CUST_SSN,DAY,MONTH,TRANSACTION_ID,TRANSACTION_TYPE,TRANSACTION_VALUE,YEAR
0,114,4210653349028689,123459988,14,2,1,Education,78.9,2018
1,35,4210653349028689,123459988,20,3,2,Entertainment,14.24,2018
2,160,4210653349028689,123459988,8,7,3,Grocery,56.7,2018
3,114,4210653349028689,123459988,19,4,4,Entertainment,59.73,2018
4,93,4210653349028689,123459988,10,10,5,Gas,3.59,2018


In [78]:
#datatypes of credit dataset
credit_df.dtypes

BRANCH_CODE            int64
CREDIT_CARD_NO        object
CUST_SSN               int64
DAY                    int64
MONTH                  int64
TRANSACTION_ID         int64
TRANSACTION_TYPE      object
TRANSACTION_VALUE    float64
YEAR                   int64
dtype: object

In [79]:
#transform credit dataframe according to mapping document
credit_df['MONTH']=credit_df['MONTH'].astype(str).str.zfill(2)
credit_df['DAY']=credit_df['DAY'].astype(str).str.zfill(2)
credit_df["TIMEID"]=credit_df['YEAR'].astype(str)+ \
                    credit_df['MONTH'] + credit_df['DAY']
credit_df.rename(columns={'CREDIT_CARD_NO':'CUST_CC_NO'},inplace=True)

In [80]:
#first 10 records from the transformed dataframe
credit_df.head(10)

Unnamed: 0,BRANCH_CODE,CUST_CC_NO,CUST_SSN,DAY,MONTH,TRANSACTION_ID,TRANSACTION_TYPE,TRANSACTION_VALUE,YEAR,TIMEID
0,114,4210653349028689,123459988,14,2,1,Education,78.9,2018,20180214
1,35,4210653349028689,123459988,20,3,2,Entertainment,14.24,2018,20180320
2,160,4210653349028689,123459988,8,7,3,Grocery,56.7,2018,20180708
3,114,4210653349028689,123459988,19,4,4,Entertainment,59.73,2018,20180419
4,93,4210653349028689,123459988,10,10,5,Gas,3.59,2018,20181010
5,164,4210653349028689,123459988,28,5,6,Education,6.89,2018,20180528
6,119,4210653349028689,123459988,19,5,7,Entertainment,43.39,2018,20180519
7,23,4210653349028689,123459988,8,8,8,Gas,95.39,2018,20180808
8,166,4210653349028689,123459988,18,3,9,Entertainment,93.26,2018,20180318
9,83,4210653349028689,123459988,3,9,10,Bills,100.38,2018,20180903


In [81]:
#selected columns to be added into transformed dataframe
trans_credit_df=credit_df[['CUST_CC_NO','TIMEID','CUST_SSN','BRANCH_CODE','TRANSACTION_TYPE','TRANSACTION_VALUE','TRANSACTION_ID']]

In [82]:
#first 10 records from transformed customer dataframe
trans_credit_df.head(10)

Unnamed: 0,CUST_CC_NO,TIMEID,CUST_SSN,BRANCH_CODE,TRANSACTION_TYPE,TRANSACTION_VALUE,TRANSACTION_ID
0,4210653349028689,20180214,123459988,114,Education,78.9,1
1,4210653349028689,20180320,123459988,35,Entertainment,14.24,2
2,4210653349028689,20180708,123459988,160,Grocery,56.7,3
3,4210653349028689,20180419,123459988,114,Entertainment,59.73,4
4,4210653349028689,20181010,123459988,93,Gas,3.59,5
5,4210653349028689,20180528,123459988,164,Education,6.89,6
6,4210653349028689,20180519,123459988,119,Entertainment,43.39,7
7,4210653349028689,20180808,123459988,23,Gas,95.39,8
8,4210653349028689,20180318,123459988,166,Entertainment,93.26,9
9,4210653349028689,20180903,123459988,83,Bills,100.38,10


In [84]:
#creting spark dataframe from transformed pandas customer dataframe
transform_credit_df=spark.createDataFrame(trans_credit_df)

In [85]:
#show first five records from customer spark dataframe
transform_credit_df.show(5,truncate=True)

+----------------+--------+---------+-----------+----------------+-----------------+--------------+
|      CUST_CC_NO|  TIMEID| CUST_SSN|BRANCH_CODE|TRANSACTION_TYPE|TRANSACTION_VALUE|TRANSACTION_ID|
+----------------+--------+---------+-----------+----------------+-----------------+--------------+
|4210653349028689|20180214|123459988|        114|       Education|             78.9|             1|
|4210653349028689|20180320|123459988|         35|   Entertainment|            14.24|             2|
|4210653349028689|20180708|123459988|        160|         Grocery|             56.7|             3|
|4210653349028689|20180419|123459988|        114|   Entertainment|            59.73|             4|
|4210653349028689|20181010|123459988|         93|             Gas|             3.59|             5|
+----------------+--------+---------+-----------+----------------+-----------------+--------------+
only showing top 5 rows



In [86]:
load_dotenv()
USER=os.environ.get('user')
PASSWORD=os.environ.get('password')

### Req-1.2   
Data loading into Database


Create a Python and Pyspark Program to load/write the “Credit Card System Data” into RDBMS(creditcard_capstone).     
Tables should be created by the following names in RDBMS:     

      CDW_SAPP_BRANCH    
      CDW_SAPP_CREDIT_CARD    
      CDW_SAPP_CUSTOMER   


In [87]:
#load branch data from spark dataframe into reditcard_capstone.CDW_SAPP_BRANCH table in RDBMS
transform_branch_df.write \
                    .format("jdbc") \
                    .mode("overwrite") \
                    .option("truncate","true") \
                    .option("url", "jdbc:mysql://localhost:3308/") \
                    .option("createTableColumnTypes", "BRANCH_CODE INT,BRANCH_NAME VARCHAR(25),BRANCH_STREET VARCHAR(50),\
                                                       BRANCH_CITY VARCHAR(25), BRANCH_STATE VARCHAR(10),BRANCH_ZIP INT,\
                                                       BRANCH_PHONE VARCHAR(25),LAST_UPDATED TIMESTAMP")\
                    .option("dbtable", "creditcard_capstone.CDW_SAPP_BRANCH") \
                    .option("truncate","true") \
                    .option("user", USER) \
                    .option("password", PASSWORD) \
                    .option("characterEncoding","UTF-8") \
                    .option("useUnicode", "true") \
                    .save()

In [89]:
#load creditcard data from spark dataframe into reditcard_capstone.CDW_SAPP_CREDIT_CARD table in RDBMS
transform_credit_df.write \
                    .format("jdbc") \
                    .mode("overwrite") \
                    .option("truncate","true") \
                    .option("url", "jdbc:mysql://localhost:3308/") \
                    .option("createTableColumnTypes", "CUST_CC_NO VARCHAR(25),TIMEID VARCHAR(15),CUST_SSN INT,\
                                                       BRANCH_CODE INT, TRANSACTION_TYPE VARCHAR(25),TRANSACTION_VALUE DOUBLE,\
                                                       TRANSACTION_ID INT")\
                    .option("dbtable", "creditcard_capstone.CDW_SAPP_CREDIT_CARD") \
                    .option("truncate","true") \
                    .option("user", USER) \
                    .option("password", PASSWORD) \
                    .option("characterEncoding","UTF-8") \
                    .option("useUnicode", "true") \
                    .save()

In [92]:
#load cUSTOMER data from spark dataframe into reditcard_capstone.CDW_SAPP_CREDIT_CARD table in RDBMS
transform_customer_df.write \
                    .format("jdbc") \
                    .mode("overwrite") \
                    .option("truncate","true") \
                    .option("url", "jdbc:mysql://localhost:3308/") \
                    .option("createTableColumnTypes", "SSN INT,FIRST_NAME VARCHAR(25),MIDDLE_NAME VARCHAR(25),LAST_NAME VARCHAR(25),\
                                                       CREDIT_CARD_NO VARCHAR(25),FULL_STREET_ADDRESS VARCHAR(50),CUST_CITY VARCHAR(25),\
                                                       CUST_STATE VARCHAR(25),CUST_COUNTRY VARCHAR(25),CUST_ZIP INT,CUST_PHONE VARCHAR(15),\
                                                       CUST_EMAIL VARCHAR(50),LAST_UPDATED TIMESTAMP")\
                    .option("dbtable", "creditcard_capstone.CDW_SAPP_CUSTOMER") \
                    .option("truncate","true") \
                    .option("user", USER) \
                    .option("password", PASSWORD) \
                    .option("characterEncoding","UTF-8") \
                    .option("useUnicode", "true") \
                    .save()