In [None]:
import torch
from torch import nn
import numpy as np
import random
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import seaborn as sns
from sklearn.metrics import confusion_matrix, accuracy_score
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder, LabelEncoder

In [None]:
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(0)

In [None]:
df = pd.read_csv("WA_Fn-UseC_-Telco-Customer-Churn.csv")
df.describe()

In [None]:
df.isnull().sum()

In [None]:
df

In [None]:
df['TotalCharges'] = pd.to_numeric(df['TotalCharges'], errors='coerce')
df = df.dropna(subset=['TotalCharges'])

In [None]:
df['PaperlessBilling'].value_counts(normalize = False)

In [None]:
df['gender'].value_counts(normalize = False) # 0 x

In [None]:
df['InternetService'].value_counts(normalize=False) # 7

In [None]:
df['MultipleLines'].value_counts(normalize=False) # 6

In [None]:
df['Contract'].value_counts(normalize = False)# 14

In [None]:
df['PaymentMethod'].value_counts(normalize = False) # 16

In [None]:
df['TotalCharges'].isnull().sum()

In [None]:
X = df.iloc[:, 1:-1]
y = df.iloc[:, -1].values

In [None]:
le = LabelEncoder()
y = le.fit_transform(y)

# Splitting and Encoding the Data

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 0)

In [None]:
# Encoding Categorical and Labeled data
categorical_data = ['MultipleLines', 'InternetService', 'Contract', 'PaymentMethod']
label_encoding_cols = ['gender', 'Partner', 'Dependents', 'PhoneService',
                'OnlineSecurity', 'OnlineBackup', 'DeviceProtection',
                'TechSupport', 'StreamingTV', 'StreamingMovies', 'PaperlessBilling']
numeric_cols = ['tenure', 'MonthlyCharges', 'TotalCharges']

ct = ColumnTransformer(transformers=[('encoder', OneHotEncoder(), categorical_data),
                                     ('ordinal', OrdinalEncoder(), label_encoding_cols),
                                     ('num', StandardScaler(), numeric_cols)], remainder = "passthrough")

X_train = ct.fit_transform(X_train)
X_test = ct.transform(X_test)

In [None]:
y_train = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
y_test = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)
X_train = torch.tensor(X_train, dtype = torch.float32)
X_test = torch.tensor(X_test, dtype = torch.float32)

In [None]:
X_train.shape

# Models


In [None]:
class ChurnModel(nn.Module):
  def __init__(self):
    super().__init__()
    self.layer_1 = nn.Linear(28, 128)
    self.relu1 = nn.ReLU()

    self.layer_2 = nn.Linear(128, 128)
    self.relu2 = nn.ReLU()

    self.layer_3 = nn.Linear(128, 1)

  def forward(self, x):
    x = self.relu1(self.layer_1(x))
    x = self.relu2(self.layer_2(x))
    x = self.layer_3(x)
    return x

In [None]:
class ChurnModelV2(nn.Module):
  def __init__(self):
    super().__init__()
    self.layer_1 = nn.Linear(28, 128)
    self.batchnorm1 = nn.BatchNorm1d(num_features=128)
    self.relu1 = nn.ReLU()
    self.dropout1 = nn.Dropout(p = 0.05)

    self.layer_2 = nn.Linear(128, 128)
    self.batchnorm2 = nn.BatchNorm1d(num_features=128)
    self.relu2 = nn.ReLU()
    self.dropout2 = nn.Dropout(p = 0.05)

    self.layer_3 = nn.Linear(128, 1)

  def forward(self, x):
    x = self.batchnorm1(self.layer_1(x))
    x = self.relu1(x)
    x = self.dropout1(x)

    x = self.batchnorm2(self.layer_2(x))
    x = self.relu2(x)
    x = self.dropout2(x)

    x = self.layer_3(x)
    return x

In [None]:
class ChurnModelV3(nn.Module):
  def __init__(self):
    super().__init__()
    self.layer_1 = nn.Linear(28, 128)
    self.relu1 = nn.ReLU()

    self.layer_2 = nn.Linear(128, 128)
    self.relu2 = nn.ReLU()

    self.layer_3 = nn.Linear(128, 1)
    self.sigmoid = nn.Sigmoid()

  def forward(self, x):
    x = self.relu1(self.layer_1(x))
    x = self.relu2(self.layer_2(x))
    x = self.layer_3(x)
    x = self.sigmoid(x)
    return x

