In [1]:
!apt-get -y install openjdk-8-jre-headless
!pip install pyspark

Reading package lists... Done
Building dependency tree       
Reading state information... Done
openjdk-8-jre-headless is already the newest version (8u292-b10-0ubuntu1~18.04).
0 upgraded, 0 newly installed, 0 to remove and 39 not upgraded.


# Do your work here

In [2]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Churn_Modeling").getOrCreate()
df = spark.read.csv('public.csv',header=True,inferSchema=True)
df.printSchema()
### make features/label
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
categoricalColumns = ['Geography', 'Gender','NumOfProducts', 'HasCrCard','IsActiveMember']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

label_stringIdx = StringIndexer(inputCol = 'Exited', outputCol = 'label').setHandleInvalid("keep")
stages += [label_stringIdx]
numericCols = ['CreditScore', 'Age', 'Tenure', 'Balance','EstimatedSalary']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features").setHandleInvalid("keep")
stages += [assembler]

### pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df2 = pipelineModel.transform(df)
selectedCols = ['label', 'features','CustomerID','Exited'] + categoricalColumns + numericCols
df2 = df2.select(selectedCols)
df2.printSchema()

### split training & test dataset
train, test = df2.randomSplit([0.7, 0.3], seed = 2021)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

### Gradient-Boosted Tree Classifier
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=64)
model = gbt.fit(train)

### Make prediction
predictions = model.transform(test)
predictions.printSchema()
predictions.select('CustomerId','Exited','prediction').show(10)

### Test
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

### F1-score
from sklearn import metrics
import numpy as np
data_array =  np.array(test.select('Exited').collect())
data_pred = np.array(predictions.select('prediction').collect())
f1_score = metrics.f1_score(data_array,data_pred, average='micro') 
print('F1 score:',f1_score)

root
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Exited: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Age: integer (nullable = t

# Evaluation Part

## Load private dataset, the same structure as public dataset

In [3]:
df_private = spark.read.csv('private.csv',header=True,inferSchema=True)  # TA takes public dataset as example
df_private.printSchema()

root
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



## Do prediction with your PySpark model here

In [4]:
### pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df_private)
df_private2 = pipelineModel.transform(df_private)
selectedCols = ['label', 'features','CustomerID','Exited'] + categoricalColumns + numericCols
df_private2 = df_private2.select(selectedCols)
df_private2.printSchema()


### Make prediction
predictions = model.transform(df_private2)
predictions.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Exited: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- EstimatedSalary: double (nullable = true)

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Exited: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- CreditScore: integer (nullable 

## Print Your result as the following type

In [5]:
predictions.select('CustomerId','prediction').show(5)

+----------+----------+
|CustomerId|prediction|
+----------+----------+
|  15565714|       0.0|
|  15565779|       0.0|
|  15565891|       0.0|
|  15566156|       0.0|
|  15566211|       0.0|
+----------+----------+
only showing top 5 rows



## TA will use the following function to get your prediction result (f-1 score)

In [6]:
from sklearn import metrics
import numpy as np
data_array =  np.array(df_private.select('Exited').collect())
data_pred = np.array(predictions.select('prediction').collect())
metrics.f1_score(data_array,data_pred, average='micro')  

0.8575