# Loading the datasets

In [1]:
import os
import json

import pandas as pd
from sklearn.model_selection import train_test_split

from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [2]:
train = pd.read_csv('/content/drive/My Drive/Coleridge/datasets/train.csv')
train_items = train.sample(n=1000, random_state=42)

X_train, X_test = train_test_split(train_items, test_size=0.1, random_state=42)
train_papers = {}
test_papers = {}

for i in range(len(X_train)):
    curr_path = os.path.join(
        os.getcwd(),
        'drive',
        'My Drive',
        'Coleridge',
        'datasets',
        'train',
        X_train.iloc[i]['Id'] + '.json')
    with open(curr_path, 'r') as file:
        curr_json = json.load(file)
        train_papers[X_train.iloc[i]['Id']] = curr_json

for i in range(len(X_test)):
    curr_path = os.path.join(
        os.getcwd(),
        'drive',
        'My Drive',
        'Coleridge',
        'datasets',
        'train',
        X_test.iloc[i]['Id'] + '.json')
    with open(curr_path, 'r') as file:
        curr_json = json.load(file)
        test_papers[X_test.iloc[i]['Id']] = curr_json

In [3]:
# Extract text data from your papers
def extract_text(papers):
    texts = []
    for paper_id, content in papers.items():
        # Assuming each paper JSON has a key 'text' or 'content' for text data
        paper_text = " ".join([section['text'] for section in content])  # Adjust if the structure is different
        texts.append(paper_text)
    return texts

# Extract training and testing data
train_texts = extract_text(train_papers)
test_texts = extract_text(test_papers)

# Trigram Language Model

In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
from collections import defaultdict, Counter
import random

In [5]:
# Step 1: Preprocessing to generate unigrams, bigrams, and trigrams
def generate_ngrams(text):
    tokens = text.split()
    unigrams = tokens
    bigrams = [(tokens[i], tokens[i+1]) for i in range(len(tokens) - 1)]
    trigrams = [(tokens[i], tokens[i+1], tokens[i+2]) for i in range(len(tokens) - 2)]
    return unigrams, bigrams, trigrams

# Example corpus
corpus = ''.join(train_texts)

# Generate unigrams, bigrams, and trigrams from the corpus
unigrams, bigrams, trigrams = generate_ngrams(corpus)

In [6]:
# Step 2: Build a vocabulary and map words to indices
vocab = set(corpus.split())
word_to_idx = {word: i for i, word in enumerate(vocab)}
idx_to_word = {i: word for i, word in enumerate(vocab)}

In [7]:
# Step 3: Count frequencies
unigram_counts = Counter(unigrams)
bigram_counts = Counter(bigrams)
trigram_counts = Counter(trigrams)
total_unigrams = len(unigrams)

In [8]:
# Step 4: Create training data for the model
X_train = []
y_train = []

for w1, w2, w3 in trigrams:
    X_train.append((word_to_idx[w1], word_to_idx[w2]))
    y_train.append(word_to_idx[w3])

# Convert training data to tensors
X_train = torch.tensor(X_train, dtype=torch.long)
y_train = torch.tensor(y_train, dtype=torch.long)

In [9]:
# Step 5: Define the model
class TrigramModel(nn.Module):
    def __init__(self, vocab_size, embed_dim):
        super(TrigramModel, self).__init__()
        self.embeddings = nn.Embedding(vocab_size, embed_dim)
        self.linear1 = nn.Linear(embed_dim * 2, 128)
        self.linear2 = nn.Linear(128, vocab_size)

    def forward(self, x):
        x = self.embeddings(x)
        x = x.view((x.shape[0], -1))  # Flatten
        x = torch.relu(self.linear1(x))
        x = self.linear2(x)
        return x

# Initialize the model and train
vocab_size = len(vocab)
embed_dim = 50  # Embedding size
model = TrigramModel(vocab_size, embed_dim)

In [10]:
from torch.utils.data import DataLoader, TensorDataset

