<a href="https://colab.research.google.com/github/olgagasowska/Machine-Learning-for-Linguists/blob/main/vanilla_custom-built_cells_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers datasets

import numpy as np
import torch
import torch.nn as nn


import numpy as np

class RNNCell:
    def __init__(self, input_size: int, hidden_size: int):
        """
        Initialize a basic RNN cell with Xavier-initialized weights.
        :param input_size: Number of input features.
        :param hidden_size: Number of units in the hidden layer.
        """
        self.hidden_size = hidden_size

        fan_in_Wx = input_size
        fan_out_Wx = hidden_size
        limit_Wx = np.sqrt(6 / (fan_in_Wx + fan_out_Wx))

        fan_in_Wh = hidden_size
        fan_out_Wh = hidden_size
        limit_Wh = np.sqrt(6 / (fan_in_Wh + fan_out_Wh))

        self.Wx = np.random.uniform(-limit_Wx, limit_Wx, size=(input_size, hidden_size))
        self.Wh = np.random.uniform(-limit_Wh, limit_Wh, size=(hidden_size, hidden_size))

        self.bh = np.zeros((1, hidden_size))

        self.tanh = np.tanh

    def forward(self, input_t: np.ndarray, h_prev: np.ndarray) -> np.ndarray:
        """
        Forward pass for a basic RNN cell.
        :param input_t: Input at time step t (batch_size x input_size).
        :param h_prev: Hidden state from the previous time step (batch_size x hidden_size).
        :return: Updated hidden state.
        """
        h_t = self.tanh(np.dot(input_t, self.Wx) + np.dot(h_prev, self.Wh) + self.bh)
        return h_t





class RecurrentLayer(nn.Module):
    def __init__(self, input_size: int, hidden_size: int, cell_type: str = 'RNN', device='cpu'):
        """
        Initialize a recurrent layer with specified cell type (RNN, GRU, or LSTM).
        :param input_size: Number of input features.
        :param hidden_size: Number of units in the hidden layer.
        :param cell_type: Type of recurrent cell ('RNN', 'GRU', 'LSTM').
        """
        super(RecurrentLayer, self).__init__()
        self.hidden_size = hidden_size
        self.device = device

        if cell_type == 'RNN':
            self.cell = nn.RNNCell(input_size, hidden_size, device=device)
        elif cell_type == 'GRU':
            self.cell = nn.GRUCell(input_size, hidden_size, device=device)
        elif cell_type == 'LSTM':
            self.cell = nn.LSTMCell(input_size, hidden_size, device=device)
        else:
            raise ValueError("Unsupported cell type. Choose from 'RNN', 'GRU', or 'LSTM'.")
        self.cell_type = cell_type

    def forward(self, inputs: torch.Tensor) -> torch.Tensor:
        """
        Forward pass through the recurrent layer for a sequence of inputs.
        :param inputs: Input sequence (batch size x sequence length x input size).
        :return: Output sequence (batch size x sequence length x hidden size).
        """
        inputs = inputs.to(self.device)
        batch_size, seq_len, _ = inputs.shape

        h_t = torch.zeros(batch_size, self.hidden_size, device=self.device)
        c_t = torch.zeros(batch_size, self.hidden_size, device=self.device) if self.cell_type == 'LSTM' else None

        outputs = []

        for t in range(seq_len):
            input_t = inputs[:, t, :]
            if self.cell_type == 'LSTM':
                h_t, c_t = self.cell(input_t, (h_t, c_t))
            else:
                h_t = self.cell(input_t, h_t)
            outputs.append(h_t)

        output_tensor = torch.stack(outputs, dim=1)
        return output_tensor