In [None]:
class ChurnModelV4(nn.Module):
  def __init__(self):
    super().__init__()
    self.layer_1 = nn.Linear(28, 128)
    self.batchnorm1 = nn.BatchNorm1d(num_features=128)
    self.relu1 = nn.ReLU()
    self.dropout1 = nn.Dropout(p = 0.1)

    self.layer_2 = nn.Linear(128, 128)
    self.batchnorm2 = nn.BatchNorm1d(num_features=128)
    self.relu2 = nn.ReLU()
    self.dropout2 = nn.Dropout(p = 0.1)

    self.layer_3 = nn.Linear(128, 1)
    self.sigmoid = nn.Sigmoid()

  def forward(self, x):
    x = self.batchnorm1(self.layer_1(x))
    x = self.relu1(x)
    x = self.dropout1(x)

    x = self.batchnorm2(self.layer_2(x))
    x = self.relu2(x)
    x = self.dropout2(x)

    x = self.layer_3(x)
    x = self.sigmoid(x)
    return x

In [None]:
model_1 = ChurnModel()
model_2 = ChurnModelV2()
model_3 = ChurnModelV3()

In [None]:
with torch.inference_mode():
  y_pred = model_1(X_train)
  y_pred_probs = torch.sigmoid(y_pred)
  y_lables = torch.round(y_pred_probs)

print(f"Initial model accuracy : {accuracy_score(y_lables, y_train) * 100:.4f} %\n")
print(f"Initial Confusion matrix :\n {confusion_matrix(y_lables, y_train)}")

In [None]:
with torch.inference_mode():
  y_pred = model_2(X_train)
  y_pred_probs = torch.sigmoid(y_pred)
  y_lables = torch.round(y_pred_probs)

print(f"Initial model accuracy : {accuracy_score(y_lables, y_train) * 100:.4f} %\n")
print(f"Initial Confusion matrix :\n {confusion_matrix(y_lables, y_train)}")

In [None]:
with torch.inference_mode():
  y_pred = model_3(X_train)
  y_preds = torch.round(y_pred)

print(f"Initial model accuracy : {accuracy_score(y_preds, y_train) * 100:.4f} %\n")
print(f"Initial Confusion matrix :\n {confusion_matrix(y_preds, y_train)}")

In [None]:
loss_fn_1 = nn.BCEWithLogitsLoss()
loss_fn_2 = nn.BCELoss()

SGD_optim_1 = torch.optim.SGD(params = model_1.parameters(), lr = 0.01)
SGD_optim_2 = torch.optim.SGD(params = model_2.parameters(), lr = 0.01)
SGD_optim_3 = torch.optim.SGD(params = model_3.parameters(), lr = 0.01)

Adam_optim_1 = torch.optim.Adam(params = model_1.parameters(), lr = 0.001)
Adam_optim_2 = torch.optim.Adam(params = model_2.parameters(), lr = 0.001)
Adam_optim_3 = torch.optim.Adam(params = model_3.parameters(), lr = 0.001)

RMSprop_optim_1 = torch.optim.RMSprop(params = model_1.parameters(), lr = 0.01)
RMSprop_optim_2 = torch.optim.RMSprop(params = model_2.parameters(), lr = 0.01)
RMSprop_optim_3 = torch.optim.RMSprop(params = model_3.parameters(), lr = 0.01)

SGDW_optim_1 = torch.optim.SGD(params = model_1.parameters(), lr = 0.01, momentum = 0.9)
SGDW_optim_2 = torch.optim.SGD(params = model_2.parameters(), lr = 0.01, momentum = 0.9)
SGDW_optim_3 = torch.optim.AdamW(params = model_3.parameters(), lr=0.001, weight_decay=1e-4)


