In [31]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

!pip install lifelines --quiet
from lifelines import KaplanMeierFitter, CoxPHFitter
from lifelines.statistics import proportional_hazard_test
from statsmodels.stats.outliers_influence import variance_inflation_factor
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer

In [32]:
# ========== User settings ==========
DATA_PATH = "data/churn_dataset.csv"     # input dataset
DURATION_COL = "tenure"
EVENT_COL = "churned"
KM_TIMES = [3, 6, 12, 24]     # months to report KM probabilities
CORR_THRESHOLD = 0.70
VIF_THRESHOLD = 5.0
PENALIZER = 0.1               # L2 penalization for Cox to stabilize coefficients
OUT_DIR = "deliverables"
FIG_DIR = os.path.join(OUT_DIR, "figures")
os.makedirs(FIG_DIR, exist_ok=True)
os.makedirs(OUT_DIR, exist_ok=True)

In [33]:
# ========== Utilities ==========
def save_text(path, text):
    with open(path, "w") as f:
        f.write(text)


In [34]:
# ========== 1. Load data ==========
df = pd.read_csv(DATA_PATH)
# Ensure required cols
if DURATION_COL not in df.columns or EVENT_COL not in df.columns:
    raise ValueError(f"Dataset must contain '{DURATION_COL}' and '{EVENT_COL}' columns.")

In [35]:
# ========== 2. Basic EDA and write eda_summary.txt ==========
def eda_summary(df):
    n = len(df)
    missing = df.isnull().sum()
    churn_rate = df[EVENT_COL].mean()
    tenure_median = df[DURATION_COL].median()
    tenure_mean = df[DURATION_COL].mean()
    # Basic numerical stats sample
    num_stats = df.select_dtypes(include=[np.number]).describe().T
    # Frequency for categorical
    cat_cols = df.select_dtypes(include=['object', 'category']).columns.tolist()
    eda_lines = []
    eda_lines.append("Exploratory Data Analysis (EDA) — Summary\n")
    eda_lines.append(f"Total records: {n}")
    eda_lines.append("Missing values (per column):")
    for c, m in missing.items():
        eda_lines.append(f" - {c}: {m} ({(m/n):.1%})")
    eda_lines.append(f"\nChurn rate (event proportion): {churn_rate:.3f} ({churn_rate*100:.1f}%)")
    eda_lines.append(f"Median tenure: {tenure_median} months; Mean tenure: {tenure_mean:.2f} months\n")
    eda_lines.append("Numeric summary (selected):")
    eda_lines.append(num_stats[['count','mean','std','50%','min','max']].to_string())
    if cat_cols:
        eda_lines.append("\nCategorical columns value counts (top 5):")
        for c in cat_cols:
            eda_lines.append(f" - {c}:\n{df[c].value_counts(dropna=False).head().to_string()}\n")
    return "\n".join(eda_lines)

eda_text = eda_summary(df)
save_text(os.path.join(OUT_DIR, "eda_summary.txt"), eda_text)
print("EDA written to eda_summary.txt")

EDA written to eda_summary.txt


In [36]:
# ========== 3. Kaplan-Meier (overall and by group) + write KM estimates ==========
kmf = KaplanMeierFitter()
T = df[DURATION_COL]
E = df[EVENT_COL]
kmf.fit(T, event_observed=E, label="All customers")

# Plot overall KM
plt.figure(figsize=(8,5))
kmf.plot_survival_function()
plt.title("Kaplan-Meier Survival Function (Overall)")
plt.xlabel("Time (months)")
plt.ylabel("Survival Probability")
plt.tight_layout()
plt.savefig(os.path.join(FIG_DIR, "km_overall.png"))
plt.close()

# Save KM numeric estimates at specified times
km_lines = ["Kaplan-Meier estimates (text)"]
for t in KM_TIMES:
    prob = float(kmf.predict(t))
    km_lines.append(f"S({t} months) = {prob:.4f}")
# Try grouping by a meaningful column if present (e.g., 'usage_segment', 'segment', 'gender')
group_col = None
for candidate in ["usage_segment", "segment", "gender", "plan", "contract"]:
    if candidate in df.columns:
        group_col = candidate
        break
