<a href="https://colab.research.google.com/github/vinitanjaya/customer-churn-prediction/blob/main/churn_prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [None]:
# initialize SparkSession
spark = SparkSession.builder.getOrCreate()

In [None]:
# load data
df_train = spark.read.option("inferschema", "true").csv("/content/drive/MyDrive/Churn_Prediction/dataset/Telco_Customer_Churn_Train.csv", header=True)
df_test= spark.read.option("inferschema", "true").csv("/content/drive/MyDrive/Churn_Prediction/dataset/Telco_Customer_Churn_Test.csv", header=True)

df_train.show(3)
df_test.show(3)

df_train.dtypes

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+----------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|   PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+----------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|          No|No phone service|            DSL|            No|         Yes|              No|         No|         No|    

[('customerID', 'string'),
 ('gender', 'string'),
 ('SeniorCitizen', 'int'),
 ('Partner', 'string'),
 ('Dependents', 'string'),
 ('tenure', 'int'),
 ('PhoneService', 'string'),
 ('MultipleLines', 'string'),
 ('InternetService', 'string'),
 ('OnlineSecurity', 'string'),
 ('OnlineBackup', 'string'),
 ('DeviceProtection', 'string'),
 ('TechSupport', 'string'),
 ('StreamingTV', 'string'),
 ('StreamingMovies', 'string'),
 ('Contract', 'string'),
 ('PaperlessBilling', 'string'),
 ('PaymentMethod', 'string'),
 ('MonthlyCharges', 'double'),
 ('TotalCharges', 'string'),
 ('Churn', 'string')]

In [None]:
# data preprocessing
print(f'df_train: {df_train.count()}') # before drop na

df_train = df_train.na.drop()
df_test = df_test.na.drop()

print(f'df_train: {df_train.count()}') # after drop na

df_train = df_train.drop('customerID')
df_test = df_test.drop('customerID')

df_train: 4929
df_train: 4929


In [None]:
# manual mapping for categorical variables to numerical values
mappings = {
    "gender": {"Male": 1, "Female": 0},
    "Partner": {"Yes": 1, "No": 0},
    "Dependents": {"Yes": 1, "No": 0},
    "PhoneService": {"Yes": 1, "No": 0},
    "MultipleLines": {"Yes": 1, "No": 0, "No phone service": 2},
    "InternetService": {"DSL": 0, "Fiber optic": 1, "No": 2},
    "OnlineSecurity": {"Yes": 1, "No": 0, "No internet service": 2},
    "OnlineBackup": {"Yes": 1, "No": 0, "No internet service": 2},
    "DeviceProtection": {"Yes": 1, "No": 0, "No internet service": 2},
    "TechSupport": {"Yes": 1, "No": 0, "No internet service": 2},
    "StreamingTV": {"Yes": 1, "No": 0, "No internet service": 2},
    "StreamingMovies": {"Yes": 1, "No": 0, "No internet service": 2},
    "Contract": {"Month-to-month": 0, "One year": 1, "Two year": 2},
    "PaperlessBilling": {"Yes": 1, "No": 0},
    "PaymentMethod": {
        "Electronic check": 0,
        "Mailed check": 1,
        "Bank transfer (automatic)": 2,
        "Credit card (automatic)": 3,
    },
    "Churn": {"Yes": 1, "No": 0}
}

from pyspark.sql.functions import col

# convert TotalCharges column to float
df_train = df_train.withColumn("TotalCharges", col("TotalCharges").cast("float"))
df_test  = df_test.withColumn("TotalCharges", col("TotalCharges").cast("float"))

df_train.select("TotalCharges").printSchema()
df_test.select("TotalCharges").printSchema()

def apply_mapping(df, mappings):
    new_df = df
    for col_name, mapping in mappings.items():
        expr = None
        for k, v in mapping.items():
            condition = (df[col_name] == k)
            if expr is None:
                expr = when(condition, v)
            else:
                expr = expr.when(condition, v)
        expr = expr.otherwise(0)
        new_df = new_df.withColumn(col_name, expr)
    return new_df

