# BOR: advanced NN architectures (attention-based ones)

In [None]:
#Importing the necessary libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
#Importing to preprocess the data
from sklearn.model_selection import train_test_split 
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.preprocessing import label_binarize

#Importing to build the models
from tensorflow.keras import layers, regularizers, models
#Importing to evaluate the models
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix, classification_report
#Importing to explain the models
import shap

# for reproducibility, the value is set for conventional reasons
SEED = 42
tf.keras.utils.set_random_seed(SEED)

In [None]:
# load the data
data = pd.read_csv('dataset_d.csv', encoding='latin-1', sep=',') # request the dataset to the author
#data.head()

In [None]:
# target column : "best_response_1l" multi-classification problem
# relevant columns for the model
relevant_columns = [ 'age', 'sex', 'smoking', 'ps_at_diagnosis_ad', 'n#_mets_sites', 'lung_only_m1', 'pleural', 'pericard','lymph_nodes_only_m1','soft_tissue',
    'leptomingeal','skin','peritoneal','renal','pancreas', 'brain', 'liver', 'bone', 'adrenal', 'histology', 'hbbaselineio','leucotbaselineio',
    'neut_abs...143','linfo_abs...144','baso_abs...145', 'mono_abs...147', 'plaqtbaselineio', 'best_response_1l']
df= data.copy()
df = df[relevant_columns]
df.shape

In [None]:
df = df.dropna(axis=0)
df.shape

In [None]:
#to randomize the data
df = df.sample(frac=1, random_state=SEED)

var_int = ['ps_at_diagnosis_ad', 'n#_mets_sites', 'lung_only_m1', 'pleural', 'pericard', 'lymph_nodes_only_m1', 'soft_tissue',
           'leptomingeal','skin','peritoneal','renal','pancreas', 'brain', 'liver', 'bone', 'adrenal']
for i in var_int:
    df[i] = df[i].astype(int)
    
df['sex'] = df['sex'].str.lower()


sex_dummies = pd.get_dummies(df['sex'], prefix='sex', drop_first=True)

other_dummies = pd.get_dummies(df[['histology', 'smoking']])


df_encoded = pd.concat([df.drop(columns=['sex', 'histology', 'smoking']),
                        sex_dummies, other_dummies], axis=1)

cols_to_convert = ['histology_adenocarcinoma', 'histology_nsclc', 'histology_squamous','histology_adenosquamous', 'sex_male', 
                   'smoking_current', 'smoking_former', 'smoking_non-smoker']

df_encoded[cols_to_convert] = df_encoded[cols_to_convert].astype(int)

In [None]:
# split the data into features and target
X = df_encoded[df_encoded.columns.difference(['best_response_1l'])] 
y = df_encoded['best_response_1l']

le = LabelEncoder()
y_encoded = le.fit_transform(y) 

In [None]:
X_train_val, X_test, y_train_val, y_test = train_test_split(X, y_encoded, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.2, random_state=42)

print("Shape of X_train_val:",X_train_val.shape)
print("Shape of X_train:",X_train.shape)
print("Shape of X_val:",X_val.shape)
print("Shape of X_test:",X_test.shape)

In [None]:
binary_features = ['lung_only_m1', 'pleural', 'pericard', 'lymph_nodes_only_m1', 'soft_tissue', 'leptomingeal','skin','peritoneal','renal',
                   'pancreas', 'brain', 'liver', 'bone', 'adrenal','histology_adenocarcinoma', 'histology_nsclc', 
                   'histology_squamous', 'sex_male','smoking_current', 'smoking_former', 'smoking_non-smoker', 'histology_adenosquamous',]
numeric_features = ['neut_abs...143','linfo_abs...144', 'plaqtbaselineio', 'age', 'ps_at_diagnosis_ad', 'n#_mets_sites', 'leucotbaselineio',
                    'hbbaselineio','baso_abs...145', 'mono_abs...147'] #'duration_l1', 'time_to_l1_start'

