In [1]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext

import os
os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jdk-18.0.2'
os.environ['SPARK_HOME'] = 'C:\Program Files\spark-3.3.0-bin-hadoop3'

<h2>Extract</h2>

In [2]:
#Create a spark session

spark = SparkSession.builder.appName('ETL-Query-Plot').getOrCreate()
#Read JSON and assign to pyspark dataframes

customer_df = spark.read.json('cdw_sapp_custmer.json')
credit_df = spark.read.json('cdw_sapp_credit.json')
branch_df = spark.read.json('cdw_sapp_branch.json')

<h2>Transform</h2>

In [3]:
from pyspark.sql.functions import concat_ws
#Transform customer data
customer_df = customer_df.withColumnRenamed("FIRST_NAME","First_Name")
customer_df = customer_df.withColumnRenamed("MIDDLE_NAME", "Middle_Name")
customer_df = customer_df.withColumnRenamed("LAST_NAME", "Last_Name")

customer_df = customer_df.withColumn('FULL_STREET_ADDRESS', concat_ws(' ', customer_df.STREET_NAME, customer_df.APT_NO))
customer_df = customer_df.drop('APT_NO', 'STREET_NAME')

customer_df.withColumn("CUST_PHONE", concat_ws('-', customer_df.CUST_PHONE[0:3], customer_df.CUST_PHONE[3:7]))

customer_df = customer_df.withColumn("CUST_ZIP",customer_df.CUST_ZIP.cast("int"))
customer_df = customer_df.drop_duplicates()
customer_df.printSchema()

root
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: long (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: integer (nullable = true)
 |-- First_Name: string (nullable = true)
 |-- Last_Name: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- Middle_Name: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = false)



In [4]:
#Transform branch data

from pyspark.sql.functions import unix_timestamp, to_timestamp
from pyspark.sql.types import TimestampType

branch_df.na.fill(value=00000,subset=["BRANCH_ZIP"])
branch_df.withColumn("BRANCH_PHONE", concat_ws('-', branch_df.BRANCH_PHONE[0:3], branch_df.BRANCH_PHONE[3:7]))

branch_df = branch_df.withColumn("LAST_UPDATED",to_timestamp("LAST_UPDATED"))
branch_df = branch_df.drop_duplicates()
branch_df.printSchema()


root
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_ZIP: long (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)



In [5]:
#Format credit data
#Concat YEAR, MONTH, DAY for YYYYMMDD format
from pyspark.sql.functions import date_format, col, to_date, udf, lpad, format_string


format_udf = udf(lambda x: f'{x:02s}')
credit_df = credit_df.withColumn("DAY",credit_df.DAY.cast("string"))
credit_df = credit_df.withColumn('DAY', lpad('DAY', 2, '0'))
credit_df = credit_df.withColumn("MONTH",credit_df.MONTH.cast("string"))
credit_df = credit_df.withColumn('MONTH', lpad('MONTH', 2, '0'))


credit_df = credit_df.withColumn('TIMEID', concat_ws('', credit_df.YEAR, credit_df.MONTH, credit_df.DAY  ))
credit_df = credit_df.drop('DAY', 'MONTH', 'YEAR')
credit_df = credit_df.drop_duplicates()


<h2>Load</h2>

In [6]:
#Write dataframes to dbms
credit_df.write.format("jdbc") \
  .mode("overwrite") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "CDW_SAPP_CREDIT") \
  .option("driver", "com.mysql.jdbc.Driver") \
  .option("user", "root") \
  .option("password", "db") \
  .save()

branch_df.write.format("jdbc") \
  .mode("overwrite") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "CDW_SAPP_BRANCH") \
  .option("driver", "com.mysql.jdbc.Driver") \
  .option("user", "root") \
  .option("password", "db") \
  .save() 

customer_df.write.format("jdbc") \
  .mode("overwrite") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "CDW_SAPP_CUSTOMER ") \
  .option("driver", "com.mysql.jdbc.Driver") \
  .option("user", "root") \
  .option("password", "db") \
  .save() 


<h2>Req-2.1</h2>

<h2> User Input Method, Widgets </h2>

In [7]:
#Decorator and widgets for interaction
def wprint(f):
    def inner(*args, **kwargs):
        r = f(*args, **kwargs)
        r.show()
        return r
    return inner

from ipywidgets import widgets
#Instead of using a terminal repl the user has a live-update widget attached to each funtion.

In [26]:
#1)    Used to display the transactions made by customers living in a given zip code for a given month and year. Order by day in descending order.
credit_df.createOrReplaceTempView("credit_df")
customer_df.createOrReplaceTempView("customer_df")
branch_df.createOrReplaceTempView("branch_df")