# apply the transformation to training and test datasets
df_train_mapped = apply_mapping(df_train, mappings)
df_test_mapped = apply_mapping(df_test, mappings)

df_train_mapped.show(5)
df_test_mapped.show(5)


root
 |-- TotalCharges: float (nullable = true)

root
 |-- TotalCharges: float (nullable = true)

+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+
|     0|            0|      1|         0|     1|           0|            2|              0|             0|           1|               0|          0| 

In [None]:
df_train_mapped.printSchema()


root
 |-- gender: integer (nullable = false)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: integer (nullable = false)
 |-- Dependents: integer (nullable = false)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: integer (nullable = false)
 |-- MultipleLines: integer (nullable = false)
 |-- InternetService: integer (nullable = false)
 |-- OnlineSecurity: integer (nullable = false)
 |-- OnlineBackup: integer (nullable = false)
 |-- DeviceProtection: integer (nullable = false)
 |-- TechSupport: integer (nullable = false)
 |-- StreamingTV: integer (nullable = false)
 |-- StreamingMovies: integer (nullable = false)
 |-- Contract: integer (nullable = false)
 |-- PaperlessBilling: integer (nullable = false)
 |-- PaymentMethod: integer (nullable = false)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: float (nullable = true)
 |-- Churn: integer (nullable = false)



In [None]:
# remove null values
df_train_mapped = df_train_mapped.dropna()
df_test_mapped  = df_test_mapped.dropna()


In [None]:
# normalization
from pyspark.ml.feature import VectorAssembler, StandardScaler

cols = df_train_mapped.columns
cols.remove("Churn")

vec_assembler = VectorAssembler(inputCols=cols, outputCol="Feature")
df_train_mapped = vec_assembler.transform(df_train_mapped)
df_test_mapped = vec_assembler.transform(df_test_mapped)

scaler = StandardScaler(inputCol="Feature", outputCol="Scaled_Feature")
scaler_model = scaler.fit(df_train_mapped)

df_train_mapped = scaler_model.transform(df_train_mapped)
df_test_mapped = scaler_model.transform(df_test_mapped)

df_train_mapped.show(5, False)

+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+-----+----------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|Churn|Feature                                                                           |Scaled_Feature                                                                                                  

In [None]:
# generate model
from pyspark.ml.classification import LogisticRegression

model = LogisticRegression(featuresCol="Scaled_Feature", labelCol="Churn", maxIter=10).fit(df_train_mapped)

In [None]:
prediction = model.transform(df_test_mapped)
prediction.select("Feature", "Churn", "prediction").show(30, False)

+-------------------------------------------------------------------------------------------------+-----+----------+
|Feature                                                                                          |Churn|prediction|
+-------------------------------------------------------------------------------------------------+-----+----------+
|[1.0,1.0,1.0,0.0,63.0,1.0,0.0,0.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,84.0,5329.5498046875]      |0    |0.0       |
|(19,[4,5,15,16,17,18],[1.0,1.0,1.0,3.0,44.55,44.54999923706055])                                 |0    |0.0       |
|[1.0,0.0,1.0,0.0,15.0,1.0,1.0,1.0,0.0,0.0,1.0,1.0,1.0,1.0,0.0,1.0,0.0,103.45,1539.800048828125]  |0    |1.0       |
|[1.0,0.0,1.0,1.0,27.0,1.0,0.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,1.0,3.0,80.65,2209.75]             |0    |0.0       |
|[0.0,0.0,0.0,1.0,4.0,0.0,2.0,0.0,1.0,1.0,1.0,0.0,1.0,1.0,0.0,1.0,1.0,57.2,223.75]                |0    |0.0       |
|[0.0,0.0,1.0,0.0,72.0,1.0,1.0,1.0,1.0,1.0,1.0,0.0,1.0,1.0,2.0,1

In [None]:
# model testing and evalutaion
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol="Churn")
accuracy = evaluator.evaluate(prediction) * 100
print(f"Accuracy: {accuracy}")

Accuracy: 83.53228676321568
