In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import pandas as pd
import tensorflow as tf
import numpy as np
from imblearn.combine import SMOTEENN
from imblearn.combine import SMOTETomek
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.layers import Add
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, matthews_corrcoef, roc_auc_score
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder
from tensorflow.keras.layers import Conv1D, Activation, Multiply

In [None]:
df_raw = pd.read_csv("IBM Telco Dataset (7043).csv")

In [None]:
df_raw.loc[df_raw["MultipleLines"] == "No phone service", "MultipleLines"] = 'No'
df_raw.loc[df_raw["OnlineSecurity"] == "No internet service", "OnlineSecurity"] = 'No'
df_raw.loc[df_raw["OnlineBackup"] == "No internet service", "OnlineBackup"] = 'No'
df_raw.loc[df_raw["DeviceProtection"] == "No internet service", "DeviceProtection"] = 'No'
df_raw.loc[df_raw["TechSupport"] == "No internet service", "TechSupport"] = 'No'
df_raw.loc[df_raw["StreamingTV"] == "No internet service", "StreamingTV"] = 'No'
df_raw.loc[df_raw["StreamingMovies"] == "No internet service", "StreamingMovies"] = 'No'
df_raw.loc[df_raw["DeviceProtection"] == "No internet service", "DeviceProtection"] = 'No'
df_raw.loc[df_raw["TotalCharges"] == " ", "TotalCharges"] = '0'
df_raw['TotalCharges'] = pd.to_numeric(df_raw['TotalCharges'])

In [None]:
# Encoder = "Label Encoder"
Encoder = "Label Encoder"
# OverSamplingTecnique = ""
OverSamplingTecnique = "SMOTE-Enn"
# OverSamplingTecnique = "SMOTE-Tomek"
# OverSamplingTecnique = "SMOTE-Enn"
filter_size=5
number_of_filter=128
flatten_layer_exist=True
Model_Name="SE Block"

**Label Encoding**

In [None]:
if Encoder == "Label Encoder":
  print("Applying Label Encoder")
  df_final = df_raw.copy()
  le = LabelEncoder()

  text_data_features = ['gender', 'SeniorCitizen', 'Partner', 'Dependents', 'PhoneService', 'MultipleLines',
              'InternetService','OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport',
              'StreamingTV', 'StreamingMovies', 'Contract','PaperlessBilling', 'PaymentMethod', 'Churn']

  print('Label Encoder Transformation')
  for i in text_data_features :
      df_final[i] = le.fit_transform(df_final[i])
      print(i,' : ',df_final[i].unique(),' = ',le.inverse_transform(df_final[i].unique()))



  X = df_final.drop(['Churn','customerID'], axis=1).copy()
  Y = df_final['Churn'].copy().astype(int)

Applying Label Encoder
Label Encoder Transformation
gender  :  [0 1]  =  ['Female' 'Male']
SeniorCitizen  :  [0 1]  =  [0 1]
Partner  :  [1 0]  =  ['Yes' 'No']
Dependents  :  [0 1]  =  ['No' 'Yes']
PhoneService  :  [0 1]  =  ['No' 'Yes']
MultipleLines  :  [0 1]  =  ['No' 'Yes']
InternetService  :  [0 1 2]  =  ['DSL' 'Fiber optic' 'No']
OnlineSecurity  :  [0 1]  =  ['No' 'Yes']
OnlineBackup  :  [1 0]  =  ['Yes' 'No']
DeviceProtection  :  [0 1]  =  ['No' 'Yes']
TechSupport  :  [0 1]  =  ['No' 'Yes']
StreamingTV  :  [0 1]  =  ['No' 'Yes']
StreamingMovies  :  [0 1]  =  ['No' 'Yes']
Contract  :  [0 1 2]  =  ['Month-to-month' 'One year' 'Two year']
PaperlessBilling  :  [1 0]  =  ['Yes' 'No']
PaymentMethod  :  [2 3 0 1]  =  ['Electronic check' 'Mailed check' 'Bank transfer (automatic)'
 'Credit card (automatic)']
Churn  :  [0 1]  =  ['No' 'Yes']


**One-hot Encoding**

In [None]:
if Encoder == "One-hot Encoder":
  print("Applying One-hot Encoder")

  # One-hot encode categorical columns
  categorical_columns = ['gender', 'SeniorCitizen', 'Partner', 'Dependents', 'PhoneService', 'MultipleLines',
              'InternetService','OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport',
              'StreamingTV', 'StreamingMovies', 'Contract','PaperlessBilling', 'PaymentMethod']

  encoder = OneHotEncoder()
  encoded_features = encoder.fit_transform(df_raw[categorical_columns]).toarray()

  # Combine one-hot encoded features with numerical features
  numerical_features = df_raw.drop(categorical_columns + ['Churn', 'customerID'], axis=1)
  X = np.hstack((encoded_features, numerical_features))

  # Manually encode 'Churn' column
  df_raw['Churn'] = df_raw['Churn'].apply(lambda x: 1 if x == 'Yes' else 0)
  # Extract the target variable Y
  Y = df_raw['Churn'].values


  # Ensure all data is in float format
  X = X.astype(float)
  Y = Y.astype(float)