@wprint
def customer_by_zip_and_date(zip, month, year):
    string=year+month
     
    return spark.sql('WITH t AS \
( \
	SELECT EXTRACT(MONTH FROM to_date(TIMEID, "yyyyMMdd")) AS M, EXTRACT(YEAR FROM to_date(TIMEID, "yyyyMMdd")) AS Y \
	FROM credit_df \
) \
SELECT DISTINCT TRANSACTION_ID, t.M, t.Y \
FROM credit_df as c, customer_df as cu, t \
WHERE c.CUST_SSN = cu.SSN AND "%s"= cu.CUST_ZIP AND "%s"= t.M AND "%s"= t.Y' % (zip, month, year))

widgets.interactive(customer_by_zip_and_date, zip='91740', month='01', year='2018')




interactive(children=(Text(value='91740', description='zip'), Text(value='01', description='month'), Text(valu…

In [9]:
#2)    Used to display the number and total values of transactions for a given type.
@wprint
def count_sum_by_type(t):
    return spark.sql(f'SELECT COUNT(TRANSACTION_ID), SUM(TRANSACTION_VALUE) FROM credit_df WHERE "%s"=TRANSACTION_TYPE' % (t))

widgets.interactive(count_sum_by_type, t=[('Bills', 'Bills'), ('Education', 'Education'), ('Entertainment', 'Entertainment'), ('Grocery','Grocery'), ('Gas','Gas'), ('Healthcare','Healthcare'), ('Test','Test')] )


interactive(children=(Dropdown(description='t', options=(('Bills', 'Bills'), ('Education', 'Education'), ('Ent…

In [10]:
states = ["AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DC", "DE", "FL", "GA", 
          "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD", 
          "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", 
          "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", 
          "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"]
st = list(zip(states,states))

In [11]:
#3)    Used to display the number and total values of transactions for branches in a given state.
@wprint
def count_sum_by_state(state):
    return spark.sql(f'SELECT DISTINCT COUNT(TRANSACTION_ID), SUM(TRANSACTION_VALUE) FROM credit_df as c, branch_df as b WHERE c.BRANCH_CODE=b.BRANCH_CODE AND "%s"=b.BRANCH_STATE' % (state))
widgets.interactive(count_sum_by_state, {'manual': False}, state=st )



interactive(children=(Dropdown(description='state', options=(('AL', 'AL'), ('AK', 'AK'), ('AZ', 'AZ'), ('AR', …

<h2> Req 2.2 </h2>

In [12]:
#1) Used to check the existing account details of a customer.
@wprint
def customer(ssn):
    return spark.sql(f'SELECT * FROM customer_df WHERE "%s"=customer_df.SSN' % (ssn))
widgets.interactive(customer, ssn='123453723')

interactive(children=(Text(value='123453723', description='ssn'), Output()), _dom_classes=('widget-interact',)…

In [13]:
#2) Used to modify the existing account details of a customer.
#Input column and value to update
#Update is not supported in Spark unless it's a Delta Lake so this is a Spark Dataframe operation

from pyspark.sql.functions import when
@wprint
def update_customer(column, value, ssn):
    return customer_df.withColumn(column, when(customer_df.SSN == ssn, value).otherwise(col(column)))

    
widgets.interactive(update_customer, {'manual': True}, column='CUST_CITY', value='Hell', ssn='123453723')

interactive(children=(Text(value='CUST_CITY', description='column'), Text(value='Hell', description='value'), …

In [14]:
#3) Used to generate a monthly bill for a credit card number for a given month and year.
@wprint
def generate_monthly_bill(cc, month, year):
    string= year+month
    return spark.sql('SELECT SUM(TRANSACTION_VALUE) FROM credit_df as c WHERE c.CREDIT_CARD_NO = "%s" \
        AND EXTRACT(MONTH FROM to_date(TIMEID, "yyyyMMdd"))="%s" AND EXTRACT(YEAR FROM to_date(TIMEID, "yyyyMMdd"))="%s"' % (cc, month, year) )
widgets.interactive(generate_monthly_bill, cc='4210653321125592', month='01', year='2018')        
    

interactive(children=(Text(value='4210653321125592', description='cc'), Text(value='01', description='month'),…

In [15]:
#4) Used to display the transactions made by a customer between two dates. Order by year, month, and day in descending order.
@wprint
def transactions_between(ssn, date1, date2):
    return spark.sql('SELECT TRANSACTION_ID FROM credit_df as c WHERE "%s"=c.CUST_SSN AND to_date(c.TIMEID, "yyyyMMdd") > to_date("%s", "yyyyMMdd") AND to_date(c.TIMEID, "yyyyMMdd") < to_date("%s", "yyyyMMdd")' % (ssn, date1, date2))
widgets.interactive(transactions_between, ssn='123459988', date1='20180213', date2='20180321')


interactive(children=(Text(value='123459988', description='ssn'), Text(value='20180213', description='date1'),…

In [16]:
#Converting Spark to Pandas to use Plotly
cdf = credit_df.toPandas()

bdf = branch_df.toPandas()

cudf = customer_df.toPandas()

In [17]:
import plotly.express as px

<h1> Functional Requirements and Visualization </h1>

In [18]:
colors= [
    "#fd90aa",
    "#fee9cf",
    "#f5ff97",
    "#94ff9b",
    "#94e8fd",
    "#e191fc"
]

In [19]:
#3.1Find and plot which transaction type has a high rate of transactions.
grouped = cdf.groupby('TRANSACTION_TYPE')['TRANSACTION_ID'].count().reset_index()
grouped
#Do they sell testing products or is 'Test' a testing category? I guess we'll never know.

fig = px.bar(grouped, x='TRANSACTION_TYPE', y='TRANSACTION_ID', title='Transactions per Category', color="TRANSACTION_TYPE", color_discrete_sequence=colors[1:])
fig.update_layout(
    font_family="Arial Black",
    font_color='white',
    font_size=18,
    paper_bgcolor=colors[0],
    plot_bgcolor=colors[0]
)
fig.show()

In [28]:
#3.2Find and plot which state has a high number of customers.
grouped = cudf.groupby('CUST_STATE')['SSN'].count().reset_index()
grouped

fig = px.bar(grouped, x='CUST_STATE', y='SSN', color='CUST_STATE', color_discrete_sequence=colors[1:], title='Customers by State')
fig.update_layout(
    font_family="Arial Black",
    font_color='white',
    font_size=18,
    paper_bgcolor=colors[0],
    plot_bgcolor=colors[0]
)

fig.show()

In [21]:
#3.3Find and plot the sum of all transactions for each customer, and which customer has the highest transaction amount. hint(use CUST_SSN).
#3.3.2 Edited to include 'first 20'
grouped2 = cdf.groupby('CUST_SSN')['TRANSACTION_VALUE'].sum().reset_index()[0:19]

fig = px.scatter(grouped2, x='CUST_SSN', y='TRANSACTION_VALUE', size="TRANSACTION_VALUE", color_continuous_scale=colors[1:], color='TRANSACTION_VALUE', title='Top Customers') #What kind of company publishes SSNs in a chart, this one (•_•)  ( •_•)>⌐■-■  (⌐■_■)

fig.update_layout(
    font_family="Arial Black",
    font_color='white',
    font_size=18,
    paper_bgcolor=colors[0],
    plot_bgcolor=colors[0]
)
fig.update_xaxes(ticks="", showticklabels=False)
fig.show()


In [22]:
#3.4.2 Moved from Part 2. Find and plot the top three months with the largest transaction data.
import calendar
dates = cdf.loc[:,['TIMEID', 'TRANSACTION_ID']]
dates['TIMEID'] = pd.to_datetime(dates['TIMEID'], format='%Y%m%d').dt.month
dates

tgroup = dates.groupby('TIMEID')['TRANSACTION_ID'].count().reset_index()
tgroup = tgroup.nlargest(3, 'TRANSACTION_ID')
tgroup['MONTH'] = tgroup['TIMEID'].map(lambda x: calendar.month_name[x])

fig = px.bar(tgroup, x='MONTH', y='TRANSACTION_ID', color='MONTH', color_discrete_sequence=colors[1:], title='Top 3 Months')
fig.update_layout(
    font_family="Arial Black",
    font_color='white',
    font_size=18,
    paper_bgcolor=colors[0],
    plot_bgcolor=colors[0]
)
fig.show()


In [23]:
#3.5.2Find and plot which branch processed the highest total dollar value of healthcare transactions.

a = np.where(cdf['TRANSACTION_TYPE']=='Healthcare')
health = cdf.iloc[a]
grouped = health.groupby('BRANCH_CODE')['TRANSACTION_VALUE'].sum().reset_index()

grouped = grouped.sort_values(by='TRANSACTION_VALUE', ascending=False)
grouped
colors=['#3e2653', '#a64777', '#ff4589', '#ebe5ce']
fig = px.scatter(grouped, x='BRANCH_CODE', y='TRANSACTION_VALUE', color='TRANSACTION_VALUE', color_continuous_scale=colors, size='TRANSACTION_VALUE', title='Healthcare sales by Branch')
fig.update_layout(
    font_family="Arial Black",
    font_size=18,
    paper_bgcolor='snow',
    plot_bgcolor='snow'
)
fig.show()