In [0]:
# PROJECT: predict customer churn
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('logreg_consult').getOrCreate()
df = spark.read.format("csv").load("dbfs:/FileStore/shared_uploads/sejal@ibm.com/customer_churn.csv", inferSchema=True, header=True)

In [0]:
df.printSchema()

In [0]:
df.show()

In [0]:
df.describe().show()

In [0]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Age', 'Total_Purchase', 'Account_Manager', 'Years', 'Num_Sites'], 
                           outputCol='features')
output = assembler.transform(df)
final_data = output.select('features', 'churn')
final_data.show()

In [0]:
train_churn, test_churn = final_data.randomSplit([0.7, 0.3])

In [0]:
from pyspark.ml.classification import LogisticRegression
lr_churn = LogisticRegression(labelCol='churn')
fitted_churn_model = lr_churn.fit(train_churn)
training_sum = fitted_churn_model.summary
training_sum.predictions.describe().show()

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
pred_and_labels = fitted_churn_model.evaluate(test_churn)
pred_and_labels.predictions.show()

In [0]:
churn_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='churn')
auc = churn_eval.evaluate(pred_and_labels.predictions)
auc

In [0]:
# predict on unseen / new data
# fit on all data because we have a new test / heldout set
final_lr_model = lr_churn.fit(final_data)

In [0]:
new_customers = spark.read.csv("dbfs:/FileStore/shared_uploads/sejal@ibm.com/new_customers.csv", inferSchema=True, header=True)

In [0]:
new_customers.printSchema()

In [0]:
test_new_customers = assembler.transform(new_customers)

In [0]:
# confirm trnasformed data has a features column
test_new_customers.printSchema()

In [0]:
final_results = final_lr_model.transform(test_new_customers)
final_results.select('Company', 'prediction').show()

In [0]:
test_new_customers.describe().show()