In [1]:
import pandas as pd

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
df = pd.read_csv("/content/drive/MyDrive/Corona_NLP_train (1).csv", encoding='latin-1')
df.head()

Unnamed: 0,UserName,ScreenName,Location,TweetAt,OriginalTweet,Sentiment
0,3799,48751,London,16-03-2020,@MeNyrbie @Phil_Gahan @Chrisitv https://t.co/i...,Neutral
1,3800,48752,UK,16-03-2020,advice Talk to your neighbours family to excha...,Positive
2,3801,48753,Vagabonds,16-03-2020,Coronavirus Australia: Woolworths to give elde...,Positive
3,3802,48754,,16-03-2020,My food stock is not the only one which is emp...,Positive
4,3803,48755,,16-03-2020,"Me, ready to go at supermarket during the #COV...",Extremely Negative


In [5]:
df_test = pd.read_csv("/content/drive/MyDrive/Corona_NLP_test.csv", encoding='latin-1')
df_test.head()

Unnamed: 0,UserName,ScreenName,Location,TweetAt,OriginalTweet,Sentiment
0,1,44953,NYC,02-03-2020,TRENDING: New Yorkers encounter empty supermar...,Extremely Negative
1,2,44954,"Seattle, WA",02-03-2020,When I couldn't find hand sanitizer at Fred Me...,Positive
2,3,44955,,02-03-2020,Find out how you can protect yourself and love...,Extremely Positive
3,4,44956,Chicagoland,02-03-2020,#Panic buying hits #NewYork City as anxious sh...,Negative
4,5,44957,"Melbourne, Victoria",03-03-2020,#toiletpaper #dunnypaper #coronavirus #coronav...,Neutral


## Pre-Processing

In [6]:
import re

def preprocess_tweets(df):

    def clean_tweet(text):
        text = text.lower()
        text = re.sub(r"http\S+|www\S+|https\S+", "", text)
        text = re.sub(r"@\w+", "", text)
        text = re.sub(r"#\w+", "", text)
        text = re.sub(r"[^a-z0-9\s]", " ", text)
        text = re.sub(r"\s+", " ", text).strip()
        return text

    def simple_tokenize(text):
        return text.split()

    # Apply cleaning + tokenization
    df['CleanTweet'] = df['OriginalTweet'].apply(clean_tweet)
    df['Tokens'] = df['CleanTweet'].apply(simple_tokenize)

    # Keep only CleanTweet, Tokens, and Sentiment
    df = df[['CleanTweet', 'Tokens', 'Sentiment']]

    return df


In [7]:
df_clean = preprocess_tweets(df)
df_clean.head()

Unnamed: 0,CleanTweet,Tokens,Sentiment
0,and and,"[and, and]",Neutral
1,advice talk to your neighbours family to excha...,"[advice, talk, to, your, neighbours, family, t...",Positive
2,coronavirus australia woolworths to give elder...,"[coronavirus, australia, woolworths, to, give,...",Positive
3,my food stock is not the only one which is emp...,"[my, food, stock, is, not, the, only, one, whi...",Positive
4,me ready to go at supermarket during the outbr...,"[me, ready, to, go, at, supermarket, during, t...",Extremely Negative


In [8]:
df_clean_test = preprocess_tweets(df_test)
df_clean_test.head()

Unnamed: 0,CleanTweet,Tokens,Sentiment
0,trending new yorkers encounter empty supermark...,"[trending, new, yorkers, encounter, empty, sup...",Extremely Negative
1,when i couldn t find hand sanitizer at fred me...,"[when, i, couldn, t, find, hand, sanitizer, at...",Positive
2,find out how you can protect yourself and love...,"[find, out, how, you, can, protect, yourself, ...",Extremely Positive
3,buying hits city as anxious shoppers stock up ...,"[buying, hits, city, as, anxious, shoppers, st...",Negative
4,one week everyone buying baby milk powder the ...,"[one, week, everyone, buying, baby, milk, powd...",Neutral


In [9]:
import numpy as np
from collections import Counter

def prepare_data(df, df_test, max_length=50, vocab_size=10000, val_split_ratio=0.1):
    """
    Prepares data for a neural network using separate train/validation and test sets.

    This function performs the following steps:
    1. Splits the main DataFrame `df` into training and validation sets.
    2. Encodes sentiment labels into integers.
    3. Builds a vocabulary from the training data ONLY.
    4. Processes the external `df_test` using the established vocabulary and label encoding.
    5. Converts all token sequences to integer sequences and pads them.

    Args:
        df (pd.DataFrame): DataFrame for training and validation.
        df_test (pd.DataFrame): Separate DataFrame for testing.
        max_length (int): The fixed length for all sequences.
        vocab_size (int): The maximum number of words in the vocabulary.
        val_split_ratio (float): The proportion of `df` to be used for validation.

    Returns:
        tuple: A tuple containing the processed train, val, and test sets,
               plus the word and label mapping dictionaries.
    """
    # --- 1. Encode Labels on the main DataFrame ---
    unique_labels = df['Sentiment'].unique()
    label_to_idx = {label: i for i, label in enumerate(unique_labels)}
    encoded_labels = df['Sentiment'].map(label_to_idx).to_numpy()
    token_sequences = df['Tokens'].tolist()

    # --- 2. Split main DataFrame into Training and Validation ---
    num_samples = len(token_sequences)
    indices = np.arange(num_samples)
    np.random.seed(42) # for reproducibility
    np.random.shuffle(indices)

    shuffled_sequences = [token_sequences[i] for i in indices]
    shuffled_labels = encoded_labels[indices]

    val_split_index = int((1 - val_split_ratio) * num_samples)

    train_seqs = shuffled_sequences[:val_split_index]
    y_train = shuffled_labels[:val_split_index]

    val_seqs = shuffled_sequences[val_split_index:]
    y_val = shuffled_labels[val_split_index:]

    # --- 3. Build Vocabulary (from training data only) ---
    word_counts = Counter(word for seq in train_seqs for word in seq)
    most_common_words = [word for word, count in word_counts.most_common(vocab_size - 2)]

    word_to_idx = {'<PAD>': 0, '<UNK>': 1}
    for i, word in enumerate(most_common_words):
        word_to_idx[word] = i + 2

    # --- 4. Process the External Test Set ---
    # NOTE: Assumes df_test has already been preprocessed to have 'Tokens' and 'Sentiment'
    test_seqs = df_test['Tokens'].tolist()
    y_test = df_test['Sentiment'].map(label_to_idx).to_numpy()

    # --- 5. Encode and Pad all Data Splits ---
    def encode_and_pad(sequences, word_map, length):
        padded_features = np.zeros((len(sequences), length), dtype=np.int32)
        for i, seq in enumerate(sequences):
            for j, word in enumerate(seq):
                if j >= length: # Truncate if sequence is too long
                    break
                # Default to <UNK> token's index (1) if word is not in vocab
                padded_features[i, j] = word_map.get(word, 1)
        return padded_features

    X_train = encode_and_pad(train_seqs, word_to_idx, max_length)
    X_val = encode_and_pad(val_seqs, word_to_idx, max_length)
    X_test = encode_and_pad(test_seqs, word_to_idx, max_length)

    return (X_train, y_train), (X_val, y_val), (X_test, y_test), word_to_idx, label_to_idx

