In [1]:
from pyspark.sql import SparkSession

from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

import pyspark.sql.functions as f

from itertools import chain
import pandas as pd
import numpy as np
import seaborn as sn 
import matplotlib.pyplot as plt

from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
DB_USER = 'root'
DB_PASS = 'root'
DB_NAME = 'telco_churn'
HOST = 'localhost:3306'

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
spark

# Import Data

In [5]:
status_df = spark.read.format("jdbc").options(
    url=f"jdbc:mysql://{HOST}/{DB_NAME}",
    driver = "com.mysql.jdbc.Driver",
    dbtable = "status",
    user=DB_USER,
    password=DB_PASS, 
    useSSL=False).load()


In [6]:
status_df.printSchema()

root
 |-- Customer ID: string (nullable = true)
 |-- Count: integer (nullable = true)
 |-- Quarter: string (nullable = true)
 |-- Satisfaction Score: integer (nullable = true)
 |-- Customer Status: string (nullable = true)
 |-- Churn Label: string (nullable = true)
 |-- Churn Value: integer (nullable = true)
 |-- Churn Score: integer (nullable = true)
 |-- CLTV: integer (nullable = true)
 |-- Churn Category: string (nullable = true)
 |-- Churn Reason: string (nullable = true)



In [7]:
demographic_df = spark.read.format("jdbc").options(
    url=f"jdbc:mysql://{HOST}/{DB_NAME}",
    driver = "com.mysql.jdbc.Driver",
    dbtable = "demographics",
    user=DB_USER,
    password=DB_PASS, 
    useSSL=False).load()


In [8]:
demographic_df.printSchema()

root
 |-- Customer ID: string (nullable = true)
 |-- Count: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Under 30: string (nullable = true)
 |-- Senior Citizen: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Number of Dependents: integer (nullable = true)



In [9]:
service_df = spark.read.format("jdbc").options(
    url=f"jdbc:mysql://{HOST}/{DB_NAME}",
    driver = "com.mysql.jdbc.Driver",
    dbtable = "services",
    user=DB_USER,
    password=DB_PASS, 
    useSSL=False).load()


In [10]:
service_df.printSchema()

root
 |-- Customer ID: string (nullable = true)
 |-- Count: integer (nullable = true)
 |-- Quarter: string (nullable = true)
 |-- Referred a Friend: string (nullable = true)
 |-- Number of Referrals: integer (nullable = true)
 |-- Tenure in Months: integer (nullable = true)
 |-- Offer: string (nullable = true)
 |-- Phone Service: string (nullable = true)
 |-- Avg Monthly Long Distance Charges: double (nullable = true)
 |-- Multiple Lines: string (nullable = true)
 |-- Internet Service: string (nullable = true)
 |-- Internet Type: string (nullable = true)
 |-- Avg Monthly GB Download: integer (nullable = true)
 |-- Online Security: string (nullable = true)
 |-- Online Backup: string (nullable = true)
 |-- Device Protection Plan: string (nullable = true)
 |-- Premium Tech Support: string (nullable = true)
 |-- Streaming TV: string (nullable = true)
 |-- Streaming Movies: string (nullable = true)
 |-- Streaming Music: string (nullable = true)
 |-- Unlimited Data: string (nullable = true)


In [11]:
df = status_df.join(demographic_df, "Customer ID").join(service_df, "Customer ID")

In [12]:
df = df.filter(df['Under 30'] == 'Yes')

In [13]:
print(f"Number of columns: {len(df.columns)}")
df.printSchema()

