In [31]:
# Import necessary libs
import pandas as pd
from pyspark.sql import SparkSession 
from pyspark.sql import functions as sf
from functools import reduce
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import date, timedelta, datetime
import time

from pyspark.conf import SparkConf
from pyspark.context import SparkContext


2. instantiate the spark session:

In [32]:
conf = SparkConf().setAppName("DataExtractionApp").setMaster("local[2]")
sc = SparkSession.builder.config("spark.driver.host", "localhost") \
.config ("spark.sql.execution.arrow.enabled", "true").getOrCreate()

3. ‘EXTRACT’ JSON data  and loaded in spark
    1. CDW_SAPP_BRANCH.JSON
    2. CDW_SAPP_CREDITCARD.JSON
    3. CDW_SAPP_CUSTOMER.JSON


In [33]:
df_cust = sc.read.json('data/cdw_sapp_custmer.json')
df_cust.show()

+------+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME|
+------+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|   656|4210653310061055|     Natchez|United States| AHooper@example.com|   1237818|        MS|   39120|      Alec|   Hooper|2018-04-21T12:49:...|         Wm|123456100|Main Street North|
|   829|4210653310102868|Wethersfield|United States| EHolman@example.com|   1238933|        CT|   06109|      Etta|   Holman|2018-04-21T12:49:...|    Brendan|123453023|    Redwood Drive|
|   683|4210653310116272|     Huntley|United States| WDunham@exam

In [34]:
df_credit = sc.read.json('data/cdw_sapp_credit.json')
df_credit.head(5)

[Row(BRANCH_CODE=114, CREDIT_CARD_NO='4210653349028689', CUST_SSN=123459988, DAY=14, MONTH=2, TRANSACTION_ID=1, TRANSACTION_TYPE='Education', TRANSACTION_VALUE=78.9, YEAR=2018),
 Row(BRANCH_CODE=35, CREDIT_CARD_NO='4210653349028689', CUST_SSN=123459988, DAY=20, MONTH=3, TRANSACTION_ID=2, TRANSACTION_TYPE='Entertainment', TRANSACTION_VALUE=14.24, YEAR=2018),
 Row(BRANCH_CODE=160, CREDIT_CARD_NO='4210653349028689', CUST_SSN=123459988, DAY=8, MONTH=7, TRANSACTION_ID=3, TRANSACTION_TYPE='Grocery', TRANSACTION_VALUE=56.7, YEAR=2018),
 Row(BRANCH_CODE=114, CREDIT_CARD_NO='4210653349028689', CUST_SSN=123459988, DAY=19, MONTH=4, TRANSACTION_ID=4, TRANSACTION_TYPE='Entertainment', TRANSACTION_VALUE=59.73, YEAR=2018),
 Row(BRANCH_CODE=93, CREDIT_CARD_NO='4210653349028689', CUST_SSN=123459988, DAY=10, MONTH=10, TRANSACTION_ID=5, TRANSACTION_TYPE='Gas', TRANSACTION_VALUE=3.59, YEAR=2018)]

In [35]:
df_branch = sc.read.json('data/cdw_sapp_branch.json')
#df_branch.head(5)
#df_branch.show()
df_branch.printSchema()

root
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_ZIP: long (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)



4. Transform the data based on the requirements found in the Mapping Document.
        
    1. FIRST_NAME - Convert the Name to Title Case
    2. MIDDLE_NAME - Convert the middle name in lower case
    3. LAST_NAME - Convert the Last Name in Title Case
    4. STREET_NAME,APT_NO - Concatenate Apartment no and Street name of customer's Residence with comma as a seperator (Street, Apartment)
                    in new column FULL_STREET_ADDRESS
    5. CUST_PHONE - Change the format to xxx-xxxx


In [36]:
df_cust = df_cust.withColumn('MIDDLE_NAME', lower(sf.col('MIDDLE_NAME')))
df_cust = df_cust.withColumn('FIRST_NAME', initcap(sf.col('FIRST_NAME')))
df_cust = df_cust.withColumn('LAST_NAME', initcap(sf.col('LAST_NAME')))
df_cust.head(5)