class SimpleRecurrentNetwork:
    def __init__(self, input_size: int, hidden_size: int, output_size: int, cell_type: str = 'RNN', learning_rate: float = 0.001):
        self.learning_rate = learning_rate
        self.recurrent_layer = RecurrentLayer(input_size, hidden_size, cell_type)
        self.weights_hidden_output = np.random.randn(hidden_size, output_size)
        self.bias_output = np.zeros((1, output_size))
        self.sigmoid = lambda x: 1 / (1 + np.exp(-x))

    def forward(self, inputs: np.ndarray) -> np.ndarray:
        outputs = self.recurrent_layer.forward(inputs)
        final_outputs = []
        for t in range(outputs.shape[1]):
            h_t = outputs[:, t, :]
            output_t = self.sigmoid(np.dot(h_t, self.weights_hidden_output) + self.bias_output)
            final_outputs.append(output_t)
        return np.stack(final_outputs, axis=1)

    def compute_loss(self, predictions: np.ndarray, targets: np.ndarray) -> float:
        predictions_clipped = np.clip(predictions, 1e-10, 1 - 1e-10)
        loss = -np.mean(targets * np.log(predictions_clipped) + (1 - targets) * np.log(1 - predictions_clipped))
        return loss

    def train(self, inputs: np.ndarray, targets: np.ndarray, epochs: int = 1000):
        for epoch in range(epochs):
            predictions = self.forward(inputs)
            loss = self.compute_loss(predictions, targets)
            if epoch % 100 == 0:
                print(f"Epoch {epoch+1}/{epochs}, Loss: {loss:.4f}")

    def predict(self, inputs: np.ndarray) -> np.ndarray:
        return self.forward(inputs)

inputs = np.random.randn(2, 5, 3)







import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
from transformers import AutoTokenizer
from datasets import load_dataset

dataset = load_dataset("stanfordnlp/imdb")

model_id = "xlm-roberta-base"
tokenizer = AutoTokenizer.from_pretrained(model_id)

def process(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=512)

tokenized_datasets = dataset.map(process, batched=True)

def convert_to_tensors(dataset):
    input_ids = torch.tensor(dataset["input_ids"])
    labels = torch.tensor(dataset["label"])
    return TensorDataset(input_ids, labels)

train_dataset = convert_to_tensors(tokenized_datasets['train'])
test_dataset = convert_to_tensors(tokenized_datasets['test'])

batch_size = 16
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)








class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        self.W1 = nn.Linear(hidden_size, hidden_size)
        self.W2 = nn.Linear(hidden_size, hidden_size)
        self.v = nn.Linear(hidden_size, 1, bias=False)

    def forward(self, hidden, encoder_outputs):
        sequence_len = encoder_outputs.shape[1]
        hidden = hidden.unsqueeze(1).repeat(1, sequence_len, 1)

        energy = torch.tanh(self.W1(encoder_outputs) + self.W2(hidden))
        attention = self.v(energy).squeeze(2)
        attention_weights = torch.softmax(attention, dim=1)

        context = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs).squeeze(1)
        return context, attention_weights

class SimpleRecurrentNetworkWithAttention(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, cell_type='RNN', use_embedding=True, device='cpu'):
        super(SimpleRecurrentNetworkWithAttention, self).__init__()

        self.hidden_size = hidden_size
        self.device = device

        self.use_embedding = use_embedding
        if self.use_embedding:
            self.embedding = nn.Embedding(input_size, hidden_size).to(device)

        self.recurrent_layer = RecurrentLayer(
            input_size if not use_embedding else hidden_size,
            hidden_size,
            cell_type,
            device=self.device
        )
        self.attention = Attention(hidden_size).to(device)

        self.fc = nn.Linear(hidden_size, output_size).to(device)

    def forward(self, inputs):
        inputs = inputs.to(self.device)

        if self.use_embedding:
            embedded = self.embedding(inputs)
        else:
            embedded = inputs

        rnn_output = self.recurrent_layer(embedded)

        h_t = rnn_output[:, -1, :]

        context, attention_weights = self.attention(h_t, rnn_output)

        output = self.fc(context)

        return output, attention_weights



