# Telecom Customer Churn

## Prerequisite

In [199]:
!pip install pyspark



## 1. Defining the Question

### a) Data Analysis Question

Can we predict whether a customer will leave a telecom operator?

### b) Metric for Success

The model should achieve a minimum accuracy of 0.8

### c) Understanding the context

Customer churn is a significant challenge in the telecom industry. Identifying customers who are likely to churn is crucial for implementing proactive measures to retain them.

In this project, we leverage the distributed computing capabilities of PySpark to develop a machine learning model using PySpark that accurately predicts customer churn in a telecom company. The model should achieve a minimum accuracy of 0.8, enabling the company to proactively identify and retain customers at risk of leaving.

By effectively predicting churn, the company can implement targeted retention strategies, reduce customer attrition, and improve overall business performance.

### d) Experimental Design

1. Data Importation
2. Data Exploration
3. Data Cleaning
4. Data Preparation
5. Data Modeling (Using Decision Trees, Random Forest and Logistic Regression)
6. Model Evaluation
7. Hyparameter Tuning
8. Findings and Recommendations

### e) Data Relevance

The given data set is relevant in answering the research question. The project includes relevant features such as customer demographics, usage patterns, service plans, call details, customer complaints, and churn status.

## 2. Reading the Data

In [200]:
# Importing our libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

from pyspark import SparkFiles

# Disable warnings  when running cells
import sys
import warnings
if not sys.warnoptions:
       warnings.simplefilter("ignore")



In [201]:
# Load the data below

# Create a SparkSession
spark = SparkSession.builder.appName("TelecomChurnPrediction").getOrCreate()

# Dataset url
url = "https://raw.githubusercontent.com/wambasisamuel/DE_Week10_Monday/main/telecom_dataset.csv"

# Download and load the dataset
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("telecom_dataset.csv"), header=True, inferSchema= True)

In [202]:
# Preview the dataset: check first 5 rows of data
df.show(5)

+----------+------+---+--------------+--------------+------------+-----+
|CustomerID|Gender|Age|      Contract|MonthlyCharges|TotalCharges|Churn|
+----------+------+---+--------------+--------------+------------+-----+
|         1|Female| 25|Month-to-Month|          65.7|       156.5|   No|
|         2|  Male| 37|      One Year|          89.0|      2356.8|   No|
|         3|  Male| 52|      Two Year|         115.5|      5408.6|   No|
|         4|Female| 30|Month-to-Month|          75.9|       129.4|  Yes|
|         5|  Male| 45|      One Year|          98.2|      3142.0|   No|
+----------+------+---+--------------+--------------+------------+-----+
only showing top 5 rows



In [203]:
# Check number of rows and columns
# A custom-defined function that returns dataframe shape
import pyspark
def df_shape(dataFrame):
    return (dataFrame.count(), len(dataFrame.columns))

pyspark.sql.dataframe.DataFrame.shape = df_shape

df.shape()

(20, 7)

In [204]:
# Checking datatypes
df.dtypes
# OR df.schema.fields

[('CustomerID', 'int'),
 ('Gender', 'string'),
 ('Age', 'int'),
 ('Contract', 'string'),
 ('MonthlyCharges', 'double'),
 ('TotalCharges', 'double'),
 ('Churn', 'string')]

Observations:

*   The are 20 observations in the dataset.
*   The dataset has 7 features.
*   There are 3 categorical features
*   There are 4 numerical features



## 3. External Data Source Validation

The provided dataset has enough features to help in developing a machine learning model that can predict customer churn from a telecom operator.

## 4. Data Preparation

### Data Standardisation

In [205]:
# Standardise column names
# change column names to lowercase
df = df.toDF(*[col.lower() for col in df.columns])
df.columns

['customerid',
 'gender',
 'age',
 'contract',
 'monthlycharges',
 'totalcharges',
 'churn']

### Data Cleaning

#### Irrelevant Data

In [206]:
# The customerid column has no use in churn prediction
df = df.drop("customerid")

#### Duplicate data

In [207]:
# Find the total duplicate records
"""
groupBy() all the columns and count()
then select the sum of the counts for the rows where the count is greater than 1
"""

import pyspark.sql.functions as f
df.groupBy(df.columns) \
    .count() \
    .where(f.col('count') > 1) \
    .select(f.sum('count')) \
    .show()


+----------+
|sum(count)|
+----------+
|      null|
+----------+



#### Missing Data

In [208]:
# Checking missing entries of all the variables

# Find count for empty, None, Null, Nan with string literals.
from pyspark.sql.functions import col,isnan,when,count
missing_counts = df.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c
                           )).alias(c)
                    for c in df.columns])
missing_counts.show()

+------+---+--------+--------------+------------+-----+
|gender|age|contract|monthlycharges|totalcharges|churn|
+------+---+--------+--------------+------------+-----+
|     0|  0|       0|             0|           0|    0|
+------+---+--------+--------------+------------+-----+



The dataset has no duplicate records nor missing values.

## 5. Modelling

#### Data Preprocessing

Creating new features from the existing dataset