In [10]:
# Define parameters
MAX_SEQ_LENGTH = 50
VOCAB_SIZE = 10000

# Call the function to get your final, model-ready data
(X_train, y_train), (X_val, y_val), (X_test, y_test), word_to_idx, label_to_idx = prepare_data(
    df_clean,df_clean_test,
    max_length=MAX_SEQ_LENGTH,
    vocab_size=VOCAB_SIZE
)

# You can now check the shapes and content
print("Shape of X_train:", X_train.shape)
print("Shape of y_train:", y_train.shape)
print("-" * 30)
print("Example sequence (integers):", X_train[0])
print("Corresponding label (integer):", y_train[0])
print("-" * 30)
print("Vocabulary size:", len(word_to_idx))
print("Label mapping:", label_to_idx)

Shape of X_train: (37041, 50)
Shape of y_train: (37041,)
------------------------------
Example sequence (integers): [ 300    2   69  137   38   30  144  708 5650   31   11  229  723   62
    2  134  270   18    8  403   23 1564   11  696    3 2021    7   81
 2427    5    2  159  118    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0]
Corresponding label (integer): 0
------------------------------
Vocabulary size: 10000
Label mapping: {'Neutral': 0, 'Positive': 1, 'Extremely Negative': 2, 'Negative': 3, 'Extremely Positive': 4}


## Mini-Batches

In [11]:
import numpy as np

def create_mini_batches(X, y, batch_size=64, shuffle=True):
    """
    Creates a generator of mini-batches from the input data.

    Args:
        X (np.ndarray): Input data of shape (num_samples, seq_length).
        y (np.ndarray): Labels of shape (num_samples,).
        batch_size (int): The size of each mini-batch.
        shuffle (bool): If True, shuffles the data at the start of each epoch.

    Yields:
        tuple: A tuple containing a mini-batch of data and its corresponding labels.
    """
    num_samples = X.shape[0]

    # Create a permutation of indices
    indices = np.arange(num_samples)
    if shuffle:
        np.random.shuffle(indices)

    # Yield batches one by one
    for start_idx in range(0, num_samples, batch_size):
        end_idx = min(start_idx + batch_size, num_samples)
        batch_indices = indices[start_idx:end_idx]

        yield X[batch_indices], y[batch_indices]

In [12]:
# 1. Define your batch size
BATCH_SIZE = 64

# 2. Create the mini-batch generator using your actual training data
train_mini_batch_generator = create_mini_batches(X_train, y_train, batch_size=BATCH_SIZE)

# 3. (Optional) Loop through it to see the batches being created
print(f"Creating mini-batches of size {BATCH_SIZE} from training data...")
print("-" * 30)

for i, (mini_batch_X, mini_batch_y) in enumerate(train_mini_batch_generator):
    # In a real training loop, you would feed this batch into your model
    if i < 3: # Let's just print the first 3 batches to check
      print(f"Mini-batch {i+1}:")
      print(f"  X shape: {mini_batch_X.shape}")
      print(f"  y shape: {mini_batch_y.shape}")

print("\n...")
print("Generator is ready for the training loop.")

Creating mini-batches of size 64 from training data...
------------------------------
Mini-batch 1:
  X shape: (64, 50)
  y shape: (64,)
Mini-batch 2:
  X shape: (64, 50)
  y shape: (64,)
Mini-batch 3:
  X shape: (64, 50)
  y shape: (64,)

...
Generator is ready for the training loop.


## Test

In [13]:
def calculate_metrics(y_true, y_pred, num_classes):
    # --- Calculate Accuracy ---
    accuracy = np.mean(y_true == y_pred)

    # --- Calculate Macro F1-Score ---
    f1_per_class = []
    for c in range(num_classes):
        # True Positives, False Positives, False Negatives
        tp = np.sum((y_pred == c) & (y_true == c))
        fp = np.sum((y_pred == c) & (y_true != c))
        fn = np.sum((y_pred != c) & (y_true == c))

        # Precision and Recall for the current class
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0

        # F1-Score for the current class
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        f1_per_class.append(f1)

    macro_f1 = np.mean(f1_per_class)

    return accuracy, macro_f1

In [14]:
def save_model(model, word_to_idx, label_to_idx, filepath):
    """
    Saves the model parameters and mappings to a compressed .npz file.

    Args:
        model (RNN): The trained RNN model instance.
        word_to_idx (dict): The vocabulary mapping.
        label_to_idx (dict): The class label mapping.
        filepath (str): The path to save the file (e.g., 'my_model.npz').
    """
    # np.savez_compressed can save dictionaries if they are passed as keyword arguments.
    # We use **model.params to unpack the parameters dictionary.
    np.savez_compressed(
        filepath,
        **model.params,
        word_to_idx=word_to_idx,
        label_to_idx=label_to_idx
    )
    print(f"Model saved to {filepath}")

## Models

### RNN

In [15]:
import numpy as np
import sys