vocab_size = tokenizer.vocab_size
hidden_size = 256
output_size = 2

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = SimpleRecurrentNetworkWithAttention(
    input_size=vocab_size,
    hidden_size=hidden_size,
    output_size=output_size,
    cell_type='RNN',
    device=device
)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-5)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

model.to(device)

def train(model, dataloader, criterion, optimizer, scheduler, device, print_every=100):
    model.train()
    running_loss = 0.0
    total_batches = len(dataloader)

    for batch_idx, (inputs, labels) in enumerate(dataloader):
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs, _ = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=5)
        optimizer.step()

        running_loss += loss.item()

        if (batch_idx + 1) % 50 == 0:
            print(f"Batch [{batch_idx + 1}/{total_batches}], Loss: {loss.item():.4f}")

    scheduler.step()

    epoch_loss = running_loss / total_batches
    print(f"Epoch completed. Average Loss: {epoch_loss:.4f}")

    return epoch_loss

def decode_input(inputs, tokenizer):
    return tokenizer.decode(inputs, skip_special_tokens=True).split()

def evaluate(model, dataloader, criterion, device, tokenizer):
    model.eval()
    correct = 0
    total = 0
    total_loss = 0.0
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs, attention_weights = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            if total == labels.size(0):
                print(f"Input sentence (token IDs): {inputs[0].cpu().numpy()}")

                words = decode_input(inputs[0], tokenizer)
                print(f"Decoded Input Sentence: {' '.join(words)}")

                attention_weights_np = attention_weights[0].cpu().numpy()

                print("Word attention distribution:")
                for word, attn in zip(words, attention_weights_np):
                    print(f"{word}: {attn:.4f}")

    accuracy = correct / total
    return total_loss / len(dataloader), accuracy


print(f"train loader size: {len(train_loader)}")
epochs = 5
for epoch in range(epochs):
    print(f"Epoch {epoch+1}/{epochs}")
    train_loss = train(model, train_loader, criterion, optimizer, scheduler, device, print_every=100)

    test_loss, test_accuracy = evaluate(model, test_loader, criterion, device, tokenizer)

    print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")
    print('-' * 50)

