In [1]:
import sys
import os
import findspark
findspark.init()

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable




In [2]:
import spark
import pyspark


In [3]:
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import *



In [4]:
#read the database username and password from secret.txt
secrets_file = os.path.join("files", "secret.txt")
with open(secrets_file, "r") as f:		
    lines = f.readlines()
for line in lines:
    words = line.split("=")
    if (words[0] == "user"):
        user = words[1].strip()
    elif (words[0] == "password"):
        password = words[1].strip()
f.close()

In [5]:
spark = SparkSession.builder.appName('capstone_project').getOrCreate()


In [6]:
cdw_sapp_customer = os.path.join("files", "cdw_sapp_custmer.json")

df_customer = spark.read.json(cdw_sapp_customer)



In [7]:

type(df_customer)
df_customer.show()

+------+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME|
+------+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|   656|4210653310061055|     Natchez|United States| AHooper@example.com|   1237818|        MS|   39120|      Alec|   Hooper|2018-04-21T12:49:...|         Wm|123456100|Main Street North|
|   829|4210653310102868|Wethersfield|United States| EHolman@example.com|   1238933|        CT|   06109|      Etta|   Holman|2018-04-21T12:49:...|    Brendan|123453023|    Redwood Drive|
|   683|4210653310116272|     Huntley|United States| WDunham@exam

In [8]:
df_customer.printSchema()