class RNN:
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        """Initializes the RNN with Xavier (Glorot) Initialization."""
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim

        # --- 1. Optimized Parameter Initialization (Xavier/Glorot) ---
        # This method helps maintain signal variance and combats vanishing/exploding gradients.
        self.params = {}

        # Embedding Matrix
        limit_E = np.sqrt(6.0 / (vocab_size + embedding_dim))
        self.params['E'] = np.random.uniform(-limit_E, limit_E, (vocab_size, embedding_dim))

        # Input to Hidden Weights
        limit_Wxh = np.sqrt(6.0 / (embedding_dim + hidden_dim))
        self.params['W_xh'] = np.random.uniform(-limit_Wxh, limit_Wxh, (embedding_dim, hidden_dim))

        # Hidden to Hidden Weights
        limit_Whh = np.sqrt(6.0 / (hidden_dim + hidden_dim))
        self.params['W_hh'] = np.random.uniform(-limit_Whh, limit_Whh, (hidden_dim, hidden_dim))

        # Hidden bias (initialized to zeros)
        self.params['b_h'] = np.zeros((1, hidden_dim))

        # Hidden to Output Weights
        limit_Why = np.sqrt(6.0 / (hidden_dim + output_dim))
        self.params['W_hy'] = np.random.uniform(-limit_Why, limit_Why, (hidden_dim, output_dim))

        # Output bias (initialized to zeros)
        self.params['b_y'] = np.zeros((1, output_dim))

        # 2. Initialize Adam Optimizer State
        self.adam_m = {k: np.zeros_like(v) for k, v in self.params.items()}
        self.adam_v = {k: np.zeros_like(v) for k, v in self.params.items()}
        self.adam_t = 0 # Timestep counter

    def _softmax(self, z):
        """Computes softmax probabilities."""
        exp_z = np.exp(z - np.max(z, axis=1, keepdims=True))
        return exp_z / np.sum(exp_z, axis=1, keepdims=True)

    def forward(self, X_batch):
        """Performs the forward pass for a batch of sequences."""
        E, W_xh, W_hh, b_h, W_hy, b_y = (self.params[k] for k in
                                        ('E', 'W_xh', 'W_hh', 'b_h', 'W_hy', 'b_y'))
        batch_size, seq_length = X_batch.shape

        h_prev = np.zeros((batch_size, self.hidden_dim))
        cache = {'h': {0: h_prev}, 'x_embedded': {}, 'X_batch': X_batch}

        for t in range(seq_length):
            word_indices = X_batch[:, t]
            x_t = E[word_indices]
            h_next = np.tanh(x_t @ W_xh + h_prev @ W_hh + b_h)

            cache['x_embedded'][t] = x_t
            cache['h'][t + 1] = h_next
            h_prev = h_next

        logits = h_prev @ W_hy + b_y
        probs = self._softmax(logits)
        cache['probs'] = probs
        return probs, cache

    def compute_loss(self, probs, y_batch):
        """Computes cross-entropy loss."""
        batch_size = y_batch.shape[0]
        log_probs = -np.log(probs[np.arange(batch_size), y_batch] + 1e-9) # add epsilon for stability
        loss = np.sum(log_probs) / batch_size
        return loss

    def backward(self, y_batch, cache):
        """Performs backpropagation through time (BPTT)."""
        E, W_xh, W_hh, W_hy = (self.params[k] for k in ('E', 'W_xh', 'W_hh', 'W_hy'))
        probs, h, x_embedded, X_batch = (cache[k] for k in ('probs', 'h', 'x_embedded', 'X_batch'))
        batch_size, seq_length = X_batch.shape

        grads = {k: np.zeros_like(v) for k, v in self.params.items()}

        d_logits = np.copy(probs)
        d_logits[np.arange(batch_size), y_batch] -= 1
        d_logits /= batch_size

        h_final = h[len(h) - 1]
        grads['W_hy'] = h_final.T @ d_logits
        grads['b_y'] = np.sum(d_logits, axis=0, keepdims=True)
        d_h = d_logits @ W_hy.T

        for t in reversed(range(seq_length)):
            d_tanh = (1 - h[t + 1]**2) * d_h
            grads['b_h'] += np.sum(d_tanh, axis=0, keepdims=True)
            grads['W_hh'] += h[t].T @ d_tanh
            grads['W_xh'] += x_embedded[t].T @ d_tanh
            d_h = d_tanh @ W_hh.T
            d_E_t = d_tanh @ W_xh.T
            np.add.at(grads['E'], X_batch[:, t], d_E_t)

        # Clip gradients to prevent explosion
        for key in grads:
            np.clip(grads[key], -5, 5, out=grads[key])

        return grads

    def update_params_with_adam(self, grads, learning_rate, beta1, beta2, epsilon=1e-8):
        """Updates parameters using the Adam optimizer."""
        self.adam_t += 1 # Increment timestep

        for key in self.params:
            self.adam_m[key] = beta1 * self.adam_m[key] + (1 - beta1) * grads[key]
            self.adam_v[key] = beta2 * self.adam_v[key] + (1 - beta2) * (grads[key] ** 2)
            m_hat = self.adam_m[key] / (1 - beta1 ** self.adam_t)
            v_hat = self.adam_v[key] / (1 - beta2 ** self.adam_t)
            self.params[key] -= learning_rate * m_hat / (np.sqrt(v_hat) + epsilon)

    def predict(self, X):
        """Makes predictions for a given input X."""
        probs, _ = self.forward(X)
        return np.argmax(probs, axis=1)

### LSTM

In [16]:
import numpy as np
import sys

