In [1]:
# Import necessary libraries
import os
import json
from kaggle.api.kaggle_api_extended import KaggleApi
from pyspark.sql import SparkSession

# Path to kaggle.json file
kaggle_json_path = os.path.expanduser("C:/Users/sammy/.kaggle/kaggle.json")  # Adjust path as necessary

# Ensure the Kaggle API can authenticate
if not os.path.exists(kaggle_json_path):
    raise FileNotFoundError("kaggle.json file not found at the specified path.")

# Set Kaggle API credentials using kaggle.json
with open(kaggle_json_path, "r") as f:
    credentials = json.load(f)
os.environ['KAGGLE_USERNAME'] = credentials['username']
os.environ['KAGGLE_KEY'] = credentials['key']

# Authenticate and download the dataset
api = KaggleApi()
api.authenticate()

# Specify the dataset and download path
dataset_id = "blastchar/telco-customer-churn"  # Dataset ID for Telco Customer Churn
cache_dir = os.path.expanduser("C:/Users/sammy/OneDrive/Documents/Portfolio")
os.makedirs(cache_dir, exist_ok=True)

# Download and unzip the dataset
api.dataset_download_files(dataset_id, path=cache_dir, unzip=True)


Dataset URL: https://www.kaggle.com/datasets/blastchar/telco-customer-churn


In [2]:
# Load dataset using Spark
spark = SparkSession.builder.appName("EnhancedChurnPredictionPipeline").getOrCreate()
data_path = os.path.join(cache_dir, "WA_Fn-UseC_-Telco-Customer-Churn.csv")
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Verify data ingestion
df.printSchema()
df.show(5)

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)

+----------+------+-------------+-------+----------+------+------------+---------

In [3]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler

# Add feature engineering step
df = df.withColumn("TotalSpend", df.tenure * df.MonthlyCharges)

# Index categorical columns
indexers = [StringIndexer(inputCol=column, outputCol=column + "_index") for column in ["gender", "Partner", "Dependents", "Churn", "Contract", "PaymentMethod"]]

# Assemble feature columns, including new and scaled features
assembler = VectorAssembler(inputCols=["gender_index", "Partner_index", "Dependents_index", "tenure", "MonthlyCharges", "TotalSpend", "Contract_index", "PaymentMethod_index"], outputCol="raw_features")

# Scale features
scaler = StandardScaler(inputCol="raw_features", outputCol="features")


In [4]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Define logistic regression model
lr = LogisticRegression(featuresCol="features", labelCol="Churn_index")

# Define parameter grid for hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Set up cross-validation
crossval = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(labelCol="Churn_index"), numFolds=3)


In [5]:
from pyspark.ml import Pipeline

# Create pipeline
pipeline = Pipeline(stages=indexers + [assembler, scaler, crossval])


In [6]:
# Split the data into training and test sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Fit pipeline on training data
model = pipeline.fit(train_data)

# Make predictions on test data
predictions = model.transform(test_data)

# Evaluate model using AUC
evaluator = BinaryClassificationEvaluator(labelCol="Churn_index", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions)
print(f"ROC AUC after tuning and scaling: {roc_auc}")


ROC AUC after tuning and scaling: 0.8235969501088418
