In [31]:
# Installing PySpark

!pip install pyspark



In [32]:
# Importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, GBTClassifier, LinearSVC
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Creating a SparkSession

spark = SparkSession.builder.appName("TelecomChurnPrediction").getOrCreate()

# Loading  the dataset
df = spark.read.csv("telecom_dataset.csv", header=True, inferSchema=True)

# Data Preprocessing
df = df.dropna()

# Drop rows with missing values
df.show()



+----------+------+---+--------------+--------------+------------+-----+
|CustomerID|Gender|Age|      Contract|MonthlyCharges|TotalCharges|Churn|
+----------+------+---+--------------+--------------+------------+-----+
|         1|Female| 25|Month-to-Month|          65.7|       156.5|   No|
|         2|  Male| 37|      One Year|          89.0|      2356.8|   No|
|         3|  Male| 52|      Two Year|         115.5|      5408.6|   No|
|         4|Female| 30|Month-to-Month|          75.9|       129.4|  Yes|
|         5|  Male| 45|      One Year|          98.2|      3142.0|   No|
|         6|Female| 55|      Two Year|          99.9|      6541.5|   No|
|         7|  Male| 32|Month-to-Month|          82.1|       267.7|  Yes|
|         8|Female| 28|Month-to-Month|          61.5|       346.9|   No|
|         9|  Male| 48|      One Year|         101.8|      5149.6|  Yes|
|        10|Female| 60|      Two Year|         108.1|      6742.8|  Yes|
|        11|  Male| 42|Month-to-Month|          78.

In [33]:
# Calculating call duration (assuming call_start_time and call_end_time columns are present)
df = df.withColumn("call_duration", (df["TotalCharges"] - df["MonthlyCharges"]) / 60)

# Calculating average monthly spend
df = df.withColumn("average_monthly_spend", df["MonthlyCharges"])

# Display the updated dataset with new features
df.show()

df.printSchema()

+----------+------+---+--------------+--------------+------------+-----+------------------+---------------------+
|CustomerID|Gender|Age|      Contract|MonthlyCharges|TotalCharges|Churn|     call_duration|average_monthly_spend|
+----------+------+---+--------------+--------------+------------+-----+------------------+---------------------+
|         1|Female| 25|Month-to-Month|          65.7|       156.5|   No|1.5133333333333332|                 65.7|
|         2|  Male| 37|      One Year|          89.0|      2356.8|   No| 37.79666666666667|                 89.0|
|         3|  Male| 52|      Two Year|         115.5|      5408.6|   No| 88.21833333333333|                115.5|
|         4|Female| 30|Month-to-Month|          75.9|       129.4|  Yes|0.8916666666666667|                 75.9|
|         5|  Male| 45|      One Year|          98.2|      3142.0|   No|50.730000000000004|                 98.2|
|         6|Female| 55|      Two Year|          99.9|      6541.5|   No|            107.

In [34]:
# Encodinging categorical variables

categorical_cols = ['Gender', 'Contract','Churn']
indexers = [StringIndexer(inputCol=col, outputCol=col+'_index').fit(df) for col in categorical_cols]
pipeline = Pipeline(stages=indexers)
dataset = pipeline.fit(df).transform(df)

In [35]:
# Feature scaling
assembler = VectorAssembler(inputCols=['Age', 'average_monthly_spend', 'call_duration', 'Gender_index'], outputCol='features')
dataset = assembler.transform(dataset)
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
scaler_model = scaler.fit(dataset)
dataset = scaler_model.transform(dataset)

In [37]:
# Splitting the data into training and testing sets
(train_data, test_data) = dataset.randomSplit([0.8, 0.2], seed=42)

In [None]:
# Model training and evaluation
lr = LogisticRegression(labelCol='Churn_index', featuresCol='scaled_features')

# Model selection and training
classifiers = [LogisticRegression(labelCol='Churn_index', featuresCol='scaled_features'), RandomForestClassifier(labelCol='Churn_index', featuresCol='scaled_features'), GBTClassifier(labelCol='Churn_index', featuresCol='scaled_features'), LinearSVC(labelCol='Churn_index', featuresCol='scaled_features') ]

# Defining the parameter grid for each classifier
paramGrids = [ParamGridBuilder().addGrid(LogisticRegression.regParam, [0.1, 0.01]).addGrid(LogisticRegression.elasticNetParam, [0.0, 0.5, 1.0]).build(), ParamGridBuilder().addGrid(RandomForestClassifier.numTrees, [10, 20, 30]).addGrid(RandomForestClassifier.featureSubsetStrategy, ['auto', 'sqrt']).build(), ParamGridBuilder().addGrid(GBTClassifier.maxDepth, [5, 10]).addGrid(GBTClassifier.maxIter, [20, 30]).build(), ParamGridBuilder().addGrid(LinearSVC.maxIter, [10, 20]).addGrid(LinearSVC.regParam, [0.1, 0.01]).build() ]
evaluator = BinaryClassificationEvaluator(labelCol='Churn_index')
best_model = None
best_accuracy = 0.0

# Iterating over classifiers and parameter grids for classifier, paramGrid in zip(classifiers, paramGrids):
pipeline = Pipeline(stages=[classifiers])
crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrids, evaluator=evaluator, numFolds=5)
cv_model = crossval.fit(train_data)

# Model evaluation on test data
predictions = cv_model.transform(test_data)
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy for {classifiers.__class__.__name__}: {accuracy}")
if accuracy > best_accuracy:
  best_model = cv_model.bestModel
  best_accuracy = accuracy

# Getting the best model and its parameters print("Best Model:")
print(best_model.stages[0])

# Use the best model for predictions
best_predictions = best_model.transform(test_data)

# Performing evaluation on the best model
best_accuracy = evaluator.evaluate(best_predictions)
print("Best Model Accuracy:", best_accuracy)

In [None]:
# Creating a MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='Churn_index', predictionCol='prediction')

# Evaluating the best model on test data
accuracy = evaluator.evaluate(best_predictions, {evaluator.metricName: 'accuracy'})
precision = evaluator.evaluate(best_predictions, {evaluator.metricName: 'weightedPrecision'})
recall = evaluator.evaluate(best_predictions, {evaluator.metricName: 'weightedRecall'})
f1_score = evaluator.evaluate(best_predictions, {evaluator.metricName: 'f1'})

# Printing the evaluation results
print("Evaluation Metrics:")
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1-Score: {f1_score}")