In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.optim import lr_scheduler
import re
import pandas as pd
from collections import Counter
from sklearn.model_selection import train_test_split
import random
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import torch.nn.functional as F
from nltk.tokenize import RegexpTokenizer
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score, precision_score, recall_score
import math


# Ensure necessary NLTK data is downloaded
nltk.download('punkt')
nltk.download('stopwords')

In [None]:
def load_csv_dataset(csv_path):
    df = pd.read_csv(csv_path)
    texts=[]
    for i in range(df.shape[0]):
        texts.append(df.iloc[i]["Text"].lower())  # Convert to lowercase
    return texts
    
# def get_stopwords():
#     with open('/kaggle/input/stopwords-txt/stopwords.txt') as f:
#         stopwords = f.read().replace('\n',' ').split()
#     return stopwords

def clean_text(text):
    tokenizer = RegexpTokenizer(r"\d|\w+")
    words=tokenizer.tokenize(text)
    # text = re.sub(r'[^a-zA-Z\s]', '', text)  
    # words = word_tokenize(text.lower()) 
    stop_words = set(stopwords.words('english')) 
    return [word for word in words if word not in stop_words]


def build_vocab(texts, vocab_size):
    all_words=[]
    for i in range(len(texts)):
        word_list=clean_text(texts[i])
        for word in word_list:
            all_words.append(word)
    # print(len(all_words))
    word_counts = Counter(all_words)
    for word in all_words:
        word_counts.update([word])
    most_common = word_counts.most_common(vocab_size - 1)
    # print(len(most_common))
    
    word_to_idx = {}

    for idx, (word, count) in enumerate(most_common):
        word_to_idx[word] = idx+1

    word_to_idx['<UNK>'] = 0 
    idx_to_word = {}
    for word, idx in word_to_idx.items():
        idx_to_word[idx] = word
    return word_to_idx, idx_to_word

def generate_cbows(texts,word_to_idx, window_size):
    # training_data=[]
    cbows = []
    for text in texts:
        tokenized_text=clean_text(text)
        indices = [word_to_idx.get(word, 0) for word in tokenized_text]
        for center_idx in range(window_size,len(indices)-window_size):
                context_indices=[]
                for w in range(-window_size, window_size + 1):
                    if w == 0:
                        continue
                    context_indices.append(indices[center_idx + w])
                cbows.append((context_indices,indices[center_idx]))
                    # if (context_indices==2*window_size):

    return cbows

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
from sklearn.metrics import accuracy_score, f1_score
import torch.nn as nn
import torch

def train_model(model, train_dataloader, validation_dataloader, epochs, learning_rate):

    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(params=model.parameters(), lr=learning_rate)

    train_set_loss_log = []
    validation_set_loss_log = []
    train_accuracy_log = []
    validation_accuracy_log = []
    train_f1_log = []
    validation_f1_log = []

    for epoch in range(epochs):
        print(f"\nEpoch: {epoch+1}/{epochs}")

        # ---- TRAINING PHASE ----
        model.train()
        total_train_loss = 0.0
        num_train_batches = 0
        all_train_preds = []
        all_train_labels = []

        for inputs_batch, outputs_batch in train_dataloader:
            inputs_batch = inputs_batch.to(device)
            outputs_batch = outputs_batch.to(device)

            y_train_logits = model(inputs_batch)
            train_loss = loss_fn(y_train_logits, outputs_batch)

            optimizer.zero_grad()
            train_loss.backward()
            optimizer.step()

            total_train_loss += train_loss.item()
            num_train_batches += 1

            preds = torch.argmax(y_train_logits, dim=1)
            all_train_preds.extend(preds.cpu().numpy())
            all_train_labels.extend(outputs_batch.cpu().numpy())

        average_train_loss = total_train_loss / num_train_batches
        train_set_loss_log.append(average_train_loss)

        train_acc = accuracy_score(all_train_labels, all_train_preds)
        train_f1 = f1_score(all_train_labels, all_train_preds, average='weighted')  # or 'macro'
        train_accuracy_log.append(train_acc)
        train_f1_log.append(train_f1)

        # ---- VALIDATION PHASE ----
        model.eval()
        total_validation_loss = 0.0
        num_validation_batches = 0
        all_val_preds = []
        all_val_labels = []

        with torch.inference_mode():
            for inputs_batch, outputs_batch in validation_dataloader:
                inputs_batch = inputs_batch.to(device)
                outputs_batch = outputs_batch.to(device)

                y_val_logits = model(inputs_batch)
                val_loss = loss_fn(y_val_logits, outputs_batch)

                total_validation_loss += val_loss.item()
                num_validation_batches += 1

                preds = torch.argmax(y_val_logits, dim=1)
                all_val_preds.extend(preds.cpu().numpy())
                all_val_labels.extend(outputs_batch.cpu().numpy())

        average_validation_loss = total_validation_loss / num_validation_batches
        validation_set_loss_log.append(average_validation_loss)

        val_acc = accuracy_score(all_val_labels, all_val_preds)
        val_f1 = f1_score(all_val_labels, all_val_preds, average='weighted')  # or 'macro'
        validation_accuracy_log.append(val_acc)
        validation_f1_log.append(val_f1)

        print(f"Train Loss: {average_train_loss:.4f} | Val Loss: {average_validation_loss:.4f}")
        print(f"Train Acc: {train_acc:.4f} | Val Acc: {val_acc:.4f}")
        print(f"Train F1: {train_f1:.4f} | Val F1: {val_f1:.4f}")

    return model, {
        'train_loss': train_set_loss_log,
        'val_loss': validation_set_loss_log,
        'train_acc': train_accuracy_log,
        'val_acc': validation_accuracy_log,
        'train_f1': train_f1_log,
        'val_f1': validation_f1_log
    }


