# Customer Churn Prediction & Analysis

End-to-end notebook covering all project milestones: data intake, EDA, advanced analysis, modeling, MLOps handoff, and deployment-ready artifacts.


## Milestone Roadmap
1. **Data Collection & EDA:** Load Kaggle churn dataset, inspect structure, clean anomalies, visualize.
2. **Advanced Analysis & Feature Engineering:** Statistical tests, RFE-style importance, engineered interaction features.
3. **Modeling & Optimization:** Train/evaluate multiple classifiers with cross-validation + hyperparameter tuning.
4. **MLOps & Deployment:** Persist artifacts, log metrics, expose FastAPI prediction example, outline monitoring.
5. **Documentation & Presentation Ready Assets:** Auto-generate cleaned datasets, metrics tables, and summary report.


In [None]:
# Core
import os
from pathlib import Path

# Data
import pandas as pd
import numpy as np
from scipy import stats

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px

# Modeling / preprocessing
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score, GridSearchCV
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    roc_auc_score,
    RocCurveDisplay,
    precision_recall_fscore_support
)
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.feature_selection import RFECV
from sklearn.utils.class_weight import compute_class_weight

# Persistence / logging
import joblib
import json
from datetime import datetime

sns.set_theme(style="whitegrid")

In [None]:
DATA_PATH = Path('../customer_churn_dataset-testing-master.csv').resolve()
ARTIFACT_DIR = Path('../artifacts')
ARTIFACT_DIR.mkdir(exist_ok=True)

print(f"Dataset located at: {DATA_PATH}")
print(f"Artifacts will be saved to: {ARTIFACT_DIR}")


In [None]:
raw_df = pd.read_csv(DATA_PATH)
print(f"Shape: {raw_df.shape}")
raw_df.head()


In [None]:
def dataset_overview(df: pd.DataFrame) -> pd.DataFrame:
    print("\n--- Schema ---")
    print(df.dtypes)
    print("\n--- Missing Values ---")
    print(df.isna().sum())
    print("\n--- Duplicates ---")
    print(df.duplicated().sum())
    return df.describe(include='all').T

overview = dataset_overview(raw_df)
overview.head(12)


In [None]:
fig, ax = plt.subplots(1, 2, figsize=(12, 4))
sns.countplot(x='Churn', data=raw_df, ax=ax[0], palette='coolwarm')
ax[0].set_title('Churn Distribution')
raw_df['Churn'].value_counts(normalize=True).plot(kind='pie', autopct='%1.1f%%', ax=ax[1], colors=['#0B5563', '#F26419'])
ax[1].set_ylabel('')
ax[1].set_title('Churn Share')
plt.tight_layout()
plt.show()

num_cols = [c for c in raw_df.select_dtypes(include=np.number).columns if c != 'Churn']
raw_df[num_cols].hist(bins=30, figsize=(14, 10))
plt.tight_layout()
plt.show()

for col in ['Gender', 'Subscription Type', 'Contract Length']:
    plt.figure(figsize=(6,4))
    sns.barplot(x=col, y='Churn', data=raw_df, estimator=np.mean)
    plt.title(f'Average Churn by {col}')
    plt.xticks(rotation=30)
    plt.show()


In [None]:
encoded_for_corr = raw_df.copy()
cat_cols = encoded_for_corr.select_dtypes(include='object').columns
encoded_for_corr = pd.get_dummies(encoded_for_corr, columns=cat_cols, drop_first=True)
plt.figure(figsize=(12,10))
sns.heatmap(encoded_for_corr.corr(), cmap='coolwarm', center=0)
plt.title('Feature Correlation Heatmap')
plt.show()


In [None]:
def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    cleaned = df.copy()
    cleaned = cleaned.drop_duplicates().reset_index(drop=True)
    cleaned = cleaned.dropna()
    return cleaned

clean_df = clean_data(raw_df)
print(f"Rows removed: {raw_df.shape[0] - clean_df.shape[0]}")
clean_df.head()


In [None]:
clean_path = ARTIFACT_DIR / 'cleaned_customer_churn.csv'
clean_df.to_csv(clean_path, index=False)
print(f"Saved cleaned dataset to {clean_path}")


In [None]:
# --- Milestone 2: Statistical Tests & Feature Signals ---
cat_columns = ['Gender', 'Subscription Type', 'Contract Length']
stat_results = []
for col in cat_columns:
    contingency = pd.crosstab(clean_df[col], clean_df['Churn'])
    chi2, p, _, _ = stats.chi2_contingency(contingency)
    stat_results.append({'feature': col, 'test': 'chi2', 'p_value': p, 'chi2': chi2})