# Prepare dataset for batch processing
dataset = TensorDataset(X_train, y_train)
batch_size = 512  # Adjust based on available memory
train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [11]:
# Step 6: Interpolation functions
def unigram_prob(word):
    return unigram_counts[idx_to_word[word]] / total_unigrams

def bigram_prob(w2, w1):
    return bigram_counts[(idx_to_word[w1], idx_to_word[w2])] / unigram_counts[idx_to_word[w1]] if unigram_counts[idx_to_word[w1]] > 0 else 0

def trigram_prob(w3, w1, w2):
    return trigram_counts[(idx_to_word[w1], idx_to_word[w2], idx_to_word[w3])] / bigram_counts[(idx_to_word[w1], idx_to_word[w2])] if bigram_counts[(idx_to_word[w1], idx_to_word[w2])] > 0 else 0

def interpolated_prob(w3, w1, w2, lambda1=0.1, lambda2=0.3, lambda3=0.6):
    p1 = unigram_prob(w3)
    p2 = bigram_prob(w3, w2)
    p3 = trigram_prob(w3, w1, w2)
    return lambda1 * p1 + lambda2 * p2 + lambda3 * p3

# Training the Trigram Model

In [None]:
# TODO: Try to integrate GradScaler
import os
import torch
from tqdm.notebook import tqdm
import torch.nn.functional as F

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Function to save checkpoint
def save_checkpoint(model, optimizer, epoch, loss, checkpoint_dir='checkpoints'):
    if not os.path.exists(checkpoint_dir):
        os.makedirs(checkpoint_dir)

    checkpoint_path = os.path.join(checkpoint_dir, f'model_epoch_{epoch}.pth')

    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': loss
    }, checkpoint_path)

    print(f"Checkpoint saved: {checkpoint_path}")

# Function to load checkpoint
def load_checkpoint(checkpoint_path, model, optimizer=None):
    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['model_state_dict'])

    if optimizer:
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

    epoch = checkpoint['epoch']
    loss = checkpoint['loss']

    print(f"Checkpoint loaded: Epoch {epoch}, Loss: {loss:.4f}")
    return epoch, loss

# Function to calculate interpolated probability
def interpolated_prob(w3_idx, w1_idx, w2_idx, lambda1=0.1, lambda2=0.3, lambda3=0.6):
    word3 = idx_to_word[w3_idx]
    word1 = idx_to_word[w1_idx]
    word2 = idx_to_word[w2_idx]

    p1 = unigram_prob(w3_idx)  # Unigram probability
    p2 = bigram_prob(w3_idx, w2_idx)  # Bigram probability
    p3 = trigram_prob(w3_idx, w1_idx, w2_idx)  # Trigram probability

    return lambda1 * p1 + lambda2 * p2 + lambda3 * p3

# Function to calculate log-likelihood and perplexity using interpolated probabilities
def calculate_metrics_with_interpolation(model, data_loader):
    model.eval()  # Set model to evaluation mode
    total_log_likelihood = 0.0
    total_words = 0

    with torch.no_grad():  # Disable gradient calculation
        for inputs, targets in tqdm(data_loader, desc="Evaluating"):
            inputs, targets = inputs.to(device), targets.to(device)

            # Calculate log-likelihood using interpolated probabilities
            for i in range(len(inputs)):
                w1_idx, w2_idx = inputs[i][0].item(), inputs[i][1].item()
                w3_idx = targets[i].item()

                # Get interpolated probability
                prob = interpolated_prob(w3_idx, w1_idx, w2_idx)

                # Avoid log(0) by ensuring prob is non-zero
                if prob > 0:
                    log_prob = torch.log(torch.tensor(prob))
                else:
                    log_prob = torch.tensor(-float('inf'))  # Log of zero case

                total_log_likelihood += log_prob.item()
                total_words += 1

    # Average log-likelihood
    average_log_likelihood = total_log_likelihood / total_words

    # Perplexity: exp(-average log-likelihood)
    perplexity = torch.exp(-torch.tensor(average_log_likelihood))

    return total_log_likelihood, perplexity.item()

