<a href="https://colab.research.google.com/github/rishabh139/conFusionRestaurant/blob/master/churn_sle.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Churn Model Training and Evaluation**

## **Setup and Imports**

In [1]:
%pip install shap



In [2]:
import pandas as pd
import numpy as np
import shap
import matplotlib.pyplot as plt
import io
import scipy
import random

from datetime import datetime, timedelta
from scipy import stats
from sklearn.model_selection import RandomizedSearchCV, GridSearchCV, train_test_split
from sklearn.metrics import confusion_matrix, f1_score, roc_auc_score, brier_score_loss, log_loss, accuracy_score, precision_recall_curve
from sklearn.calibration import calibration_curve
from sklearn.linear_model import LogisticRegression
from sklearn.isotonic import IsotonicRegression
from sklearn.model_selection import cross_val_score, KFold
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from xgboost import XGBClassifier

import joblib
import warnings
warnings.filterwarnings("ignore", category=UserWarning)

## **Helper Functions**

### **Statistical Analysis Functions**

In [3]:
def calculate_side_kurtosis(data, side="left"):
    """Calculates the kurtosis of either the left or right side of a distribution."""
    median = np.median(data)
    side_data = data[data <= median] if side == "left" else data[data > median]

    unique_values = np.unique(side_data)
    if len(unique_values) < 2:
        return f"Not enough unique values on {side} side (found {len(unique_values)})"

    if np.isinf(side_data).any() or np.isnan(side_data).any():
        return f"Infinite or NaN values found on {side} side"

    try:
        kurt = stats.kurtosis(side_data)
        return float(kurt)
    except Exception as e:
        return f"Error calculating {side}-side kurtosis: {str(e)}"

def bootstrap_std_error(data, num_bootstrap=1000):
    """Calculates the standard error of the mean using bootstrapping."""
    bootstrap_means = np.array([np.mean(np.random.choice(data, size=len(data), replace=True))
                              for _ in range(num_bootstrap)])
    return np.std(bootstrap_means)

def find_optimal_threshold(y_true, y_pred_proba):
    """Finds the optimal probability threshold for classification based on F1-score."""
    precision, recall, thresholds = precision_recall_curve(y_true, y_pred_proba)
    f1_scores = 2 * (precision * recall) / (precision + recall)
    optimal_threshold = thresholds[np.argmax(f1_scores)]
    return optimal_threshold

### **Data Preprocessing Functions**

In [4]:
def data_transform(data, data_type='train', **kwargs):
    """
    Performs data transformation. Replace with your actual transformation logic.
    """
    # Add your custom transformations here
    return data

def identify_column_types(df, target_variable):
    """Identifies numerical and categorical columns."""
    numerical_cols = []
    categorical_cols = []

    for column in df.columns:
        if column == target_variable:
            continue
        if df[column].dtype in ['int64', 'float64']:
            numerical_cols.append(column)
        else:
            categorical_cols.append(column)

    return numerical_cols, categorical_cols

def encode_columns(data, numerical_cols, categorical_cols):
    """Encodes categorical columns and scales numerical columns."""
    for col in categorical_cols:
        data[col] = data[col].astype(str)

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', StandardScaler(), numerical_cols),
            ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_cols)
        ])
    transformed_data = preprocessor.fit_transform(data)

    feature_names = numerical_cols + list(
        preprocessor.named_transformers_['cat'].get_feature_names_out(categorical_cols))

    return transformed_data, feature_names, preprocessor

def data_preprocessing(data, target_variable, is_train=True):
    """Preprocesses data, including encoding."""
    y_data = data[target_variable]
    X_data = data.drop(columns=[target_variable])

    numerical_cols, categorical_cols = identify_column_types(X_data, target_variable=None)

    if is_train:
        transformed_data, feature_names, preprocessor = encode_columns(
            X_data, numerical_cols, categorical_cols)
    else:
        transformed_data = preprocessor.transform(X_data)
        feature_names = numerical_cols + list(
            preprocessor.named_transformers_['cat'].get_feature_names_out(categorical_cols))

    if scipy.sparse.issparse(transformed_data):
        X_data = pd.DataFrame(transformed_data.toarray(), columns=feature_names)
    else:
        X_data = pd.DataFrame(transformed_data, columns=feature_names)

    return (X_data, y_data, preprocessor) if is_train else (X_data, y_data)