Collecting datasets
  Downloading datasets-3.1.0-py3-none-any.whl.metadata (20 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.9.0,>=2023.1.0 (from fsspec[http]<=2024.9.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.9.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.1.0-py3-none-any.whl (480 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m480.6/480.6 kB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2024.9.0-py3-none-any.whl (1

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/7.81k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/21.0M [00:00<?, ?B/s]

test-00000-of-00001.parquet:   0%|          | 0.00/20.5M [00:00<?, ?B/s]

unsupervised-00000-of-00001.parquet:   0%|          | 0.00/42.0M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating unsupervised split:   0%|          | 0/50000 [00:00<?, ? examples/s]

tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/615 [00:00<?, ?B/s]

sentencepiece.bpe.model:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/9.10M [00:00<?, ?B/s]



Map:   0%|          | 0/25000 [00:00<?, ? examples/s]

Map:   0%|          | 0/25000 [00:00<?, ? examples/s]

Map:   0%|          | 0/50000 [00:00<?, ? examples/s]

train loader size: 1563
Epoch 1/5
Batch [50/1563], Loss: 0.6973
Batch [100/1563], Loss: 0.7193
Batch [150/1563], Loss: 0.6919
Batch [200/1563], Loss: 0.6931
Batch [250/1563], Loss: 0.6937
Batch [300/1563], Loss: 0.7004
Batch [350/1563], Loss: 0.6887
Batch [400/1563], Loss: 0.7003
Batch [450/1563], Loss: 0.6950
Batch [500/1563], Loss: 0.6781
Batch [550/1563], Loss: 0.6492
Batch [600/1563], Loss: 0.6847
Batch [650/1563], Loss: 0.6827
Batch [700/1563], Loss: 0.6940
Batch [750/1563], Loss: 0.6867
Batch [800/1563], Loss: 0.6933
Batch [850/1563], Loss: 0.6956
Batch [900/1563], Loss: 0.7012
Batch [950/1563], Loss: 0.6880
Batch [1000/1563], Loss: 0.6878
Batch [1050/1563], Loss: 0.6952
Batch [1100/1563], Loss: 0.6826
Batch [1150/1563], Loss: 0.6939
Batch [1200/1563], Loss: 0.6864
Batch [1250/1563], Loss: 0.6927
Batch [1300/1563], Loss: 0.6880
Batch [1350/1563], Loss: 0.6712
Batch [1400/1563], Loss: 0.6808
Batch [1450/1563], Loss: 0.6725
Batch [1500/1563], Loss: 0.6668
Batch [1550/1563], Loss: 0

In [None]:

print(model)

SimpleRecurrentNetworkWithAttention(
  (embedding): Embedding(250002, 256)
  (recurrent_layer): RecurrentLayer(
    (cell): RNNCell(256, 256)
  )
  (attention): Attention(
    (W1): Linear(in_features=256, out_features=256, bias=True)
    (W2): Linear(in_features=256, out_features=256, bias=True)
    (v): Linear(in_features=256, out_features=1, bias=False)
  )
  (fc): Linear(in_features=256, out_features=2, bias=True)
)


In [None]:
total_params = sum(p.numel() for p in model.parameters())
print(f"Total number of parameters: {total_params}")

trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
non_trainable_params = total_params - trainable_params

print(f"Trainable parameters: {trainable_params}")
print(f"Non-trainable parameters: {non_trainable_params}")



Total number of parameters: 64264450
Trainable parameters: 64264450
Non-trainable parameters: 0


In [None]:
sentences = [
    "i hate this movie",
    "this is the best film I've ever seen",
    "the plot was boring and slow",
    "i love the acting in this movie"
]

def classify(sentences, model, tokenizer, device):
    model.eval()

    for sentence in sentences:
        print(f"\nProcessing sentence: '{sentence}'")

        inputs = tokenizer(sentence, return_tensors="pt", padding="max_length", truncation=True, max_length=512)

        input_ids = inputs['input_ids'].to(device)

        with torch.no_grad():
            outputs, attention_weights = model(input_ids)

        decoded_sentence = tokenizer.decode(input_ids[0], skip_special_tokens=True).split()

        _, predicted_label = torch.max(outputs, 1)
        predicted_label = predicted_label.item()
        sentiment = "Positive" if predicted_label == 1 else "Negative"
        print(f"Model Prediction: {sentiment}")

        attention_weights_np = attention_weights[0].cpu().numpy()
        print("Word attention distribution:")
        for word, attn_weight in zip(decoded_sentence, attention_weights_np):
            print(f"{word}: {attn_weight:.4f}")

classify(sentences, model, tokenizer, device)


Processing sentence: 'i hate this movie'
Model Prediction: Positive
Word attention distribution:
i: 0.0467
hate: 0.0579
this: 0.0380
movie: 0.0440

Processing sentence: 'this is the best film I've ever seen'
Model Prediction: Positive
Word attention distribution:
this: 0.0379
is: 0.0365
the: 0.0906
best: 0.0466
film: 0.0207
I've: 0.0057
ever: 0.0199
seen: 0.0095

Processing sentence: 'the plot was boring and slow'
Model Prediction: Negative
Word attention distribution:
the: 0.0166
plot: 0.0111
was: 0.0376
boring: 0.5671
and: 0.0266
slow: 0.0129

Processing sentence: 'i love the acting in this movie'
Model Prediction: Positive
Word attention distribution:
i: 0.0427
love: 0.0529
the: 0.0227
acting: 0.0158
in: 0.0348
this: 0.0128
movie: 0.0454