In [None]:
scaler = StandardScaler()
X_resampled_scaled = scaler.fit_transform(X)
X=X_resampled_scaled
Y=Y

**SMOTE**

In [None]:
if OverSamplingTecnique == "SMOTE":
  print("Applying SMOTE")
  smote = SMOTE()

  X_resampled, y_resampled = smote.fit_resample(X, Y)
  scaler = StandardScaler()
  X_resampled_scaled = scaler.fit_transform(X_resampled)
  X=X_resampled_scaled
  Y=y_resampled

**SMOTETomek**

In [None]:
if OverSamplingTecnique == "SMOTE-Tomek":
  print("Applying SMOTE-Tomek")

  smote_tomek = SMOTETomek()
  X_resampled, y_resampled = smote_tomek.fit_resample(X, Y)
  scaler = StandardScaler()
  X_resampled_scaled = scaler.fit_transform(X_resampled)
  X=X_resampled_scaled
  Y=y_resampled

**SMOTEENN**

In [None]:
if OverSamplingTecnique == "SMOTE-Enn":
  print("Applying SMOTE-Enn")

  smote_enn = SMOTEENN()
  X_resampled, y_resampled = smote_enn.fit_resample(X, Y)
  scaler = StandardScaler()
  X_resampled_scaled = scaler.fit_transform(X_resampled)
  X=X_resampled_scaled
  Y=y_resampled

Applying SMOTE-Enn


**Squeeze-and-Excitation**

