# Import libraries

In [None]:
import os
import numpy as np
import pandas as pd
import random
import matplotlib.pyplot as plt

import re
import spacy
from collections import Counter
from tqdm.notebook import tqdm
from datasets import load_dataset

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader

from sklearn.utils import shuffle
from sklearn.metrics import classification_report
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.preprocessing import LabelEncoder

from datasets import load_dataset
from transformers import AutoTokenizer, AutoModel
from huggingface_hub import login

# Tokenization functions

In [None]:
_patterns = [r"\'", r"\"", r"\.", r"<br \/>", r",", r"\(", r"\)", r"\!", r"\?", r"\;", r"\:", r"\s+"]

_replacements = [" '  ", "", " . ", " ", " , ", " ( ", " ) ", " ! ", " ? ", " ", " ", " "]

_patterns_dict = list((re.compile(p), r) for p, r in zip(_patterns, _replacements))

def _basic_english_normalize(line):
    r"""
    Basic normalization for a line of text.
    Normalization includes
    - lowercasing
    - complete some basic text normalization for English words as follows:
        add spaces before and after '\''
        remove '\"',
        add spaces before and after '.'
        replace '<br \/>'with single space
        add spaces before and after ','
        add spaces before and after '('
        add spaces before and after ')'
        add spaces before and after '!'
        add spaces before and after '?'
        replace ';' with single space
        replace ':' with single space
        replace multiple spaces with single space

    Returns a list of tokens after splitting on whitespace.
    """

    line = line.lower()
    for pattern_re, replaced_str in _patterns_dict:
        line = pattern_re.sub(replaced_str, line)
    return line.split()



def get_tokenizer(tokenizer, language='en'):
    r"""
    Generate tokenizer function for a string sentence.

    Arguments:
        tokenizer: the name of tokenizer function. If None, it returns split()
            function, which splits the string sentence by space.
            If basic_english, it returns _basic_english_normalize() function,
            which normalize the string first and split by space. If a callable
            function, it will return the function. If a tokenizer library
            (e.g. spacy, moses, toktok, revtok, subword), it returns the
            corresponding library.
        language: Default en

    Examples:
        >>> import torchtext
        >>> from torchtext.data import get_tokenizer
        >>> tokenizer = get_tokenizer("basic_english")
        >>> tokens = tokenizer("You can now install TorchText using pip!")
        >>> tokens
        >>> ['you', 'can', 'now', 'install', 'torchtext', 'using', 'pip', '!']

    """

    # default tokenizer is string.split(), added as a module function for serialization


    if tokenizer == "basic_english":
        if language != 'en':
            raise ValueError("Basic normalization is only available for Enlish(en)")
        return _basic_english_normalize

    # simply return if a function is passed
    if callable(tokenizer):
        return tokenizer

    if tokenizer == "spacy":
        try:
            import spacy
            spacy = spacy.load(language)
            return partial(_spacy_tokenize, spacy=spacy)
        except ImportError:
            print("Please install SpaCy. "
                  "See the docs at https://spacy.io for more information.")
            raise
        except AttributeError:
            print("Please install SpaCy and the SpaCy {} tokenizer. "
                  "See the docs at https://spacy.io for more "
                  "information.".format(language))
            raise
    elif tokenizer == "moses":
        try:
            from sacremoses import MosesTokenizer
            moses_tokenizer = MosesTokenizer()
            return moses_tokenizer.tokenize
        except ImportError:
            print("Please install SacreMoses. "
                  "See the docs at https://github.com/alvations/sacremoses "
                  "for more information.")
            raise
    elif tokenizer == "toktok":
        try:
            from nltk.tokenize.toktok import ToktokTokenizer
            toktok = ToktokTokenizer()
            return toktok.tokenize
        except ImportError:
            print("Please install NLTK. "
                  "See the docs at https://nltk.org  for more information.")
            raise
    elif tokenizer == 'revtok':
        try:
            import revtok
            return revtok.tokenize
        except ImportError:
            print("Please install revtok.")
            raise
    elif tokenizer == 'subword':
        try:
            import revtok
            return partial(revtok.tokenize, decap=True)
        except ImportError:
            print("Please install revtok.")
            raise
    raise ValueError("Requested tokenizer {}, valid choices are a "
                     "callable that takes a single string as input, "
                     "\"revtok\" for the revtok reversible tokenizer, "
                     "\"subword\" for the revtok caps-aware tokenizer, "
                     "\"spacy\" for the SpaCy English tokenizer, or "
                     "\"moses\" for the NLTK port of the Moses tokenization "
                     "script.".format(tokenizer))