Number of columns: 48
root
 |-- Customer ID: string (nullable = true)
 |-- Count: integer (nullable = true)
 |-- Quarter: string (nullable = true)
 |-- Satisfaction Score: integer (nullable = true)
 |-- Customer Status: string (nullable = true)
 |-- Churn Label: string (nullable = true)
 |-- Churn Value: integer (nullable = true)
 |-- Churn Score: integer (nullable = true)
 |-- CLTV: integer (nullable = true)
 |-- Churn Category: string (nullable = true)
 |-- Churn Reason: string (nullable = true)
 |-- Count: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Under 30: string (nullable = true)
 |-- Senior Citizen: string (nullable = true)
 |-- Married: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- Number of Dependents: integer (nullable = true)
 |-- Count: integer (nullable = true)
 |-- Quarter: string (nullable = true)
 |-- Referred a Friend: string (nullable = true)
 |-- Number of Referrals: integer (null

# Utils

In [14]:
# remove duplicate columns
def remove_duplicate_columns(df):
    print(f"Initial Number of Columns: {len(df.columns)}")
    df_cols = df.columns
    existed = []
    duplicate_col_index = []
    idx = 0
    for c in df_cols:
        if c in existed:
            duplicate_col_index.append(idx)
        else:
            existed.append(c)
        idx = idx + 1

    for i in duplicate_col_index:
        df_cols[i] = df_cols[i] + '_duplicated'

    df = df.toDF(*df_cols)
    cols_to_remove = [c for c in df_cols if '_duplicated' in c]
    df = df.drop(*cols_to_remove)

    print("\nDuplicated columns removed from the dataframe:")
    print([df_cols[i] for i in duplicate_col_index])
    print(f"\nFinal Number of Columns: {len(df.columns)}")
    return df

In [15]:
def features_selection(df):
    print(f"Initial Number of Columns: {len(df.columns)}")
    
    # Drop columns not used in the model training
    to_drop = [  
        'Customer ID',
        'Quarter',
        'Customer Status',
        'Churn Value',
        'Churn Score',
        'Churn Category',
        'Churn Reason',
        'Age',
        'Number of Dependents',
        'Referred a Friend',
        'Number of Referrals',
        'Phone Service',
        'Internet Service',
        'Streaming Music',
        'Total Charges',
        'Total Refunds',
        'Total Extra Data Charges',
        'Total Long Distance Charges',
        'Total Revenue',
        'Under 30', 
        'Senior Citizen'
    ]
    df = df.drop(*to_drop)
    
    print("\nUnused columns removed from the dataframe:")
    print(to_drop)
    print(f"\nFinal Number of Columns: {len(df.columns)}")
    return df

In [16]:
def get_columns_by_types(df):
    label_col = 'Churn Label'
    numeric_features = [t[0] for t in df.dtypes if t[1] == 'int' or t[1] == 'double']
    categorical_features = [t[0] for t in df.dtypes if t[1] != 'int' and t[1] != 'double' and t[0] != label_col]

    print(f"Target column: {label_col}")
    print(f"\nTotal {len(numeric_features)} of numeric features")
    print(numeric_features)
    print(f"\nTotal {len(categorical_features)} of categorical features")
    print(categorical_features)
    
    return label_col, numeric_features, categorical_features

In [17]:
def preprocess_data(df, label_col, numeric_features, categorical_features):

    stages = []

    for categoricalCol in categorical_features:
        stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + ' Index')
        encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + " OHE"])
        stages += [stringIndexer, encoder]

    label_string_idx = StringIndexer(inputCol = label_col, outputCol = 'label')
    stages.append(label_string_idx)

    assemblerInputs = [c + " OHE" for c in categorical_features] + numeric_features
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
    stages.append(assembler)
    
    print(f"Number of stages: {len(stages)}")
    pipeline = Pipeline(stages = stages)
    pipelineModel = pipeline.fit(df)
    preprocessed_df = pipelineModel.transform(df)
    return preprocessed_df, pipelineModel

In [18]:
def split_train_test(df):
    train, test = df.randomSplit([0.7, 0.3], seed = 2018)
    print("Training Dataset Count: " + str(train.count()))
    print("Test Dataset Count: " + str(test.count()))
    return train, test

In [19]:
def train_model(train):
    lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
    model = lr.fit(train)
    return model

In [20]:
def get_feature_importance_df(df, lrModel):
    attrs = sorted(
        (attr["idx"], attr["name"]) for attr in (chain(*df
            .schema[lrModel.summary.featuresCol]
            .metadata["ml_attr"]["attrs"].values())))
    temp = [(name, lrModel.coefficients[idx]) for idx, name in attrs]
    feature_importance = []
    for d in temp:
        feature_importance.append([d[0].replace('OHE_', ''), float(d[1])])

    # giving column names of dataframe
    columns = ["feature", "importance"]

    # creating a dataframe
    feature_importance_df = spark.createDataFrame(feature_importance, columns)
    feature_importance_df.show()
    return feature_importance_df