[Row(APT_NO='656', CREDIT_CARD_NO='4210653310061055', CUST_CITY='Natchez', CUST_COUNTRY='United States', CUST_EMAIL='AHooper@example.com', CUST_PHONE=1237818, CUST_STATE='MS', CUST_ZIP='39120', FIRST_NAME='Alec', LAST_NAME='Hooper', LAST_UPDATED='2018-04-21T12:49:02.000-04:00', MIDDLE_NAME='wm', SSN=123456100, STREET_NAME='Main Street North'),
 Row(APT_NO='829', CREDIT_CARD_NO='4210653310102868', CUST_CITY='Wethersfield', CUST_COUNTRY='United States', CUST_EMAIL='EHolman@example.com', CUST_PHONE=1238933, CUST_STATE='CT', CUST_ZIP='06109', FIRST_NAME='Etta', LAST_NAME='Holman', LAST_UPDATED='2018-04-21T12:49:02.000-04:00', MIDDLE_NAME='brendan', SSN=123453023, STREET_NAME='Redwood Drive'),
 Row(APT_NO='683', CREDIT_CARD_NO='4210653310116272', CUST_CITY='Huntley', CUST_COUNTRY='United States', CUST_EMAIL='WDunham@example.com', CUST_PHONE=1243018, CUST_STATE='IL', CUST_ZIP='60142', FIRST_NAME='Wilber', LAST_NAME='Dunham', LAST_UPDATED='2018-04-21T12:49:02.000-04:00', MIDDLE_NAME='ezequiel

In [37]:
df_cust = df_cust.withColumn('FULL_STREET_ADDRESS', 
                    sf.concat(sf.col('APT_NO'),sf.lit(', '), sf.col('STREET_NAME')))
df_cust.take(1)

[Row(APT_NO='656', CREDIT_CARD_NO='4210653310061055', CUST_CITY='Natchez', CUST_COUNTRY='United States', CUST_EMAIL='AHooper@example.com', CUST_PHONE=1237818, CUST_STATE='MS', CUST_ZIP='39120', FIRST_NAME='Alec', LAST_NAME='Hooper', LAST_UPDATED='2018-04-21T12:49:02.000-04:00', MIDDLE_NAME='wm', SSN=123456100, STREET_NAME='Main Street North', FULL_STREET_ADDRESS='656, Main Street North')]

In [38]:
df_cust = df_cust.withColumn('Ph',  sf.concat(substring('CUST_PHONE',1,3), sf.lit('-'), substring('CUST_PHONE',4,7) ))
df_cust.take(1)

[Row(APT_NO='656', CREDIT_CARD_NO='4210653310061055', CUST_CITY='Natchez', CUST_COUNTRY='United States', CUST_EMAIL='AHooper@example.com', CUST_PHONE=1237818, CUST_STATE='MS', CUST_ZIP='39120', FIRST_NAME='Alec', LAST_NAME='Hooper', LAST_UPDATED='2018-04-21T12:49:02.000-04:00', MIDDLE_NAME='wm', SSN=123456100, STREET_NAME='Main Street North', FULL_STREET_ADDRESS='656, Main Street North', Ph='123-7818')]

In [39]:
df_cust.describe('CUST_ZIP').show()

+-------+------------------+
|summary|          CUST_ZIP|
+-------+------------------+
|  count|               952|
|   mean|36312.616596638654|
| stddev| 23945.49431677531|
|    min|             01810|
|    max|             98908|
+-------+------------------+



6. Transfer data from source table 'CDW_SAPP_D_CUSTOMER' to target table 'CDW_SAPP_CUSTOMER'

In [40]:
CDW_SAPP_CUSTOMER = df_cust.selectExpr('SSN', 'FIRST_NAME', 'MIDDLE_NAME', 'LAST_NAME', 'Credit_card_no', 'FULL_STREET_ADDRESS', 'CUST_CITY', 'CUST_STATE','CUST_COUNTRY', 'CUST_ZIP', 'Ph as CUST_PHONE', 'CUST_EMAIL', 'LAST_UPDATED')
CDW_SAPP_CUSTOMER.show()

+---------+----------+-----------+---------+----------------+--------------------+------------+----------+-------------+--------+----------+--------------------+--------------------+
|      SSN|FIRST_NAME|MIDDLE_NAME|LAST_NAME|  Credit_card_no| FULL_STREET_ADDRESS|   CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|CUST_PHONE|          CUST_EMAIL|        LAST_UPDATED|
+---------+----------+-----------+---------+----------------+--------------------+------------+----------+-------------+--------+----------+--------------------+--------------------+
|123456100|      Alec|         wm|   Hooper|4210653310061055|656, Main Street ...|     Natchez|        MS|United States|   39120|  123-7818| AHooper@example.com|2018-04-21T12:49:...|
|123453023|      Etta|    brendan|   Holman|4210653310102868|  829, Redwood Drive|Wethersfield|        CT|United States|   06109|  123-8933| EHolman@example.com|2018-04-21T12:49:...|
|123454487|    Wilber|   ezequiel|   Dunham|4210653310116272|683, 12th Street ...|   

1. BRANCH_ZIP - If the source value is null load default (00000) value else Direct move
2. BRANCH_PHONE - Change the format of phone number to (XXX)XXX-XXXX

In [41]:
df_branch = df_branch.na.fill(00000, subset = ['BRANCH_ZIP'])

df_branch = df_branch.withColumn('ph',  sf.concat(sf.lit('('), substring('BRANCH_PHONE',1,3), sf.lit(')'), substring('BRANCH_PHONE',4,3),sf.lit('-'), substring('BRANCH_PHONE',7,4) ))
df_branch.show()

+-----------------+-----------+------------+------------+------------+-------------------+----------+--------------------+-------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME|BRANCH_PHONE|BRANCH_STATE|      BRANCH_STREET|BRANCH_ZIP|        LAST_UPDATED|           ph|
+-----------------+-----------+------------+------------+------------+-------------------+----------+--------------------+-------------+
|        Lakeville|          1|Example Bank|  1234565276|          MN|       Bridle Court|     55044|2018-04-18T16:51:...|(123)456-5276|
|          Huntley|          2|Example Bank|  1234618993|          IL|  Washington Street|     60142|2018-04-18T16:51:...|(123)461-8993|
|SouthRichmondHill|          3|Example Bank|  1234985926|          NY|      Warren Street|     11419|2018-04-18T16:51:...|(123)498-5926|
|       Middleburg|          4|Example Bank|  1234663064|          FL|   Cleveland Street|     32068|2018-04-18T16:51:...|(123)466-3064|
|    KingOfPrussia|          5|Example Ba

 Transfer data from source table 'CDW_SAPP_D_BRANCH' to target table 'CDW_SAPP_BRANCH'

In [42]:
CDW_SAPP_BRANCH= df_branch.selectExpr('BRANCH_CODE', 'BRANCH_NAME', 'BRANCH_STREET', 'BRANCH_CITY', 'BRANCH_STATE','BRANCH_ZIP', 'ph as BRANCH_PHONE', 'LAST_UPDATED')
CDW_SAPP_BRANCH.show()

+-----------+------------+-------------------+-----------------+------------+----------+-------------+--------------------+
|BRANCH_CODE| BRANCH_NAME|      BRANCH_STREET|      BRANCH_CITY|BRANCH_STATE|BRANCH_ZIP| BRANCH_PHONE|        LAST_UPDATED|
+-----------+------------+-------------------+-----------------+------------+----------+-------------+--------------------+
|          1|Example Bank|       Bridle Court|        Lakeville|          MN|     55044|(123)456-5276|2018-04-18T16:51:...|
|          2|Example Bank|  Washington Street|          Huntley|          IL|     60142|(123)461-8993|2018-04-18T16:51:...|
|          3|Example Bank|      Warren Street|SouthRichmondHill|          NY|     11419|(123)498-5926|2018-04-18T16:51:...|
|          4|Example Bank|   Cleveland Street|       Middleburg|          FL|     32068|(123)466-3064|2018-04-18T16:51:...|
|          5|Example Bank|        14th Street|    KingOfPrussia|          PA|     19406|(123)484-9701|2018-04-18T16:51:...|
|       

In [43]:
#df_credit.withColumn("TimeID",to_timestamp(df_credit['DAY'])).show(3,False)
df_credit1 = df_credit.withColumn('TIMECol',  sf.concat(df_credit['YEAR'], lit('-'), df_credit['MONTH'], lit('-'), df_credit['DAY']).cast('date'))
df_credit1 = df_credit1.withColumn("TIMEID", df_credit1["TIMECol"].cast('string'))
df_credit1 = df_credit1.withColumn("TIMEID", regexp_replace('TIMEID','-',''))
df_credit1.show()

+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+----------+--------+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|   TIMECol|  TIMEID|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+----------+--------+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|2018-02-14|20180214|
|         35|4210653349028689|123459988| 20|    3|             2|   Entertainment|            14.24|2018|2018-03-20|20180320|
|        160|4210653349028689|123459988|  8|    7|             3|         Grocery|             56.7|2018|2018-07-08|20180708|
|        114|4210653349028689|123459988| 19|    4|             4|   Entertainment|            59.73|2018|2018-04-19|20180419|
|         93|4210653349028689|123459988| 10|   10|             5|             Gas|             3.59|2018|2018-10-10|20

 Transfer data from source table 'CDW_SAPP_D_CREDIT_CARD' to target table 'CDW_SAPP_CREDIT_CARD'

In [44]:
CDW_SAPP_CREDIT_CARD = df_credit1.selectExpr('CREDIT_CARD_NO as CUST_CC_NO', 'TIMEID', 'CUST_SSN', 'BRANCH_CODE', 'TRANSACTION_TYPE', 'TRANSACTION_VALUE', 'TRANSACTION_ID')
CDW_SAPP_CREDIT_CARD.show()

+----------------+--------+---------+-----------+----------------+-----------------+--------------+
|      CUST_CC_NO|  TIMEID| CUST_SSN|BRANCH_CODE|TRANSACTION_TYPE|TRANSACTION_VALUE|TRANSACTION_ID|
+----------------+--------+---------+-----------+----------------+-----------------+--------------+
|4210653349028689|20180214|123459988|        114|       Education|             78.9|             1|
|4210653349028689|20180320|123459988|         35|   Entertainment|            14.24|             2|
|4210653349028689|20180708|123459988|        160|         Grocery|             56.7|             3|
|4210653349028689|20180419|123459988|        114|   Entertainment|            59.73|             4|
|4210653349028689|20181010|123459988|         93|             Gas|             3.59|             5|
|4210653349028689|20180528|123459988|        164|       Education|             6.89|             6|
|4210653349028689|20180519|123459988|        119|   Entertainment|            43.39|             7|


2. Data Loading from database

In [51]:
CDW_SAPP_CUSTOMER.write.format("jdbc") \
.mode("overwrite") \
.option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
.option("dbtable", "creditcard_capstone.CDW_SAPP_CUSTOMER") \
.option("user", "root") \
.option("password", "lakshmi") \
.option("primaryKeyFields","SSN")\
.save()

In [47]:
CDW_SAPP_BRANCH.write.format("jdbc") \
.mode("overwrite") \
.option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
.option("dbtable", "creditcard_capstone.CDW_SAPP_BRANCH") \
.option("user", "root") \
.option("password", "lakshmi") \
.option("primaryKeyFields", "BRANCH_CODE")\
.save()

In [49]:
CDW_SAPP_CREDIT_CARD.show()
CDW_SAPP_CREDIT_CARD.write.format("jdbc") \
.mode("overwrite") \
.option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
.option("dbtable", "creditcard_capstone.CDW_SAPP_CREDIT_CARD") \
.option("user", "root") \
.option("password", "lakshmi") \
.option("primaryKeyFields", "TRANSACTION_ID")\
.save()

+----------------+--------+---------+-----------+----------------+-----------------+--------------+
|      CUST_CC_NO|  TIMEID| CUST_SSN|BRANCH_CODE|TRANSACTION_TYPE|TRANSACTION_VALUE|TRANSACTION_ID|
+----------------+--------+---------+-----------+----------------+-----------------+--------------+
|4210653349028689|20180214|123459988|        114|       Education|             78.9|             1|
|4210653349028689|20180320|123459988|         35|   Entertainment|            14.24|             2|
|4210653349028689|20180708|123459988|        160|         Grocery|             56.7|             3|
|4210653349028689|20180419|123459988|        114|   Entertainment|            59.73|             4|
|4210653349028689|20181010|123459988|         93|             Gas|             3.59|             5|
|4210653349028689|20180528|123459988|        164|       Education|             6.89|             6|
|4210653349028689|20180519|123459988|        119|   Entertainment|            43.39|             7|
