# 1. Import libraries : 

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report,confusion_matrix
import tensorflow as tf
from transformers import BertTokenizer,TFBertModel
from tensorflow.keras.layers import Input, Dense, Concatenate,Dropout,BatchNormalization
from tensorflow.keras.optimizers import Adam,SGD
from tensorflow.keras.models import Model
from imblearn.over_sampling import SMOTE

# 2.Importing data : 

In [None]:
original_data = pd.DataFrame(pd.read_csv('original_data.csv')) # Modified to fit my own workspace -- /kaggle/input/ukraine-war/original_data.csv
original_data = original_data[['subreddit','title','selftext','upvote_ratio']]
original_data.dropna(inplace=True)
original_data.reset_index(inplace=True,drop=True)
original_data

# 3. Classify upvotes class:  

In [None]:
class Label_classification():

    def __init__(self,data):
        self.data =data

    def classify(self):
        self.data['labels'] = 'No labels'
        for index,value in self.data.iterrows():
            if value['upvote_ratio']<=0.5 :
                self.data.at[index,'labels'] = 'Less Popularity'
            elif value['upvote_ratio'] >0.5 and value['upvote_ratio'] <= 0.8 :
                self.data.at[index,'labels'] = 'Average popularity'
            elif value['upvote_ratio'] > 0.8 : 
                self.data.at[index,'labels'] = 'Most popularity'

        return self.data
            

## labeled classified data object : 

In [None]:
data = Label_classification(original_data)
data = data.classify()

# 4. Initialize BERT: 

In [None]:
model_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased',force_download=True)
model = TFBertModel.from_pretrained('bert-base-uncased', output_attentions=True,force_download=True)

# model_tokenizer = BertTokenizer.from_pretrained('bert-large-uncased',force_download=True)
# model = TFBertModel.from_pretrained('bert-large-uncased', output_attentions=True,force_download=True)

In [None]:
# AUTHOR: Giacomo D'Andria
# FUNCTIONING: Adding new models to test

from transformers import DistilBertTokenizer, TFDistilBertModel
from transformers import BertTokenizer, TFBertModel
from transformers import RobertaTokenizer, TFRobertaModel
from transformers import AlbertTokenizer, TFAlbertModel

# DistilBERT (Smaller and faster but less accurate than bert-base)
# model_tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased', force_download=True)
# model = TFDistilBertModel.from_pretrained('distilbert-base-uncased', output_attentions=True, force_download=True)

# TinyBERT (Further reduced for speed, but with lower accuracy)
# model_tokenizer = BertTokenizer.from_pretrained('huawei-noah/TinyBERT_General_4L_312D', force_download=True)
# model = TFBertModel.from_pretrained('huawei-noah/TinyBERT_General_4L_312D', output_attentions=True, force_download=True, from_pt=True)

# RoBERTa (BERT-like but trained on larger datasets with different preprocessing)
# model_tokenizer = RobertaTokenizer.from_pretrained('roberta-base', force_download=True)
# model = TFRobertaModel.from_pretrained('roberta-base', output_attentions=True, force_download=True)

# ALBERT (designed to be lighter and faster than BERT)
# model_tokenizer = AlbertTokenizer.from_pretrained('albert-base-v2', force_download=True)
# model = TFAlbertModel.from_pretrained('albert-base-v2', output_attentions=True, force_download=True)

# 5. preprocessing class :