In [209]:
# Calculate new features

import pyspark.sql.functions as f

# customer tenure column
#df = df.withColumn("tenure_months", (col("totalcharges") / col("monthlycharges")).cast("double"))
df = df.withColumn("tenure_months", (f.round(col("totalcharges") / col("monthlycharges"), scale=2)).cast("double"))

Encoding and feature scaling

In [210]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col
from pyspark.ml import Pipeline

# Encode categorical features into numerical form
categorical_cols = ["gender", "contract"]

# Convert string columns to numerical categories
indexers = [
    StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="keep")
    for col in ["gender", "contract", "churn"]
]

indexer_pipeline = Pipeline(stages=indexers)
transformed_df = indexer_pipeline.fit(df).transform(df)

# Cast the "Target" column to IntegerType
#df = df.withColumn("churn", col("churn").cast(IntegerType()))

# Set the feature columns
featureCols = ["gender_index", "contract_index", "monthlycharges", "totalcharges", "tenure_months"]

# Convert feature columns to numeric types
for featureCol in featureCols:
    transformed_df = transformed_df.withColumn(featureCol, col(featureCol).cast("double"))

# Vectorize the feature columns
vecAssembler = VectorAssembler(inputCols=featureCols, outputCol="features")
feature_vector = vecAssembler.transform(transformed_df)

feature_vector.show(5)