class LSTM:
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        """Initializes the LSTM model with Xavier Initialization."""
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim

        self.params = {}
        combined_dim = embedding_dim + hidden_dim

        limit_E = np.sqrt(6.0 / (vocab_size + embedding_dim))
        self.params['E'] = np.random.uniform(-limit_E, limit_E, (vocab_size, embedding_dim))

        limit_gates = np.sqrt(6.0 / (combined_dim + 4 * hidden_dim))
        self.params['W_gates'] = np.random.uniform(-limit_gates, limit_gates, (combined_dim, 4 * hidden_dim))
        self.params['b_gates'] = np.zeros((1, 4 * hidden_dim))

        limit_Why = np.sqrt(6.0 / (hidden_dim + output_dim))
        self.params['W_hy'] = np.random.uniform(-limit_Why, limit_Why, (hidden_dim, output_dim))
        self.params['b_y'] = np.zeros((1, output_dim))

        self.adam_m = {k: np.zeros_like(v) for k, v in self.params.items()}
        self.adam_v = {k: np.zeros_like(v) for k, v in self.params.items()}
        self.adam_t = 0

    def _sigmoid(self, z):
        return 1 / (1 + np.exp(-z))

    def _softmax(self, z):
        exp_z = np.exp(z - np.max(z, axis=1, keepdims=True))
        return exp_z / np.sum(exp_z, axis=1, keepdims=True)

    def forward(self, X_batch):
        """Performs the forward pass for the LSTM."""
        E, W_gates, b_gates, W_hy, b_y = (self.params[k] for k in
                                          ('E', 'W_gates', 'b_gates', 'W_hy', 'b_y'))
        batch_size, seq_length = X_batch.shape
        h_dim = self.hidden_dim

        h_prev = np.zeros((batch_size, h_dim))
        c_prev = np.zeros((batch_size, h_dim))

        cache = {'X_batch': X_batch, 'h': {}, 'c': {}, 'gates': {}}
        cache['h'][-1], cache['c'][-1] = h_prev, c_prev

        for t in range(seq_length):
            word_indices = X_batch[:, t]
            x_t = E[word_indices]

            combined_input = np.hstack((h_prev, x_t))
            gates_raw = combined_input @ W_gates + b_gates

            f_raw, i_raw, c_tilde_raw, o_raw = np.split(gates_raw, 4, axis=1)

            f = self._sigmoid(f_raw)
            i = self._sigmoid(i_raw)
            c_tilde = np.tanh(c_tilde_raw)
            o = self._sigmoid(o_raw)

            c_next = f * c_prev + i * c_tilde
            h_next = o * np.tanh(c_next)

            cache['gates'][t] = {'f': f, 'i': i, 'o': o, 'c_tilde': c_tilde, 'combined': combined_input}
            cache['h'][t], cache['c'][t] = h_next, c_next

            h_prev, c_prev = h_next, c_next

        # --- *** FIX 1: Store the final hidden state *** ---
        h_final = h_prev
        cache['h_final'] = h_final
        # --- End of Fix ---

        logits = h_final @ W_hy + b_y
        probs = self._softmax(logits)
        cache['probs'] = probs

        return probs, cache

    def compute_loss(self, probs, y_batch):
        batch_size = y_batch.shape[0]
        log_probs = -np.log(probs[np.arange(batch_size), y_batch] + 1e-9)
        return np.sum(log_probs) / batch_size

    def backward(self, y_batch, cache):
        """Performs backpropagation for the LSTM."""
        W_gates, W_hy = self.params['W_gates'], self.params['W_hy']
        probs, X_batch = cache['probs'], cache['X_batch']
        batch_size, seq_length = X_batch.shape
        e_dim, h_dim = self.embedding_dim, self.hidden_dim

        grads = {k: np.zeros_like(v) for k, v in self.params.items()}
        dh_next = np.zeros((batch_size, h_dim))
        dc_next = np.zeros((batch_size, h_dim))

        d_logits = np.copy(probs)
        d_logits[np.arange(batch_size), y_batch] -= 1
        d_logits /= batch_size

        # --- *** FIX 2: Retrieve h_final correctly *** ---
        h_final = cache['h_final']
        # --- End of Fix ---

        grads['W_hy'] = h_final.T @ d_logits
        grads['b_y'] = np.sum(d_logits, axis=0, keepdims=True)
        dh_next = d_logits @ W_hy.T

        for t in reversed(range(seq_length)):
            gates = cache['gates'][t]
            c_prev = cache['c'][t-1]
            c_curr = cache['c'][t]
            combined = gates['combined']

            dh = dh_next
            do = dh * np.tanh(c_curr)
            do_raw = do * gates['o'] * (1 - gates['o'])

            dc = dc_next + dh * gates['o'] * (1 - np.tanh(c_curr)**2)

            df = dc * c_prev
            df_raw = df * gates['f'] * (1 - gates['f'])

            di = dc * gates['c_tilde']
            di_raw = di * gates['i'] * (1 - gates['i'])

            dc_tilde = dc * gates['i']
            dc_tilde_raw = dc_tilde * (1 - gates['c_tilde']**2)

            d_gates_raw = np.hstack((df_raw, di_raw, dc_tilde_raw, do_raw))

            grads['W_gates'] += combined.T @ d_gates_raw
            grads['b_gates'] += np.sum(d_gates_raw, axis=0, keepdims=True)

            d_combined = d_gates_raw @ W_gates.T

            dh_next = d_combined[:, :h_dim]
            dc_next = dc * gates['f']

            dx_t = d_combined[:, h_dim:]
            np.add.at(grads['E'], X_batch[:, t], dx_t)

        for key in grads:
            np.clip(grads[key], -5, 5, out=grads[key])

        return grads

    def update_params_with_adam(self, grads, learning_rate, beta1, beta2, epsilon=1e-8):
        self.adam_t += 1
        for key in self.params:
            self.adam_m[key] = beta1 * self.adam_m[key] + (1 - beta1) * grads[key]
            self.adam_v[key] = beta2 * self.adam_v[key] + (1 - beta2) * (grads[key] ** 2)
            m_hat = self.adam_m[key] / (1 - beta1 ** self.adam_t)
            v_hat = self.adam_v[key] / (1 - beta2 ** self.adam_t)
            self.params[key] -= learning_rate * m_hat / (np.sqrt(v_hat) + epsilon)

    def predict(self, X):
        probs, _ = self.forward(X)
        return np.argmax(probs, axis=1)

### Tranformer

In [17]:
import numpy as np
import sys

# ===================================================================
# SECTION 1: HELPER FUNCTIONS
# ===================================================================

def positional_encoding(seq_length, embedding_dim):
    """Generates the positional encoding matrix."""
    pos = np.arange(seq_length)[:, np.newaxis]
    i = np.arange(embedding_dim)[np.newaxis, :]
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(embedding_dim))
    angle_rads = pos * angle_rates
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[np.newaxis, ...]
    return pos_encoding.astype(np.float32)

def create_mini_batches(X, y, batch_size=64, shuffle=True):
    """Creates a generator of mini-batches."""
    num_samples = X.shape[0]
    indices = np.arange(num_samples)
    if shuffle:
        np.random.shuffle(indices)
    for start_idx in range(0, num_samples, batch_size):
        end_idx = min(start_idx + batch_size, num_samples)
        batch_indices = indices[start_idx:end_idx]
        yield X[batch_indices], y[batch_indices]

def calculate_metrics(y_true, y_pred, num_classes):
    """Calculates accuracy and macro F1-score."""
    accuracy = np.mean(y_true == y_pred)
    f1_per_class = []
    for c in range(num_classes):
        tp = np.sum((y_pred == c) & (y_true == c))
        fp = np.sum((y_pred == c) & (y_true != c))
        fn = np.sum((y_pred != c) & (y_true == c))
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        f1_per_class.append(f1)
    macro_f1 = np.mean(f1_per_class)
    return accuracy, macro_f1

# ===================================================================
# SECTION 2: TRANSFORMER ENCODER CLASS
# ===================================================================