In [None]:
class preprocessing():
    
    def __init__(self,max_length,tokenizer,model_encoder,data):
        self.max_length = max_length
        self.tokenizer = tokenizer
        self.model_encoder = model_encoder
        self.data = data
        
    def tokenize_and_encode(self,sentence):
        input_ids = []
        attention_masks = []
        for sent in sentence :
            encoded_dict = self.tokenizer.encode_plus ( 
            sent,
            add_special_tokens = True,
            max_length = self.max_length,
            padding = 'max_length',
            truncation = True,
            return_attention_mask = True,
            return_tensors = 'tf'
            )
            input_ids.append(encoded_dict['input_ids'])
            attention_masks.append(encoded_dict['attention_mask'])
        input_ids = tf.concat(input_ids,axis=0)
        attention_masks = tf.concat(attention_masks,axis=0)
        return input_ids,attention_masks
    
    def spilit(self):
        replace_df = {'Less Popularity':0,'Average popularity':1,'Most popularity':2}
        self.data.replace(replace_df,inplace=True)
        X = self.data['selftext']
        Y = self.data['labels']
        self.X_train,self.X_test,self.Y_train,self.Y_test = train_test_split(X,Y,random_state=42,test_size=0.2)
        return self.X_train,self.X_test,self.Y_train,self.Y_test
    
    def convert_to_tensor(self,Y_train,Y_test):                
        Y_train = tf.convert_to_tensor(Y_train,dtype=tf.int32)
        Y_test = tf.convert_to_tensor(Y_test,dtype=tf.int32)
        return Y_train,Y_test
    
    def create_dataset(self, input_ids, attention_mask, labels, batch_size):
        dataset = tf.data.Dataset.from_tensor_slices((input_ids, attention_mask, labels))
        dataset = dataset.batch(batch_size)
        return dataset
    
    def extract_output(self,dataset):
        pooled_outputs = []
        cls_outputs = []
        for batch in dataset:
            input_ids, attention_mask, _ = batch
            outputs = self.model_encoder(input_ids=input_ids, attention_mask=attention_mask)
            pooled_output = outputs.pooler_output
            cls_output = outputs.last_hidden_state[:,0,:]
            pooled_outputs.append(pooled_output)
            cls_outputs.append(cls_output)
        return tf.concat(pooled_outputs, axis=0),tf.concat(cls_outputs, axis=0)

    def oversample_minority_classes(self):
        max_class_size = self.data['labels'].value_counts().max()
        balanced_data = pd.DataFrame()
        for label in self.data['labels'].unique():
            class_data = self.data[self.data['labels'] == label]
            if len(class_data) < max_class_size:
                class_data = class_data.sample(max_class_size, replace=True, random_state=42)
            balanced_data = pd.concat([balanced_data, class_data])
        self.data = balanced_data.sample(frac=1).reset_index(drop=True)
        return self.data

    def undersample_majority_classes(self):
        min_class_size = self.data['labels'].value_counts().min()
        balanced_data = pd.DataFrame()
        for label in self.data['labels'].unique():
            class_data = self.data[self.data['labels'] == label]
            if len(class_data) > min_class_size:
                class_data = class_data.sample(min_class_size, random_state=42)
            balanced_data = pd.concat([balanced_data, class_data])
        self.data = balanced_data.sample(frac=1).reset_index(drop=True)
        return self.data

    def smote_augmentation(self):
        # Extract features and target labels
        X = self.data['selftext']
        y = self.data['labels']

        # Tokenize and encode the text data
        input_ids, attention_masks = self.tokenize_and_encode(X)
        
        # Initialize SMOTE
        smote = SMOTE(random_state=42)
        
        # Perform SMOTE resampling on input_ids and labels
        X_resampled, y_resampled = smote.fit_resample(input_ids.numpy(), y)
        
        # Flatten tokenized sequences back to strings for DataFrame
        flattened_text = [" ".join(map(str, seq)) for seq in X_resampled]
        
        # Update self.data with SMOTE resampled data
        self.data = pd.DataFrame({'selftext': flattened_text, 'labels': y_resampled})
        
        return self.data

In [None]:
preprocessor = preprocessing(max_length=128, tokenizer=bert_base_tokenizer, model_encoder=bert_base_model, data=data)

balanced_data_oversample = preprocessor.oversample_minority_classes()
balanced_data_undersample = preprocessor.undersample_majority_classes()
balanced_data_smote = preprocessor.smote_augmentation()