In [None]:
class CbowWord2Vec(nn.Module):
    def __init__(self, vocab_size, embedding_dim) -> None:
        super().__init__()
        self.embeddings = nn.Embedding(vocab_size,embedding_dim)  
        self.linear = nn.Linear(embedding_dim,vocab_size)

    def forward(self, X) -> torch.Tensor: 
        embeddings=self.embeddings(X).mean(1).squeeze(1)
        embeddings=self.linear(embeddings)
        return embeddings

In [None]:
######################################################
batch_size=64
split_ratio=0.8
vocab_size=10000
window_size=2
######################################################

In [None]:
# cbow_vector_pairs[0]
# cbow_vector_pairs[0][0].sum()
class CustomDataset(Dataset):
    def __init__(self, data):
        self.inputs = torch.tensor([item[0] for item in data])
        self.outputs = torch.tensor([item[1] for item in data])
        # print(self.outputs[0])

    def __len__(self):
        return len(self.inputs)


    def __getitem__(self, idx):
        input_sample = self.inputs[idx]
        output_sample = self.outputs[idx]
    
        return input_sample, output_sample



In [None]:
csv_path = "/kaggle/input/bbc-news-article/TrainData.csv" 
texts = load_csv_dataset(csv_path)
word_to_idx,idx_to_word=build_vocab(texts,vocab_size)
cbows=generate_cbows(texts,word_to_idx,window_size)


random.shuffle(cbows)

split_index = int(len(cbows) * split_ratio)

In [None]:
train_dataset = CustomDataset(cbows[:split_index])
test_dataset = CustomDataset(cbows[split_index:])

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
validation_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
embedding_dim = 20
epochs=3
learning_rate=0.01
# vocab_size=10000

In [None]:
model = CbowWord2Vec(vocab_size, embedding_dim).to(device)
model, model_dict = train_model(model, train_dataloader, validation_dataloader, 
                                                                 epochs, learning_rate)
train_set_loss_log=model_dict['train_loss']
validation_set_loss_log=model_dict['val_loss']

In [None]:
def cosine_similarity(v1, v2):

    return (v1 @ v2) / (torch.norm(v1) * torch.norm(v2))

def most_similar(word, word_dict, top_k=5):
        
    query_vector = word_dict[word]

    similarities = {}
    for other_word, other_vector in word_dict.items():
        if other_word != word:
            similarity = cosine_similarity(query_vector, other_vector)
            similarities[other_word] = similarity

    sorted_similarities = sorted(similarities.items(), key=lambda x: x[1], reverse=True)
    top_similar_words = sorted_similarities[:top_k]

    return top_similar_words

In [None]:
plt.plot(train_set_loss_log, color='red', label='train_loss')
plt.plot(validation_set_loss_log, color='blue', label='validation_loss')

plt.title("Loss During Training")
plt.xlabel("Epoch")
plt.ylabel("Cross Entropy")
plt.legend()
plt.show()

In [None]:
params = list(model.parameters())
word_vectors=model.embeddings.weight.detach().cpu()
unique_words=[]
for i in range(vocab_size):
    unique_words.append(idx_to_word[i])