root
 |-- APT_NO: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: long (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- STREET_NAME: string (nullable = true)



In [9]:
    
df_customer.createOrReplaceTempView('customer_data')

In [10]:
customer_df=spark.sql("SELECT CAST(SSN AS INT) SSN, \
CONCAT(UCASE(SUBSTRING(FIRST_NAME,1,1)), LCASE(SUBSTRING(FIRST_NAME,2))) FIRST_NAME, \
LOWER(MIDDLE_NAME) MIDDLE_NAME, \
CONCAT(UCASE(SUBSTRING(last_name,1,1)), LCASE(SUBSTRING(last_name,2))) LAST_NAME, \
Credit_card_no, \
CONCAT(APT_NO,' ,',STREET_NAME) FULL_STREET_ADDRESS, \
CUST_CITY, CUST_STATE, CUST_COUNTRY, CUST_ZIP, CONCAT(SUBSTRING(cust_phone, 1,3), '-',SUBSTRING(cust_phone,4)) CUST_PHONE, \
CUST_EMAIL, LAST_UPDATED FROM customer_data")

In [11]:

customer_df.show() 

+---------+----------+-----------+---------+----------------+--------------------+------------+----------+-------------+--------+----------+--------------------+--------------------+
|      SSN|FIRST_NAME|MIDDLE_NAME|LAST_NAME|  Credit_card_no| FULL_STREET_ADDRESS|   CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|CUST_PHONE|          CUST_EMAIL|        LAST_UPDATED|
+---------+----------+-----------+---------+----------------+--------------------+------------+----------+-------------+--------+----------+--------------------+--------------------+
|123456100|      Alec|         wm|   Hooper|4210653310061055|656 ,Main Street ...|     Natchez|        MS|United States|   39120|  123-7818| AHooper@example.com|2018-04-21T12:49:...|
|123453023|      Etta|    brendan|   Holman|4210653310102868|  829 ,Redwood Drive|Wethersfield|        CT|United States|   06109|  123-8933| EHolman@example.com|2018-04-21T12:49:...|
|123454487|    Wilber|   ezequiel|   Dunham|4210653310116272|683 ,12th Street ...|   

In [12]:
customer_df.printSchema()

root
 |-- SSN: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- Credit_card_no: string (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)



In [13]:
#df_customer.createOrReplaceTempView('customer_df')
customer_df.columns

['SSN',
 'FIRST_NAME',
 'MIDDLE_NAME',
 'LAST_NAME',
 'Credit_card_no',
 'FULL_STREET_ADDRESS',
 'CUST_CITY',
 'CUST_STATE',
 'CUST_COUNTRY',
 'CUST_ZIP',
 'CUST_PHONE',
 'CUST_EMAIL',
 'LAST_UPDATED']

In [14]:
customer_df.createOrReplaceTempView('customer_df')

In [15]:
#sc.sql("SELECT CAST(CUST_ZIP as INT) CUST_ZIP FROM customer_data")
customer_df=spark.sql("SELECT SSN, FIRST_NAME, MIDDLE_NAME, LAST_NAME, Credit_Card_no, FULL_STREET_ADDRESS, CUST_CITY,CUST_STATE, \
CUST_COUNTRY, CAST(CUST_ZIP as INT) CUST_ZIP, CUST_PHONE, CUST_EMAIL, LAST_UPDATED FROM customer_df")

In [16]:
# customer_df = sc.sql("SELECT SSN,  FIRST_NAME, \
#  MIDDLE_NAME, LAST_NAME, Credit_card_no, FULL_STREET_ADDRESS, CUST_CITY, CUST_STATE, \
# CUST_COUNTRY,CAST(CUST_ZIP as INT) CUST_ZIP,CUST_PHONE , CUST_EMAIL,LAST_UPDATED \
# FROM customer_df")




In [17]:
customer_df.printSchema()

root
 |-- SSN: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- Credit_Card_no: string (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_ZIP: integer (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)



In [18]:
customer_df.show()

+---------+----------+-----------+---------+----------------+--------------------+------------+----------+-------------+--------+----------+--------------------+--------------------+
|      SSN|FIRST_NAME|MIDDLE_NAME|LAST_NAME|  Credit_Card_no| FULL_STREET_ADDRESS|   CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|CUST_PHONE|          CUST_EMAIL|        LAST_UPDATED|
+---------+----------+-----------+---------+----------------+--------------------+------------+----------+-------------+--------+----------+--------------------+--------------------+
|123456100|      Alec|         wm|   Hooper|4210653310061055|656 ,Main Street ...|     Natchez|        MS|United States|   39120|  123-7818| AHooper@example.com|2018-04-21T12:49:...|
|123453023|      Etta|    brendan|   Holman|4210653310102868|  829 ,Redwood Drive|Wethersfield|        CT|United States|    6109|  123-8933| EHolman@example.com|2018-04-21T12:49:...|
|123454487|    Wilber|   ezequiel|   Dunham|4210653310116272|683 ,12th Street ...|   

In [19]:
df_customer = customer_df.withColumn('LAST_UPDATED', F.to_timestamp('LAST_UPDATED'))

In [20]:
df_customer.printSchema()

root
 |-- SSN: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- Credit_Card_no: string (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_ZIP: integer (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)



In [21]:
df_customer.show()

+---------+----------+-----------+---------+----------------+--------------------+------------+----------+-------------+--------+----------+--------------------+-------------------+
|      SSN|FIRST_NAME|MIDDLE_NAME|LAST_NAME|  Credit_Card_no| FULL_STREET_ADDRESS|   CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|CUST_PHONE|          CUST_EMAIL|       LAST_UPDATED|
+---------+----------+-----------+---------+----------------+--------------------+------------+----------+-------------+--------+----------+--------------------+-------------------+
|123456100|      Alec|         wm|   Hooper|4210653310061055|656 ,Main Street ...|     Natchez|        MS|United States|   39120|  123-7818| AHooper@example.com|2018-04-21 11:49:02|
|123453023|      Etta|    brendan|   Holman|4210653310102868|  829 ,Redwood Drive|Wethersfield|        CT|United States|    6109|  123-8933| EHolman@example.com|2018-04-21 11:49:02|
|123454487|    Wilber|   ezequiel|   Dunham|4210653310116272|683 ,12th Street ...|     Hun

In [22]:

df_customer.write.format("jdbc") \
.mode("append") \
.option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
.option("dbtable", "creditcard_capstone.cdw_sapp_customer") \
.option("user", user) \
.option("password", password) \
.save()


In [23]:
spark.stop()

In [24]:
# ALTER TABLE `creditcard_capstone`.`cdw_sapp_customer` 
# CHANGE COLUMN `FIRST_NAME` `FIRST_NAME` VARCHAR(45) NULL DEFAULT NULL ,
# CHANGE COLUMN `MIDDLE_NAME` `MIDDLE_NAME` VARCHAR(45) NULL DEFAULT NULL ,
# CHANGE COLUMN `LAST_NAME` `LAST_NAME` VARCHAR(45) NULL DEFAULT NULL ,
# CHANGE COLUMN `Credit_Card_no` `Credit_Card_no` VARCHAR(45) NULL DEFAULT NULL ,
# CHANGE COLUMN `FULL_STREET_ADDRESS` `FULL_STREET_ADDRESS` VARCHAR(45) NULL DEFAULT NULL ,
# CHANGE COLUMN `CUST_CITY` `CUST_CITY` VARCHAR(45) NULL DEFAULT NULL ,
# CHANGE COLUMN `CUST_STATE` `CUST_STATE` VARCHAR(45) NULL DEFAULT NULL ,
# CHANGE COLUMN `CUST_COUNTRY` `CUST_COUNTRY` VARCHAR(45) NULL DEFAULT NULL ,
# CHANGE COLUMN `CUST_PHONE` `CUST_PHONE` VARCHAR(45) NULL DEFAULT NULL ,
# CHANGE COLUMN `CUST_EMAIL` `CUST_EMAIL` VARCHAR(45) NULL DEFAULT NULL ;
