### Business Requirements - ETL
#### 1. Functional Requirements - Load Credit Card Database 


#### Req-1.1
Data Extraction and Transformation with Python and 
PySpark

For “Credit Card System,” create a Python and PySpark SQL program to read/extract the following JSON files according to the specifications found in the mapping document.
 
1. CDW_SAPP_BRANCH.JSON
2. CDW_SAPP_CREDITCARD.JSON
3. CDW_SAPP_CUSTOMER.JSON


In [97]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("Capstone").getOrCreate()

#### READ TABLES AND TRANSFORM

In [98]:
# branch = spark.read.json("./data/cdw_sapp_branch.json")
branch = spark.read.load("./data/cdw_sapp_branch.json", format = 'json')
branch.printSchema()
branch.show()
# type(branch)
# branch.createTempView("branch")


root
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_ZIP: long (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)

+-----------------+-----------+------------+------------+------------+-------------------+----------+--------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME|BRANCH_PHONE|BRANCH_STATE|      BRANCH_STREET|BRANCH_ZIP|        LAST_UPDATED|
+-----------------+-----------+------------+------------+------------+-------------------+----------+--------------------+
|        Lakeville|          1|Example Bank|  1234565276|          MN|       Bridle Court|     55044|2018-04-18T16:51:...|
|          Huntley|          2|Example Bank|  1234618993|          IL|  Washington Street|     60142|2018-04-18T16:51:...|
|SouthRichmondHill|          3|Exam

In [129]:
branch.na.fill({'BRANCH_ZIP':00000}).show()

+-----------------+-----------+------------+------------+------------+-------------------+----------+--------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME|BRANCH_PHONE|BRANCH_STATE|      BRANCH_STREET|BRANCH_ZIP|        LAST_UPDATED|
+-----------------+-----------+------------+------------+------------+-------------------+----------+--------------------+
|        Lakeville|          1|Example Bank|  1234565276|          MN|       Bridle Court|     55044|2018-04-18T16:51:...|
|          Huntley|          2|Example Bank|  1234618993|          IL|  Washington Street|     60142|2018-04-18T16:51:...|
|SouthRichmondHill|          3|Example Bank|  1234985926|          NY|      Warren Street|     11419|2018-04-18T16:51:...|
|       Middleburg|          4|Example Bank|  1234663064|          FL|   Cleveland Street|     32068|2018-04-18T16:51:...|
|    KingOfPrussia|          5|Example Bank|  1234849701|          PA|        14th Street|     19406|2018-04-18T16:51:...|
|         Paters

In [137]:
from pyspark.sql.functions import concat,lit,col,substring,to_timestamp

In [143]:
tr_branch = branch.select(col("BRANCH_CITY") \
          ,col("BRANCH_CODE")\
          ,col("BRANCH_NAME") \
          ,(concat( lit("(") ,substring(col("BRANCH_PHONE"),1,3),lit(")") ,substring(col("BRANCH_PHONE"),4,3), lit("-") ,substring(col("BRANCH_PHONE"),7,4) )).alias("BRANCH_PHONE") \
          ,col("BRANCH_STATE") \
          ,col("BRANCH_STREET")\
          ,col("BRANCH_ZIP") \
          ,to_timestamp(concat(substring(col("LAST_UPDATED"),1,10),lit(" "),substring(col("LAST_UPDATED"),12,20))).alias("LAST_UPDATED") \
          )
# tr_branch = spark.sql("SELECT BRANCH_CITY \
#           ,BRANCH_CODE \
#           ,BRANCH_NAME \ 
#           ,concat('(' ,substr(BRANCH_PHONE,1,3), ')' ,substr(BRANCH_PHONE,4,3), '-' ,substr(BRANCH_PHONE,7,4)) as BRANCH_PHONE \
#           ,BRANCH_STATE \
#           ,BRANCH_STREET\
#           ,BRANCH_ZIP \
#           ,LAST_UPDATED \
#           from branch")

tr_branch.show()

+-----------------+-----------+------------+-------------+------------+-------------------+----------+-------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME| BRANCH_PHONE|BRANCH_STATE|      BRANCH_STREET|BRANCH_ZIP|       LAST_UPDATED|
+-----------------+-----------+------------+-------------+------------+-------------------+----------+-------------------+
|        Lakeville|          1|Example Bank|(123)456-5276|          MN|       Bridle Court|     55044|2018-04-18 14:51:47|
|          Huntley|          2|Example Bank|(123)461-8993|          IL|  Washington Street|     60142|2018-04-18 14:51:47|
|SouthRichmondHill|          3|Example Bank|(123)498-5926|          NY|      Warren Street|     11419|2018-04-18 14:51:47|
|       Middleburg|          4|Example Bank|(123)466-3064|          FL|   Cleveland Street|     32068|2018-04-18 14:51:47|
|    KingOfPrussia|          5|Example Bank|(123)484-9701|          PA|        14th Street|     19406|2018-04-18 14:51:47|
|         Paters

In [102]:
credit = spark.read.load("./data/cdw_sapp_credit.json", format = 'json')
credit.printSchema()
# credit.show()
# credit.createTempView("credit")
spark.sql("SELECT * from credit").show()