In [21]:
def make_predictions_based_on_modifications(df, pipelineModel, model):
    modified_df = df.withColumn("Online Security", f.when(df["Online Security"]=="No", "Yes").otherwise(df["Online Security"]))
    modified_df = pipelineModel.transform(modified_df)
    selectedCols = ['label', 'features']
    modified_df = modified_df.select(selectedCols)
    predictions = model.transform(modified_df)
    
    predictions.groupBy('prediction').count().orderBy('count', ascending=False).show()
    
    predictions_modified = predictions.groupBy('prediction').count().orderBy('count', ascending=False).withColumn("prediction", f.when(predictions["prediction"] == 0, "No Churn").otherwise("Churn"))
    return predictions_modified

In [22]:
def insert_to_mysql(feature_importance_df, predictions_modified, host, db_name, db_user, db_pass, driver):

    feature_importance_df.write.format('jdbc').options(
          url=f"jdbc:mysql://{host}/{db_name}",
          driver=driver,
          dbtable='feature_importance',
          user=db_user,
          password=db_pass, 
          useSSL=False).mode('overwrite').save()
    
    predictions_modified.write.format('jdbc').options(
      url=f"jdbc:mysql://{host}/{db_name}",
      driver=driver,
      dbtable='prediction_modified',
      user=db_user,
      password=db_pass, 
      useSSL=False).mode('overwrite').save()
    
    return True

# Data Exploration

In [23]:
pd.DataFrame(df.take(5), columns=df.columns).transpose()

21/12/14 10:43:53 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Unnamed: 0,0,1,2,3,4
Customer ID,0953-LGOVU,1450-SKCVI,3195-TQDZX,7244-KXYZN,0516-UXRMT
Count,1,1,1,1,1
Quarter,Q3,Q3,Q3,Q3,Q3
Satisfaction Score,4,3,5,4,3
Customer Status,Stayed,Churned,Joined,Stayed,Stayed
Churn Label,No,Yes,No,No,No
Churn Value,0,1,0,0,0
Churn Score,75,72,42,20,71
CLTV,2562,5276,4050,4690,5801
Churn Category,,Attitude,,,


In [24]:
df = remove_duplicate_columns(df)

Initial Number of Columns: 48

Duplicated columns removed from the dataframe:
['Count_duplicated', 'Count_duplicated', 'Quarter_duplicated']

Final Number of Columns: 45


In [25]:
df = features_selection(df)

Initial Number of Columns: 45

Unused columns removed from the dataframe:
['Customer ID', 'Quarter', 'Customer Status', 'Churn Value', 'Churn Score', 'Churn Category', 'Churn Reason', 'Age', 'Number of Dependents', 'Referred a Friend', 'Number of Referrals', 'Phone Service', 'Internet Service', 'Streaming Music', 'Total Charges', 'Total Refunds', 'Total Extra Data Charges', 'Total Long Distance Charges', 'Total Revenue', 'Under 30', 'Senior Citizen']

Final Number of Columns: 24


In [26]:
# Check missing values
data_agg = df.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in df.columns])
pd.DataFrame(data_agg.take(5), columns=data_agg.columns).transpose()

                                                                                

Unnamed: 0,0
Count,0
Satisfaction Score,0
Churn Label,0
CLTV,0
Gender,0
Married,0
Dependents,0
Tenure in Months,0
Offer,0
Avg Monthly Long Distance Charges,0


In [27]:
label_col, numeric_features, categorical_features = get_columns_by_types(df)

Target column: Churn Label

Total 7 of numeric features
['Count', 'Satisfaction Score', 'CLTV', 'Tenure in Months', 'Avg Monthly Long Distance Charges', 'Avg Monthly GB Download', 'Monthly Charge']

Total 16 of categorical features
['Gender', 'Married', 'Dependents', 'Offer', 'Multiple Lines', 'Internet Type', 'Online Security', 'Online Backup', 'Device Protection Plan', 'Premium Tech Support', 'Streaming TV', 'Streaming Movies', 'Unlimited Data', 'Contract', 'Paperless Billing', 'Payment Method']