word_dict = {word: vector for word, vector in zip(unique_words, word_vectors)}

In [None]:
import kagglehub
path = kagglehub.dataset_download("sugataghosh/google-word2vec")
print("Path to dataset files:", path)

In [None]:
from gensim.models import KeyedVectors

path="/kaggle/input/google-word2vec/GoogleNews-vectors-negative300.bin"
model = KeyedVectors.load_word2vec_format(path, binary=True)

print(model.most_similar(positive=['woman', 'king'], negative=['man'], topn=1))


In [None]:
unique_words = list(model.index_to_key)
word_vectors = [model[word] for word in unique_words]
word_dict = {word: vector for word, vector in zip(unique_words, word_vectors)}
word_dict['<UNK>']=np.zeros(300)


In [None]:
print(word_dict['police'])

In [None]:
class NewsDataset(Dataset):
    def __init__(self, csv_file, embedding_dict, max_len=100):
        self.data = pd.read_csv(csv_file)
        self.embedding_dict = embedding_dict
        self.max_len = max_len
        self.categories = {"business": 0, "tech": 1, "politics": 2, "sport": 3, "entertainment": 4}

    def text_to_embedding(self, text):
        words = clean_text(text)
        embeddings=[]
        for word in words[0:self.max_len]:
            embeddings.append(self.embedding_dict.get(word, self.embedding_dict.get('<UNK>')))

        temp=len(embeddings)
        while temp < self.max_len:
            embeddings.append(np.zeros(len(self.embedding_dict['<UNK>'])))
            temp+=1

        embeddings = np.array([e.cpu().numpy() if isinstance(e, torch.Tensor) else e for e in embeddings], dtype=np.float32)
    
        return embeddings

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text_embedding = self.text_to_embedding(self.data.iloc[idx]["Text"])
        label = self.categories[self.data.iloc[idx]["Category"]]
        return torch.tensor(text_embedding, dtype=torch.float32).cpu(), torch.tensor(label, dtype=torch.long).cpu()

In [None]:
from torch.utils.data import random_split
maxlen=100
full_dataset = NewsDataset(csv_path, word_dict, maxlen)
train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])


train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

In [None]:
train_dataset[0][0][-1]

In [None]:
def evaluate_model(model, dataloader, device, average='weighted'):
    model.eval()
    total_correct = 0
    total_samples = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            preds = outputs.argmax(dim=1)

            total_correct += (preds == labels).sum().item()
            total_samples += labels.size(0)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    accuracy = total_correct / total_samples
    f1 = f1_score(all_labels, all_preds, average=average)
    precision = precision_score(all_labels, all_preds, average=average)
    recall = recall_score(all_labels, all_preds, average=average)

    results = {
        'accuracy': accuracy,
        'f1_score': f1,
        'precision': precision,
        'recall': recall,
        'correct': total_correct,
        'incorrect': total_samples - total_correct
    }


    print(f"Test Accuracy : {results['accuracy']:.4f}")
    print(f"F1 Score      : {results['f1_score']:.4f}")
    print(f"Precision     : {results['precision']:.4f}")
    print(f"Recall        : {results['recall']:.4f}")
    print(f"Correct       : {results['correct']}")
    print(f"Incorrect     : {results['incorrect']}")



In [None]:
class CLSTM_B(nn.Module):
    def __init__(self, embed_dim, hidden_dim, num_classes, kernel_sizes=[3,5,7], num_filters=100, dropout=0.1,max_len=100,use_self_attention=True):
        super().__init__()

        self.conv1=nn.Conv1d(in_channels=embed_dim, out_channels=num_filters, kernel_size=kernel_sizes[0], padding=1)
        self.conv2=nn.Conv1d(in_channels=embed_dim, out_channels=num_filters, kernel_size=kernel_sizes[1],padding=2) 
        self.conv3=nn.Conv1d(in_channels=embed_dim, out_channels=num_filters, kernel_size=kernel_sizes[2],padding=3)
        self.lstm_hidden_dim=hidden_dim

        self.lstm = nn.LSTM(embed_dim, hidden_dim, batch_first=True)

        self.fc1 = nn.Linear(num_filters*3+hidden_dim,num_classes*3)
        self.fc2 = nn.Linear(num_classes*3,num_classes)
        self.use_self_attention=use_self_attention
        self.lstm_cell = nn.LSTMCell(input_size=embedding_dim, hidden_size=hidden_dim)
        
    def forward(self, x):
 
        x_cnn = x.permute(0, 2, 1)  # (batch_size, embed_dim, seq_length)
        x_cnn1=F.relu(self.conv1(x_cnn)).permute(0,2,1)
        x_cnn2=F.relu(self.conv2(x_cnn)).permute(0,2,1)
        x_cnn3=F.relu(self.conv3(x_cnn)).permute(0,2,1)

        x_final=[x_cnn1,x_cnn2,x_cnn3]
        x_final_cnn=torch.cat(x_final,dim=2)

        x_final_cnn=torch.mean(x_final_cnn,dim=1)
