### Using pyspark to create a logistic regression model to predict whether a customer will churn or not. 

By: Matt Purvis

In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName('Churn').getOrCreate()

In [0]:
# Load in the data using Spark SQL
df = spark.sql('select * from customer_churn_csv')

In [0]:
# Print the schema to get column names and datatypes
df.printSchema()

In [0]:
# Preview the dataset
df.show()

In [0]:
# Get list of column names
df.columns

In [0]:
# Filter the df down to columns we are interested in
my_cols = df.select([
 'Age',
 'Total_Purchase',
 'Years',
 'Num_Sites',
 'Churn'])

In [0]:
# Drop missing values - different strategies for handling missing data. We will drop for now. 
my_final_data = my_cols.na.drop()

In [0]:
# Import VectorAssembler
from pyspark.ml.feature import VectorAssembler

In [0]:
# Create assembler object that will transform the features into a vector - to be used in pipeline later
assembler = VectorAssembler(inputCols = ['Age','Total_Purchase','Years','Num_Sites'],
                           outputCol='features')

In [0]:
# import LogisticRegression class
from pyspark.ml.classification import LogisticRegression

In [0]:
# Import pipeline class
from pyspark.ml import Pipeline

In [0]:
# Create logistic regression model
log_reg_churn = LogisticRegression(featuresCol = 'features', labelCol = 'Churn')

In [0]:
# Create pipeline that will create the feature vector and the logistic regression model
pipeline = Pipeline(stages =  [assembler, log_reg_titanic])

In [0]:
# Split data into train and test - could take this further and create a validation set
train_data, test_data = my_final_data.randomSplit([.7,.3])

In [0]:
# Fit the pipeline to the training data
fit_model = pipeline.fit(train_data)

In [0]:
# Use pipeline to make same transformations on the test data
results = fit_model.transform(test_data)

In [0]:
# Import BinaryClassificationEvaluator to get the ROC
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [0]:
# Create evaluator object
my_eval = BinaryClassificationEvaluator(rawPredictionCol = 'prediction', labelCol = 'Churn')

In [0]:
# Preview the transformed test data
results.select('Churn', 'prediction').show()

In [0]:
# Compare means and std devs of Churn and Predictions
results.select('Churn', 'prediction').describe().show()

In [0]:
# Get the ROC/AUC to evaluate the model
AUC = my_eval.evaluate(results)

In [0]:
# Preview the ROC/AUC
AUC

A perfect model would have a 1.0 ROC/AUC. This model is not terrible but could be better!

### Predict on new data

In [0]:
# Read in unseen data
new_customers = spark.sql('select * from new_customers_csv')

In [0]:
# Preview the data
new_customers.show()

In [0]:
# Make the same transformations using the pipeline on the new data
test_new_customers = fit_model.transform(new_customers)

In [0]:
# Look at predictions
test_new_customers.select(['Names','Age','Total_Purchase','Years','Num_Sites','Company','prediction']).show()

Four of these customers are predicted to churn. We would want to assign account managers to these customers and give them extra care and attention in order to prevent them from churning.