num_columns = ['Age', 'Tenure', 'Usage Frequency', 'Support Calls', 'Payment Delay', 'Total Spend', 'Last Interaction']
for col in num_columns:
    churned = clean_df[clean_df['Churn'] == 1][col]
    stayed = clean_df[clean_df['Churn'] == 0][col]
    stat, p = stats.ttest_ind(churned, stayed, equal_var=False)
    stat_results.append({'feature': col, 'test': 't-test', 'p_value': p, 'statistic': stat})

stat_df = pd.DataFrame(stat_results)
stat_df.sort_values('p_value').head(12)


In [None]:
feature_df = clean_df.copy()
feature_df['TenureBucket'] = pd.cut(
    feature_df['Tenure'],
    bins=[0, 12, 24, 48, 60, 100],
    labels=['<1y', '1-2y', '2-4y', '4-5y', '5y+']
)
feature_df['HighSupport'] = (feature_df['Support Calls'] > feature_df['Support Calls'].median()).astype(int)
feature_df['RecentInteraction'] = (feature_df['Last Interaction'] <= feature_df['Last Interaction'].median()).astype(int)
feature_df['UsagePerTenure'] = feature_df['Usage Frequency'] / (feature_df['Tenure'].replace(0, np.nan))
feature_df['UsagePerTenure'] = feature_df['UsagePerTenure'].fillna(feature_df['UsagePerTenure'].median())
feature_df['SpendPerMonth'] = feature_df['Total Spend'] / feature_df['Contract Length'].map({'Monthly':1, 'Quarterly':3, 'Annual':12}).replace({0: np.nan})
feature_df['SpendPerMonth'] = feature_df['SpendPerMonth'].fillna(feature_df['SpendPerMonth'].median())
feature_df.head()


In [None]:
feature_df = feature_df.drop(columns=['CustomerID'])
target = feature_df['Churn']
X = feature_df.drop(columns=['Churn'])
y = target

numeric_features = X.select_dtypes(include=[np.number]).columns.tolist()
categorical_features = X.select_dtypes(include=['object', 'category']).columns.tolist()

preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numeric_features),
        ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_features)
    ]
)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

print(f"Train shape: {X_train.shape}, Test shape: {X_test.shape}")
print(f"Churn rate train: {y_train.mean():.2%} | test: {y_test.mean():.2%}")


In [None]:
log_reg = Pipeline(steps=[
    ('prep', preprocessor),
    ('clf', LogisticRegression(max_iter=1000, class_weight='balanced'))
])

rfecv = RFECV(
    estimator=log_reg['clf'],
    step=1,
    cv=StratifiedKFold(5),
    scoring='f1',
    min_features_to_select=5
)

X_prepared = preprocessor.fit_transform(X_train)
rfecv.fit(X_prepared, y_train)

selected_mask = rfecv.support_
feature_names = np.concatenate([
    numeric_features,
    preprocessor.named_transformers_['cat'].get_feature_names_out(categorical_features)
])
selected_features = feature_names[selected_mask]
print(f"Selected top {len(selected_features)} features via RFECV")
pd.Series(rfecv.grid_scores_, index=range(len(rfecv.grid_scores_))).plot(title='RFECV F1 by feature count')
plt.xlabel('Number of features')
plt.ylabel('Mean CV F1')
plt.show()
pd.Series(selected_features).head(15)