########################################################################################################

        if self.use_self_attention:
            
            batch_size,seq_len,_ = x.size()
            h_t = torch.zeros(batch_size, self.lstm_hidden_dim, device=x.device)
            c_t = torch.zeros(batch_size, self.lstm_hidden_dim, device=x.device)
            
            hidden_states = []
            
            for t in range(seq_len):
                current_input = x[:, t, :]  # (batch, num_filters)
                h_t, c_t = self.lstm_cell(current_input, (h_t, c_t))
                if t > 0:
                    prev_h = torch.stack(hidden_states, dim=1)
                    attn_scores = torch.bmm(prev_h, h_t.unsqueeze(2)).squeeze(2)
                    attn_weights = F.softmax(attn_scores, dim=1)  # (batch, t, 1)
                    attn_vector = torch.sum(prev_h * attn_weights.unsqueeze(2), dim=1)

                    h_t = h_t + attn_vector
                hidden_states.append(h_t)
        
            h_lstm = hidden_states[-1]
        
########################################################################################################
        
        else:
            _, (h_lstm, _) = self.lstm(x)  # h_lstm: (1, batch_size, hidden_dim)
            h_lstm=h_lstm.squeeze(0)
    
        x_final=[x_final_cnn,h_lstm]
        x_final=torch.cat((x_final_cnn,h_lstm),dim=1)
    
        x_final=self.fc1(x_final)
        x_final=self.fc2(x_final)
        return x_final
        



In [None]:
# vocab_size = 20000      
embedding_dim = 300 #20   
num_filters = 50      
lstm_hidden_dim = 64
num_classes = 5        
max_len = 100          
# dropout_rate = 0.2
use_self_attention=True
learning_rate=0.001
num_epochs = 10
kernel_sizes=[3,5,7]
model=CLSTM_B(embedding_dim,lstm_hidden_dim, num_classes,kernel_sizes, num_filters,use_self_attention=use_self_attention)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.RMSprop(model.parameters(), lr=learning_rate)
# optimizer = torch.optim.SGD(params=model.parameters(), lr=learning_rate, momentum=0.9)

train_loss_log=[]
val_loss_log=[]
for epoch in range(num_epochs):
    model.train()
    train_loss, train_correct, train_total = 0.0, 0, 0
    
    for inputs, labels in train_dataloader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item() * inputs.size(0)
        train_correct += (outputs.argmax(1) == labels).sum().item()
        train_total += labels.size(0)
    train_loss_log.append(train_loss/train_total)
    val_loss, val_correct, val_total = 0.0, 0, 0
    model.eval()
    with torch.no_grad():
        for inputs, labels in val_dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item() * inputs.size(0)
            val_correct += (outputs.argmax(1) == labels).sum().item()
            val_total += labels.size(0)
    val_loss_log.append(val_loss/val_total)
    
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss/train_total:.4f}, Train Acc: {train_correct/train_total:.2f}, Val Loss: {val_loss/val_total:.4f},Val Acc: {val_correct/val_total:.2f}")


In [None]:
plt.plot(train_loss_log, color='red', label='train_loss')
plt.plot(val_loss_log, color='blue', label='validation_loss')

plt.title("Loss During Training")
plt.xlabel("Epoch")
plt.ylabel("Cross Entropy")
plt.legend()
plt.show()

In [None]:
test_csv_path="/kaggle/input/bbc-news-test-final/TestLabels.csv"
test_dataset = NewsDataset(test_csv_path, word_dict, maxlen)
batch_size=64
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
evaluate_model(model,test_dataloader,device)