In [None]:
def model_loopV1(optimizer, loss_fn, model, epochs=150, limit=20):
    best_accuracy = 0
    patience_counter = 0

    epoch_counts = []
    test_loss_values = []
    train_loss_values = []
    test_accuracies = []

    for epoch in range(epochs):
        model.train()

        y_logits = model(X_train)
        y_pred_probs = torch.sigmoid(y_logits)
        y_labels = torch.round(y_pred_probs)

        loss = loss_fn(y_logits, y_train)

        optimizer.zero_grad()

        loss.backward()

        optimizer.step()

        model.eval()
        with torch.inference_mode():
            test_logits = model(X_test)
            test_pred_probs = torch.sigmoid(test_logits)
            test_labels = torch.round(test_pred_probs)

            test_loss = loss_fn(test_logits, y_test)
            test_accuracy = accuracy_score(test_labels, y_test)

            epoch_counts.append(epoch)
            test_loss_values.append(test_loss.item())
            train_loss_values.append(loss.item())
            test_accuracies.append(test_accuracy)

            # Early stopping
            if test_accuracy > best_accuracy:
                best_accuracy = test_accuracy
                patience_counter = 0
                torch.save(model.state_dict(), "best_model.pth")  # Save best model
            else:
                patience_counter += 1

            #if epoch % 10 == 0:
                #print(f"Epoch {epoch} | Train Loss: {loss:.4f} | Test Loss: {test_loss:.4f} | Accuracy: {test_accuracy:.4f}")

            if patience_counter >= limit:
                print(f"\n Early stopping at epoch {epoch} | no improvement in last {limit} epochs.")
                break

    print(f"\n Best accuracy: {best_accuracy:.4f}")

    plt.figure(figsize=(10, 5))
    plt.plot(test_accuracies, label="Test Accuracy", color="green")
    plt.title(f"Test Accuracy over Epochs (Best: {best_accuracy * 100:.2f}) %")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.grid(True, alpha = 0.6)
    plt.show()

In [None]:
def model_loopV2(optimizer, loss_fn, model=model_3, epochs=150, limit=50):
    best_accuracy = 0
    patience_counter = 0

    epoch_counts = []
    test_loss_values = []
    train_loss_values = []
    test_accuracies = []

    for epoch in range(epochs):
        model.train()

        y_preds = model(X_train)
        loss = loss_fn(y_preds, y_train)

        optimizer.zero_grad()

        loss.backward()

        optimizer.step()

        model.eval()
        with torch.inference_mode():
            test_pred_probs = model(X_test)
            test_labels = torch.round(test_pred_probs)

            test_loss = loss_fn(test_pred_probs, y_test)

            test_accuracy = accuracy_score(test_labels, y_test)

            epoch_counts.append(epoch)
            test_loss_values.append(test_loss.item())
            train_loss_values.append(loss.item())
            test_accuracies.append(test_accuracy)

            # Early stopping
            if test_accuracy > best_accuracy:
                best_accuracy = test_accuracy
                patience_counter = 0
                torch.save(model.state_dict(), "best_model.pth")  # Save best model
            else:
                patience_counter += 1

            #if epoch % 10 == 0:
                #print(f"Epoch {epoch} | Train Loss: {loss:.4f} | Test Loss: {test_loss:.4f} | Accuracy: {test_accuracy:.4f}")

            if patience_counter >= limit:
                print(f"\n Early stopping at epoch {epoch} | no improvement in last {limit} epochs.")
                break

    print(f"\n Best accuracy: {best_accuracy:.4f}")

    plt.figure(figsize=(10, 5))
    plt.plot(test_accuracies, label="Test Accuracy", color="green")
    plt.title(f"Test Accuracy over Epochs (Best: {best_accuracy * 100:.2f}) %")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.grid(True, alpha = 0.6)
    plt.legend()
    plt.show()


# Stochastic Gradient Descent

In [None]:
model_loopV1(SGD_optim_1, loss_fn_1, model_1, limit=50) # Stochastic Gradient Descent

In [None]:
model_loopV1(SGD_optim_2, loss_fn_1, model_2, limit=50)

In [None]:
model_loopV2(SGD_optim_3, loss_fn_2, model_3)

# Adaptive moment estimator (Adam)

In [None]:
model_loopV1(Adam_optim_1, loss_fn_1, model_1)

In [None]:
model_loopV1(Adam_optim_2, loss_fn_1, model_2)

In [None]:
model_loopV2(Adam_optim_3, loss_fn_2, model_3)

# RMSprop

In [None]:
model_loopV1(RMSprop_optim_1, loss_fn_1, model_1, limit=50)

In [None]:
model_loopV1(RMSprop_optim_2, loss_fn_1, model_2, limit=50)

In [None]:
model_loopV2(RMSprop_optim_3, loss_fn_2, model_3)

# SGD with Momentum


In [None]:
model_loopV1(SGDW_optim_1, loss_fn_1, model_1, limit=50)

In [None]:
model_loopV1(SGDW_optim_2, loss_fn_1, model_2, limit=50)

In [None]:
model_loopV2(SGDW_optim_3, loss_fn_2, model_3, limit = 200)