In [None]:
# AUTHOR: Giacomo D'Andria
# FUNCTIONING: Computes data and saves pickle models

# 5.1---split data :
original_data = preprocessing(128,model_tokenizer,model,data)  # CHANGE MODEL HERE
X_train,X_test,Y_train,Y_test = original_data.spilit()

#5.2---tokenize and encode data :
X_train_id,X_train_mask = original_data.tokenize_and_encode(X_train)
X_test_id,X_test_mask = original_data.tokenize_and_encode(X_test)

#5.3---convert label to tensor :
Y_train,Y_test = original_data.convert_to_tensor(Y_train,Y_test)


#5.4---create train and test dataset :
train_dataset = original_data.create_dataset(X_train_id,X_train_mask,Y_train,128)
test_dataset = original_data.create_dataset(X_test_id,X_test_mask,Y_test,128)

#5.5---extract pooler output layer and cls output layer :
pooler_output_train,cls_output_train = original_data.extract_output(train_dataset)
pooler_output_test,cls_output_test = original_data.extract_output(test_dataset)


import os
import pickle
import tensorflow as tf

# Convert Tensors to Numpy arrays if needed
def convert_tensors_to_numpy(data):
    for key, value in data.items():
        if isinstance(value, tf.Tensor):
            data[key] = value.numpy()
    return data


# Create a dictionary to store all relevant components and convert tensors to numpy
model_data = {
    "X_train_id": X_train_id,
    "X_train_mask": X_train_mask,
    "X_test_id": X_test_id,
    "X_test_mask": X_test_mask,
    "Y_train": Y_train,
    "Y_test": Y_test,
    "pooler_output_train": pooler_output_train,
    "cls_output_train": cls_output_train,
    "pooler_output_test": pooler_output_test,
    "cls_output_test": cls_output_test,
}

model_data = convert_tensors_to_numpy(model_data)

model_name = 'bert_base' # CHANGE MODEL NAME HERE
save_path = '.' # SET SAVE PATH HERE
file_path = os.path.join(save_path, f"{model_name}_all_data.pkl")

with open(file_path, 'wb') as f:
    pickle.dump(model_data, f)

print(f"All data for {model_name} saved to {file_path}")

## bert base model object : 

In [None]:
# 5.1---split data :
    
original_data = preprocessing(128,bert_base_tokenizer,bert_base_model,data)
X_train,X_test,Y_train,Y_test = original_data.spilit()
    
#5.2---tokenize and encode data :

X_train_id,X_train_mask = original_data.tokenize_and_encode(X_train)
X_test_id,X_test_mask = original_data.tokenize_and_encode(X_test)

#5.3---convert label to tensor :    

Y_train,Y_test = original_data.convert_to_tensor(Y_train,Y_test)


#5.4---create train and test dataset : 

train_dataset = original_data.create_dataset(X_train_id,X_train_mask,Y_train,128)
test_dataset = original_data.create_dataset(X_test_id,X_test_mask,Y_test,128)


#5.5---extract pooler output layer and cls output layer :

pooler_output_train_base,cls_output_train_base = original_data.extract_output(train_dataset)
pooler_output_test_base,cls_output_test_base = original_data.extract_output(test_dataset)

## bert larg model object :

In [None]:
# 5.1---split data :
    
original_data = preprocessing(128,bert_larg_tokenizer,bert_larg_model,data)
X_train,X_test,Y_train,Y_test = original_data.spilit()
    
#5.2---tokenize and encode data :

X_train_id,X_train_mask = original_data.tokenize_and_encode(X_train)
X_test_id,X_test_mask = original_data.tokenize_and_encode(X_test)

#5.3---convert label to tensor :    

Y_train,Y_test = original_data.convert_to_tensor(Y_train,Y_test)


#5.4---create train and test dataset : 

