# Dataset Overview

In [1]:
import pandas as pd
#df = pd.read_csv('public.csv')

# Use Pyspark to view dataset 

In [2]:
# These part is  for windows version, if you use ubuntu, remember to edit import pyspark part
# ----
import findspark

findspark.init('/home/austin/spark-2.3.3-bin-hadoop2.7')
findspark.find()
import pyspark
findspark.find()
# ----
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Churn_Modelling").getOrCreate()
df = spark.read.csv('public.csv',header=True,inferSchema=True)
df.printSchema()

root
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



# Do your work here

In [3]:
cols = df.columns
df.groupby("Exited").count().show()
#df.groupby("Exited").mean().show()
df_one = df.filter(df['Exited'] == 1)
for i in range(2):
    df = df.union(df_one)
df.groupby("Exited").count().show()

+------+-----+
|Exited|count|
+------+-----+
|     1| 1644|
|     0| 6356|
+------+-----+

+------+-----+
|Exited|count|
+------+-----+
|     1| 4932|
|     0| 6356|
+------+-----+



In [4]:
from pyspark.ml.feature import Normalizer

# Normalize each Vector using $L^1$ norm.
#normalizer = Normalizer(inputCol="CreditScore", outputCol="normCreditScore", p=2.0)
#l1NormData.show('CreditScore')
# Normalize each Vector using $L^\infty$ norm.
#lInfNormData = normalizer.transform(df, {normalizer.p: float("inf")})
#print(l1NormData)
def normolize(df):
    balance_max = df.select("Balance").describe().collect()[4].asDict()["Balance"]
    credit_max = df.select("CreditScore").describe().collect()[4].asDict()["CreditScore"]
    credit_min = df.select("CreditScore").describe().collect()[3].asDict()["CreditScore"]
    salary_max = df.select("EstimatedSalary").describe().collect()[4].asDict()["EstimatedSalary"]
    salary_min = df.select("EstimatedSalary").describe().collect()[3].asDict()["EstimatedSalary"]
    df = df.withColumn("Nor_Balance",df["Balance"] / balance_max)
    df = df.withColumn("Nor_CreditScore",(df["CreditScore"] - credit_min) / credit_max)
    df = df.withColumn("Nor_EstimatedSalary",(df["EstimatedSalary"] - salary_min) / salary_max)
    return df
df= normolize(df)
#df.select("Nor_Balance","Nor_CreditScore","Nor_EstimatedSalary").show()

In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
categoricalColumns = ["Geography", "Gender"]
stages = []
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoder(inputCol=stringIndexer.getOutputCol(), outputCol=categoricalCol + "classVec", dropLast = False)
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

#m = stages[2].fit(df)
#dft = m.transform(df)
#stages[3].transform(dft)["GenderclassVec","GenderIndex"].show()

In [6]:
numericCols = ["Age","Tenure","Nor_CreditScore","Nor_Balance", "NumOfProducts", "HasCrCard", "IsActiveMember","Nor_EstimatedSalary"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [7]:
from pyspark.ml.classification import LogisticRegression
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(df)
preppedDataDF = pipelineModel.transform(df)

#display(lrModel, preppedDataDF, "ROC")
# Keep relevant columns
selectedcols = ["features"] + cols
dataset = preppedDataDF.select(selectedcols)
display(dataset)

(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

DataFrame[features: vector, CustomerId: int, Surname: string, CreditScore: int, Geography: string, Gender: string, Age: int, Tenure: int, Balance: double, NumOfProducts: int, HasCrCard: int, IsActiveMember: int, EstimatedSalary: double, Exited: int]

7952
3336


In [8]:
# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="Exited", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)
# View model's predictions and probabilities of each prediction class
# You can select any columns in the above schema to view as well. For example's sake we will choose age & occupation
selected = predictions.select("Exited", "prediction", "probability", "EstimatedSalary", "AGE")
#selected.show(100)

from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(labelCol="Exited",rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)
#trainingSummary = lrModel.summary

#testData.groupby("Exited").count().show()
#predictions.groupby("prediction").count().show()

0.7738780186182422

In [9]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="Exited", featuresCol="features")

# Train model with Training Data
rfModel = rf.fit(trainingData)
# Make predictions on test data using the Transformer.transform() method.
predictions = rfModel.transform(testData)
#predictions.printSchema()
# View model's predictions and probabilities of each prediction class
#selected = predictions.select("Exited", "prediction", "probability")
#display(selected)
#We will evaluate our Random Forest model with BinaryClassificationEvaluator.

from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(labelCol="Exited")
evaluator.evaluate(predictions)

0.8617877415586483

In [10]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(trainingData)
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)
# View Best model's predictions and probabilities of each prediction class
#selected = predictions.select("label", "prediction", "probability", "age", "occupation")
#display(selected)

bestModel = cvModel.bestModel
# Generate predictions for entire dataset
finalPredictions = bestModel.transform(dataset)
# Evaluate best model
evaluator.evaluate(finalPredictions)

#trainingSummary = lrModel.summary

0.8813351351827239

In [11]:
#pr = trainingSummary.pr.toPandas()
'''
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
'''
from sklearn import metrics
import numpy as np
data_array =  np.array(dataset.select('Exited').collect())
prediction_array = np.array(finalPredictions.select('prediction').collect())
print(metrics.f1_score(data_array,prediction_array))

data_array =  np.array(testData.select('Exited').collect())
prediction_array = np.array(predictions.select('prediction').collect())
metrics.f1_score(data_array,prediction_array) 

0.7569270889927197


0.7601744186046513

# Evaluation Part

## Load private dataset, the same structure as public dataset

In [12]:
df_private = spark.read.csv('public.csv',header=True,inferSchema=True)  # TA takes public dataset as example

## Do prediction with your PySpark model here

In [13]:
df_private = normolize(df_private)
pipelineModel = partialPipeline.fit(df_private)
preppedData_private = pipelineModel.transform(df_private)
selectedcols = ["features"] + cols
dataset_private = preppedData_private.select(selectedcols)
finalPredictions = bestModel.transform(dataset_private)
finalPredictions = finalPredictions.drop('Exited')
finalPredictions = finalPredictions.withColumnRenamed('prediction','Exited')

## Print Your result as the following type

In [14]:
finalPredictions.select('CustomerId','Exited').show(5)

+----------+------+
|CustomerId|Exited|
+----------+------+
|  15565701|   0.0|
|  15565706|   0.0|
|  15565796|   1.0|
|  15565806|   0.0|
|  15565878|   0.0|
+----------+------+
only showing top 5 rows



## TA will use the following function to get your prediction result (f-1 score)

In [15]:
from sklearn import metrics
import numpy as np
data_array =  np.array(df_private.select('Exited').collect())
prediction_array = np.array(finalPredictions.select('Exited').collect())
metrics.f1_score(data_array,prediction_array)  

0.6462566100751462