# Set random seed

In [None]:
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

# Set device

In [None]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", DEVICE)

# Load datasets

In [None]:
# Load the MultilingualSentiment dataset
dataset = load_dataset("clapAI/MultiLingualSentiment")
print(dataset)

In [None]:
# Get a small number of English samples as training dataset
english_train_dataset = dataset['train'].filter(lambda example: example['language'] == 'en')
english_shuffled_dataset = english_train_dataset.shuffle(seed=42)
english_sampled_dataset = english_shuffled_dataset.select(range(10000))
# english_sampled_dataset.select(range(5)).to_pandas()

In [None]:
# Generate French validation dataset
val_fr = dataset['validation'].filter(lambda x: x['language'] == 'fr')
val_fr_shuffled = val_fr.shuffle(seed=42)
val_fr_sampled = val_fr_shuffled.select(range(1250))
# val_fr_sampled.select(range(50)).to_pandas()

In [None]:
# Generate English validation dataset
val_en = dataset['validation'].filter(lambda x: x['language'] == 'en')
val_en_shuffled = val_en.shuffle(seed=42)
val_en_sampled = val_en_shuffled.select(range(1250))
# val_en_sampled.select(range(5)).to_pandas()

In [None]:
# Generate Chinese validation dataset
val_zh = dataset['validation'].filter(lambda x: x['language'] == 'zh')
val_zh_shuffled = val_zh.shuffle(seed=42)
val_zh_sampled = val_zh_shuffled.select(range(1250))
# val_zh_sampled.select(range(5)).to_pandas()

In [None]:
# Generate French test dataset
test_fr = dataset['test'].filter(lambda x: x['language'] == 'fr')
test_fr_shuffled = test_fr.shuffle(seed=42)
test_fr_sampled = test_fr_shuffled.select(range(1250))
# test_fr_sampled.select(range(5)).to_pandas()

In [None]:
# Training data (English training dataset)
x_train_text = english_sampled_dataset["text"]  # Training input texts in English
y_train = english_sampled_dataset["label"]     # Corresponding training labels

# Validation data (French validation dataset)
x_val_fr_text = val_fr_sampled["text"]  # Validation input texts in French
y_val_fr = val_fr_sampled["label"]      # Corresponding French validation labels

# Validation data (English validation dataset)
x_val_en_text = val_en_sampled["text"]  # Validation input texts in English
y_val_en = val_en_sampled["label"]      # Corresponding English validation labels

# Validation data (Chinese validation dataset)
x_val_zh_text = val_zh_sampled["text"]  # Validation input texts in Chinese
y_val_zh = val_zh_sampled["label"]      # Corresponding Chinese validation labels

# Test data (e.g. French test dataset, unchanged)
x_test_text = test_fr_sampled["text"]  # Test input texts in French
y_test = test_fr_sampled["label"]      # Corresponding test labels

# Load pretrained mBERT embeddings

In [None]:
# Load pretrained multilingual BERT model and tokenizer
model_name = "bert-base-multilingual-cased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
encoder = AutoModel.from_pretrained(model_name)

# Move model to device and set to evaluation mode
encoder = encoder.to(DEVICE)
encoder.eval()

# Freeze model parameters to avoid training
for param in encoder.parameters():
    param.requires_grad = False

# Generate sentence-level embeddings

In [None]:
# Define a function for sentence-level embedding generation
def encode_texts(texts, batch_size=32):
    """
    Encode a list of texts into sentence embeddings using pretrained encoder.

    Args:
        texts (list of str): List of input sentences.
        batch_size (int): Number of samples per batch.

    Returns:
        torch.Tensor: Tensor of shape (num_samples, hidden_size) with sentence embeddings.
    """
    embeddings = []

    for i in tqdm(range(0, len(texts), batch_size)):
        batch_texts = texts[i:i+batch_size]
        inputs = tokenizer(batch_texts, return_tensors="pt", padding=True, truncation=True, max_length=128).to(DEVICE)

        with torch.no_grad():
            outputs = encoder(**inputs)
            # Take the [CLS] token representation as the sentence embedding
            batch_embs = outputs.last_hidden_state[:, 0, :]
            embeddings.append(batch_embs.cpu())

    return torch.cat(embeddings, dim=0)