train_dataset = original_data.create_dataset(X_train_id,X_train_mask,Y_train,128)
test_dataset = original_data.create_dataset(X_test_id,X_test_mask,Y_test,128)


#5.5---extract pooler output layer and cls output layer :

pooler_output_train_larg,cls_output_train_larg = original_data.extract_output(train_dataset)
pooler_output_test_larg,cls_output_test_larg = original_data.extract_output(test_dataset)

# 6. model stracture class : 

In [None]:
# AUTHOR: Giacomo D'Andria
# FUNCTIONING: If the BERT models are not used directly, precomputed outputs can be loaded

import pickle
import os

def load_model_output(model_name, load_path='.'):
    """
    Loads the outputs for a specific model from a pickle file.
    Returns the output data for the specified model.
    """
    file_path = os.path.join(load_path, f"{model_name}_all_data.pkl")

    if os.path.exists(file_path):
        with open(file_path, 'rb') as f:
            model_outputs = pickle.load(f)
        print(f"Loaded outputs for {model_name} from {file_path}")
        return model_outputs
    else:
        print(f"File not found: {file_path}")
        return None

model_name = "bert_base" # CHANGE MODEL HERE
load_path = '.' # SET PATH TO MODEL HERE

# Load the model's outputs
outputs = load_model_output(model_name, load_path)

# Access outputs for processing
if outputs:
    X_train_id = outputs.get("X_train_id")
    X_train_mask = outputs.get("X_train_mask")
    X_test_id = outputs.get("X_test_id")
    X_test_mask = outputs.get("X_test_mask")
    Y_train = outputs.get("Y_train")
    Y_test = outputs.get("Y_test")
    pooler_output_train = outputs.get("pooler_output_train")
    cls_output_train = outputs.get("cls_output_train")
    pooler_output_test = outputs.get("pooler_output_test")
    cls_output_test = outputs.get("cls_output_test")

In [None]:
class neural_network:
    
    def __init__(self, X_train, Y_train, X_test, Y_test,input_shape):
        self.X_train = X_train
        self.Y_train = Y_train
        self.X_test = X_test
        self.Y_test = Y_test
        self.input_shape = input_shape
        
        
        
    def build_model(self):
            
        input_layer = Input(shape=(self.input_shape,), dtype='float32', name='input_layer')

        hidden_layer_1 = Dense(1024, activation='relu')(input_layer)
        normalization_1 = BatchNormalization()(hidden_layer_1)
        dropout_1 = Dropout(0.5)(normalization_1)

        hidden_layer_2 = Dense(512, activation='relu')(dropout_1)
        normalization_2 = BatchNormalization()(hidden_layer_2)
        dropout_2 = Dropout(0.5)(normalization_2)

        hidden_layer_3 = Dense(256, activation='relu')(dropout_2)
        normalization_3 = BatchNormalization()(hidden_layer_3)
        dropout_2 = Dropout(0.5)(normalization_3)

        output = Dense(3, activation='softmax')(dropout_2)

        self.model = Model(inputs=input_layer, outputs=output)
        self.model.compile(
            optimizer=SGD(learning_rate=0.0001,momentum=0.8),
            loss=tf.keras.losses.sparse_categorical_crossentropy,
            metrics=['accuracy']
        )
        #early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

        self.model.fit(
            self.X_train, self.Y_train,
            epochs=100,
            batch_size=16,
            #callbacks=[early_stopping],
            #validation_split=0.2,
        )

        
        
    def predict(self):
        
        Y_prediction = self.model.predict(self.X_test)
        self.Y_prediction = np.argmax(Y_prediction, axis=1)
        return self.Y_prediction
        
        
    def evaluate(self):
        
        classification_reports = classification_report(self.Y_test, self.Y_prediction)
        cm_result = confusion_matrix(self.Y_test, self.Y_prediction)
        print(classification_reports)
        return cm_result

## Residual neural network

