# Client Retention Demo Using PySpark

In this demo, we will show PySpark functionality accessing enterprise data from VSAM and DB2. The data stored in VSAM consists of 6,001 rows of customer information. The data stored in DB2 consists of 20,000 rows of transaction data. The data is transformed and joined within a Spark dataframe, which is used to perform predictive analyses. A logistic regression algorithm is then used to evaluate cutomer activity level vs. churn.

In [None]:
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

import warnings
warnings.filterwarnings("ignore")
warnings.simplefilter("ignore", category=PendingDeprecationWarning)

zOS_IP = "123.456.78.901"
MDSS_PORT = "1200"
zOS_USERNAME = "SPK????"
zOS_PASSWORD = "????????"

SparkSession is the entry point to using Spark on z/OS APIs through IBM Open Data Analytics for z/OS (IzODA).

In [None]:
spark = SparkSession.builder.master("spark://"+zOS_IP+":7077").appName("pyspark_demo").getOrCreate()

***Client Data***

Load client data into a Spark dataframe.

In [None]:
jdbc_connection = "jdbc:rs:dv://"+zOS_IP+":"+MDSS_PORT+"; DBTY=DVS; SUBSYS=NONE; UID="+zOS_USERNAME+"; PWD="+zOS_PASSWORD+";"
raw_clientInfo_df = spark.read \
        .format("jdbc") \
        .option("url", jdbc_connection) \
        .option("dbtable", "VSAM_CLIENT") \
        .load()
clientInfo_df = raw_clientInfo_df. \
    toDF("customer_id","gender","age_years","highest_edu","annual_investment_rev","annual_income","activity_level","churn", "rid"). \
    select("customer_id","gender","age_years","highest_edu","annual_investment_rev","annual_income","activity_level","churn")
        
clientInfo_df.show(10)

***Credit card transactions***

Load credit card transactions into a Spark dataframe.

In [None]:
DB2_SSID = "DBBG"
jdbc_connection = "jdbc:rs:dv://"+zOS_IP+":"+MDSS_PORT+"; DBTY=DB2; SUBSYS="+DB2_SSID+"; UID="+zOS_USERNAME+"; PWD="+zOS_PASSWORD+";"
clientTrans_df = spark.read \
        .format("jdbc") \
        .option("url", jdbc_connection) \
        .option("dbtable", "sparkdb.sppaytb1") \
        .load()
clientTrans_df.show(10)

## Aggregate statistics
Calculate a few aggregate statistics based on credit transactions and join the results to the client data dataframe. We also convert some of the fields to integer and double type, which will be used in following calculations.

In [None]:
import pyspark.sql.functions as func

calcTrans_df = clientTrans_df.groupBy("CONT_ID").agg(func.sum("ACAUREQ_AUREQ_TX_DT_TTLAMT"), \
func.count("ACAUREQ_AUREQ_TX_DT_TTLAMT")/365, \
func.count("ACAUREQ_AUREQ_TX_DT_TTLAMT"), \
func.sum("ACAUREQ_AUREQ_TX_DT_TTLAMT")/func.count("ACAUREQ_AUREQ_TX_DT_TTLAMT"))

schema = StructType([ \
    StructField("CONT_ID", IntegerType()), \
    StructField("total_txn_amount", DoubleType()), \
    StructField("avg_daily_txns", DoubleType()), \
    StructField("total_txns", IntegerType()), \
    StructField("avg_txn_amount", DoubleType()) \
    ])
    
calcTrans_df = calcTrans_df.toDF('CONT_ID', 'total_txn_amount', 'avg_daily_txns', 'total_txns', 'avg_txn_amount')
calcTrans_df.show(10)

In [None]:
client_df = clientInfo_df.join(calcTrans_df, calcTrans_df.CONT_ID == clientInfo_df.customer_id, "inner")
client_df.show(10)

## Predictive Analyses

We now start to do some predictive analyses on the data to evaluate cutomer activity level vs. churn. We use a supervised learning algorithm, logistic regression, to train the model. Logistic regression is a common, fast, highly scalable, classification model that doesn't require much tuning and is easy to regularize. The model outputs a set of probabilities which can be more useful than class labels. Here, we will use PySpark Machine Learning library to create our model.

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import RFormula
from pyspark.sql.functions import udf

input_df = client_df
formula = RFormula().setFormula("churn ~ activity_level").setFeaturesCol("features").setLabelCol("label")
train_df = formula.fit(input_df).transform(input_df).select("label", "features")

model =  LogisticRegression().setThreshold(0.5)
result_df = model.fit(train_df).transform(train_df)

result_df.show()

Now, we plot the S-curve for cutomer activity level vs. churn. Matplotlib and Seaborn are two common plotting libraries used in Python. These plotting libraries are useful in creating custom visualizations to help gain insights from our data.

In [None]:
%matplotlib inline

import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
sns.set(style="whitegrid")

plot_list = [(e['features'].item(0),1-e['probability'].item(0)) for e in result_df.collect()]
X = [plot_pair[0] for plot_pair in plot_list]
Y = [plot_pair[1] for plot_pair in plot_list]

plt.figure()
plt.xlabel("Activity Level", fontsize=16)
plt.ylabel("Probability of Churn", fontsize=16)
sns.regplot(x=np.array(X),y=np.array(Y), order=2)
plt.show()