### **Model Training and Evaluation Functions**

In [5]:
def train_model(X_train, y_train, model_params={}, cv_params={}):
    """Trains the model (XGBoost with optional cross-validation)."""
    default_cv_params = {
        "scoring": "f1",
        "cv": 5,
        "verbose": 1,
        "n_iter": 10,
        "random_state": 42,
        "error_score": 'raise'
    }
    cv_params = {**default_cv_params, **cv_params}

    if cv_params["cv"] >= 2:
        model = RandomizedSearchCV(
            XGBClassifier(**model_params),
            cv_params
        ).fit(X_train, y_train)
        best_model = model.best_estimator_
    else:
        model = XGBClassifier(**model_params).fit(X_train, y_train)
        best_model = model
    return best_model

def evaluate_model(model, X_test, y_test):
    """Evaluates the trained model."""
    y_pred = model.predict(X_test)
    y_pred_proba = model.predict_proba(X_test)[:, 1]
    metrics = calculate_metrics(y_test, y_pred, y_pred_proba)
    return metrics

def calculate_metrics(y_true, y_pred, y_pred_proba):
    """Calculates various evaluation metrics."""
    metrics = {
        'Skewness': stats.skew(y_pred),
        'Left-Side Kurtosis': calculate_side_kurtosis(y_pred_proba, "left"),
        'Right-Side Kurtosis': calculate_side_kurtosis(y_pred_proba, "right"),
        'Standard Error (Direct)': np.std(y_pred) / np.sqrt(len(y_pred)),
        'Standard Error (Bootstrapped)': bootstrap_std_error(y_pred),
        'Log Loss': log_loss(y_true, y_pred_proba),
        'F1 Score': f1_score(y_true, y_pred),
        'ROC-AUC': roc_auc_score(y_true, y_pred_proba),
        'Accuracy': accuracy_score(y_true, y_pred),
        'Optimal Threshold': find_optimal_threshold(y_true, y_pred_proba)
    }
    return metrics

## **Data Loading and Initial Transformation**

In [None]:
# Load your data
# train_data = pd.read_csv("your_train_data.csv")  # Replace with your training data path
# eval_data = pd.read_csv("your_eval_data.csv")    # Replace with your evaluation data path
target_variable = "your_target_variable"          # Replace with your target column name

# Apply transformations
transformed_train_data = data_transform(train_data.copy(), data_type='train')
transformed_eval_data = data_transform(eval_data.copy(), data_type='exec')

## **Data Preprocessing and Train-Test Split**

In [None]:
# Preprocess training data
X_train, y_train, preprocessor = data_preprocessing(
    transformed_train_data, target_variable, is_train=True)

# Split into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(
    X_train, y_train, test_size=0.2, random_state=42)

## **Model Training**

In [None]:
# Define model parameters
xgb_params = {
    "learning_rate": 0.1,
    "n_estimators": 100,
    "max_depth": 3,
    "tree_method": 'hist',
    "enable_categorical": True
}

# Define cross-validation parameters
cv_params = {"cv": 5}

# Train the model
trained_model = train_model(X_train, y_train, model_params=xgb_params, cv_params=cv_params)

## **Model Evaluation on Test Set**

In [None]:
# Evaluate on test set
test_metrics = evaluate_model(trained_model, X_test, y_test)
print("Test set metrics:")
for metric_name, metric_value in test_metrics.items():
    print(f"{metric_name}: {metric_value}")

## **Real-world Evaluation**

In [None]:
# Preprocess evaluation data
X_eval, y_eval = data_preprocessing(transformed_eval_data, target_variable, is_train=False)

# Handle column mismatches
train_cols = X_train.columns
missing_cols_eval = set(train_cols) - set(X_eval.columns)
extra_cols_eval = set(X_eval.columns) - set(train_cols)

for col in missing_cols_eval:
    X_eval[col] = 0

X_eval = X_eval.drop(columns=list(extra_cols_eval)) if extra_cols_eval else X_eval
X_eval = X_eval[train_cols]

# Evaluate on real-world data
real_world_metrics = evaluate_model(trained_model, X_eval, y_eval)
print("\nReal-world evaluation metrics:")
for metric_name, metric_value in real_world_metrics.items():
    print(f"{metric_name}: {metric_value}")