In [None]:
# Define the channel attention layer
class ChannelAttention(tf.keras.layers.Layer):
    def __init__(self, reduction_ratio=8):
        super(ChannelAttention, self).__init__()
        self.reduction_ratio = reduction_ratio

    def build(self, input_shape):
        channels = input_shape[-1]
        self.fc = tf.keras.layers.Dense(channels // self.reduction_ratio, activation='relu')
        self.attention = tf.keras.layers.Dense(channels, activation='sigmoid')

    def call(self, inputs):
        x = tf.reduce_mean(inputs, axis=[1])  # Global average pooling across time dimension
        x = self.fc(x)
        x = self.attention(x)
        x = tf.expand_dims(x, axis=1)  # Add a new dimension for broadcasting
        return inputs * x

# Define the spatial attention layer
class SpatialAttention(tf.keras.layers.Layer):
    def __init__(self):
        super(SpatialAttention, self).__init__()
        self.max_pool = tf.keras.layers.MaxPooling1D(pool_size=3, strides=1, padding='same')
        self.avg_pool = tf.keras.layers.AveragePooling1D(pool_size=3, strides=1, padding='same')
        self.concat = tf.keras.layers.Concatenate(axis=-1)
        self.conv1d = tf.keras.layers.Conv1D(filters=1, kernel_size=3, padding='same', activation='sigmoid')

    def call(self, inputs):
        max_pool_out = self.max_pool(inputs)
        avg_pool_out = self.avg_pool(inputs)
        concat_out = self.concat([max_pool_out, avg_pool_out])
        attention_weights = self.conv1d(concat_out)
        return inputs * attention_weights


# Define the residual block
def residual_block(x, filters, kernel_size):
    # Save the input tensor
    x_shortcut = x

    # First convolutional layer
    x = tf.keras.layers.Conv1D(filters, kernel_size, activation='relu', padding='same')(x)

    # Second convolutional layer
    x = tf.keras.layers.Conv1D(filters, kernel_size, activation='relu', padding='same')(x)

    # Add the shortcut connection
    x = Add()([x, x_shortcut])

    # Apply ReLU activation
    x = tf.keras.layers.Activation('relu')(x)

    return x

**Basic Channel Attention**

In [None]:
# Define the channel attention layer for 1D data
class Basic_ChannelAttention(tf.keras.layers.Layer):
    def __init__(self, ratio=8):
        super(Basic_ChannelAttention, self).__init__()
        self.ratio = ratio

    def build(self, input_shape):
        _, channels = input_shape[1:]
        self.shared_layer1 = Conv1D(channels // self.ratio, kernel_size=1, activation='relu', padding='same')
        self.shared_layer2 = Conv1D(channels, kernel_size=1, padding='same')

    def call(self, inputs):
        x1 = tf.reduce_mean(inputs, axis=1, keepdims=True)
        x1 = self.shared_layer1(x1)
        x1 = self.shared_layer2(x1)

        x2 = tf.reduce_max(inputs, axis=1, keepdims=True)
        x2 = self.shared_layer1(x2)
        x2 = self.shared_layer2(x2)

        attention = tf.add(x1, x2)
        attention = Activation("sigmoid")(attention)
        output = Multiply()([inputs, attention])

        return output

In [None]:
def kfold(filter_size, number_of_filter, flatten_layer_exist, Model_Name):
  print("Applying K-fold")
  print(f"Applying {number_of_filter} filters of size {filter_size}")


  # Assuming X and Y are your input and target data
  # Define the number of folds
  num_folds = 10

  # Initialize lists to store the evaluation results
  accuracy_scores = []
  precision_scores = []
  recall_scores = []
  f1_scores = []
  mcc_scores = []
  auc_roc_scores = []



  # Perform stratified k-fold cross-validation
  fold_number = 1  # Initialize the fold number
  skf = StratifiedKFold(n_splits=num_folds, shuffle=True)
  for train_index, test_index in skf.split(X, Y):
      print(f"Fold {fold_number}/{num_folds}:")
      # Split the data into training and test sets for the current fold
      X_train, X_test = X[train_index], X[test_index]
      # X_train, X_test = X[train_index], X[test_index]
      Y_train, Y_test = Y[train_index], Y[test_index]

      # Create the model with attention mechanisms and residual blocks
      inputs = tf.keras.Input(shape=(X_train.shape[1], 1))
      x = tf.keras.layers.Conv1D(filters=number_of_filter, kernel_size=filter_size, activation='relu')(inputs)
      x_res = residual_block(x, number_of_filter, filter_size)  # Apply the first residual block
      if Model_Name == "SE Block":
        print("Applying SE Block")
        x = ChannelAttention()(x_res)  # Apply channel attention
      else:
        print("Applying Basic Channel Attenntion")
        x = Basic_ChannelAttention()(x_res)  # Apply channel attention
      x = SpatialAttention()(x)  # Apply spatial attention
      x = tf.keras.layers.Conv1D(filters=number_of_filter, kernel_size=filter_size, activation='relu', padding='same')(x)
      x_res = residual_block(x, number_of_filter, filter_size)  # Apply the second residual block
      if Model_Name == "SE Block":
        print("Applying SE Block")
        x = ChannelAttention()(x_res)  # Apply channel attention
      else:
        print("Applying Basic Channel Attenntion")
        x = Basic_ChannelAttention()(x_res)  # Apply channel attention
      x = SpatialAttention()(x)  # Apply spatial attention
      if flatten_layer_exist==True:
        print("Applying flatten layer")
        x = tf.keras.layers.Flatten()(x)  # Flatten the output before dense layers
      else:
        print("Applying Globale Max Pooling 1D layer")
        x = tf.keras.layers.GlobalMaxPooling1D()(x)
      x = tf.keras.layers.Dropout(0.5)(x)
      x = tf.keras.layers.Dense(number_of_filter, activation='relu')(x)
      outputs = tf.keras.layers.Dense(1, activation='sigmoid')(x)

      model = tf.keras.Model(inputs=inputs, outputs=outputs)
      # Compile and train the model
      optimizer = tf.keras.optimizers.Adam(learning_rate=0.00001)
      model.compile(optimizer='RMSprop', loss='binary_crossentropy', metrics=['accuracy'])
      model.fit(X_train, Y_train, epochs=30, batch_size=32, verbose="1",validation_split=0.2)
      # Evaluate the model on the test set
      Y_pred = model.predict(X_test)
      Y_pred_binary = np.round(Y_pred).flatten()

      # Calculate evaluation metrics
      accuracy = accuracy_score(Y_test, Y_pred_binary)
      precision = precision_score(Y_test, Y_pred_binary)
      recall = recall_score(Y_test, Y_pred_binary)
      f1 = f1_score(Y_test, Y_pred_binary)
      mcc = matthews_corrcoef(Y_test, Y_pred_binary)
      auc_roc = roc_auc_score(Y_test, Y_pred)

      # Append the scores to the respective lists
      accuracy_scores.append(accuracy)
      precision_scores.append(precision)
      recall_scores.append(recall)
      f1_scores.append(f1)
      mcc_scores.append(mcc)
      auc_roc_scores.append(auc_roc)
      fold_number += 1

  # Calculate the average scores
  avg_accuracy = np.mean(accuracy_scores)
  avg_precision = np.mean(precision_scores)
  avg_recall = np.mean(recall_scores)
  avg_f1 = np.mean(f1_scores)
  avg_mcc = np.mean(mcc_scores)
  avg_auc_roc = np.mean(auc_roc_scores)

  # Print the average scores
  print("Average Test Accuracy:", avg_accuracy)
  print("Average Precision:", avg_precision)
  print("Average Recall:", avg_recall)
  print("Average F1 Score:", avg_f1)
  print("Average MCC:", avg_mcc)
  print("Average AUC-ROC:", avg_auc_roc)

In [None]:
kfold(filter_size, number_of_filter, flatten_layer_exist, Model_Name)

Applying K-fold
Applying 128 filters of size 5
Fold 1/10:
Applying SE Block
Applying SE Block
Applying flatten layer
Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30
Fold 2/10:
Applying SE Block
Applying SE Block
Applying flatten layer
Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30
Fold 3/10:
Applying SE Block
Applying SE Block
Applying flatten layer
Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/