In [1]:
# Cell 1: Imports and Setup
import os
import glob
import pandas as pd
import numpy as np
import torch
import optuna
from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.metrics import roc_auc_score, f1_score, classification_report
from imblearn.over_sampling import RandomOverSampler
from imblearn.over_sampling import SMOTE
from models.kan import KANClassifier

RANDOM_STATE = 42
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
N_TRIALS = 20
DATA_DIR = os.path.join(os.getcwd(), 'data')

print(f"Running on device: {DEVICE}")

  from .autonotebook import tqdm as notebook_tqdm


Running on device: cuda


In [2]:
# Cell 2: Load Dataset
if not os.path.exists(DATA_DIR):
    csv_path = 'customer_churn_telecom_services.csv'
else:
    csv_files = glob.glob(os.path.join(DATA_DIR, '*.csv'))
    csv_path = csv_files[0] if csv_files else 'customer_churn_telecom_services.csv'

print(f"Loading dataset: {csv_path}")
try:
    df = pd.read_csv(csv_path)
except FileNotFoundError:
    print("CSV not found! Place it in 'data' or project root.")
    raise

print('Shape:', df.shape)

Loading dataset: /home/pcgr/Code/churn-predict/data/customer_churn_telecom_services.csv
Shape: (7043, 20)


In [3]:
# Cell 3: Cleaning & Preprocessing
target_col = "Churn"

def to_binary(series):
    if series.dtype == 'O':
        return series.str.lower().map({
            'yes':1,'sim':1,'true':1,'y':1,'1':1,
            'no':0,'nao':0,'false':0,'n':0,'0':0
        }).fillna(series)
    return series

df[target_col] = to_binary(df[target_col])

# Clean TotalCharges
df['TotalCharges'] = pd.to_numeric(df['TotalCharges'], errors='coerce')
df = df.dropna(subset=['TotalCharges']).reset_index(drop=True)

# Separate X and y
X = df.drop(columns=[target_col])
y = df[target_col].astype(int)

# Identify columns
categorical_cols = [c for c in X.columns if X[c].dtype == 'O']
numeric_cols = [c for c in X.columns if c not in categorical_cols]

# Preprocessor
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numeric_cols),
        ('cat', OneHotEncoder(handle_unknown='ignore', sparse_output=False), categorical_cols)
    ]
)

# Process all data before split
print("Processing data...")
X_processed = preprocessor.fit_transform(X)

Processing data...


In [None]:
# Cell 4: Split 50/25/25 and SMOTE on Train

# Split 1: 25% test
X_temp, X_test, y_temp, y_test = train_test_split(
    X_processed, y, test_size=0.25, random_state=RANDOM_STATE, stratify=y
)
# Split 2: 25% of original for val => 0.3333 of temp
X_train, X_val, y_train, y_val = train_test_split(
    X_temp, y_temp, test_size=0.3333, random_state=RANDOM_STATE, stratify=y_temp
)

print(f"Shapes: Train={X_train.shape}, Val={X_val.shape}, Test={X_test.shape}")

# SMOTE on Train only
print("Applying SMOTE on Train...")
try:
    smote = SMOTE(random_state=RANDOM_STATE)
    X_train_bal, y_train_bal = smote.fit_resample(X_train, y_train)
    print(f"Before: {np.bincount(y_train)} | After: {np.bincount(y_train_bal)}")
except Exception as e:
    print(f"SMOTE failed: {e}. Using original train.")
    X_train_bal, y_train_bal = X_train, y_train


Shapes: Train=(3516, 45), Val=(1758, 45), Test=(1758, 45)
Applying SMOTE on Train...
Before: [2581  935] | After: [2581 2581]


In [5]:
# Cell 5: Optuna Optimization for KAN
def objective(trial):
    n_layers = trial.suggest_int('n_layers', 1, 3)
    hidden_size = trial.suggest_categorical('hidden_size', [16, 32, 64])
    hidden_sizes = tuple([hidden_size] * n_layers)

    grid_size = trial.suggest_int('grid_size', 3, 8)
    spline_order = trial.suggest_int('spline_order', 2, 3)

    lr = trial.suggest_float('lr', 1e-4, 1e-2, log=True)
    batch_size = trial.suggest_categorical('batch_size', [32, 64])
    weight_decay = trial.suggest_float('weight_decay', 1e-5, 1e-3, log=True)

    model = KANClassifier(
        hidden_sizes=hidden_sizes,
        grid_size=grid_size,
        spline_order=spline_order,
        learning_rate=lr,
        weight_decay=weight_decay,
        batch_size=batch_size,
        max_epochs=20,
        device=DEVICE
    )

    model.fit(X_train_bal, y_train_bal)

    probs_val = model.predict_proba(X_val)[:, 1]
    try:
        return roc_auc_score(y_val, probs_val)
    except ValueError:
        return 0.5

print("\nStarting Optuna search...")
optuna.logging.set_verbosity(optuna.logging.WARNING)
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=N_TRIALS)

print("\nBest params:")
print(study.best_params)


Starting Optuna search...

Best params:
{'n_layers': 1, 'hidden_size': 64, 'grid_size': 4, 'spline_order': 3, 'lr': 0.00019942200061167132, 'batch_size': 32, 'weight_decay': 0.00010054378956296349}


In [6]:
# Cell 6: Final Training and Test Evaluation
best_params = study.best_params
best_hidden_sizes = tuple([best_params['hidden_size']] * best_params['n_layers'])

final_model = KANClassifier(
    hidden_sizes=best_hidden_sizes,
    grid_size=best_params['grid_size'],
    spline_order=best_params['spline_order'],
    learning_rate=best_params['lr'],
    weight_decay=best_params['weight_decay'],
    batch_size=best_params['batch_size'],
    max_epochs=50,
    device=DEVICE
)

final_model.fit(X_train_bal, y_train_bal)

print("\nFinal Test Results:")
probs_test = final_model.predict_proba(X_test)[:, 1]
preds_test = (probs_test >= 0.5).astype(int)

print(classification_report(y_test, preds_test))
print(f"AUROC Final: {roc_auc_score(y_test, probs_test):.4f}")


Final Test Results:
              precision    recall  f1-score   support

           0       0.87      0.83      0.85      1291
           1       0.58      0.65      0.61       467

    accuracy                           0.78      1758
   macro avg       0.72      0.74      0.73      1758
weighted avg       0.79      0.78      0.78      1758

AUROC Final: 0.8388