In [None]:
# Print types and first few elements to check label formats
print(f"type(y_train): {type(y_train)}")
print(f"First 10 y_train labels: {y_train[:10]}")

print(f"type(y_val_fr): {type(y_val_fr)}")
print(f"First 10 y_val_fr labels: {y_val_fr[:10]}")

print(f"type(y_val_en): {type(y_val_en)}")
print(f"First 10 y_val_en labels: {y_val_en[:10]}")

print(f"type(y_val_zh): {type(y_val_zh)}")
print(f"First 10 y_val_zh labels: {y_val_zh[:10]}")

print(f"type(y_test): {type(y_test)}")
print(f"First 10 y_test labels: {y_test[:10]}")

In [None]:
# Define label to index mapping
label2id = {
    "negative": 0,
    "neutral": 1,
    "positive": 2
}

# Show mapping
print("Label to ID mapping:", label2id)

In [None]:
# Convert string labels to integer ids using the mapping
y_train = [label2id[label] for label in y_train]
y_val_fr = [label2id[label] for label in y_val_fr]
y_val_en = [label2id[label] for label in y_val_en]
y_val_zh = [label2id[label] for label in y_val_zh]
y_test = [label2id[label] for label in y_test]

In [None]:
# Generate sentence-level embeddings
X_train = encode_texts(x_train_text)
X_val_fr = encode_texts(x_val_fr_text)
X_val_en = encode_texts(x_val_en_text)
X_val_zh = encode_texts(x_val_zh_text)
X_test = encode_texts(x_test_text)

y_train = torch.tensor(y_train)
y_val_fr = torch.tensor(y_val_fr)
y_val_en = torch.tensor(y_val_en)
y_val_zh = torch.tensor(y_val_zh)
y_test = torch.tensor(y_test)

In [None]:
# Convert tensors to NumPy arrays
X_train_np = X_train.numpy()
X_val_fr_np = X_val_fr.numpy()
X_val_en_np = X_val_en.numpy()
X_val_zh_np = X_val_zh.numpy()
X_test_np   = X_test.numpy()

y_train_np = y_train.numpy()
y_val_fr_np = y_val_fr.numpy()
y_val_en_np = y_val_en.numpy()
y_val_zh_np = y_val_zh.numpy()
y_test_np   = y_test.numpy()

# Combine LR with sentence-level embeddings

In [None]:
# Train the model using English training set
model = LogisticRegression(solver='saga', max_iter=1000)
model.fit(X_train_np, y_train_np)

# Predict and evaluate on French validation set
y_pred_fr = model.predict(X_val_fr_np)
print("Validation on French:")
print(classification_report(y_val_fr_np, y_pred_fr))

# Predict and evaluate on English validation set
y_pred_en = model.predict(X_val_en_np)
print("Validation on English:")
print(classification_report(y_val_en_np, y_pred_en))

# Predict and evaluate on Chinese validation set
y_pred_zh = model.predict(X_val_zh_np)
print("Validation on Chinese:")
print(classification_report(y_val_zh_np, y_pred_zh))

# Generate token-level embeddings

In [None]:
# Define a function for token-level embedding generation
def encode_token_embeddings(texts, batch_size=32, max_len=128):
    """
    Encode texts into mBERT token embeddings (last_hidden_state).
    Returns: Tensor of shape (num_samples, max_len, hidden_dim)
    """
    all_embs = []

    for i in tqdm(range(0, len(texts), batch_size)):
        batch_texts = texts[i:i+batch_size]
        inputs = tokenizer(batch_texts, return_tensors="pt", padding="max_length", truncation=True,
                           max_length=max_len).to(DEVICE)

        with torch.no_grad():
            outputs = encoder(**inputs)
            token_embs = outputs.last_hidden_state  # [batch, max_len, 768]
            all_embs.append(token_embs.cpu())

    return torch.cat(all_embs, dim=0)

In [None]:
def make_loader(embeddings, labels, batch_size=64, shuffle=False):
    X_tensor = torch.tensor(embeddings, dtype=torch.float32)
    y_tensor = torch.tensor(labels, dtype=torch.long)
    dataset = TensorDataset(X_tensor, y_tensor)
    return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

# Set RNN