# Modify the training loop to calculate metrics with interpolation
def train_and_evaluate_model_with_interpolation(model, train_loader, test_loader, num_epochs=10, learning_rate=0.001, checkpoint_dir='checkpoints'):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Move model to the GPU if available
    model = model.to(device)

    for epoch in tqdm(range(num_epochs), desc="Epochs"):
        model.train()
        running_loss = 0.0

        for i, (inputs, targets) in enumerate(tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", leave=False)):
            inputs, targets = inputs.to(device), targets.to(device)

            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, targets)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            if i % 100 == 99:    # Print every 100 batches
                print(f'Epoch [{epoch+1}/{num_epochs}], Batch [{i+1}], Loss: {running_loss / 100:.4f}')
                running_loss = 0.0

        # Save checkpoint after every epoch
        save_checkpoint(model, optimizer, epoch+1, loss.item(), checkpoint_dir)

        # Calculate log-likelihood and perplexity on the test set using interpolated probabilities
        log_likelihood, perplexity = calculate_metrics_with_interpolation(model, test_loader)
        print(f'Epoch [{epoch+1}/{num_epochs}], Log-Likelihood: {log_likelihood:.4f}, Perplexity: {perplexity:.4f}')

# Example: Initialize and train model with evaluation using interpolation
vocab_size = len(vocab)
embed_dim = 50  # Embedding size
model = TrigramModel(vocab_size, embed_dim)

# Assume `train_loader` and `test_loader` are your DataLoaders
# Train and evaluate the model with interpolated probabilities for log-likelihood and perplexity
train_and_evaluate_model_with_interpolation(model, train_loader, test_loader, num_epochs=10)

# You can also run evaluation separately on the test set after training using interpolated probabilities
log_likelihood, perplexity = calculate_metrics_with_interpolation(model, test_loader)
print(f"Final Evaluation - Log-Likelihood: {log_likelihood:.4f}, Perplexity: {perplexity:.4f}")


In [16]:
import os
import torch
from torch.cuda.amp import GradScaler, autocast
from tqdm.notebook import tqdm  # Use tqdm.notebook for Colab and Jupyter

scaler = GradScaler()

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Function to save checkpoint
def save_checkpoint(model, optimizer, epoch, loss, checkpoint_dir='/content/drive/My Drive/Coleridge/datasets/'):
    if not os.path.exists(checkpoint_dir):
        os.makedirs(checkpoint_dir)

    checkpoint_path = os.path.join(checkpoint_dir, f'trigram_model_epoch_{epoch}.pth')

    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': loss
    }, checkpoint_path)

    print(f"Checkpoint saved: {checkpoint_path}")

# Function to load checkpoint
def load_checkpoint(checkpoint_path, model, optimizer=None):
    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['model_state_dict'])

    if optimizer:
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

    epoch = checkpoint['epoch']
    loss = checkpoint['loss']

    print(f"Checkpoint loaded: Epoch {epoch}, Loss: {loss:.4f}")
    return epoch, loss