root
 |-- BRANCH_CODE: long (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_SSN: long (nullable = true)
 |-- DAY: long (nullable = true)
 |-- MONTH: long (nullable = true)
 |-- TRANSACTION_ID: long (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_VALUE: double (nullable = true)
 |-- YEAR: long (nullable = true)

+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|
|         35|4210653349028689|123459988| 20|    3|             2|   Entertainment|            14.24|2018|
|        160|4210653349028689|123459988|  8|    7|             3|         Grocery|             5

In [103]:
# tr_credit =  credit.select(col("BRANCH_CODE") \
#           ,col("CREDIT_CARD_NO")\
#           ,col("CUST_SSN") \
#           ,concat( lit("(") ,substring(col("BRANCH_PHONE"),1,3),lit(")") ,substring(col("BRANCH_PHONE"),4,3), lit("-") ,substring(col("BRANCH_PHONE"),7,4) )  \
#           ,col("BRANCH_STATE") \
#           ,col("BRANCH_STREET")\
#           ,col("BRANCH_ZIP") \
#           ,col("LAST_UPDATED") \
#           )
# tr_branch.show()
tr_credit = spark.sql("SELECT BRANCH_CODE\
                    , CREDIT_CARD_NO as CUST_CC_NO\
                    , CUST_SSN \
                    , CASE WHEN MONTH<10 and DAY<10 THEN concat(YEAR,'0',MONTH,'0',DAY) \
                      WHEN MONTH<10 THEN concat(YEAR,'0',MONTH,DAY)\
                      WHEN DAY<10 THEN concat(YEAR,MONTH,'0',DAY) \
                      ELSE concat(YEAR,MONTH,DAY) END as TIMEID \
                    ,TRANSACTION_ID\
                    ,TRANSACTION_TYPE\
                    ,TRANSACTION_VALUE\
                    from credit")
tr_credit.show()

+-----------+----------------+---------+--------+--------------+----------------+-----------------+
|BRANCH_CODE|      CUST_CC_NO| CUST_SSN|  TIMEID|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|
+-----------+----------------+---------+--------+--------------+----------------+-----------------+
|        114|4210653349028689|123459988|20180214|             1|       Education|             78.9|
|         35|4210653349028689|123459988|20180320|             2|   Entertainment|            14.24|
|        160|4210653349028689|123459988|20180708|             3|         Grocery|             56.7|
|        114|4210653349028689|123459988|20180419|             4|   Entertainment|            59.73|
|         93|4210653349028689|123459988|20181010|             5|             Gas|             3.59|
|        164|4210653349028689|123459988|20180528|             6|       Education|             6.89|
|        119|4210653349028689|123459988|20180519|             7|   Entertainment|            43.39|


In [105]:
customer = spark.read.load("./data/cdw_sapp_customer.json", format = 'json')
customer.printSchema()
# customer.show()
customer.createTempView("customer")


root
 |-- APT_NO: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: long (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- STREET_NAME: string (nullable = true)



In [117]:
tr_customer = spark.sql("SELECT SSN\
                ,CREDIT_CARD_NO\
                ,CUST_CITY\
                ,CUST_STATE\
                ,CUST_COUNTRY\
                ,CUST_EMAIL\
                ,CUST_ZIP\
                ,concat( substr(CUST_PHONE,1,3) , '-', substr(CUST_PHONE,4,4)) as CUST_PHONE \
                ,concat(STREET_NAME, ',' ,APT_NO) as FULL_STREET_ADDRESS \
                ,concat(UPPER(substr(FIRST_NAME,1,1)),LOWER(substr(FIRST_NAME,2))) as FIRST_NAME\
                ,concat(UPPER(substr(MIDDLE_NAME,1,1)),LOWER(substr(MIDDLE_NAME,2))) as MIDDLE_NAME\
                ,concat(UPPER(substr(LAST_NAME,1,1)),LOWER(substr(LAST_NAME,2))) as LAST_NAME\
                ,timestamp(concat(substr(LAST_UPDATED,1,10),' ',substr(LAST_UPDATED,12))) as LAST_UPDATED\
                from customer")
tr_customer.show()


+---------+----------------+------------+----------+-------------+--------------------+--------+----------+--------------------+----------+-----------+---------+-------------------+
|      SSN|  CREDIT_CARD_NO|   CUST_CITY|CUST_STATE| CUST_COUNTRY|          CUST_EMAIL|CUST_ZIP|CUST_PHONE| FULL_STREET_ADDRESS|FIRST_NAME|MIDDLE_NAME|LAST_NAME|       LAST_UPDATED|
+---------+----------------+------------+----------+-------------+--------------------+--------+----------+--------------------+----------+-----------+---------+-------------------+
|123456100|4210653310061055|     Natchez|        MS|United States| AHooper@example.com|   39120|  123-7818|Main Street North...|      Alec|         Wm|   Hooper|2018-04-21 10:49:02|
|123453023|4210653310102868|Wethersfield|        CT|United States| EHolman@example.com|   06109|  123-8933|   Redwood Drive,829|      Etta|    Brendan|   Holman|2018-04-21 10:49:02|
|123454487|4210653310116272|     Huntley|        IL|United States| WDunham@example.com|   

####  Req-1.2 Data loading into Database



In [118]:
tr_customer.write.format("jdbc") \
          .mode("append") \
          .options(driver="com.mysql.cj.jdbc.Driver",\
                user="root",\
                password="root",\
                url="jdbc:mysql://localhost:3306/creditcard_capstone",\
                dbtable="creditcard_capstone.CDW_SAPP_CUSTOMER").save()

In [144]:
tr_branch.write.format("jdbc") \
          .mode("append") \
          .options(driver="com.mysql.cj.jdbc.Driver",\
                user="root",\
                password="root",\
                url="jdbc:mysql://localhost:3306/creditcard_capstone",\
                dbtable="creditcard_capstone.CDW_SAPP_BRANCH").save()


In [145]:
tr_credit.write.format("jdbc") \
          .mode("append") \
          .options(driver="com.mysql.cj.jdbc.Driver",\
                user="root",\
                password="root",\
                url="jdbc:mysql://localhost:3306/creditcard_capstone",\
                dbtable="creditcard_capstone.CDW_SAPP_CREDIT").save()


### 2. Functional Requirements - Application Front-End


#### Req-2.1 Transaction Details Module