In [None]:
# AUTHOR: Giacomo D'Andria
# FUNCTIONING: Builds a dense network with residual connections for improved information flow and stability in training.

from tensorflow.keras.layers import Add, Dense, Input, BatchNormalization, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np

class ResidualDenseNetwork(neural_network):
    def __init__(self, X_train, Y_train, X_test, Y_test, input_shape):
        super().__init__(X_train, Y_train, X_test, Y_test, input_shape)

    def build_model(self):
        # Define input layer
        input_layer = Input(shape=(self.input_shape,), dtype='float32', name='input_layer')
        # First block with residual connection
        hidden_layer_1 = Dense(1024, activation='relu')(input_layer)
        normalization_1 = BatchNormalization()(hidden_layer_1)
        dropout_1 = Dropout(0.3)(normalization_1)
        # Residual connection
        hidden_layer_2 = Dense(1024, activation='relu')(dropout_1)
        residual_1 = Add()([dropout_1, hidden_layer_2])
        normalization_2 = BatchNormalization()(residual_1)
        dropout_2 = Dropout(0.5)(normalization_2)
        # Additional dense layers
        hidden_layer_3 = Dense(256, activation='relu')(dropout_2)
        normalization_3 = BatchNormalization()(hidden_layer_3)
        dropout_3 = Dropout(0.5)(normalization_3)
        # Output layer
        output = Dense(3, activation='softmax')(dropout_3)
        # Compile model
        self.model = Model(inputs=input_layer, outputs=output)
        self.model.compile(
            optimizer=Adam(learning_rate=0.001),
            loss=tf.keras.losses.sparse_categorical_crossentropy,
            metrics=['accuracy']
        )
        # Train model without early stopping
        self.model.fit(
            self.X_train, self.Y_train,
            epochs=100,
            batch_size=32,
            validation_split=0.2
        )


## Progressive Neural Network

In [None]:
# AUTHOR: Giacomo D'Andria
# FUNCTIONING: Constructs a progressively smaller dense network for classification, reducing layer size at each stage.

from tensorflow.keras.layers import Add, Dense, Input, BatchNormalization, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np


class ProgressiveDenseNetwork(neural_network):

    def __init__(self, X_train, Y_train, X_test, Y_test, input_shape):
        super().__init__(X_train, Y_train, X_test, Y_test, input_shape)

    def build_model(self):
        # Input layer
        input_layer = Input(shape=(self.input_shape,), dtype='float32', name='input_layer')

        # Progressive dense layers with reducing units
        x = input_layer
        for units in [512, 256, 128, 64]:  # Reducing layer size as we go deeper
            x = Dense(units, activation='relu')(x)
            x = BatchNormalization()(x)
            x = Dropout(0.3)(x)

        # Output layer
        output = Dense(3, activation='softmax')(x)

        # Compile the model
        self.model = Model(inputs=input_layer, outputs=output)
        self.model.compile(
            optimizer=Adam(learning_rate=0.001),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )

        # Train the model
        self.model.fit(
            self.X_train, self.Y_train,
            epochs=100,
            batch_size=32,
            validation_split=0.2
        )

## Convolutional Neural Network

In [None]:
# AUTHOR: Giacomo D'Andria
# FUNCTIONING: Implements a convolutional and dense network for classification with early stopping to prevent overfitting.

from tensorflow.keras.layers import Add, Dense, Input, BatchNormalization, Dropout, Conv1D, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
import tensorflow as tf
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np