def train_model_with_amp(model, train_loader, num_epochs=10, learning_rate=0.001, checkpoint_dir='checkpoints'):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Move model to the GPU if available
    model = model.to(device)

    for epoch in tqdm(range(num_epochs), desc="Epochs"):
        model.train()
        running_loss = 0.0

        for i, (inputs, targets) in enumerate(tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", leave=False)):
            inputs, targets = inputs.to(device), targets.to(device)

            optimizer.zero_grad()

            # Use autocast for mixed precision training
            with autocast():
                outputs = model(inputs)
                loss = criterion(outputs, targets)

            # Scale the loss and backward pass
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            optimizer.step()

            running_loss += loss.item()
            if i % 100 == 99:    # Print every 100 batches
                print(f'Epoch [{epoch+1}/{num_epochs}], Batch [{i+1}], Loss: {running_loss / 100:.4f}')
                running_loss = 0.0

        # Save checkpoint after every epoch
        save_checkpoint(model, optimizer, epoch+1, loss.item(), checkpoint_dir)


# Train model with GPU and mixed precision training
train_model_with_amp(model, train_loader, num_epochs=10)


Using device: cuda


  scaler = GradScaler()


Epochs:   0%|          | 0/10 [00:00<?, ?it/s]

Epoch 1/10:   0%|          | 0/13798 [00:00<?, ?it/s]

  with autocast():


Epoch [1/10], Batch [100], Loss: 7.8113
Epoch [1/10], Batch [200], Loss: 7.8038
Epoch [1/10], Batch [300], Loss: 7.7839
Epoch [1/10], Batch [400], Loss: 7.7719
Epoch [1/10], Batch [500], Loss: 7.7665
Epoch [1/10], Batch [600], Loss: 7.7586
Epoch [1/10], Batch [700], Loss: 7.7410


KeyboardInterrupt: 

In [19]:
test_corpus = ''.join(test_texts)
test_unigrams, test_bigrams, test_trigrams = generate_ngrams(test_corpus)

test_vocab = set(test_corpus.split())
test_word_to_idx = {word: i for i, word in enumerate(test_vocab)}
test_idx_to_word = {i: word for i, word in enumerate(test_vocab)}

X_test = []
y_test = []

for w1, w2, w3 in test_trigrams:
    X_test.append((test_word_to_idx[w1], test_word_to_idx[w2]))
    y_test.append(test_word_to_idx[w3])


X_test = torch.tensor(X_test, dtype=torch.long)
y_test = torch.tensor(y_test, dtype=torch.long)

# Evaluation of Trigram Model

In [None]:
# Function to calculate log-likelihood and perplexity
def calculate_metrics(model, data_loader):
    model.eval()  # Set model to evaluation mode
    total_log_likelihood = 0.0
    total_words = 0

    with torch.no_grad():  # Disable gradient calculation
        for inputs, targets in tqdm(data_loader, desc="Evaluating"):
            inputs, targets = inputs.to(device), targets.to(device)

            outputs = model(inputs)

            # Calculate log probabilities (log softmax)
            log_probs = F.log_softmax(outputs, dim=1)

            # Gather the log likelihoods for the true targets
            log_likelihoods = log_probs[range(len(targets)), targets]

            # Sum log likelihoods
            total_log_likelihood += log_likelihoods.sum().item()
            total_words += len(targets)

    # Average negative log likelihood
    average_log_likelihood = total_log_likelihood / total_words

    # Perplexity: exp(-average log-likelihood)
    perplexity = torch.exp(-torch.tensor(average_log_likelihood))

    return total_log_likelihood, perplexity.item()

test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)
metrics = calculate_metrics(model, test_loader)

# Transformer Decoder-Only Model

In [None]:
import math
import re
import random

import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.utils.rnn as rnn_utils

from torch.nn.utils.rnn import pad_sequence
import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader

from sklearn.model_selection import train_test_split

from collections import defaultdict

# Preprocessing the data

In [None]:
# Extract text data from your papers
def extract_text(papers):
    texts = []
    for paper_id, content in papers.items():
        # Assuming each paper JSON has a key 'text' or 'content' for text data
        paper_text = " ".join([section['text'] for section in content])  # Adjust if the structure is different
        texts.append(paper_text)
    return texts

# Extract training and testing data
train_texts = extract_text(train_papers)
test_texts = extract_text(test_papers)

# Tokenizing the Data

In [None]:
import torch
from transformers import BertTokenizer

# Initialize tokenizer (or use any other tokenizer that fits your dataset)
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Tokenize input and output (target) sequences
def tokenize_texts(texts):
    return tokenizer(texts, truncation=True, padding=True, max_length=512, return_tensors='pt')

# Prepare data (assuming you have train_texts and test_texts)
train_input_ids = tokenize_texts(train_texts)['input_ids']
train_output_ids = tokenize_texts(test_texts)['input_ids']  # For decoder targets