In [28]:
df.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Count,1401,1.0,0.0,1,1
Satisfaction Score,1401,3.346181299072091,1.1738526525779596,1,5
CLTV,1401,4380.441827266239,1151.3834465884077,2003,6499
Tenure in Months,1401,32.371163454675234,24.764505056972883,1,72
Avg Monthly Long Distance Charges,1401,23.24911491791578,15.58062966595056,0.0,49.96
Avg Monthly GB Download,1401,41.92790863668808,27.403541465244256,0,85
Monthly Charge,1401,62.076837972876525,30.526478680602843,18.75,118.6


In [29]:
for col in categorical_features:
    print(df.groupBy(col).count().orderBy('count', ascending=False).toPandas())
    print("\n")
    
print(df.groupBy(label_col).count().orderBy('count', ascending=False).toPandas())
print("\n")

   Gender  count
0    Male    703
1  Female    698


  Married  count
0      No    703
1     Yes    698




                                                                                

  Dependents  count
0         No   1028
1        Yes    373


     Offer  count
0     None    752
1  Offer B    162
2  Offer E    160
3  Offer D    121
4  Offer A    120
5  Offer C     86


  Multiple Lines  count
0             No    859
1            Yes    542


  Internet Type  count
0   Fiber Optic    515
1           DSL    358
2          None    345
3         Cable    183


  Online Security  count
0              No    959
1             Yes    442


  Online Backup  count
0            No    911
1           Yes    490


  Device Protection Plan  count
0                     No    914
1                    Yes    487


  Premium Tech Support  count
0                   No    967
1                  Yes    434


  Streaming TV  count
0           No    885
1          Yes    516


  Streaming Movies  count
0               No    871
1              Yes    530


  Unlimited Data  count
0            Yes    902
1             No    499


         Contract  count
0  Month-to-Month    717
1        

# Data Preprocessing

In [30]:
preprocessed_df, pipelineModel = preprocess_data(df, label_col, numeric_features, categorical_features)

Number of stages: 34


[Stage 364:>                                                        (0 + 1) / 1]                                                                                

In [31]:
pd.DataFrame(preprocessed_df.take(5), columns=preprocessed_df.columns).transpose()

Unnamed: 0,0,1,2,3,4
Count,1,1,1,1,1
Satisfaction Score,4,3,5,4,3
Churn Label,No,Yes,No,No,No
CLTV,2562,5276,4050,4690,5801
Gender,Male,Female,Male,Female,Female
Married,Yes,No,No,No,No
Dependents,Yes,No,No,No,No
Tenure in Months,12,56,3,24,62
Offer,Offer D,,,,
Avg Monthly Long Distance Charges,0.0,35.59,1.57,5.98,23.88


In [32]:
train, test = split_train_test(preprocessed_df)

Training Dataset Count: 993
Test Dataset Count: 408


# Data Modelling

In [33]:
model = train_model(train)

In [34]:
feature_importance_df = get_feature_importance_df(preprocessed_df, model)

NameError: name 'get_feature_importance' is not defined

# Model Evaluation

In [None]:
predictions = model.transform(test)
predictions.select(['label', 'features', 'rawPrediction', 'probability', 'prediction']).show(10)

In [None]:
actual = predictions.select('label').toPandas()
predicted = predictions.select('prediction').toPandas()

print('Accuracy score of predicted data :',accuracy_score(actual, predicted))

# precision score
print('Precision Score of predicted data :',precision_score(actual, predicted))

# recall score
print('Recall Score of predicted data :',recall_score(actual, predicted))

# F1 score
print('F1 Score of predicted data :',f1_score(actual, predicted))

cf_matrix = confusion_matrix(actual, predicted)
sn.heatmap(cf_matrix, annot=True, fmt="d")
plt.ylabel('True label')
plt.xlabel('Predicted label')

# Proposed Ways

In [None]:
predictions_modified = make_predictions_based_on_modifications(df, pipelineModel, model)

# Insertion to DB

In [None]:
driver = 'com.mysql.jdbc.Driver'
insert_to_mysql(feature_importance_df, predictions_modified, HOST, DB_NAME, DB_USER, DB_PASS, driver)