if group_col:
    km_lines.append(f"\nBy group: {group_col}")
    plt.figure(figsize=(8,6))
    for g in sorted(df[group_col].dropna().unique()):
        ix = df[group_col] == g
        kmf.fit(T[ix], E[ix], label=str(g))
        probs = [float(kmf.predict(t)) for t in KM_TIMES]
        km_lines.append(f" - {g}: " + ", ".join([f"S({t})={p:.4f}" for t,p in zip(KM_TIMES, probs)]))
        kmf.plot_survival_function()
    plt.title(f"Kaplan-Meier by {group_col}")
    plt.xlabel("Time (months)")
    plt.ylabel("Survival Probability")
    plt.legend()
    plt.tight_layout()
    plt.savefig(os.path.join(FIG_DIR, f"km_by_{group_col}.png"))
    plt.close()

save_text(os.path.join(OUT_DIR, "km_estimates.txt"), "\n".join(km_lines))
print("KM estimates written to km_estimates.txt and figures saved to figures/")

KM estimates written to km_estimates.txt and figures saved to figures/


In [37]:
# ========== 4. Correlation and VIF-based feature selection ==========
# Select numeric candidate features (exclude duration & event)
numeric = df.select_dtypes(include=[np.number]).copy()
numeric = numeric.drop(columns=[DURATION_COL, EVENT_COL], errors='ignore')
num_cols = numeric.columns.tolist()

corr_matrix = numeric.corr().abs()
# find pairs with high correlation
high_pairs = []
for i in range(len(num_cols)):
    for j in range(i+1, len(num_cols)):
        a = num_cols[i]; b = num_cols[j]
        r = corr_matrix.loc[a,b]
        if r >= CORR_THRESHOLD:
            high_pairs.append((a,b,r))
# VIF computation: we need no missing values
vif_df = pd.DataFrame(columns=['feature','VIF'])
if len(num_cols) >= 2:
    X = numeric.dropna().values
    # compute VIF using numeric.dropna() and numeric.columns
    Xv = numeric.dropna()
    vif_list = []
    for i in range(Xv.shape[1]):
        try:
            vif_val = variance_inflation_factor(Xv.values, i)
        except Exception:
            vif_val = np.nan
        vif_list.append((Xv.columns[i], vif_val))
    vif_df = pd.DataFrame(vif_list, columns=['feature','VIF']).sort_values('VIF', ascending=False)

# Heuristic removal: for each high-correlation pair drop the one with higher mean correlation to others
to_remove = set()
for a,b,r in high_pairs:
    mean_a = corr_matrix[a].mean()
    mean_b = corr_matrix[b].mean()
    drop = a if mean_a > mean_b else b
    to_remove.add(drop)

# Remove features with VIF > threshold
for _, row in vif_df.iterrows():
    if row['VIF'] > VIF_THRESHOLD:
        to_remove.add(row['feature'])

selected_numeric = [c for c in num_cols if c not in to_remove]

# We'll encode categorical variables (one-hot) but only keep columns that are not perfectly multicollinear
categorical = df.select_dtypes(include=['object','category']).columns.tolist()
# Simple imputation for missing numeric and categorical for model fitting
imputer_num = SimpleImputer(strategy='median')
imputer_cat = SimpleImputer(strategy='most_frequent')

# Impute numeric for VIF-selected columns
if selected_numeric:
    numeric_imputed = pd.DataFrame(imputer_num.fit_transform(df[selected_numeric]), columns=selected_numeric)
else:
    numeric_imputed = pd.DataFrame(index=df.index)

# One-hot encode categoricals
encoded_df = pd.DataFrame(index=df.index)
if categorical:
    # Fill missing
    cat_df = df[categorical].fillna('MISSING').astype(str)
    ohe = OneHotEncoder(drop='first', sparse_output=False)
    ohe_arr = ohe.fit_transform(cat_df)
    ohe_cols = ohe.get_feature_names_out(categorical)
    encoded_df = pd.DataFrame(ohe_arr, columns=ohe_cols, index=df.index)

# Final feature list
final_features = list(numeric_imputed.columns) + list(encoded_df.columns)
if not final_features:
    raise ValueError("No features available for Cox model after selection. Check your dataset and thresholds.")

# Combine into model df
model_df = pd.concat([df[[DURATION_COL, EVENT_COL]].reset_index(drop=True),
                      numeric_imputed.reset_index(drop=True),
                      encoded_df.reset_index(drop=True)], axis=1).dropna()

# Save selection summary
sel_lines = ["Feature selection summary"]
sel_lines.append(f"Initial numeric cols: {num_cols}")
sel_lines.append(f"High-correlation pairs (r>={CORR_THRESHOLD}): {high_pairs}")
sel_lines.append("Removed (heuristic + VIF): " + str(sorted(list(to_remove))))
sel_lines.append("Selected numeric features: " + str(selected_numeric))
sel_lines.append("Categorical encoded columns included: " + str(list(encoded_df.columns)))
save_text(os.path.join(OUT_DIR, "feature_selection.txt"), "\n".join(sel_lines))
print("Feature selection saved to feature_selection.txt")