In [None]:
class SentimentRNN_PretrainedEmbedding(nn.Module):
    def __init__(self, no_layers, embedding_dim, hidden_dim, output_dim, drop_prob=0.1):
        super(SentimentRNN_PretrainedEmbedding, self).__init__()

        self.no_layers = no_layers
        self.hidden_dim = hidden_dim
        self.drop_prob = drop_prob

        # No embedding layer here, input is already embedding vectors

        # LSTM layers
        self.lstm = nn.LSTM(input_size=embedding_dim,
                            hidden_size=hidden_dim,
                            num_layers=no_layers,
                            batch_first=True,
                            dropout=drop_prob,
                            bidirectional=False)

        # Dropout layer
        self.dropout = nn.Dropout(drop_prob)

        # Fully connected output layer
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x, hidden=None):
        # x shape: [batch_size, seq_len, embedding_dim]

        # If no hidden state is given, initialize it
        if hidden is None:
            batch_size = x.size(0)
            hidden = self.init_hidden(batch_size)

        lstm_out, hidden = self.lstm(x, hidden)  # lstm_out shape: [batch, seq_len, hidden_dim]

        lstm_out = self.dropout(lstm_out)

        last_hidden = lstm_out[:, -1, :].contiguous()  # last time step output

        out = self.dropout(last_hidden)
        out = self.fc(out)

        return out, hidden

    def init_hidden(self, batch_size, device=None):
        if device is None:
            device = next(self.parameters()).device
        # Initialize hidden and cell states with zeros on the given device
        h0 = torch.zeros(self.no_layers, batch_size, self.hidden_dim).to(device)
        c0 = torch.zeros(self.no_layers, batch_size, self.hidden_dim).to(device)
        return (h0, c0)


In [None]:
embedding_dim = 768  # mBERT hidden size
hidden_dim = 64
output_dim = 3       # 3-class classification
no_layers = 2
drop_prob = 0.4

model = SentimentRNN_PretrainedEmbedding(no_layers, embedding_dim, hidden_dim, output_dim, drop_prob)
model.to(DEVICE)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# Combine RNN with token-level embeddings