# Use a special token for padding and start/end of sequences if needed


# Creating a Custom Dataset and DataLoader

In [None]:
from torch.utils.data import Dataset, DataLoader

class TextDataset(Dataset):
    def __init__(self, input_ids):
        self.input_ids = input_ids  # The tokenized input sequences

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        # Input sequence (all tokens except the last one)
        src = self.input_ids[idx][:-1]
        # Target sequence (all tokens except the first one)
        tgt = self.input_ids[idx][1:]

        return torch.tensor(src), torch.tensor(tgt)


# Create the dataset using the tokenized input data (train_input_ids)
train_dataset = TextDataset(train_input_ids)

# Use DataLoader to handle batching
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, collate_fn=None)

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')


In [None]:
import torch.nn as nn
import torch

class DecoderOnlyTransformer(nn.Module):
    def __init__(self, vocab_size, d_model, nhead, num_decoder_layers, dim_feedforward, max_len=512, dropout=0.1):
        super(DecoderOnlyTransformer, self).__init__()

        # Embedding layer for input tokens
        self.embedding = nn.Embedding(vocab_size, d_model)

        # Positional Encoding
        self.positional_encoding = nn.Embedding(max_len, d_model)

        # Transformer Decoder
        self.decoder_layer = nn.TransformerDecoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, dropout=dropout, batch_first=True)
        self.transformer_decoder = nn.TransformerDecoder(self.decoder_layer, num_layers=num_decoder_layers)

        # Output layer to convert decoder output to token logits
        self.fc_out = nn.Linear(d_model, vocab_size)

        self.dropout = nn.Dropout(dropout)
        self.max_len = max_len

    def forward(self, tgt):
        batch_size, seq_len = tgt.size()

        # Add position information to the target sequence
        positions = torch.arange(0, seq_len).unsqueeze(0).expand(batch_size, seq_len).to(tgt.device)

        # Embedding + Positional Encoding
        tgt_embedded = self.embedding(tgt) + self.positional_encoding(positions)
        tgt_embedded = self.dropout(tgt_embedded)

        # Create a square mask of size [seq_len, seq_len]
        tgt_mask = nn.Transformer.generate_square_subsequent_mask(seq_len).to(tgt.device)

        # Since this is a decoder-only model, pass tgt_embedded as both tgt and memory
        decoder_output = self.transformer_decoder(tgt_embedded, memory=tgt_embedded, tgt_mask=tgt_mask)

        # Output layer to map decoder output to logits
        logits = self.fc_out(decoder_output)

        return logits




# Hyperparameters
input_dim = tokenizer.vocab_size
output_dim = tokenizer.vocab_size
d_model = 512
nhead = 8
num_encoder_layers = 6
num_decoder_layers = 6
dim_feedforward = 2048

model = DecoderOnlyTransformer(tokenizer.vocab_size, d_model, nhead, num_decoder_layers, dim_feedforward)


# Training the Model

In [None]:
import torch
import os

def save_checkpoint(model, optimizer, epoch, loss, filename='transformer_checkpoint.pth'):
    # Save a dictionary with the model's state, optimizer's state, and other relevant information
    checkpoint = {
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'epoch': epoch,
        'loss': loss
    }
    checkpoint_dir = '/content/drive/My Drive/Coleridge/datasets/'
    torch.save(checkpoint, checkpoint_dir + filename)
    print(f"Checkpoint saved at epoch {epoch}")


def load_checkpoint(model, optimizer, filename='transformer_checkpoint.pth'):
    checkpoint_dir = '/content/drive/My Drive/Coleridge/datasets/'
    if os.path.isfile(checkpoint_dir + filename):
        checkpoint = torch.load(checkpoint_dir + filename)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        epoch = checkpoint['epoch']
        loss = checkpoint['loss']
        print(f"Checkpoint loaded from epoch {epoch}, loss: {loss:.4f}")
        return epoch, loss
    else:
        print("No checkpoint found at", checkpoint_dir + filename)
        return None, None