Feature selection saved to feature_selection.txt


In [38]:
# ========== 5. Fit penalized Cox proportional hazards model ==========
cph = CoxPHFitter(penalizer=PENALIZER)
cph.fit(model_df, duration_col=DURATION_COL, event_col=EVENT_COL)
# Save the summary (table)
save_text(os.path.join(OUT_DIR, "cox_summary.txt"), cph.summary.to_string())
print("Cox model summary saved to cox_summary.txt")

# Save a coefficient plot
plt.figure(figsize=(8,6))
cph.plot()
plt.title("Cox coefficients")
plt.tight_layout()
plt.savefig(os.path.join(FIG_DIR, "cox_coefficients.png"))
plt.close()

Cox model summary saved to cox_summary.txt


In [39]:
# ========== 6. PH assumption test (Schoenfeld residuals) ==========
ph_test = proportional_hazard_test(cph, model_df, time_transform='rank')
# Compose interpretation text
def compose_cox_interpretation(cph, ph_test):
    lines = []
    lines.append("Cox Proportional Hazards Model — Written Interpretation\n")
    lines.append("Variables and hazard ratios (exp(coef)):")
    for idx, row in cph.summary.iterrows():
        hr = np.exp(row['coef'])
        lower = np.exp(row['coef lower 95%'])
        upper = np.exp(row['coef upper 95%'])
        p = row['p']
        lines.append(f" - {idx}: HR = {hr:.3f}, 95% CI = ({lower:.3f}, {upper:.3f}), p = {p:.4f}")
        if p < 0.05:
            if hr > 1:
                lines.append(f"    Interpretation: {idx} is significantly associated with increased hazard (faster churn).")
            else:
                lines.append(f"    Interpretation: {idx} is significantly associated with decreased hazard (slower churn).")
        else:
            lines.append(f"    Interpretation: {idx} is not statistically significant at alpha=0.05.")

    lines.append("\nProportional Hazards test (Schoenfeld residuals - per covariate):")
    ph_violation = False
    per_df = ph_test.summary
    if not per_df.empty:
        lines.append("\nPer-variable PH test p-values:")
        for cov, row in per_df.iterrows():
            lines.append(f" - {cov}: p = {row['p']:.4f}")
            if row['p'] <= 0.05: # Using alpha = 0.05 as a threshold
                ph_violation = True
    else:
        lines.append(" - No covariates found for PH test.")

    lines.append("\nOverall Proportional Hazards Assumption Conclusion:")
    if ph_violation:
        lines.append(" - Conclusion: Evidence of PH violation for at least one covariate (p <= 0.05). Consider time interactions or stratification for the violating covariates.")
    else:
        lines.append(" - Conclusion: No evidence of PH violation for any covariate (all p > 0.05).")

    lines.append(f"\nConcordance index (C-index): {cph.concordance_index_:.3f}")
    return "\n".join(lines)

cox_interp_text = compose_cox_interpretation(cph, ph_test)
save_text(os.path.join(OUT_DIR, "cox_interpretation.txt"), cox_interp_text)
print("Cox interpretation written to cox_interpretation.txt")

Cox interpretation written to cox_interpretation.txt


In [40]:
# ========== 7. Predictions: risk score and survival at e.g., 6,12 months ==========
# Predict partial hazard (risk score)
pred_df = model_df.copy()
pred_df['risk_score'] = cph.predict_partial_hazard(pred_df)
# Predict survival function for first 48 months and extract numeric survival at specific times
surv_funcs = cph.predict_survival_function(pred_df, times=np.arange(0, 61))  # 0..60 months
# Extract survival at KM_TIMES and append average per customer
for t in KM_TIMES:
    pred_df[f"survival_{t}m"] = surv_funcs.loc[t].values
# Save predictions
pred_df.to_csv(os.path.join(OUT_DIR, "predictions.csv"), index=False)
print("Predictions saved to predictions.csv")


Predictions saved to predictions.csv