In [None]:
def train_and_evaluate(model_name: str, estimator: Pipeline) -> dict:
    estimator.fit(X_train, y_train)
    preds = estimator.predict(X_test)
    probs = estimator.predict_proba(X_test)[:, 1]
    report = classification_report(y_test, preds, output_dict=True)
    cm = confusion_matrix(y_test, preds)
    roc = roc_auc_score(y_test, probs)
    print(f"\n===== {model_name} =====")
    print(classification_report(y_test, preds))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title(f'{model_name} Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.show()
    RocCurveDisplay.from_predictions(y_test, probs)
    plt.title(f'{model_name} ROC Curve')
    plt.show()
    return {
        'model': model_name,
        'precision': report['1']['precision'],
        'recall': report['1']['recall'],
        'f1': report['1']['f1-score'],
        'roc_auc': roc,
        'estimator': estimator
    }

models = {
    'Logistic Regression': Pipeline([
        ('prep', preprocessor),
        ('clf', LogisticRegression(max_iter=1000, class_weight='balanced'))
    ]),
    'Random Forest': Pipeline([
        ('prep', preprocessor),
        ('clf', RandomForestClassifier(n_estimators=300, max_depth=None, class_weight='balanced'))
    ]),
    'Gradient Boosting': Pipeline([
        ('prep', preprocessor),
        ('clf', GradientBoostingClassifier())
    ])
}

results = [train_and_evaluate(name, pipe) for name, pipe in models.items()]
results_df = pd.DataFrame([{k: v for k, v in res.items() if k != 'estimator'} for res in results])
results_df


In [None]:
top_model_name = results_df.sort_values('f1', ascending=False).iloc[0]['model']
print(f"Top baseline model: {top_model_name}")

search_spaces = {
    'Random Forest': {
        'clf__n_estimators': [200, 400, 600],
        'clf__max_depth': [None, 10, 20, 30],
        'clf__min_samples_split': [2, 5, 10]
    },
    'Gradient Boosting': {
        'clf__learning_rate': [0.05, 0.1, 0.2],
        'clf__n_estimators': [200, 400],
        'clf__max_depth': [3, 4]
    },
    'Logistic Regression': {
        'clf__C': [0.1, 1, 10],
        'clf__penalty': ['l2'],
        'clf__solver': ['lbfgs']
    }
}

best_estimators = {}
for model_name, pipe in models.items():
    param_grid = search_spaces[model_name]
    grid = GridSearchCV(pipe, param_grid, cv=3, scoring='f1', n_jobs=-1)
    grid.fit(X_train, y_train)
    best_estimators[model_name] = grid.best_estimator_
    print(f"{model_name} best params: {grid.best_params_} | best F1: {grid.best_score_:.3f}")

# Evaluate tuned model with best test F1
best_model_name = None
best_f1 = -np.inf
best_model = None
for model_name, estimator in best_estimators.items():
    preds = estimator.predict(X_test)
    probs = estimator.predict_proba(X_test)[:, 1]
    f1 = precision_recall_fscore_support(y_test, preds, average='binary')[2]
    if f1 > best_f1:
        best_model_name = model_name
        best_f1 = f1
        best_model = estimator

print(f"Selected best model: {best_model_name} | Test F1: {best_f1:.3f}")


In [None]:
model_path = ARTIFACT_DIR / f"best_model_{best_model_name.replace(' ', '_').lower()}.joblib"
joblib.dump(best_model, model_path)
print(f"Persisted best model to {model_path}")

# Save metadata for deployment
metadata = {
    'model_name': best_model_name,
    'test_f1': float(best_f1),
    'test_precision': float(precision_recall_fscore_support(y_test, best_model.predict(X_test), average='binary')[0]),
    'test_recall': float(precision_recall_fscore_support(y_test, best_model.predict(X_test), average='binary')[1]),
    'roc_auc': float(roc_auc_score(y_test, best_model.predict_proba(X_test)[:,1])),
    'generated_at': datetime.utcnow().isoformat()
}
meta_path = ARTIFACT_DIR / 'model_report.json'
with open(meta_path, 'w') as fp:
    json.dump(metadata, fp, indent=2)
print(f"Saved metadata to {meta_path}")


## Milestone 4: Deployment Blueprint
The snippet below shows a minimal FastAPI service that:
- loads the persisted `joblib` pipeline
- exposes `/predict` for real-time churn scoring
- performs simple logging for monitoring hooks

> Run it via `uvicorn api.app:app --reload` once the model artifact exists.


In [None]:
fastapi_example = """
from fastapi import FastAPI
from pydantic import BaseModel
import joblib
import json
import pandas as pd

with open('artifacts/model_report.json', 'r') as fh:
    meta = json.load(fh)
model_path = meta['model_path']
model = joblib.load(model_path)
app = FastAPI(title='Churn Predictor')

class Customer(BaseModel):
    Age: int
    Gender: str
    Tenure: int
    Usage_Frequency: int
    Support_Calls: int
    Payment_Delay: int
    Subscription_Type: str
    Contract_Length: str
    Total_Spend: float
    Last_Interaction: int
    TenureBucket: str
    HighSupport: int
    RecentInteraction: int
    UsagePerTenure: float
    SpendPerMonth: float

@app.post('/predict')
def predict(customer: Customer):
    df = pd.DataFrame([customer.dict()])
    prob = model.predict_proba(df)[0][1]
    return {'churn_probability': float(prob), 'churn_risk': prob > 0.5}
"""
print(fastapi_example)


### Monitoring & Retraining Checklist
- **Data Drift:** schedule weekly Kolmogorovâ€“Smirnov tests comparing live features vs training distribution.
- **Performance Alerts:** recompute precision/recall from labeled feedback; alert if F1 drops >5% absolute.
- **Logging:** persist every API call payload + prediction + eventual true label for audit trail.
- **Retraining Trigger:** gather at least 5k new labeled observations or once drift/performance thresholds breach.
- **Versioning:** track models + metrics via MLflow/DVC, store experiment IDs in `model_report.json`.


## Milestone 5: Executive-Ready Summary
- Clean dataset saved to `artifacts/cleaned_customer_churn.csv`.
- Feature-engineered dataset resides in notebook memory; export if needed for AutoML.
- Model leaderboard stored in `results_df`; selected best model + metrics saved to `artifacts/model_report.json`.
- Deployment playbook + FastAPI snippet included for quick operationalization.
- Use the companion `reports/final_project_report.md` for stakeholder narrative + slide-ready bullets.