class TransformerEncoder:
    def __init__(self, vocab_size, max_len, embedding_dim, num_heads, ff_hidden_dim, num_blocks, output_dim):
        self.params = {}
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.num_heads = num_heads
        self.num_blocks = num_blocks
        self.output_dim = output_dim  # <-- *** ADD THIS LINE ***

        self.params['E'] = np.random.randn(vocab_size, embedding_dim) * 0.01
        self.pos_encoding = positional_encoding(max_len, embedding_dim)

        self.params['blocks'] = {}
        for i in range(num_blocks):
            block = {
                'W_qkv': np.random.randn(embedding_dim, 3 * embedding_dim) * 0.01,
                'W_o': np.random.randn(embedding_dim, embedding_dim) * 0.01,
                'ln1_gamma': np.ones((1, 1, embedding_dim)), 'ln1_beta': np.zeros((1, 1, embedding_dim)),
                'ln2_gamma': np.ones((1, 1, embedding_dim)), 'ln2_beta': np.zeros((1, 1, embedding_dim)),
                'W1': np.random.randn(embedding_dim, ff_hidden_dim) * 0.01, 'b1': np.zeros((1, ff_hidden_dim)),
                'W2': np.random.randn(ff_hidden_dim, embedding_dim) * 0.01, 'b2': np.zeros((1, embedding_dim))
            }
            self.params['blocks'][i] = block

        self.params['W_out'] = np.random.randn(embedding_dim, output_dim) * 0.01
        self.params['b_out'] = np.zeros((1, output_dim))
        self._initialize_adam()

        self.params['W_out'] = np.random.randn(embedding_dim, output_dim) * 0.01
        self.params['b_out'] = np.zeros((1, output_dim))
        self._initialize_adam()

        self.params['W_out'] = np.random.randn(embedding_dim, output_dim) * 0.01
        self.params['b_out'] = np.zeros((1, output_dim))
        self._initialize_adam()

    def _initialize_adam(self):
        self.adam_m = {'E': np.zeros_like(self.params['E']), 'W_out': np.zeros_like(self.params['W_out']), 'b_out': np.zeros_like(self.params['b_out']), 'blocks': {}}
        self.adam_v = {'E': np.zeros_like(self.params['E']), 'W_out': np.zeros_like(self.params['W_out']), 'b_out': np.zeros_like(self.params['b_out']), 'blocks': {}}
        for i in range(self.num_blocks):
            self.adam_m['blocks'][i] = {k: np.zeros_like(v) for k, v in self.params['blocks'][i].items()}
            self.adam_v['blocks'][i] = {k: np.zeros_like(v) for k, v in self.params['blocks'][i].items()}
        self.adam_t = 0

    def _softmax(self, z):
        exp_z = np.exp(z - np.max(z, axis=-1, keepdims=True))
        return exp_z / np.sum(exp_z, axis=-1, keepdims=True)

    def _layer_norm_forward(self, x, gamma, beta, epsilon=1e-5):
        mean = np.mean(x, axis=-1, keepdims=True)
        variance = np.var(x, axis=-1, keepdims=True)
        x_normalized = (x - mean) / np.sqrt(variance + epsilon)
        out = gamma * x_normalized + beta
        cache = (x, x_normalized, mean, variance, gamma, epsilon)
        return out, cache

    def _multi_head_attention_forward(self, x, W_qkv, W_o):
        batch_size, seq_len, d_model = x.shape
        d_head = d_model // self.num_heads
        qkv = x @ W_qkv
        q, k, v = np.split(qkv, 3, axis=-1)
        q = q.reshape(batch_size, seq_len, self.num_heads, d_head).transpose(0, 2, 1, 3)
        k = k.reshape(batch_size, seq_len, self.num_heads, d_head).transpose(0, 2, 1, 3)
        v = v.reshape(batch_size, seq_len, self.num_heads, d_head).transpose(0, 2, 1, 3)
        scores = (q @ k.transpose(0, 1, 3, 2)) / np.sqrt(d_head)
        weights = self._softmax(scores)
        attention = weights @ v
        attention = attention.transpose(0, 2, 1, 3).reshape(batch_size, seq_len, d_model)
        out = attention @ W_o
        cache = (x, q, k, v, weights, attention, W_qkv, W_o)
        return out, cache

    def forward(self, X_batch):
        batch_size, seq_len = X_batch.shape
        cache = {'X_batch': X_batch, 'blocks': {}}
        x = self.params['E'][X_batch] + self.pos_encoding[:, :seq_len, :]

        for i in range(self.num_blocks):
            block_params = self.params['blocks'][i]
            ln1_in = x
            mha_out, mha_cache = self._multi_head_attention_forward(ln1_in, block_params['W_qkv'], block_params['W_o'])
            add_norm1_in = ln1_in + mha_out
            add_norm1_out, ln1_cache = self._layer_norm_forward(add_norm1_in, block_params['ln1_gamma'], block_params['ln1_beta'])

            ln2_in = add_norm1_out
            ffn_hidden = np.maximum(0, ln2_in @ block_params['W1'] + block_params['b1'])
            ffn_out = ffn_hidden @ block_params['W2'] + block_params['b2']
            add_norm2_in = ln2_in + ffn_out
            add_norm2_out, ln2_cache = self._layer_norm_forward(add_norm2_in, block_params['ln2_gamma'], block_params['ln2_beta'])

            x = add_norm2_out
            cache['blocks'][i] = {'mha_cache': mha_cache, 'ln1_cache': ln1_cache, 'ln2_cache': ln2_cache, 'ffn_hidden': ffn_hidden, 'ln1_in': ln1_in, 'ln2_in': ln2_in}

        pooled_out = np.mean(x, axis=1)
        logits = pooled_out @ self.params['W_out'] + self.params['b_out']
        probs = self._softmax(logits)
        cache.update({'final_x': x, 'pooled_out': pooled_out, 'probs': probs})
        return probs, cache

    def _layer_norm_backward(self, d_out, cache):
        x, x_norm, mean, var, gamma, eps = cache
        N, D = x.shape[0]*x.shape[1], x.shape[2]
        d_x_norm = d_out * gamma
        d_var = np.sum(d_x_norm * (x - mean) * -0.5 * (var + eps)**(-1.5), axis=-1, keepdims=True)
        d_mean = np.sum(d_x_norm * -1 / np.sqrt(var + eps), axis=-1, keepdims=True) + d_var * np.sum(-2 * (x - mean), axis=-1, keepdims=True) / D
        d_x = d_x_norm / np.sqrt(var + eps) + d_var * 2 * (x - mean) / D + d_mean / D
        d_gamma = np.sum(d_out * x_norm, axis=(0, 1), keepdims=True)
        d_beta = np.sum(d_out, axis=(0, 1), keepdims=True)
        return d_x, d_gamma, d_beta

    def _multi_head_attention_backward(self, d_out, cache):
        x, q, k, v, weights, attention, W_qkv, W_o = cache
        batch_size, seq_len, d_model = x.shape
        d_head = d_model // self.num_heads
        d_attention = d_out @ W_o.T
        d_W_o = attention.reshape(batch_size * seq_len, d_model).T @ d_out.reshape(batch_size * seq_len, d_model)
        d_attention = d_attention.reshape(batch_size, seq_len, self.num_heads, d_head).transpose(0, 2, 1, 3)
        d_weights = d_attention @ v.transpose(0, 1, 3, 2)
        d_v = weights.transpose(0, 1, 3, 2) @ d_attention
        d_scores = weights * (d_weights - np.sum(d_weights * weights, axis=-1, keepdims=True))
        d_q = d_scores @ k
        d_k = d_scores.transpose(0, 1, 3, 2) @ q
        d_q /= np.sqrt(d_head)
        d_k /= np.sqrt(d_head)
        d_q = d_q.transpose(0, 2, 1, 3).reshape(batch_size, seq_len, d_model)
        d_k = d_k.transpose(0, 2, 1, 3).reshape(batch_size, seq_len, d_model)
        d_v = d_v.transpose(0, 2, 1, 3).reshape(batch_size, seq_len, d_model)
        d_qkv = np.concatenate([d_q, d_k, d_v], axis=-1)
        d_W_qkv = x.reshape(batch_size * seq_len, d_model).T @ d_qkv.reshape(batch_size * seq_len, 3 * d_model)
        d_x = d_qkv @ W_qkv.T
        return d_x, d_W_qkv, d_W_o

    def backward(self, y_batch, cache):
        grads = {k: np.zeros_like(v) if isinstance(v, np.ndarray) else {} for k, v in self.params.items()}
        for i in range(self.num_blocks):
             grads['blocks'][i] = {k: np.zeros_like(v) for k, v in self.params['blocks'][i].items()}
        probs, X_batch, final_x, pooled_out = cache['probs'], cache['X_batch'], cache['final_x'], cache['pooled_out']
        batch_size, seq_len = X_batch.shape

        d_logits = (probs - np.eye(self.output_dim)[y_batch]) / batch_size
        grads['W_out'] = pooled_out.T @ d_logits
        grads['b_out'] = np.sum(d_logits, axis=0)

        d_pooled = d_logits @ self.params['W_out'].T
        d_x = np.tile(d_pooled[:, np.newaxis, :], (1, seq_len, 1)) / seq_len

        for i in reversed(range(self.num_blocks)):
            block_params = self.params['blocks'][i]
            block_cache = cache['blocks'][i]

            d_add_norm2_in, d_ln2_gamma, d_ln2_beta = self._layer_norm_backward(d_x, block_cache['ln2_cache'])
            grads['blocks'][i]['ln2_gamma'] += d_ln2_gamma
            grads['blocks'][i]['ln2_beta'] += d_ln2_beta

            d_ln2_in = d_add_norm2_in
            d_ffn_out = d_add_norm2_in

            d_ffn_hidden = d_ffn_out @ block_params['W2'].T
            grads['blocks'][i]['W2'] += block_cache['ffn_hidden'].reshape(-1, block_cache['ffn_hidden'].shape[-1]).T @ d_ffn_out.reshape(-1, d_ffn_out.shape[-1])
            grads['blocks'][i]['b2'] += np.sum(d_ffn_out, axis=(0, 1))

            d_ffn_hidden[block_cache['ffn_hidden'] <= 0] = 0

            d_add_norm1_out = d_ffn_hidden @ block_params['W1'].T
            grads['blocks'][i]['W1'] += block_cache['ln2_in'].reshape(-1, block_cache['ln2_in'].shape[-1]).T @ d_ffn_hidden.reshape(-1, d_ffn_hidden.shape[-1])
            grads['blocks'][i]['b1'] += np.sum(d_ffn_hidden, axis=(0, 1))

            d_x = d_ln2_in + d_add_norm1_out

            d_add_norm1_in, d_ln1_gamma, d_ln1_beta = self._layer_norm_backward(d_x, block_cache['ln1_cache'])
            grads['blocks'][i]['ln1_gamma'] += d_ln1_gamma
            grads['blocks'][i]['ln1_beta'] += d_ln1_beta

            d_ln1_in = d_add_norm1_in
            d_mha_out = d_add_norm1_in

            d_x_mha, d_W_qkv, d_W_o = self._multi_head_attention_backward(d_mha_out, block_cache['mha_cache'])
            grads['blocks'][i]['W_qkv'] += d_W_qkv
            grads['blocks'][i]['W_o'] += d_W_o

            d_x = d_ln1_in + d_x_mha

        d_E = d_x
        np.add.at(grads['E'], X_batch, d_E)
        return grads

    def compute_loss(self, probs, y_batch):
        batch_size = y_batch.shape[0]
        return -np.sum(np.log(probs[np.arange(batch_size), y_batch] + 1e-9)) / batch_size

    def update_params_with_adam(self, grads, learning_rate, beta1, beta2, epsilon=1e-8):
        self.adam_t += 1
        for p_key in ['E', 'W_out', 'b_out']:
            self._adam_update(p_key, grads[p_key], learning_rate, beta1, beta2, epsilon)
        for i in range(self.num_blocks):
            for key in self.params['blocks'][i]:
                self._adam_update(('blocks', i, key), grads['blocks'][i][key], learning_rate, beta1, beta2, epsilon)

    def _adam_update(self, key_tuple, grad, learning_rate, beta1, beta2, epsilon):
        # Helper for nested param update
        if isinstance(key_tuple, str): key_tuple = (key_tuple,)

        param_ref = self.params
        m_ref = self.adam_m
        v_ref = self.adam_v
        for key in key_tuple:
            param_ref = param_ref[key]
            m_ref = m_ref[key]
            v_ref = v_ref[key]

        m_ref[...] = beta1 * m_ref + (1 - beta1) * grad
        v_ref[...] = beta2 * v_ref + (1 - beta2) * (grad**2)
        m_hat = m_ref / (1 - beta1**self.adam_t)
        v_hat = v_ref / (1 - beta2**self.adam_t)
        param_ref[...] -= learning_rate * m_hat / (np.sqrt(v_hat) + epsilon)

    def predict(self, X):
        probs, _ = self.forward(X)
        return np.argmax(probs, axis=1)