X_scaled = X.copy()
X_train_scaled = X_train.copy()
X_val_scaled = X_val.copy() 
X_test_scaled = X_test.copy()
X_train_val_scaled = X_train_val.copy()

scaler = StandardScaler()

X_scaled[numeric_features] = scaler.transform(X_scaled[numeric_features])
X_train_val_scaled[numeric_features] = scaler.fit_transform(X_train_val_scaled[numeric_features])
X_train_scaled[numeric_features] = scaler.fit_transform(X_train_scaled[numeric_features])
X_val_scaled[numeric_features] = scaler.transform(X_val_scaled[numeric_features])
X_test_scaled[numeric_features] = scaler.transform(X_test_scaled[numeric_features])

In [None]:
for idx, label in enumerate(le.classes_):
    print(f"{idx}: {label}")

In [None]:
X_train_scaled_df = pd.DataFrame(X_train_scaled, columns=X_train.columns)
X_train_val_scaled_df = pd.DataFrame(X_train_val_scaled, columns=X_train_val.columns)
X_test_scaled_df = pd.DataFrame(X_test_scaled, columns=X_test.columns)

In [None]:
# Attention Layer definition
class FeatureAttention(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super(FeatureAttention, self).__init__(**kwargs)

    def build(self, input_shape):
        self.attention_dense = layers.Dense(
            input_shape[-1], 
            activation='tanh',
            name='attention_dense'
        )
        self.attention_output = layers.Dense(
            input_shape[-1], 
            activation='softmax',
            name='attention_output'
        )
        super(FeatureAttention, self).build(input_shape)

    def call(self, inputs):
        attention_weights = self.attention_dense(inputs)
        attention_weights = self.attention_output(attention_weights)
        
        attended_features = inputs * attention_weights
        
        return attended_features


# 1. Attention-block at input level
def model_attention_input_only(input_shape):
    return models.Sequential([
        FeatureAttention(input_shape=input_shape),
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
        layers.LayerNormalization(),
        layers.Dropout(0.4),
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
        layers.Dropout(0.4),
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
        layers.Dropout(0.4),
        layers.Dense(4, activation='softmax')
    ])

# 2. Attention-block at intermediate level
def model_attention_intermediate(input_shape):
    return models.Sequential([
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.01), input_shape=input_shape),
        layers.LayerNormalization(),
        layers.Dropout(0.4),
        FeatureAttention(),  
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
        layers.Dropout(0.4),
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
        layers.Dropout(0.4),
        layers.Dense(4, activation='softmax')
    ])

# 3. Multiple Attention-blocks
def model_multiple_attention(input_shape):
    return models.Sequential([
        FeatureAttention(input_shape=input_shape),  
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
        layers.LayerNormalization(),
        layers.Dropout(0.4),
        FeatureAttention(),  
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
        layers.Dropout(0.4),
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
        layers.Dropout(0.4),
        layers.Dense(4, activation='softmax')
    ])

# 4. Attention-block right before the output
def model_attention_pre_output(input_shape):
    return models.Sequential([
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.01), input_shape=input_shape),
        layers.LayerNormalization(),
        layers.Dropout(0.4),
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
        layers.Dropout(0.4),
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
        layers.Dropout(0.4),
        FeatureAttention(),  
        layers.Dense(4, activation='softmax')
    ])

# 5. Residual attention
class ResidualAttention(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super(ResidualAttention, self).__init__(**kwargs)

    def build(self, input_shape):
        self.attention = FeatureAttention()
        super(ResidualAttention, self).build(input_shape)

    def call(self, inputs):
        attended = self.attention(inputs)
        return inputs + attended  

def model_residual_attention(input_shape):
    return models.Sequential([
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.01), input_shape=input_shape),
        layers.LayerNormalization(),
        layers.Dropout(0.4),
        ResidualAttention(),  
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
        layers.Dropout(0.4),
        layers.Dense(512, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
        layers.Dropout(0.4),
        layers.Dense(4, activation='softmax')
    ])