class ConvolutionalDenseNetwork(neural_network):
    def __init__(self, X_train, Y_train, X_test, Y_test, input_shape):
        super().__init__(X_train, Y_train, X_test, Y_test, input_shape)

    def build_model(self):
        # Define input layer with 1D convolution
        input_layer = Input(shape=(self.input_shape, 1), dtype='float32', name='input_layer')
        # Convolutional layer
        x = Conv1D(filters=64, kernel_size=3, activation='relu')(input_layer)
        x = BatchNormalization()(x)
        x = Dropout(0.3)(x)
        x = Flatten()(x)
        # Dense layers
        x = Dense(256, activation='relu')(x)
        x = BatchNormalization()(x)
        x = Dropout(0.5)(x)
        # Output layer
        output = Dense(3, activation='softmax')(x)
        # Compile the model
        self.model = Model(inputs=input_layer, outputs=output)
        self.model.compile(
            optimizer=Adam(learning_rate=0.001),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )
        # Define early stopping callback
        early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
        # Train model with early stopping
        self.model.fit(
            self.X_train, self.Y_train,
            epochs=100,
            batch_size=32,
            validation_split=0.2,
            callbacks=[early_stopping]
        )


## Autoencoder Neural Network

In [None]:
# AUTHOR: Giacomo D'Andria
# FUNCTIONING: Combines an autoencoder for feature extraction with a classifier for multi-class classification, using early stopping.

from tensorflow.keras.layers import Add, Dense, Input, BatchNormalization, Dropout, Conv1D, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
import tensorflow as tf
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np