In [None]:
from tqdm.notebook import tqdm  # Use tqdm.notebook for Colab and Jupyter
# Optimizer and loss function
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)

# Training loop with batches
num_epochs = 20
model.to(device)
vocab_size = tokenizer.vocab_size

save_every_n_epochs = 1  # Save a checkpoint every 2 epochs
best_loss = float('inf')  # For saving the best model

for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    # Use TQDM to wrap the train_loader for progress bar
    with tqdm(train_loader, unit="batch") as tepoch:
        tepoch.set_description(f"Epoch {epoch + 1}/{num_epochs}")

        for batch in tepoch:
            src, tgt = batch
            src, tgt = src.to(device), tgt.to(device)
            print(f"Shape of src: {src.shape}, Shape of tgt: {tgt.shape}")


            optimizer.zero_grad()

            outputs = model(src)
            loss = criterion(outputs.view(-1, vocab_size), tgt.view(-1))

            loss.backward()
            optimizer.step()

            total_loss += loss.item()

            # Update the progress bar with current loss
            tepoch.set_postfix(loss=loss.item())

    avg_loss = total_loss / len(train_loader)
    print(f"Epoch [{epoch + 1}/{num_epochs}], Avg Loss: {avg_loss:.4f}")

    # Save checkpoint every few epochs
    if (epoch + 1) % save_every_n_epochs == 0:
        save_checkpoint(model, optimizer, epoch + 1, avg_loss, filename=f'checkpoint_epoch_{epoch+1}.pth')

    # Save the best model if current epoch loss is better
    if avg_loss < best_loss:
        best_loss = avg_loss
        save_checkpoint(model, optimizer, epoch + 1, avg_loss, filename='best_model.pth')


# Custom loading from checkpoint!

Uncomment only if we need to read the checkpoints.

In [None]:
# # Initialize model and optimizer
# model = DecoderOnlyTransformer(vocab_size, d_model, nhead, num_decoder_layers, dim_feedforward)
# optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

# # Load checkpoint if available
# start_epoch, _ = load_checkpoint(model, optimizer, filename='checkpoint_epoch_2.pth')

# if start_epoch is None:
#     start_epoch = 0  # Start from scratch if no checkpoint is found

# # Continue training from the checkpoint
# for epoch in range(start_epoch, num_epochs):
#     model.train()
#     total_loss = 0
#     for batch in train_loader:
#         src, tgt = batch
#         src, tgt = src.to(device), tgt.to(device)

#         optimizer.zero_grad()

#         outputs = model(src)
#         loss = criterion(outputs.view(-1, vocab_size), tgt.view(-1))

#         loss.backward()
#         optimizer.step()

#         total_loss += loss.item()

#     avg_loss = total_loss / len(train_loader)
#     print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_loss:.4f}")

#     # Save checkpoints as before


# Test Eval

In [None]:
def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    num_correct = 0
    num_total = 0

    with torch.no_grad():  # No gradient calculation during evaluation
        for batch in dataloader:
            src, tgt = batch  # Unpack batch
            src, tgt = src.to(device), tgt.to(device)
            print(f"Shape of src: {src.shape}, Shape of tgt: {tgt.shape}")

            # Forward pass through the model
            outputs = model(src)
            loss = criterion(outputs.view(-1, vocab_size), tgt.view(-1))

            total_loss += loss.item()

            # Accuracy calculation
            predicted_tokens = outputs.argmax(-1)  # Get the token with the highest score
            num_correct += (predicted_tokens == tgt).sum().item()
            num_total += tgt.numel()

    avg_loss = total_loss / len(dataloader)
    accuracy = num_correct / num_total if num_total > 0 else 0

    return avg_loss, accuracy

# Evaluation after training
test_dataset = TextDataset(train_output_ids)  # Prepare test dataset
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

test_loss, test_accuracy = evaluate(model, test_loader, criterion, device)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

# Sample training on smaller dataset