## Train

### RNN

In [18]:
# --- Recommended Hyperparameters ---
EMBEDDING_DIM = 100
HIDDEN_DIM = 16      # CRITICAL: Increase this back to 128 or even 256
VOCAB_SIZE = 15000    # Use a larger vocabulary
BATCH_SIZE = 256      # 64 is a good starting point
NUM_EPOCHS = 20       # CRITICAL: Train for more epochs since the model was still learning
LEARNING_RATE = 0.001

# --- Adam Hyperparameters ---
BETA1 = 0.9
BETA2 = 0.999
EPSILON = 1e-8

# Define the output dimension based on the number of unique labels
OUTPUT_DIM = len(label_to_idx)

# --- 2. Initialize the Model ---
model = RNN(
    vocab_size=VOCAB_SIZE,
    embedding_dim=EMBEDDING_DIM,
    hidden_dim=HIDDEN_DIM,
    output_dim=OUTPUT_DIM
)

for epoch in range(NUM_EPOCHS):
    epoch_loss = 0
    train_batches = create_mini_batches(X_train, y_train, BATCH_SIZE)

    for i, (X_batch, y_batch) in enumerate(train_batches):
        probs, cache = model.forward(X_batch)
        loss = model.compute_loss(probs, y_batch)
        epoch_loss += loss

        grads = model.backward(y_batch, cache)
        model.update_params_with_adam(grads, LEARNING_RATE, BETA1, BETA2, EPSILON)

        if i % 50 == 0:
            sys.stdout.write(f"\rEpoch {epoch+1}/{NUM_EPOCHS} | Batch {i} | Loss: {loss:.4f}")

    avg_epoch_loss = epoch_loss / (i + 1)
    val_preds = model.predict(X_val[:2000])  # Increase validation size
    val_accuracy = np.mean(val_preds == y_val[:2000])

    print(f"\nEnd of Epoch {epoch+1} | Avg Loss: {avg_epoch_loss:.4f} | Val Accuracy: {val_accuracy:.4f}")