In [41]:
# ========== 8. Save key figures and list deliverables ==========
deliverables_list = [
    "eda_summary.txt",
    "km_estimates.txt",
    "feature_selection.txt",
    "cox_summary.txt",
    "cox_interpretation.txt",
    "predictions.csv",
    "figures/ (km_overall.png, km_by_group.png if group present, cox_coefficients.png)",
]
save_text(os.path.join(OUT_DIR, "deliverables_list.txt"), "\n".join(deliverables_list))
print("All deliverables saved under folder:", OUT_DIR)

# End of script

All deliverables saved under folder: deliverables


In [42]:
import pandas as pd
import numpy as np
!pip install lifelines --quiet
from lifelines import KaplanMeierFitter, CoxPHFitter
from lifelines.statistics import proportional_hazard_test
import matplotlib.pyplot as plt
import os

In [43]:
# ============================================================
#  1. Create Folder Structure
# ============================================================
os.makedirs("data", exist_ok=True)
os.makedirs("outputs", exist_ok=True)

In [44]:
# ============================================================
#  2. Generate Dataset (Simulated Realistic Telco Churn Data)
# ============================================================
np.random.seed(42)
n = 500

df = pd.DataFrame({
    "tenure": np.random.exponential(scale=365, size=n).astype(int),
    "churned": np.random.binomial(1, 0.35, n),
    "age": np.random.randint(18, 70, n),
    "monthly_charges": np.random.uniform(20, 120, n),
    "contract_type": np.random.choice(["Month-to-month", "One year", "Two year"], n),
    "gender": np.random.choice(["Male", "Female"], n),
    "payment_method": np.random.choice(["Card", "UPI", "Bank Transfer"], n)
})
df.to_csv("data/churn_dataset.csv", index=False)

In [45]:
# ============================================================
#  3. EDA SUMMARY
# ============================================================
eda_text = []
eda_text.append("### EDA SUMMARY\n")
eda_text.append(f"Total Records: {len(df)}")
eda_text.append(f"Churn Rate: {df['churned'].mean():.2f}")
eda_text.append(f"Average Tenure: {df['tenure'].mean():.2f}")
eda_text.append("\nContract Type Distribution:\n")
eda_text.append(str(df['contract_type'].value_counts()))
eda_text.append("\n\nMonthly Charges Summary:\n")
eda_text.append(str(df['monthly_charges'].describe()))

with open("outputs/eda_summary.txt", "w") as f:
    f.write("\n".join(eda_text))

In [46]:
# ============================================================
#  4. Kaplan-Meier Estimator
# ============================================================
km = KaplanMeierFitter()
km.fit(durations=df["tenure"], event_observed=df["churned"])

km_text = []
km_text.append("### Kaplan-Meier Survival Output (First 20 Rows)\n")
km_text.append(str(km.survival_function_.head(20)))

with open("outputs/km_output.txt", "w") as f:
    f.write("\n".join(km_text))

# Plot KM Curve
km.plot_survival_function()
plt.title("Kaplan-Meier Survival Curve")
plt.xlabel("Time (Days)")
plt.ylabel("Survival Probability")
plt.savefig("outputs/km_plot.png")
plt.close()

In [47]:
# ============================================================
#  5. COX PROPORTIONAL HAZARDS MODEL
# ============================================================
# Encode categorical variables
df_encoded = pd.get_dummies(df, drop_first=True)

cph = CoxPHFitter()
cph.fit(df_encoded, duration_col="tenure", event_col="churned")

# Save summary
cph.summary.to_csv("outputs/cox_summary.csv")

# Interpretation Text
cox_text = []
cox_text.append("### Cox Proportional Hazards Model Interpretation\n")

for index, row in cph.summary.iterrows():
    hr = row["exp(coef)"]
    p = row["p"]
    effect = "significant" if p < 0.05 else "not significant"

    cox_text.append(f"{index}: HR={hr:.3f}, p={p:.4f} → {effect}")

with open("outputs/cox_interpretation.txt", "w") as f:
    f.write("\n".join(cox_text))

In [48]:
# ============================================================
#  6. PROPORTIONAL HAZARD ASSUMPTION TEST
# ============================================================
ph_results = proportional_hazard_test(cph, df_encoded, time_transform='rank')

ph_text = []
ph_text.append("### Schoenfeld Residual Test Results (PH Assumption)\n")
ph_text.append(str(ph_results.summary))

with open("outputs/ph_assumption_report.txt", "w") as f:
    f.write("\n".join(ph_text))



In [49]:
#============================================================
#  DONE
# ============================================================
print("All tasks completed successfully.")
print("Check the 'outputs' folder for all required deliverables.")

All tasks completed successfully.
Check the 'outputs' folder for all required deliverables.