In [None]:
class CLSTM_A(nn.Module):
    def __init__(self, embedding_dim, num_filters, filter_size, 
                 lstm_hidden_dim, num_classes, use_self_attention=False,dropout_rate=0.2):
        super().__init__()
        self.conv = nn.Conv1d(in_channels=embedding_dim,
                              out_channels=num_filters,
                              kernel_size=filter_size)
        self.dropout = nn.Dropout(dropout_rate)
        self.lstm = nn.LSTM(input_size=num_filters, 
                            hidden_size=lstm_hidden_dim, 
                            batch_first=True)
        self.lstm_cell = nn.LSTMCell(input_size=num_filters, hidden_size=lstm_hidden_dim)
        self.fc = nn.Linear(lstm_hidden_dim, num_classes)
        self.lstm_hidden_dim = lstm_hidden_dim
        self.use_self_attention=use_self_attention

    def forward(self, x):
        x = self.dropout(x)
        x = x.transpose(1, 2)  # (batch, embedding_dim, max_len)
        conv_out = F.relu(self.conv(x))  
        #  (batch, num_filters, L_out) 
        conv_out = conv_out.transpose(1, 2)   #(batch, L_out, num_filters)
        
        if self.use_self_attention:
            
            batch_size,seq_len,_ = conv_out.size()

            h_t = torch.zeros(batch_size, self.lstm_hidden_dim, device=conv_out.device)
            c_t = torch.zeros(batch_size, self.lstm_hidden_dim, device=conv_out.device)
            
            hidden_states = []
            for t in range(seq_len):
                current_input = conv_out[:, t, :]  # (batch, num_filters)
                h_t, c_t = self.lstm_cell(current_input, (h_t, c_t))

                if t > 0:
                    prev_h = torch.stack(hidden_states, dim=1)
                    attn_scores = torch.bmm(prev_h, h_t.unsqueeze(2)).squeeze(2)
                    attn_weights = F.softmax(attn_scores, dim=1)  # (batch, t, 1)
                    attn_vector = torch.sum(prev_h * attn_weights.unsqueeze(2), dim=1)

                    h_t = h_t + attn_vector
                hidden_states.append(h_t)

            final_feature = self.dropout(h_t)
        else:
            lstm_out, (h_n, c_n) = self.lstm(conv_out)
            final_feature = h_n.squeeze(0)  # (batch, lstm_hidden_dim)
            final_feature = self.dropout(final_feature)
            
        logits = self.fc(final_feature)
        return logits

In [None]:
# Example hyperparameters
# vocab_size = 20000      
# embedding_dim = 20    
num_filters = 50       
filter_size = 10         
lstm_hidden_dim = 64
num_classes = 5         
max_len = 100            
dropout_rate = 0.2
use_self_attention=True
learning_rate=0.0001
num_epochs = 30 
model=CLSTM_A(embedding_dim, num_filters, filter_size, 
                 lstm_hidden_dim, num_classes, use_self_attention,dropout_rate=dropout_rate)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

criterion = nn.CrossEntropyLoss()
# optimizer = optim.RMSprop(model.parameters(), lr=learning_rate)
# optimizer = torch.optim.SGD(params=model.parameters(), lr=learning_rate, momentum=0.9)
optimizer = torch.optim.Adam(params=model.parameters(), lr=learning_rate)

train_loss_log=[]
val_loss_log=[]
for epoch in range(num_epochs):
    model.train()
    train_loss, train_correct, train_total = 0.0, 0, 0
    
    for inputs, labels in train_dataloader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item() * inputs.size(0)
        train_correct += (outputs.argmax(1) == labels).sum().item()
        train_total += labels.size(0)

    train_loss_log.append(train_loss/train_total)
    
    val_loss, val_correct, val_total = 0.0, 0, 0
    model.eval()
    with torch.no_grad():
        for inputs, labels in val_dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item() * inputs.size(0)
            val_correct += (outputs.argmax(1) == labels).sum().item()
            val_total += labels.size(0)

    val_loss_log.append(val_loss/val_total)
    
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss/train_total:.4f}, Train Acc: {train_correct/train_total:.2f}, Val Loss: {val_loss/val_total:.4f},Val Acc: {val_correct/val_total:.2f}")


In [None]:
plt.plot(train_loss_log, color='red', label='train_loss')
plt.plot(val_loss_log, color='blue', label='validation_loss')

plt.title("Loss During Training")
plt.xlabel("Epoch")
plt.ylabel("Cross Entropy")
plt.legend()
plt.show()

In [None]:
test_csv_path="/kaggle/input/bbc-news-test-final/TestLabels.csv"

In [None]:
test_dataset = NewsDataset(test_csv_path, word_dict, maxlen)
batch_size=64
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
evaluate_model(model,test_dataloader,device)