def compare_models(X_train, y_train, X_val, y_val):
    input_shape = (X_train.shape[1],)
    
    models_to_test = {
        'attention_input': model_attention_input_only(input_shape),
        'attention_intermediate': model_attention_intermediate(input_shape),
        'attention_pre_output': model_attention_pre_output(input_shape),
        'multiple_attention': model_multiple_attention(input_shape),
        'residual_attention': model_residual_attention(input_shape)
    }
    
    results = {}
    
    for name, model in models_to_test.items():
        print(f"\nTraining {name}...")
        model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',  
                  metrics=['accuracy'])
        
        history = model.fit(X_train, y_train,
                         validation_data=(X_val, y_val),
                         epochs=50,
                         batch_size=16,
                         verbose=0
        )

        epochs = range(1, len(history.history['loss']) + 1)

        # Plot of the loss
        plt.figure(figsize=(12, 5))

        plt.subplot(1, 2, 1)
        plt.plot(epochs, history.history['loss'], 'b', label='Training Loss')
        plt.plot(epochs, history.history['val_loss'], 'r', label='Validation Loss')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.title('Training and Validation Loss')
        plt.legend()

        # Plot of accuracy
        plt.subplot(1, 2, 2)
        plt.plot(epochs, history.history['accuracy'], 'b', label='Training Accuracy')
        plt.plot(epochs, history.history['val_accuracy'], 'r', label='Validation Accuracy')
        plt.xlabel('Epochs')
        plt.ylabel('Accuracy')
        plt.title('Training and Validation accuracy')
        plt.legend()

        plt.show()
        
        # Predict class probabilities
        prob_predictions = model.predict(X_test_scaled)

        # Get the predicted class index
        class_predictions = np.argmax(prob_predictions, axis=1)

        original_predictions = le.inverse_transform(class_predictions)

        accuracy = accuracy_score(y_test, class_predictions)
        precision = precision_score(y_test, class_predictions, average='weighted')
        recall = recall_score(y_test, class_predictions, average='weighted')
        f1 = f1_score(y_test, class_predictions, average='weighted')

        # ROC AUC macro (one-vs-rest)
        y_test_bin = label_binarize(y_test, classes=[0,1,2,3])
        roc_auc = roc_auc_score(y_test_bin, prob_predictions, multi_class='ovr', average='macro')

        # Print the results
        print(f"Accuracy: {accuracy:.4f}")
        print(f"Precision: {precision:.4f}")
        print(f"Recall: {recall:.4f}")
        print(f"F1-score: {f1:.4f}")
        print(f"AUC-ROC: {roc_auc:.4f}")

        print(classification_report(y_test, class_predictions))
        print(confusion_matrix(y_test, class_predictions))

        prob_predictions = model.predict(X_scaled)
        class_predictions = np.argmax(prob_predictions, axis=1)
        original_predictions = le.inverse_transform(class_predictions)
        df['Predicted'] = original_predictions

        print(df.tail(10))

        background = shap.kmeans(X_train_scaled, 10)

        explainer = shap.KernelExplainer(lambda x: model.predict(x), background)
        
        shap_values = explainer.shap_values(X_test_scaled_df, nsamples=100, silent=True)

        class_names = le.classes_  
        for i, class_name in enumerate(class_names):
            shap.summary_plot(
                shap_values[:,:,i], 
                features=X_test_scaled_df,
                feature_names=X_test_scaled_df.columns,
                plot_type='dot',
                max_display=len(X_test_scaled_df.columns),
                show=False  
            )
            plt.title(f"Class {class_name}")
            plt.savefig(f"shap_plot_class_{class_name}.png")
            plt.close()


        fig, axs = plt.subplots(2, 2, figsize=(16, 12))
        axs = axs.flatten()

        for i, class_name in enumerate(class_names):
            img = plt.imread(f"shap_plot_class_{class_name}.png")
            axs[i].imshow(img)
            axs[i].axis('off')
            axs[i].set_title(f"Class {class_name}")

        plt.tight_layout()
        plt.show()

        
    
    return results

In [None]:
results = compare_models(X_train_scaled, y_train, X_val_scaled, y_val)