# CUSTOMER CHURN PREDICTION

#####Churn prediction is one of the most popular Big Data use cases in business. It consists of detecting customers who are likely to cancel a subscription to a service. Although originally a telco giant thing, this concerns businesses of all sizes, including startups.

![DLR](https://raw.githubusercontent.com/Krishnakali/Big-Data-Project/master/Churn.png)

The dataset contains 3,333 observations
We will use this information to predict if a customer is likely to discontinue using the given telecom service. The dataset is rather clean, and consists of both numeric and categorical variables.

**Attribute Information:**
* **state:** State Name
* **account** length: Account length
* **area code:** Area Code
* **phone number:** Phone number
* **international plan:** Whether they have international plan
* **voice mail plan:** Whether they have voice mail plan
* **number vmail messages:** No. of voice mail messages
* **total day minutes:** numeric.
* **total day calls:** numeric.
* **total day charge:** numeric.
* **total eve minutes:** numeric.
* **total eve calls:** numeric.
* **total eve charge:** numeric.
* **total night minutes:** numeric.
* **total night calls:** numeric.
* **total night charge:** numeric.
* **total intl minutes:** numeric.
* **total intl calls:** numeric.
* **total intl charge:** numeric.
* **number customer service calls:** numeric.

**Target/Label:** Churn: True/False

#Load the data
We load data from a file from GitHub in to a Spark DataFrame. Each row is an observed customer, and each column contains attributes of that customer.

In [3]:
%sh
mkdir -p telco
curl 'https://raw.githubusercontent.com/Krishnakali/Big-Data-Project/master/bigml_59c28831336c6604c800002a1.csv' > telco/predict_churn.csv
ls /databricks/driver/telco

In [4]:
%fs 
ls file:/databricks/driver/telco

In [5]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
 
sqlContext = SQLContext(sc)
schema = StructType([ \
    StructField("state", StringType(), True), \
    StructField("account length", DoubleType(), True), \
    StructField("area code", StringType(), True), \
    StructField("phone number", StringType(), True), \
    StructField("international plan", StringType(), True), \
    StructField("voice mail plan", StringType(), True), \
    StructField("number vmail messages", DoubleType(), True), \
    StructField("total day minutes", DoubleType(), True), \
    StructField("total day calls", DoubleType(), True), \
    StructField("total day charge", DoubleType(), True), \
    StructField("total eve minutes", DoubleType(), True), \
    StructField("total eve calls", DoubleType(), True), \
    StructField("total eve charge", DoubleType(), True), \
    StructField("total night minutes", DoubleType(), True), \
    StructField("total night calls", DoubleType(), True), \
    StructField("total night charge", DoubleType(), True), \
    StructField("total intl minutes", DoubleType(), True), \
    StructField("total intl calls", DoubleType(), True), \
    StructField("total intl charge", DoubleType(), True), \
    StructField("number customer service calls", DoubleType(), True), \
    StructField("churn", StringType(), True)])
df_data = spark.read.csv(path="file:/databricks/driver/telco/predict_churn.csv",header='true', schema=schema)

sqlContext.read \
    .format('com.databricks.spark.csv') \
    .load('file:/databricks/driver/telco/predict_churn.csv', schema = schema)
display(df_data)

# Basic DataFrame operations
Dataframes essentially allow us to express sql-like statements.

In [7]:
df_data.count()

In [8]:
df_data.select('churn').distinct().show()

# Feature Visualization

In [10]:
df_data.printSchema()
cols = df_data.columns

![DLR](https://raw.githubusercontent.com/Krishnakali/Big-Data-Project/master/Capture.PNG)





The type of visualization we do depends on the data type, so lets define what columns have different properties first:

In [12]:
numeric_cols = ["account_length", "number_vmail_messages", "total_day_minutes",
                "total_day_calls", "total_day_charge", "total_eve_minutes",
                "total_eve_calls", "total_eve_charge", "total_night_minutes",
                "total_night_calls", "total_intl_minutes", "total_intl_calls",
                "total_intl_charge"]

categorical_cols = ["state", "international_plan", "voice_mail_plan", "area_code"]

###Data visualization with SparkR using library ggplot2 and ggally

####Create Dataframe with SparkR

In [15]:
%r
library(SparkR)
churn <- read.df("file:/databricks/driver/telco/predict_churn.csv",source = "csv", header="true", inferSchema = "true")

####Create Another Dataframe

In [17]:
%r
df <- collect(select(churn, churn$state, churn$account_length, churn$area_code, churn$phone_number, churn$international_plan, churn$voice_mail_plan, churn$number_vmail_messages, churn$total_day_minutes, churn$total_day_calls, churn$total_day_charge, churn$total_eve_minutes, churn$total_eve_calls, churn$total_eve_charge, churn$total_night_minutes, churn$total_night_calls, churn$total_night_charge, churn$total_intl_minutes, churn$total_intl_calls, churn$total_intl_charge, churn$customer_service_calls, churn$churn))

####Display Dataframe

In [19]:
%r
display(df)

####Create Histogram
* Histogram for the column **total_night_minutes**
* We see that maximum number of night minutes is above 200.

In [21]:
%r
hist(df$total_night_minutes,
xlab = "total_night_minutes",
main = "")

####Create Histogram with Density Line
* Histogram for the column **total_night_minutes**
* We see that maximum number of voice mail messages is above 200.

In [23]:
%r
hist(df$total_night_minutes, prob = TRUE, main = "")
lines(density(df$total_night_minutes))



####Create Histogram with Color

In [25]:
%r
hist(df$total_night_minutes, xlab = "total_night_minutes",
main = "", col = "lightblue", breaks = 15)
lines(density(df$total_night_minutes))

####Create Boxplot using Two Variables
* Boxplot for column **customer_service_calls** and **churn**
* Average number of customer service calls of customer likely to retain is 1
* Average number of customer service calls of customer likely to churn is 2

In [27]:
%r
boxplot(customer_service_calls~churn, data = df,  varwidth=TRUE, col = "lightyellow")

#### Create ggpairs using library GGally
##### Produces a matrix of scatter plots for visualizing the correlation between total_intl_minutes, total_intl_calls, total_intl_charge.

In [29]:
%r
require(GGally)
data = df[, c(17,18,19)]
ggpairs(data=data, # data.frame with variables
        title="International calls data",
        upper = list(continuous = "density"),
        lower = list(combo = "facetdensity")) # 

#### Create HeatMap

In [31]:
%r
data_matrix <- df[, 5:19]
heat_map <- heatmap(data.matrix(data_matrix), Rowv=NA, Colv=NA, col= cm.colors(256),scale="column", margins=c(25,10))

# Process Data
For analysis we need to convert the categorical variables in the dataset into numeric variables. There are 2 ways we can do this.

* **Category Indexing.**
This is basically assigning a numeric value to each category from {0, 1, 2, ...numCategories-1}. This introduces an implicit ordering among your categories, and is more suitable for ordinal variables (eg: Poor: 0, Average: 1, Good: 2)

* **One-Hot Encoding.**
This converts categories into binary vectors with at most one nonzero value (eg: (Blue: [1, 0]), (Green: [0, 1]), (Red: [0, 0]))

Here, we will use a combination of StringIndexer and OneHotEncoder to convert the categorical variables. The OneHotEncoder will return a SparseVector.

Since we will have more than 1 stages of feature transformations, we use a Pipeline to tie the stages together. This simplifies our code.

In [33]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumns = ["state", "area code", "phone number", "international plan", "voice mail plan"]
stages = [] 
for categoricalCol in categoricalColumns:

  stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")

  encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")

  stages += [stringIndexer, encoder]

In [34]:
label_stringIdx = StringIndexer(inputCol = "churn", outputCol = "label")
stages += [label_stringIdx]

In [35]:
numericCols = ["account length", "number vmail messages", "total day calls",
                        "total day charge", "total eve calls", "total eve charge",
                        "total night calls", "total intl calls", "total intl charge"]
assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

# Model Training
We can now define our classifier and pipeline. With this done, we can split our labeled data in train and test sets and fit a model.

In [37]:
pipeline = Pipeline(stages=stages)

my_Model = pipeline.fit(df_data)
df = my_Model.transform(df_data)

selectedcols = ["label", "features"] + cols
df = df.select(selectedcols)
display(df)

In [38]:
splitted_data = df.randomSplit([0.8, 0.2], 24)# proportions [], seed for random
train_data = splitted_data[0]
test_data = splitted_data[1]

print "Number of training records: " + str(train_data.count())
print "Number of testing records : " + str(test_data.count())


# Logistic Regression
In the Pipelines API, we are now able to perform Elastic-Net Regularization with Logistic Regression, as well as other linear methods.

In [40]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(train_data)

Make predictions on test data using the transform() method.

In [42]:
predictions = lrModel.transform(test_data)

Print the schema of the predictions table

In [44]:
predictions.printSchema()

We can make use of the BinaryClassificationEvaluator method to evaluate our model. The Evaluator expects two input columns: (rawPrediction, label).

In [46]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

Create ParamGrid for Cross Validation

In [48]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

#####Create 5-fold CrossValidator and run cross validation

In [50]:
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = cv.fit(train_data)

Use test set here so we can measure the accuracy of our model on new data

In [52]:
predictions = cvModel.transform(test_data)

Evaluate the best model

In [54]:
evaluator.evaluate(predictions)

We can also access the model’s feature weights and intercepts easily

In [56]:
print 'Model Intercept: ', cvModel.bestModel.intercept

Determine the accuracy of the predictions

In [58]:
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Accuracy= %g" % (accuracy))

# Random Forest Classifier
Random Forests uses an ensemble of trees to improve model accuracy.

In [60]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# Train model with Training Data
rfModel = rf.fit(train_data)

Make predictions on test data using the Transformer.transform() method.

In [62]:
predictions = rfModel.transform(test_data)

Print the schema of the predictions table

In [64]:
predictions.printSchema()

We will evaluate our Random Forest model with BinaryClassificationEvaluator.

In [66]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

Create ParamGrid for Cross Validation

In [68]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

Create 5-fold CrossValidator and run cross validation

In [70]:
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = cv.fit(train_data)

Use test set here so we can measure the accuracy of our model on new data

In [72]:
predictions = cvModel.transform(test_data)

Evaluate the best model

In [74]:
evaluator.evaluate(predictions)

# Results
* Accuracy using Logistic Regression = 78.795%
* Accuracy using Random Forest Classifier = 84.068%

In [76]:
truePositive = int(predictions.where("(label = 1 AND prediction = 1)").count())
trueNegative = int(predictions.where("(label = 0 AND prediction = 0)").count())
falsePositive = int(predictions.where("(label = 0 AND prediction = 1)").count())
falseNegative = int(predictions.where("(label = 1 AND prediction = 0)").count())

print [['TP', truePositive], ['TN', trueNegative], ['FP', falsePositive], ['FN', falseNegative]]
resultDF = sqlContext.createDataFrame([['TP', truePositive], ['TN', trueNegative], ['FP', falsePositive], ['FN', falseNegative]], ['metric', 'value'])
display(resultDF)

In [77]:
resultDF.createOrReplaceTempView("LRresult")

In [78]:
%r
library(SparkR)
sparkdf <- sql("FROM LRresult SELECT *")
rdf <- collect(sparkdf)
print( rdf)
vals <- (t(rdf[2]))
labels <- (t(rdf[1]))
# Simple Pie Chart
pie(vals,labels)