# --- 5. Final Evaluation on Test Set ---
print("\nTraining finished. Evaluating on test set...")
test_preds = model.predict(X_test)

# Calculate both accuracy and macro F1-score
test_accuracy, test_macro_f1 = calculate_metrics(y_test, test_preds, model.output_dim)

print(f"Final Test Accuracy: {test_accuracy:.4f}")
print(f"Final Test Macro F1-Score: {test_macro_f1:.4f}")

Epoch 1/20 | Batch 100 | Loss: 1.6020
End of Epoch 1 | Avg Loss: 1.5748 | Val Accuracy: 0.2985
Epoch 2/20 | Batch 100 | Loss: 1.4195
End of Epoch 2 | Avg Loss: 1.4595 | Val Accuracy: 0.3535
Epoch 3/20 | Batch 100 | Loss: 1.3209
End of Epoch 3 | Avg Loss: 1.2747 | Val Accuracy: 0.4250
Epoch 4/20 | Batch 100 | Loss: 1.1264
End of Epoch 4 | Avg Loss: 1.1504 | Val Accuracy: 0.4050
Epoch 5/20 | Batch 100 | Loss: 1.1125
End of Epoch 5 | Avg Loss: 1.0682 | Val Accuracy: 0.4300
Epoch 6/20 | Batch 100 | Loss: 1.0442
End of Epoch 6 | Avg Loss: 1.0014 | Val Accuracy: 0.4175
Epoch 7/20 | Batch 100 | Loss: 1.0513
End of Epoch 7 | Avg Loss: 0.9445 | Val Accuracy: 0.4190
Epoch 8/20 | Batch 100 | Loss: 0.8426
End of Epoch 8 | Avg Loss: 0.9087 | Val Accuracy: 0.4175
Epoch 9/20 | Batch 100 | Loss: 0.7640
End of Epoch 9 | Avg Loss: 0.8584 | Val Accuracy: 0.4065
Epoch 10/20 | Batch 100 | Loss: 0.7682
End of Epoch 10 | Avg Loss: 0.8342 | Val Accuracy: 0.4030
Epoch 11/20 | Batch 100 | Loss: 0.8194
End of Ep

In [19]:
save_model(model, word_to_idx, label_to_idx, '/content/sample_data/model_RNN.npz')

Model saved to /content/sample_data/model_RNN.npz


### LSTM


In [20]:
# --- 1. Model & Training Hyperparameters ---
EMBEDDING_DIM = 100
HIDDEN_DIM = 16
OUTPUT_DIM = len(label_to_idx)
LEARNING_RATE = 0.001
NUM_EPOCHS = 10
BATCH_SIZE = 64

# --- Adam Hyperparameters ---
BETA1 = 0.9
BETA2 = 0.999

# --- 2. Initialize the Model ---
# The only change is here: instantiate LSTM instead of RNN
model = LSTM(
    vocab_size=VOCAB_SIZE,
    embedding_dim=EMBEDDING_DIM,
    hidden_dim=HIDDEN_DIM,
    output_dim=OUTPUT_DIM
)

# --- 3. The Training Loop (Identical to before) ---
for epoch in range(NUM_EPOCHS):
    epoch_loss = 0
    train_batches = create_mini_batches(X_train, y_train, BATCH_SIZE)

    for i, (X_batch, y_batch) in enumerate(train_batches):
        probs, cache = model.forward(X_batch)
        loss = model.compute_loss(probs, y_batch)
        epoch_loss += loss

        grads = model.backward(y_batch, cache)
        model.update_params_with_adam(grads, LEARNING_RATE, BETA1, BETA2)

        if i % 100 == 0:
            sys.stdout.write(f"\rEpoch {epoch+1}/{NUM_EPOCHS} | Batch {i} | Loss: {loss:.4f}")

    avg_epoch_loss = epoch_loss / (i + 1)
    val_preds = model.predict(X_val[:2000])
    val_accuracy = np.mean(val_preds == y_val[:2000])

    print(f"\nEnd of Epoch {epoch+1} | Avg Loss: {avg_epoch_loss:.4f} | Val Accuracy: {val_accuracy:.4f}")


print("\nTraining finished. Evaluating on test set...")
test_preds = model.predict(X_test)

# Calculate both accuracy and macro F1-score
test_accuracy, test_macro_f1 = calculate_metrics(y_test, test_preds, model.output_dim)

print(f"Final Test Accuracy: {test_accuracy:.4f}")
print(f"Final Test Macro F1-Score: {test_macro_f1:.4f}")