In [None]:
class TransformerEncoderBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, ff_hidden_dim, dropout=0.1):
        super().__init__()
        self.self_attn = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout)
        self.fc1 = nn.Linear(embed_dim, ff_hidden_dim)
        self.fc2 = nn.Linear(ff_hidden_dim, embed_dim)
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
    
    def forward(self, x, mask=None):
        attn_output, _ = self.self_attn(x, x, x, attn_mask=mask)
        x = x + self.dropout1(attn_output)
        x = self.norm1(x)
        x1=self.fc1(x)
        x1=F.relu(x1)
        x1=self.fc2(x1)
        x = x + self.dropout2(x1)
        x = self.norm2(x)
        return x

class TransformerTextEncoder(nn.Module):
    def __init__(self, embed_dim, num_heads, ff_hidden_dim, num_layers, num_classes, use_positional_encoding=True, dropout=0.1):
        super().__init__()
        #######################################################
        self.pe = torch.zeros(100, embed_dim)
        position = torch.arange(0, 100, dtype=torch.float).unsqueeze(1)
        self.div_term = torch.arange(0, embed_dim, 2).float()
        self.div_term = self.div_term * (-math.log(10000.0) / embed_dim)
        self.div_term = torch.exp(self.div_term)
        self.pe[:, 0::2] = torch.sin(position * self.div_term)
        self.pe[:, 1::2] = torch.cos(position * self.div_term)
        self.pe = self.pe.unsqueeze(0)
        ###################################################
        self.layers=[]
        for i in range(num_layers):
            self.layers.append(TransformerEncoderBlock(embed_dim, num_heads, ff_hidden_dim, dropout))
        self.transformer_layers=nn.Sequential(*self.layers)
        self.norm = nn.LayerNorm(embed_dim)
        self.fc = nn.Linear(embed_dim, num_classes)
        self.use_positional_encoding=use_positional_encoding
    
    def forward(self, x, mask=None):
        x=x
        if self.use_positional_encoding:
            x = x + self.pe[:, :x.size(1), :].to(x.device)
        for layer in self.transformer_layers:
            x = layer(x, mask)
        x = self.norm(x.mean(dim=1)) 
        return self.fc(x)


In [None]:
num_epochs = 20
learning_rate = 0.0001
num_of_heads = 5
layers = [2,4,6]
ff_dim = 2*embedding_dim
use_positional_encoding_list = [False,True]
dropout=0.2


test_csv_path = "/kaggle/input/bbc-news-test-final/TestLabels.csv"
test_dataset = NewsDataset(test_csv_path, word_dict, maxlen)
batch_size = 64
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

for use_positional_encoding in use_positional_encoding_list:
    for num_of_layers in layers:
        
        print('use_positional_encoding',use_positional_encoding,';  num_of_layers',num_of_layers)
        
        model = TransformerTextEncoder(
            embed_dim=embedding_dim,
            num_heads=num_of_heads,
            ff_hidden_dim=ff_dim,
            num_layers=num_of_layers,
            num_classes=5,
            dropout=0.1,
            use_positional_encoding=use_positional_encoding
        ).to(device)
        
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
        
        for epoch in range(num_epochs):
            model.train()
            train_loss, train_correct, train_total = 0.0, 0, 0
            all_train_preds, all_train_labels = [], []
        
            for inputs, labels in train_dataloader:
                inputs, labels = inputs.to(device), labels.to(device)
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
        
                train_loss += loss.item() * inputs.size(0)
                preds = outputs.argmax(dim=1)
                train_correct += (preds == labels).sum().item()
                train_total += labels.size(0)
        
                all_train_preds.extend(preds.cpu().numpy())
                all_train_labels.extend(labels.cpu().numpy())
        
            train_f1 = f1_score(all_train_labels, all_train_preds, average='weighted')
        
            model.eval()
            val_loss, val_correct, val_total = 0.0, 0, 0
            all_val_preds, all_val_labels = [], []
        
            with torch.no_grad():
                for inputs, labels in val_dataloader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
        
                    val_loss += loss.item() * inputs.size(0)
                    preds = outputs.argmax(dim=1)
                    val_correct += (preds == labels).sum().item()
                    val_total += labels.size(0)
        
                    all_val_preds.extend(preds.cpu().numpy())
                    all_val_labels.extend(labels.cpu().numpy())
        
            val_f1 = f1_score(all_val_labels, all_val_preds, average='weighted')

        evaluate_model(model,test_dataloader,device)
        