In [None]:
def train(model, loader):
    model.train()
    total_loss = 0
    for x, y in loader:
        x, y = x.to(DEVICE), y.to(DEVICE)
        optimizer.zero_grad()
        out = model(x)
        loss = criterion(out, y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(loader)

In [None]:
def evaluate(model, loader, y_true):
    model.eval()
    preds = []
    with torch.no_grad():
        for x, _ in loader:
            x = x.to(DEVICE)
            out = model(x)
            pred = torch.argmax(out, dim=1)
            preds.extend(pred.cpu().numpy())
    print(classification_report(y_true, preds))

In [None]:
# Get mBERT token embeddings for each language
X_train_emb = encode_token_embeddings(x_train_text)      # shape: (N, 128, 768)
X_val_en_emb = encode_token_embeddings(x_val_en_text)
X_val_fr_emb = encode_token_embeddings(x_val_fr_text)
X_val_zh_emb = encode_token_embeddings(x_val_zh_text)
X_test_emb = encode_token_embeddings(x_test_text)

# Convert labels to tensors
y_train_tensor = torch.tensor(y_train)
y_val_en_tensor = torch.tensor(y_val_en)
y_val_fr_tensor = torch.tensor(y_val_fr)
y_val_zh_tensor = torch.tensor(y_val_zh)
y_test_tensor = torch.tensor(y_test)

# Construct dataloaders
train_loader = make_loader(X_train_emb, y_train_tensor, shuffle=True)
val_loader_en = make_loader(X_val_en_emb, y_val_en_tensor)
val_loader_fr = make_loader(X_val_fr_emb, y_val_fr_tensor)
val_loader_zh = make_loader(X_val_zh_emb, y_val_zh_tensor)
test_loader = make_loader(X_test_emb, y_test_tensor)

In [None]:
epochs = 10
clip = 5  # Gradient clipping threshold
valid_loss_min = np.inf  # Track minimum validation loss for saving best model

# Initialize lists to store losses and accuracies
epoch_tr_loss, epoch_tr_acc = [], []
epoch_val_loss_fr, epoch_val_loss_en, epoch_val_loss_zh = [], [], []
epoch_val_acc_fr, epoch_val_acc_en, epoch_val_acc_zh = [], [], []

# Dictionary of validation loaders for different languages
valid_sets = {
    "French": val_loader_fr,
    "English": val_loader_en,
    "Chinese": val_loader_zh
}

for epoch in range(epochs):
    # ===== Training =====
    train_losses = []
    train_correct = 0
    train_total = 0
    model.train()  # Set model to training mode

    for inputs, labels in train_loader:
        current_batch_size = inputs.size(0)
        # Initialize hidden state for current batch and move to device
        h = model.init_hidden(current_batch_size, device=inputs.device)
        h = tuple([each.data.to(DEVICE) for each in h])
    
        inputs, labels = inputs.to(DEVICE), labels.to(DEVICE).long()
    
        optimizer.zero_grad()
        output, h = model(inputs, h)
    
        loss = criterion(output, labels)
        loss.backward()
    
        # Clip gradients to prevent exploding gradients
        nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
    
        train_losses.append(loss.item())

        # Calculate number of correct predictions per batch
        _, preds = torch.max(output, 1)
        train_correct += (preds == labels).sum().item()
        train_total += labels.size(0)

    # Calculate average training loss and accuracy for the epoch
    epoch_train_loss = np.mean(train_losses)
    epoch_train_acc = train_correct / train_total

    # Record training loss and accuracy
    epoch_tr_loss.append(epoch_train_loss)
    epoch_tr_acc.append(epoch_train_acc)

    # ===== Validation =====
    model.eval()  # Set model to evaluation mode
    epoch_val_loss = {}
    epoch_val_acc = {}

    with torch.no_grad():
        # Evaluate on each validation set (French, English, Chinese)
        for lang, loader in valid_sets.items():
            val_losses = []
            val_correct = 0
            val_total = 0

            for inputs, labels in loader:
                current_batch_size = inputs.size(0)
                val_h = model.init_hidden(current_batch_size, device=inputs.device)
                val_h = tuple([each.data.to(DEVICE) for each in val_h])

                inputs, labels = inputs.to(DEVICE), labels.to(DEVICE).long()

                output, val_h = model(inputs, val_h)
                val_loss = criterion(output, labels)
                val_losses.append(val_loss.item())

                _, preds = torch.max(output, 1)
                val_correct += (preds == labels).sum().item()
                val_total += labels.size(0)

            # Calculate average validation loss and accuracy for current language
            epoch_val_loss[lang] = np.mean(val_losses)
            epoch_val_acc[lang] = val_correct / val_total

    # Record validation losses and accuracies per language
    epoch_val_loss_fr.append(epoch_val_loss["French"])
    epoch_val_loss_en.append(epoch_val_loss["English"])
    epoch_val_loss_zh.append(epoch_val_loss["Chinese"])

    epoch_val_acc_fr.append(epoch_val_acc["French"])
    epoch_val_acc_en.append(epoch_val_acc["English"])
    epoch_val_acc_zh.append(epoch_val_acc["Chinese"])

    # ===== Logging =====
    print(f"Epoch {epoch + 1}")
    print(f"Train Loss: {epoch_train_loss:.4f} | Train Acc: {epoch_train_acc*100:.2f}%")
    for lang in valid_sets.keys():
        print(f"Val Loss ({lang}): {epoch_val_loss[lang]:.4f} | Val Acc ({lang}): {epoch_val_acc[lang]*100:.2f}%")
    print("="*50)

    # ===== Save best model based on French validation loss =====
    if epoch_val_loss["French"] <= valid_loss_min:
        print(f"Validation loss (French) decreased ({valid_loss_min:.6f} --> {epoch_val_loss['French']:.6f}). Saving model ...")
        torch.save(model.state_dict(), 'state_dict.pt')
        valid_loss_min = epoch_val_loss["French"]


# Plot accuracy and loss

In [None]:
fig = plt.figure(figsize=(20, 6))

# Plot Accuracy
plt.subplot(1, 2, 1)
plt.plot(epoch_tr_acc, label='Train Accuracy')
plt.plot(epoch_val_acc_fr, label='Validation Accuracy (French)')
plt.plot(epoch_val_acc_en, label='Validation Accuracy (English)')
plt.plot(epoch_val_acc_zh, label='Validation Accuracy (Chinese)')
plt.title("Accuracy over Epochs")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.grid(True)

# Plot Loss
plt.subplot(1, 2, 2)
plt.plot(epoch_tr_loss, label='Train Loss')
plt.plot(epoch_val_loss_fr, label='Validation Loss (French)')
plt.plot(epoch_val_loss_en, label='Validation Loss (English)')
plt.plot(epoch_val_loss_zh, label='Validation Loss (Chinese)')
plt.title("Loss over Epochs")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.grid(True)

plt.show()