+------+---+--------------+--------------+------------+-----+-------------+------------+--------------+-----------+--------------------+
|gender|age|      contract|monthlycharges|totalcharges|churn|tenure_months|gender_index|contract_index|churn_index|            features|
+------+---+--------------+--------------+------------+-----+-------------+------------+--------------+-----------+--------------------+
|Female| 25|Month-to-Month|          65.7|       156.5|   No|         2.38|         1.0|           0.0|        0.0|[1.0,0.0,65.7,156...|
|  Male| 37|      One Year|          89.0|      2356.8|   No|        26.48|         0.0|           1.0|        0.0|[0.0,1.0,89.0,235...|
|  Male| 52|      Two Year|         115.5|      5408.6|   No|        46.83|         0.0|           2.0|        0.0|[0.0,2.0,115.5,54...|
|Female| 30|Month-to-Month|          75.9|       129.4|  Yes|          1.7|         1.0|           0.0|        1.0|[1.0,0.0,75.9,129...|
|  Male| 45|      One Year|          98.2

In [211]:
# Encode categorical features
from pyspark.ml.feature import OneHotEncoder

categorical_cols = ["gender", "contract"]
#numeric_cols = ["age", "monthlycharges", "totalcharges","tenure_months"]
numeric_cols = [col for col, dtype in df.dtypes if dtype != 'string']

stringIndexer = StringIndexer(inputCols=[cols for cols in categorical_cols],
                              outputCols=[cols + "_index" for cols in categorical_cols])

encoder = OneHotEncoder(inputCols=[cols + "_index" for cols in categorical_cols],
                                 outputCols=[cols + "_classVec" for cols in categorical_cols])

stages = []

stages += [stringIndexer, encoder]

label_string_id = StringIndexer(inputCol="churn", outputCol="label")
stages += [label_string_id]

# Feature scaling
assembler_inputs = [cols + "_classVec" for cols in categorical_cols] + numeric_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="unscaled_features")
stages += [assembler]

# Standard scaling
scaler = StandardScaler(inputCol="unscaled_features", outputCol="features")
stages += [scaler]

# Label the target variable (churn): Map 'Y'  to 1 and 'N' to 0
#df = df.withColumn("label", (df["churn"] == "Yes").cast("integer"))


# Run Data Through Pipeline
pipeline = Pipeline().setStages(stages)
pipeline_model = pipeline.fit(df)
pipelined_df = pipeline_model.transform(df)

pipelined_df.dtypes
pipelined_df.show(5)
#df.show(5)

+------+---+--------------+--------------+------------+-----+-------------+------------+--------------+---------------+-----------------+-----+--------------------+--------------------+
|gender|age|      contract|monthlycharges|totalcharges|churn|tenure_months|gender_index|contract_index|gender_classVec|contract_classVec|label|   unscaled_features|            features|
+------+---+--------------+--------------+------------+-----+-------------+------------+--------------+---------------+-----------------+-----+--------------------+--------------------+
|Female| 25|Month-to-Month|          65.7|       156.5|   No|         2.38|         1.0|           0.0|      (1,[],[])|    (2,[0],[1.0])|  0.0|[0.0,1.0,0.0,25.0...|[0.0,1.9895560643...|
|  Male| 37|      One Year|          89.0|      2356.8|   No|        26.48|         0.0|           1.0|  (1,[0],[1.0])|    (2,[1],[1.0])|  0.0|[1.0,0.0,1.0,37.0...|[1.95917937881752...|
|  Male| 52|      Two Year|         115.5|      5408.6|   No|        4

#### Model Selection

#### Splitting the dataset

In [212]:
(train_data, test_data) = feature_vector.randomSplit([0.75, 0.25], seed=1337)
# 25% - testing, 75% - training.

#### Logistic Regression

In [213]:
#from pyspark.ml.regression import LogisticRegression

# Create Logistic Regression model
#lr_regressor = LogisticRegression(labelCol="churn_index", featuresCol="features", predictionCol="predicted_churn")
lr_regressor = LogisticRegression(labelCol="churn_index", featuresCol="features", predictionCol="predicted_churn", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Train the model
model = lr_regressor.fit(train_data)

# Test the model
lr_predictions = model.transform(test_data)

# Evaluate the model
lr_predictions.show(5)

+------+---+--------+--------------+------------+-----+-------------+------------+--------------+-----------+--------------------+--------------------+--------------------+---------------+
|gender|age|contract|monthlycharges|totalcharges|churn|tenure_months|gender_index|contract_index|churn_index|            features|       rawPrediction|         probability|predicted_churn|
+------+---+--------+--------------+------------+-----+-------------+------------+--------------+-----------+--------------------+--------------------+--------------------+---------------+
|Female| 55|Two Year|          99.9|      6541.5|   No|        65.48|         1.0|           2.0|        0.0|[1.0,2.0,99.9,654...|[1.68662779498050...|[0.58698954980392...|            0.0|
|  Male| 41|Two Year|          96.5|      4188.1|   No|         43.4|         0.0|           2.0|        0.0|[0.0,2.0,96.5,418...|[1.68662779498050...|[0.58698954980392...|            0.0|
|  Male| 48|One Year|         101.8|      5149.6|  Yes|

#### Random Forest Classification

In [214]:
from pyspark.ml.classification import RandomForestClassifier

# Create a Random Forest Classifier
rf_classifier = RandomForestClassifier(labelCol="churn_index", featuresCol="features",numTrees=4, predictionCol="predicted_churn")

# Create a Pipeline
pipeline = Pipeline(stages=[rf_classifier])

# Fit the Model
model = pipeline.fit(feature_vector)

# Make predictions
rf_predictions = model.transform(test_data)
rf_predictions.show()

+------+---+--------+--------------+------------+-----+-------------+------------+--------------+-----------+--------------------+-------------+---------------+---------------+
|gender|age|contract|monthlycharges|totalcharges|churn|tenure_months|gender_index|contract_index|churn_index|            features|rawPrediction|    probability|predicted_churn|
+------+---+--------+--------------+------------+-----+-------------+------------+--------------+-----------+--------------------+-------------+---------------+---------------+
|Female| 55|Two Year|          99.9|      6541.5|   No|        65.48|         1.0|           2.0|        0.0|[1.0,2.0,99.9,654...|[2.0,2.0,0.0]|  [0.5,0.5,0.0]|            0.0|
|  Male| 41|Two Year|          96.5|      4188.1|   No|         43.4|         0.0|           2.0|        0.0|[0.0,2.0,96.5,418...|[3.0,1.0,0.0]|[0.75,0.25,0.0]|            0.0|
|  Male| 48|One Year|         101.8|      5149.6|  Yes|        50.59|         0.0|           1.0|        1.0|[0.0,1

#### Model Evaluation

In [215]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Evaluation metrics
evaluator = MulticlassClassificationEvaluator(labelCol="predicted_churn",predictionCol="predicted_churn")


accuracy_lr = evaluator.evaluate(lr_predictions, {evaluator.metricName: "accuracy"})
precision_lr = evaluator.evaluate(lr_predictions, {evaluator.metricName: "weightedPrecision"})
recall_lr = evaluator.evaluate(lr_predictions, {evaluator.metricName: "weightedRecall"})
f1_lr = evaluator.evaluate(lr_predictions, {evaluator.metricName: "f1"})


accuracy_rf = evaluator.evaluate(rf_predictions, {evaluator.metricName: "accuracy"})
precision_rf = evaluator.evaluate(rf_predictions, {evaluator.metricName: "weightedPrecision"})
recall_rf = evaluator.evaluate(rf_predictions, {evaluator.metricName: "weightedRecall"})
f1_rf = evaluator.evaluate(rf_predictions, {evaluator.metricName: "f1"})

print("Logistic Regression accuracy:", accuracy_lr)
print("Logistic Regression Precision:", precision_lr)
print("Logistic Regression recall:", recall_lr)
print("Logistic Regression F1-score:", f1_lr)

print("\nRandom Forest accuracy:", accuracy_rf)
print("Random Forest Precision:", accuracy_rf)
print("Random Forest recall:", accuracy_rf)
print("Random Forest F1-score:", accuracy_rf)

Logistic Regression accuracy: 1.0
Logistic Regression Precision: 1.0
Logistic Regression recall: 1.0
Logistic Regression F1-score: 1.0

Random Forest accuracy: 1.0
Random Forest Precision: 1.0
Random Forest recall: 1.0
Random Forest F1-score: 1.0


## 6. Conclusion

#### Challenges

- The dataset provided limited information, which required careful feature engineering and selection to improve the model's performance.

- churned customers were significantly fewer than the non-churned customers, leading to imbalanced classes.

#### Findings

* The two selected models are good in predicting customer churn



