In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("customer_churn").getOrCreate()

In [2]:
data = spark.read.csv('/FileStore/tables/Customer_Churn_Prediction/customer_churn.csv',inferSchema=True,header=True)
data.printSchema()

In [3]:
data.show(5)

In [4]:
# add a column 'days_since_Onboard' until now
from pyspark.sql.functions import datediff, current_date
data = data.withColumn("days_since_Onboard",
                       datediff(current_date(),data['Onboard_date']).alias("days_since_Onboard"))
data.printSchema()

In [5]:
data.show(5)

In [6]:
data.dtypes

In [7]:
cols = []
for dataPoint in data.dtypes:
    if ((dataPoint[1]=='double') or (dataPoint[1]=='int')):
        cols.append(dataPoint[0])

In [8]:
cols

In [9]:
data_model = data.select(cols)
data_model.show(5)

In [10]:
print('Total customers:', data_model.count())
print('Customers with no account manager:', data_model.filter(data_model['Account_Manager']==0).count())
print('Customers with account manager:', data_model.filter(data_model['Account_Manager']==1).count())

In [11]:
from pyspark.sql.functions import corr
data.select(corr("Churn","Account_Manager")).show()

In [12]:
data_model = data_model.filter(data_model['Account_Manager']==0)
print('Customer with no account manager to use in the model:', data_model.count())

data_model.show(5)

In [13]:
data_model = data_model.drop('Account_Manager')
data_model.show(5)

In [14]:
vecCols = data_model.columns
vecCols.remove('Churn')
vecCols

In [15]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=vecCols,outputCol='features')
data_feed = assembler.transform(data_model).select('Churn','features')
data_feed.show(5)

In [16]:
train_data, test_data = data_feed.randomSplit([0.7, 0.3])

In [17]:
from pyspark.ml.classification import LogisticRegression

logReg = LogisticRegression(featuresCol='features',labelCol='Churn')
logReg_trained = logReg.fit(train_data)

In [18]:
logReg_trained.summary.predictions.show(5)


In [19]:
test_results = logReg_trained.transform(test_data)


In [20]:
test_results.show(5)

In [21]:
test_results_2 = logReg_trained.evaluate(test_data)
test_results_2.predictions.show(5)


In [22]:
print(type(logReg_trained.summary))
print(type(test_results))
print(type(test_results_2))

In [23]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
print("Area under ROC:")
bi_eval = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='Churn', metricName='areaUnderROC')
bi_eval.evaluate(test_results_2.predictions)

In [24]:
print("Easier way to get Area under ROC:")
test_results_2.areaUnderROC

In [25]:
print("Accuracy:")
multi_eval = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol='Churn',metricName='accuracy')
multi_eval.evaluate(test_results_2.predictions)

In [26]:
logReg_trained_new = logReg.fit(data_feed)


In [27]:
logReg_trained_new.summary.predictions.show(5)


In [28]:
#Prepare new customer data
new_cus = spark.read.csv('/FileStore/tables/Customer_Churn_Prediction/new_customers.csv',inferSchema=True,header=True)
new_cus.printSchema()


In [29]:
new_cus.show(5)


In [30]:
new_cus = new_cus.withColumn("days_since_Onboard",
                       datediff(current_date(),new_cus['Onboard_date']).alias("days_since_Onboard"))

In [31]:
new_cus.show(5)


In [32]:
from copy import deepcopy
new_cus_cols = deepcopy(cols)
new_cus_cols

In [33]:
new_cus_cols.remove('Churn')
new_cus_cols.remove('Account_Manager')


In [34]:
new_cus_cols

In [35]:
# Let's Add 'Names' column to keep track of which company will churn
new_cus_data = new_cus.select(['Names'] + new_cus_cols)
new_cus_data.show(5)

In [36]:
new_cus_assembler = VectorAssembler(inputCols=new_cus_cols,outputCol='features')
new_cus_feed = new_cus_assembler.transform(new_cus_data).select('Names','features')
new_cus_feed.show()

In [37]:
#Make predictions
new_cus_pred = logReg_trained_new.transform(new_cus_feed)
new_cus_pred.show()

In [38]:
new_cus_pred.select('Names','prediction').show()
