# Churn Prediction with Spark

This Jupyter Notebook performs customer churn prediction using Spark. The analysis is conducted with the `churn.csv` dataset, applying classification algorithms to predict the likelihood of customer churn.

## Contents

1. **Data Loading and Preprocessing:** Loading the `churn.csv` dataset and performing necessary preprocessing steps.
2. **Feature Engineering:** Preparing and transforming dataset features for model training.
3. **Model Training:** Implementing models such as Gradient Boosted Tree (GBT) classifiers to predict churn.
4. **Model Evaluation:** Assessing model performance on test data, including the calculation of metrics like AUC.

## Objective

The goal of this notebook is to predict customer churn rates and assist in making strategic decisions to reduce customer attrition. The developed model aims to accurately predict the likelihood of customers leaving, potentially helping businesses strengthen their customer relationship strategies.


In [3]:
#pip install pyspark

In [52]:
from pyspark.ml.classification import GBTClassifier
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler, Bucketizer
from pyspark.sql.types import IntegerType, FloatType
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col, udf
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

# Initialize Spark session
spark = SparkSession.builder.appName("ChurnPrediction").getOrCreate()

# Load the dataset
file_path = "churn.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Check for missing values
df = df.na.drop()
df

# Handle categorical columns with high cardinality by grouping infrequent categories
threshold = 100  # Set a threshold for frequency
categorical_cols = [col for col, dtype in df.dtypes if dtype == 'string' and col != 'Churn']

def group_infrequent_categories(column, threshold):
    counts = df.groupBy(column).count()
    frequent_categories = counts.where(F.col('count') >= threshold).select(column).rdd.flatMap(lambda x: x).collect()
    return F.when(F.col(column).isin(frequent_categories), F.col(column)).otherwise("Other")

# Apply grouping for high cardinality columns
for col_name in categorical_cols:
    df = df.withColumn(col_name, group_infrequent_categories(col_name, threshold))

# Handle categorical columns using StringIndexer
indexers = [StringIndexer(inputCol=col, outputCol=col+"_Index") for col in categorical_cols]

# Apply the indexers
for indexer in indexers:
    df = indexer.fit(df).transform(df)

# Assemble all features into a single vector
feature_cols = [col+"_Index" for col in categorical_cols] + [col for col, dtype in df.dtypes if dtype in ['int', 'double'] and col != 'Churn']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

# Convert the label (Churn) to a numerical column
label_indexer = StringIndexer(inputCol="Churn", outputCol="label")
df = label_indexer.fit(df).transform(df)

# Split the data into training and testing sets
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Initialize the GBTClassifier with a higher maxBins
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxBins=1000, maxIter=100)

# Train the model
gbt_model = gbt.fit(train_df)

# Make predictions
predictions = gbt_model.transform(test_df)

# Evaluate the model using AUC (Area Under the Curve)
evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(predictions)

print(f"Test AUC: {auc}")


Test AUC: 0.8931034482758625


In [53]:
spark.stop()

## References

- [ChatGPT](https://chatgpt.com)
- [Spark Documentation - GBTClassifier](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.GBTClassifier.html)
