In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [3]:
# 1. Spark Session
spark = SparkSession.builder.appName("ChurnPrediction").getOrCreate()

In [5]:
# 2. Load Data
df = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/telco_customer_churn.csv", header=True, inferSchema=True)

In [6]:
# 3. Data Cleaning
df = df.dropna()
df = df.drop("customerID")

In [7]:
# 4. Encode Categorical Columns
categorical_cols = [field for (field, dtype) in df.dtypes if dtype == "string" and field != "Churn"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_Index", handleInvalid="keep") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=col+"_Index", outputCol=col+"_Vec") for col in categorical_cols]

In [8]:
# 5. Label Indexing
label_indexer = StringIndexer(inputCol="Churn", outputCol="label")

In [9]:
# 6. Assemble Features
numeric_cols = [field for (field, dtype) in df.dtypes if dtype in ["double", "int"] and field != "Churn"]
assembler_inputs = [col + "_Vec" for col in categorical_cols] + numeric_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="assembled_features")
scaler = StandardScaler(inputCol="assembled_features", outputCol="features")

In [10]:
# 7. Model
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

In [11]:

# 8. Pipeline
stages = indexers + encoders + [label_indexer, assembler, scaler, rf]
pipeline = Pipeline(stages=stages)

In [12]:
# 9. Train/Test Split
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

In [13]:
# 10. Cross-Validation
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [20, 50]).build()
evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)

In [14]:
# 11. Fit Model
cv_model = cv.fit(train_data)

In [15]:
# 12. Evaluate
predictions = cv_model.transform(test_data)
auc = evaluator.evaluate(predictions)
print(f"AUC on test data: {auc:.4f}")

AUC on test data: 0.8235


In [16]:
# 13. Save Model (optional)
cv_model.write().overwrite().save("churn_model")