Epoch 1/10 | Batch 500 | Loss: 1.4292
End of Epoch 1 | Avg Loss: 1.5048 | Val Accuracy: 0.4125
Epoch 2/10 | Batch 500 | Loss: 1.0756
End of Epoch 2 | Avg Loss: 1.2261 | Val Accuracy: 0.5190
Epoch 3/10 | Batch 500 | Loss: 0.8758
End of Epoch 3 | Avg Loss: 1.0565 | Val Accuracy: 0.5690
Epoch 4/10 | Batch 500 | Loss: 0.8928
End of Epoch 4 | Avg Loss: 0.8762 | Val Accuracy: 0.6500
Epoch 5/10 | Batch 500 | Loss: 0.8422
End of Epoch 5 | Avg Loss: 0.6981 | Val Accuracy: 0.7000
Epoch 6/10 | Batch 500 | Loss: 0.4923
End of Epoch 6 | Avg Loss: 0.5600 | Val Accuracy: 0.7360
Epoch 7/10 | Batch 500 | Loss: 0.4530
End of Epoch 7 | Avg Loss: 0.4669 | Val Accuracy: 0.7405
Epoch 8/10 | Batch 500 | Loss: 0.4696
End of Epoch 8 | Avg Loss: 0.4009 | Val Accuracy: 0.7695
Epoch 9/10 | Batch 500 | Loss: 0.3841
End of Epoch 9 | Avg Loss: 0.3491 | Val Accuracy: 0.7740
Epoch 10/10 | Batch 500 | Loss: 0.3913
End of Epoch 10 | Avg Loss: 0.3252 | Val Accuracy: 0.7670

Training finished. Evaluating on test set...
Fi

In [23]:
save_model(model, word_to_idx, label_to_idx, '/content/sample_data/model_LSTM.npz')

Model saved to /content/sample_data/model_LSTM.npz


### Transformer

In [26]:
import copy
import sys

# --- Hyperparameters ---
VOCAB_SIZE = 15000
MAX_LEN = 50
EMBEDDING_DIM = 100
NUM_HEADS = 1
FF_HIDDEN_DIM = 128
NUM_BLOCKS = 1
OUTPUT_DIM = 5
LEARNING_RATE = 0.001
NUM_EPOCHS = 10
BATCH_SIZE = 64
BETA1 = 0.9
BETA2 = 0.999
PATIENCE = 3   # NEW: Stop training if val_loss doesn't improve after 3 epochs

# --- Initialize the Model ---
model = TransformerEncoder(
    vocab_size=VOCAB_SIZE,
    max_len=MAX_LEN,
    embedding_dim=EMBEDDING_DIM,
    num_heads=NUM_HEADS,
    ff_hidden_dim=FF_HIDDEN_DIM,
    num_blocks=NUM_BLOCKS,
    output_dim=OUTPUT_DIM
)
print("Transformer model initialized.")

# --- Early Stopping Setup ---
best_val_loss = float('inf')
patience_counter = 0
best_model_params = None

# --- Training Loop ---
for epoch in range(NUM_EPOCHS):
    # Training Phase
    epoch_loss = 0
    train_batches = create_mini_batches(X_train, y_train, BATCH_SIZE)
    for i, (X_batch, y_batch) in enumerate(train_batches):
        probs, cache = model.forward(X_batch)
        loss = model.compute_loss(probs, y_batch)
        epoch_loss += loss
        grads = model.backward(y_batch, cache)
        model.update_params_with_adam(grads, LEARNING_RATE, BETA1, BETA2)

        if i % 10 == 0:
            sys.stdout.write(f"\rEpoch {epoch+1}/{NUM_EPOCHS} | Batch {i} | Loss: {loss:.4f}")

    avg_train_loss = epoch_loss / (i + 1)

    # Validation Phase
    val_loss = 0
    val_batches = create_mini_batches(X_val, y_val, BATCH_SIZE, shuffle=False)
    for i, (X_batch_val, y_batch_val) in enumerate(val_batches):
        probs, _ = model.forward(X_batch_val)
        val_loss += model.compute_loss(probs, y_batch_val)
    avg_val_loss = val_loss / (i + 1)

    val_preds = model.predict(X_val)
    val_accuracy, _ = calculate_metrics(y_val, val_preds, model.output_dim)

    print(f"\nEnd of Epoch {epoch+1} | Train Loss: {avg_train_loss:.4f} | "
          f"Val Loss: {avg_val_loss:.4f} | Val Accuracy: {val_accuracy:.4f}")

    # --- Early Stopping ---
    if avg_val_loss < best_val_loss:
        print(f" Validation loss improved from {best_val_loss:.4f} → {avg_val_loss:.4f}. Saving model...")
        best_val_loss = avg_val_loss
        best_model_params = copy.deepcopy(model.params)
        patience_counter = 0
    else:
        patience_counter += 1
        print(f" Validation loss did not improve. Patience: {patience_counter}/{PATIENCE}")

    if patience_counter >= PATIENCE:
        print(" Early stopping triggered.")
        break

# --- Final Evaluation ---
print("\nTraining finished. Loading best model and evaluating on test set...")
if best_model_params:
    model.params = best_model_params  # restore best params

test_preds = model.predict(X_test)
test_accuracy, test_macro_f1 = calculate_metrics(y_test, test_preds, model.output_dim)
print(f"Final Test Accuracy: {test_accuracy:.4f}")
print(f"Final Test Macro F1-Score: {test_macro_f1:.4f}")


Transformer model initialized.
Epoch 1/10 | Batch 570 | Loss: 0.9092
End of Epoch 1 | Train Loss: 1.2425 | Val Loss: 0.8979 | Val Accuracy: 0.6207
 Validation loss improved from inf → 0.8979. Saving model...
Epoch 2/10 | Batch 570 | Loss: 0.6665
End of Epoch 2 | Train Loss: 0.6453 | Val Loss: 0.6134 | Val Accuracy: 0.7823
 Validation loss improved from 0.8979 → 0.6134. Saving model...
Epoch 3/10 | Batch 570 | Loss: 0.4600
End of Epoch 3 | Train Loss: 0.4965 | Val Loss: 0.6287 | Val Accuracy: 0.7794
 Validation loss did not improve. Patience: 1/3
Epoch 4/10 | Batch 570 | Loss: 0.5258
End of Epoch 4 | Train Loss: 0.4320 | Val Loss: 0.6150 | Val Accuracy: 0.7898
 Validation loss did not improve. Patience: 2/3
Epoch 5/10 | Batch 570 | Loss: 0.4315
End of Epoch 5 | Train Loss: 0.3998 | Val Loss: 0.6269 | Val Accuracy: 0.7935
 Validation loss did not improve. Patience: 3/3
 Early stopping triggered.

Training finished. Loading best model and evaluating on test set...
Final Test Accuracy: 0.7

In [27]:
save_model(model, word_to_idx, label_to_idx, '/content/sample_data/model_Transformer_128.npz')

Model saved to /content/sample_data/model_Transformer_128.npz


In [28]:
test_macro_f1

np.float64(0.7618576316756924)