class AutoencoderClassifierNetwork(neural_network):
    def __init__(self, X_train, Y_train, X_test, Y_test, input_shape):
        super().__init__(X_train, Y_train, X_test, Y_test, input_shape)
        self.X_train_encoded = None
        self.X_test_encoded = None

    def build_autoencoder(self):
        # Define autoencoder structure
        input_layer = Input(shape=(self.input_shape,))
        encoded = Dense(128, activation='relu')(input_layer)
        encoded = Dense(64, activation='relu')(encoded)

        decoded = Dense(128, activation='relu')(encoded)
        decoded = Dense(self.input_shape, activation='sigmoid')(decoded)

        autoencoder = Model(inputs=input_layer, outputs=decoded)
        autoencoder.compile(optimizer=Adam(learning_rate=0.001), loss='mse')

        # Define early stopping for autoencoder
        early_stopping_autoencoder = EarlyStopping(
            monitor='val_loss',
            patience=5,
            restore_best_weights=True
        )

        # Train autoencoder with early stopping
        autoencoder.fit(
            self.X_train, self.X_train,
            epochs=50,
            batch_size=32,
            validation_split=0.2,
            callbacks=[early_stopping_autoencoder]
        )

        # Encode the input data
        encoder = Model(inputs=input_layer, outputs=encoded)
        self.X_train_encoded = encoder.predict(self.X_train)
        self.X_test_encoded = encoder.predict(self.X_test)

    def build_model(self):
        # Ensure autoencoder has been built
        if self.X_train_encoded is None or self.X_test_encoded is None:
            raise AttributeError("Autoencoder has not been built. Call `build_autoencoder` before `build_model`.")

        # Define the classifier model using encoded inputs
        input_layer = Input(shape=(64,), dtype='float32', name='input_layer')
        x = Dense(256, activation='relu')(input_layer)
        x = BatchNormalization()(x)
        x = Dropout(0.5)(x)

        output = Dense(3, activation='softmax')(x)
        self.model = Model(inputs=input_layer, outputs=output)
        self.model.compile(optimizer=Adam(learning_rate=0.001), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

        # Early stopping for main model
        early_stopping_model = EarlyStopping(
            monitor='val_loss',
            patience=5,
            restore_best_weights=True
        )

        # Train the main model with early stopping
        self.model.fit(
            self.X_train_encoded, self.Y_train,
            epochs=100,
            batch_size=32,
            validation_split=0.2,
            callbacks=[early_stopping_model]
        )

    def predict(self):
        # Ensure autoencoder was built before prediction
        if self.X_test_encoded is None:
            raise AttributeError("Autoencoder has not been built. Call `build_autoencoder` before making predictions.")

        Y_prediction = self.model.predict(self.X_test_encoded)
        self.Y_prediction = np.argmax(Y_prediction, axis=1)

        return self.Y_prediction

    def evaluate(self):
        # Ensure prediction has been made before evaluation
        if not hasattr(self, 'Y_prediction'):
            raise AttributeError("No predictions made. Call `predict` before `evaluate`.")

        classification_reports = classification_report(self.Y_test, self.Y_prediction)
        cm_result = confusion_matrix(self.Y_test, self.Y_prediction)
        print(classification_reports)

        return cm_result

## Attention-based netural network

In [None]:
# AUTHOR: Giacomo D'Andria
# FUNCTIONING: Implements an attention-based neural network for multi-class classification with early stopping.

from tensorflow.keras.layers import Input, Dense, Reshape, Attention, Flatten, BatchNormalization, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np


class AttentionBasedNetwork(neural_network):
    def __init__(self, X_train, Y_train, X_test, Y_test, input_shape):
        super().__init__(X_train, Y_train, X_test, Y_test, input_shape)

    def build_model(self):
        # Input layer
        input_layer = Input(shape=(self.input_shape,))

        # Dense layers to simulate key and query vectors for attention
        dense_1 = Dense(256, activation='relu')(input_layer)
        dense_2 = Dense(256, activation='relu')(input_layer)

        # Reshape to allow attention across "sequence" dimension
        reshaped_1 = Reshape((1, 256))(dense_1)
        reshaped_2 = Reshape((1, 256))(dense_2)

        # Attention layer
        attention = Attention()([reshaped_1, reshaped_2])
        attention = Flatten()(attention)

        # Additional dense layers
        x = Dense(128, activation='relu')(attention)
        x = BatchNormalization()(x)
        x = Dropout(0.5)(x)

        # Output layer
        output = Dense(3, activation='softmax')(x)
        self.model = Model(inputs=input_layer, outputs=output)
        self.model.compile(optimizer=Adam(learning_rate=0.001), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

        # Early stopping callback
        early_stopping = EarlyStopping(
            monitor='val_loss',
            patience=5,
            restore_best_weights=True
        )

        # Train model with early stopping
        self.model.fit(
            self.X_train, self.Y_train,
            epochs=100,
            batch_size=32,
            validation_split=0.2,
            callbacks=[early_stopping]
        )

    def predict(self):
        Y_prediction = self.model.predict(self.X_test)
        self.Y_prediction = np.argmax(Y_prediction, axis=1)
        return self.Y_prediction

    def evaluate(self):
        # Evaluate model performance
        classification_reports = classification_report(self.Y_test, self.Y_prediction)
        cm_result = confusion_matrix(self.Y_test, self.Y_prediction)
        print(classification_reports)
        return cm_result

## output object (general code)

In [None]:
def get_input_shape(data):
    return data.shape[1]  # Gets the feature dimension

# Get the actual shape from the training data
shape = get_input_shape(pooler_output_train)

# Original Neural Network
## Pooler
pooled_layer_model = neural_network(pooler_output_train, Y_train, pooler_output_test, Y_test, shape)
pooled_layer_model.build_model()
Y_prediction = pooled_layer_model.predict()
confusion_matrix_result  = pooled_layer_model.evaluate()
## CLS
pooled_layer_model = neural_network(cls_output_train, Y_train, cls_output_test, Y_test, shape)
pooled_layer_model.build_model()
Y_prediction = pooled_layer_model.predict()
confusion_matrix_result  = pooled_layer_model.evaluate()

# Residual Neural Network
## Pooler
pooled_layer_model = ResidualDenseNetwork(pooler_output_train, Y_train, pooler_output_test, Y_test, shape)
pooled_layer_model.build_model()
Y_prediction = pooled_layer_model.predict()
confusion_matrix_result  = pooled_layer_model.evaluate()
## CLS
pooled_layer_model = ResidualDenseNetwork(cls_output_train, Y_train, cls_output_test, Y_test, shape)
pooled_layer_model.build_model()
Y_prediction = pooled_layer_model.predict()
confusion_matrix_result  = pooled_layer_model.evaluate()

# Progressive Neural Network
## Pooler
pooled_layer_model = ProgressiveDenseNetwork(pooler_output_train, Y_train, pooler_output_test, Y_test, shape)
pooled_layer_model.build_model()
Y_prediction = pooled_layer_model.predict()
confusion_matrix_result  = pooled_layer_model.evaluate()
# CLS
pooled_layer_model = ProgressiveDenseNetwork(cls_output_train, Y_train, cls_output_test, Y_test, shape)
pooled_layer_model.build_model()
Y_prediction = pooled_layer_model.predict()
confusion_matrix_result  = pooled_layer_model.evaluate()

# Convolutional Neural Network
## Pooler
pooled_layer_model = ConvolutionalDenseNetwork(pooler_output_train, Y_train, pooler_output_test, Y_test, shape)
pooled_layer_model.build_model()
Y_prediction = pooled_layer_model.predict()
confusion_matrix_result  = pooled_layer_model.evaluate()

#CLS
pooled_layer_model = ConvolutionalDenseNetwork(cls_output_train, Y_train, cls_output_test, Y_test, shape)
pooled_layer_model.build_model()
Y_prediction = pooled_layer_model.predict()
confusion_matrix_result  = pooled_layer_model.evaluate()

# Autoencoder Neural Network
## Pooler
pooled_layer_model = AutoencoderClassifierNetwork(pooler_output_train, Y_train, pooler_output_test, Y_test, shape)
pooled_layer_model.build_autoencoder()
pooled_layer_model.build_model()
Y_prediction = pooled_layer_model.predict()
confusion_matrix_result  = pooled_layer_model.evaluate()
## CLS
pooled_layer_model = AutoencoderClassifierNetwork(cls_output_train, Y_train, cls_output_test, Y_test, shape)
pooled_layer_model.build_autoencoder()
pooled_layer_model.build_model()
Y_prediction = pooled_layer_model.predict()
confusion_matrix_result  = pooled_layer_model.evaluate()

# Attention-based
## Pooler
pooled_layer_model = AttentionBasedNetwork(pooler_output_train, Y_train, pooler_output_test, Y_test, shape)
pooled_layer_model.build_model()
Y_prediction = pooled_layer_model.predict()
confusion_matrix_result  = pooled_layer_model.evaluate()
#CLS
pooled_layer_model = AttentionBasedNetwork(cls_output_train, Y_train, cls_output_test, Y_test, shape)
pooled_layer_model.build_model()
Y_prediction = pooled_layer_model.predict()
confusion_matrix_result  = pooled_layer_model.evaluate()

# 7.visualization class : 

In [None]:
class visualization():
    
    def __init__(self,confusion_matrix,model_name):
    
        self.confusion_matrix = confusion_matrix
        self.model_name = model_name
    
    
    
    def plot(self):
    
        plt.figure(figsize=(6, 4))
        plt.title(self.model_name)
        sns.heatmap(confusion_matrix_result, annot=True, cmap='Reds')
        plt.xlabel('Prediction')
        plt.ylabel('Actual')
        plt.show()

## pooled output model object(bert base) : 

In [None]:
pooled_output = visualization(confusion_matrix_result,'Pooled Output Model (BERT-base)')
pooled_output.plot()

## pooled output model object(bert larg) :

In [None]:
pooled_output = visualization(confusion_matrix_result,'Pooled Output Model (BERT-larg)')
pooled_output.plot()

## cls output model object(bert base) :

In [None]:
pooled_output = visualization(confusion_matrix_result,'CLS Output Model (BERT-base)')
pooled_output.plot()

## cls output model object(bert larg) :

In [None]:
pooled_output = visualization(confusion_matrix_result,'CLS Output Model (BERT-larg)')
pooled_output.plot()