# Dataset Overview

In [1]:
import pandas as pd
df = pd.read_csv('public.csv')
df

Unnamed: 0,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited
0,15565701,Ferri,698,Spain,Female,39,9,161993.89,1,0,0,90212.38,0
1,15565706,Akobundu,612,Spain,Male,35,1,0.00,1,1,1,83256.26,1
2,15565796,Docherty,745,Germany,Male,48,10,96048.55,1,1,0,74510.65,0
3,15565806,Toosey,532,France,Male,38,9,0.00,2,0,0,30583.95,0
4,15565878,Bates,631,Spain,Male,29,3,0.00,2,1,1,197963.46,0
5,15565879,Riley,845,France,Female,28,9,0.00,2,1,1,56185.98,0
6,15565996,Arnold,653,France,Male,44,8,0.00,2,1,1,154639.72,0
7,15566030,Tu,497,Germany,Male,41,5,80542.81,1,0,0,88729.22,1
8,15566091,Thomsen,545,Spain,Female,32,4,0.00,1,1,0,94739.20,0
9,15566111,Estes,596,France,Male,39,9,0.00,1,1,0,48963.59,0


# Use Pyspark to view dataset 

In [2]:
import findspark
findspark.init('/home/austin/spark-2.1.0-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Churn_Modelling").getOrCreate()
df = spark.read.csv('public.csv', header=True, inferSchema=True)
df.printSchema()

root
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



# Do your work here

In [3]:
# preprocessing
# drop ID, name
drop_col = ['CustomerId', 'Surname', 'Geography']
df = df.select([col for col in df.columns if col not in drop_col])
df_label_pd = df.groupby('Exited').count().toPandas()
ratio = df_label_pd['count'].iloc[0]/df_label_pd['count'].iloc[1]
print(df_label_pd)
print(ratio)

# add weight balance
from pyspark.sql.functions import when, col
def weight_balance(labels):
    return when(labels == 0, ratio).otherwise(1*(1-ratio))

df = df.withColumn('weights', weight_balance(col('Exited')))

   Exited  count
0       1   1644
1       0   6356
0.25865324103209564


In [4]:
# feature engineering
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline


cat_cols = [item[0] for item in df.dtypes if item[1].startswith('string')]
num_cols = [item[0] for item in df.dtypes if item[1].startswith('int') | item[1].startswith('double')][1:]

indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in cat_cols
]
encoders = [
    OneHotEncoder(
        inputCol=indexer.getOutputCol(),
        outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers
]
assemblerInputs = [encoder.getOutputCol() for encoder in encoders] + num_cols
assembler = VectorAssembler(
    inputCols=assemblerInputs,
    outputCol="features"
)


cols = df.columns
pipeline = Pipeline(stages = indexers + encoders + [assembler])
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)

selectedCols = ['features'] + cols
df = df.select(selectedCols)
pd.DataFrame(df.take(5), columns=df.columns)

Unnamed: 0,features,CreditScore,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited,weights
0,"[0.0, 39.0, 9.0, 161993.89, 1.0, 0.0, 0.0, 902...",698,Female,39,9,161993.89,1,0,0,90212.38,0,0.258653
1,"[1.0, 35.0, 1.0, 0.0, 1.0, 1.0, 1.0, 83256.26,...",612,Male,35,1,0.0,1,1,1,83256.26,1,0.741347
2,"[1.0, 48.0, 10.0, 96048.55, 1.0, 1.0, 0.0, 745...",745,Male,48,10,96048.55,1,1,0,74510.65,0,0.258653
3,"[1.0, 38.0, 9.0, 0.0, 2.0, 0.0, 0.0, 30583.95,...",532,Male,38,9,0.0,2,0,0,30583.95,0,0.258653
4,"[1.0, 29.0, 3.0, 0.0, 2.0, 1.0, 1.0, 197963.46...",631,Male,29,3,0.0,2,1,1,197963.46,0,0.258653


In [5]:
# Logistic Regression
train, test = df.randomSplit([0.80, 0.20], seed=42)

# first we check how LogisticRegression perform 
from pyspark.ml.classification import LogisticRegression

LR = LogisticRegression(featuresCol = 'features', labelCol = 'Exited', maxIter=15)
LR_model = LR.fit(train)

from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions_LR = LR_model.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol="Exited")
print("Test_SET (Area Under ROC): " + str(evaluator.evaluate(predictions_LR, {evaluator.metricName: "areaUnderROC"})))

Test_SET (Area Under ROC): 1.0


In [6]:
# Gradient Boosting Trees
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

gbt = GBTClassifier(featuresCol = 'features', labelCol = 'Exited', maxIter=15)
GBT_Model = gbt.fit(train)
gbt_predictions = GBT_Model.transform(test)
evaluator = MulticlassClassificationEvaluator(
    labelCol="Exited", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(gbt_predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0


# Evaluation Part

## Load private dataset, the same structure as public dataset

In [7]:
df_private = spark.read.csv('public.csv',header=True,inferSchema=True)  # TA takes public dataset as example

## Do prediction with your PySpark model here

In [8]:
drop_col = ['CustomerId', 'Surname', 'Geography']
df_eval = df_private.select([col for col in df_private.columns if col not in drop_col])
df_label_pd = df_eval.groupby('Exited').count().toPandas()
ratio = df_label_pd['count'].iloc[0]/df_label_pd['count'].iloc[1]

# add weight balance
from pyspark.sql.functions import when, col
def weight_balance(labels):
    return when(labels == 0, ratio).otherwise(1*(1-ratio))

df_eval = df_eval.withColumn('weights', weight_balance(col('Exited')))

cat_cols = [item[0] for item in df_eval.dtypes if item[1].startswith('string')]
num_cols = [item[0] for item in df_eval.dtypes if item[1].startswith('int') | item[1].startswith('double')][1:]

indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in cat_cols
]
encoders = [
    OneHotEncoder(
        inputCol=indexer.getOutputCol(),
        outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers
]
assemblerInputs = [encoder.getOutputCol() for encoder in encoders] + num_cols
assembler = VectorAssembler(
    inputCols=assemblerInputs,
    outputCol="features"
)


cols = df_eval.columns
pipeline = Pipeline(stages = indexers + encoders + [assembler])
pipelineModel = pipeline.fit(df_eval)
df_eval = pipelineModel.transform(df_eval)

selectedCols = ['features'] + cols
df_eval = df_eval.select(selectedCols)
gbt_predictions = GBT_Model.transform(df_eval)

In [9]:
df_private = df_private.withColumn('Exited', gbt_predictions['Exited'])

## Print Your result as the following type

In [10]:
df_private.select('CustomerId','Exited').show(5)

+----------+------+
|CustomerId|Exited|
+----------+------+
|  15565701|     0|
|  15565706|     1|
|  15565796|     0|
|  15565806|     0|
|  15565878|     0|
+----------+------+
only showing top 5 rows



## TA will use the following function to get your prediction result (f-1 score)

In [11]:
from sklearn import metrics
import numpy as np
data_array =  np.array(df_private.select('Exited').collect())

metrics.f1_score(data_array,data_array)

1.0