In [1]:
%cd ..

d:\nlp\assignment1


  self.shell.db['dhist'] = compress_dhist(dhist)[-100:]


In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import matplotlib.pyplot as plt
import json
from task1 import WordPieceTokenizer
from task2 import Word2VecModel
import pickle


# Neural LM Dataset class
class NeuralLMDataset(Dataset):
    def __init__(self, tokenizer, word2vec, context_size=3):
        self.pad_token = '[PAD]'
        self.tokenizer = tokenizer
        self.word2vec = word2vec
        self.context_size = context_size
        self.data = self.preprocess_data()

    def tokenize_txt_file(self, input_file: str, output_file: str) -> None:
        """Tokenize sentences from input TXT file and save results as JSON"""
        # Reading the input text file
        with open(input_file, 'r', encoding='utf-8') as f:
            lines = f.readlines()
        results = {}
        for idx, line in enumerate(lines):
            sentence = line.strip()  # Remove leading/trailing whitespaces or newlines
            tokens = self.tokenizer.tokenize(sentence)  # Tokenize the sentence using your tokenization method
            results[str(idx)] = tokens  # Store tokens with index as the key (starting from 0)
        # Saving the results to an output JSON file
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(results, f, indent=2)

    def preprocess_data(self):
        # self.tokenizer.construct_vocabulary("corpus.txt", vocab_size=100)
        # self.tokenize_txt_file("corpus.txt", "tokenized_corpus.json")

        corpus = None
        with open('task2-files/tokenized_corpus.json', 'r') as f:
            # Load the JSON data
            tokenized_corpus = json.load(f)
            
            # Convert the dictionary into a list of sentences (list of tokenized words)
            corpus = [tokens for tokens in tokenized_corpus.values()]
            
        
        self.tokenized_sentences = corpus
        # updates the word to index mapping
        self.word2idx = {word: idx for idx, word in enumerate(self.tokenizer.vocab)}
        # updates the reverse index to word mapping
        self.idx2word = {idx: word for word, idx in self.word2idx.items()}

        # indexed_sentences = [[self.word2vec.embeddings(torch.tensor(self.word2idx[token], dtype=torch.long)).detach().numpy() for token in sent] for sent in self.tokenized_sentences]
        data = []

        for sent in self.tokenized_sentences:
            if len(sent) > self.context_size:
                for i in range(len(sent) - self.context_size):
                    # context = sent[i:i+self.context_size]
                    context = [self.word2vec.embeddings(torch.tensor(self.word2idx[token], dtype=torch.long)).detach().numpy() for token in sent[i:i+self.context_size]]
                    target = sent[i+self.context_size]
                    if target in self.word2idx:
                        target_idx = self.word2idx[target]
                    else:
                        target_idx = self.word2idx['[UNK]']
                    data.append((context, target_idx))
        return data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        context, target = self.data[idx]
        context_tensor = torch.tensor(np.concatenate(context).flatten(), dtype=torch.float32)
        target_tensor = torch.tensor(target, dtype=torch.long)
        return context_tensor, target_tensor

# Define Neural Language Model Variations
class NeuralLM1(nn.Module):
    def __init__(self, input_dim, hidden_dim, vocab_size):
        super(NeuralLM1, self).__init__()
        self.architecture = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, vocab_size)
        )
    
    def forward(self, x):
        return self.architecture(x)

class NeuralLM2(nn.Module):
    def __init__(self, input_dim, hidden_dim, vocab_size):
        super(NeuralLM2, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, vocab_size)
        self.projection = nn.Linear(input_dim, vocab_size)
    
    def forward(self, x):
        identity = self.projection(x)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        out = x + identity
        return out

class NeuralLM3(nn.Module):
    def __init__(self, input_dim, hidden_dim, vocab_size):
        super(NeuralLM3, self).__init__()
        # self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.batch_norm = nn.BatchNorm1d(hidden_dim)
        self.dropout = nn.Dropout(0.3)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, vocab_size)
    
    def forward(self, x):
        # x = self.embedding(x).view(x.shape[0], -1)
        x = self.relu(self.batch_norm(self.fc1(x)))
        x = self.dropout(x)
        return self.fc2(x)

# Training function with loss tracking
def train(model, train_dataloader, val_dataloader, epochs=100, lr=0.01):
    model.to("cpu")
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    train_losses, val_losses = [], []
    train_acc, val_acc = [], []

    for epoch in range(epochs):
        #training phase
        model.train()
        train_loss, correct, total = 0, 0, 0
        for context, target in train_dataloader:
            optimizer.zero_grad() #zero the gradients
            #forward pass
            output = model(context)
            loss = criterion(output, target)
            train_loss += loss.item()
            #backward pass and optimize
            loss.backward()
            optimizer.step()
            #compute accuracy
            _, predicted = torch.max(output, 1)
            correct += (predicted == target).sum().item()
            total += target.size(0)
        #calculate average training loss
        train_losses.append(train_loss / len(train_dataloader))
        train_acc.append(100 * correct / total)

        #validation phase
        model.eval()
        val_loss, correct, total = 0, 0, 0
        with torch.no_grad():
            for context, target in val_dataloader:
                output = model(context)
                loss = criterion(output, target)
                val_loss += loss.item()
                #compute accuracy
                _, predicted = torch.max(output, 1)
                correct += (predicted == target).sum().item()
                total += target.size(0)
        #calculate average validation loss
        val_losses.append(val_loss / len(val_dataloader))
        val_acc.append(100 * correct / total)

        #print losses for each epoch
        print(f"----- Epoch {epoch + 1}/{epochs} -----")
        # print(f"Train Loss: {train_losses[-1]:.4f}, Val Loss: {val_losses[-1]:.4f}")
        print(f"Train Loss: {train_losses[-1]:.4f}, Train Accuracy: {train_acc[-1]:.2f}%")
        print(f"Val Loss: {val_losses[-1]:.4f}, Val Accuracy: {val_acc[-1]:.2f}%")
    print(type(train_losses[-1]))
    print(type(output))
    print(type(target))
    # return train_losses, train_acc, val_losses, val_acc #return losses

# Plot loss function
def plot_losses(train_losses, val_losses):
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Loss vs Epochs')
    plt.legend()
    plt.show()

# Accuracy and Perplexity computation
def compute_accuracy(model, dataloader):
    correct, total = 0, 0
    with torch.no_grad():
        for context, target in dataloader:
            output = model(context)
            predictions = torch.argmax(output, dim=1)
            correct += (predictions == target).sum().item()
            total += target.size(0)
    return correct / total

def compute_perplexity(loss):
    return np.exp(loss)

In [3]:
def load_model(model_class):
    model_path = 'task2-files/final_model/final_model.pt'
    checkpoint = torch.load(model_path)
    
    model = model_class(vocab_size=checkpoint['vocab_size'], embedding_dim=checkpoint['embedding_dim'])
    model.load_state_dict(checkpoint['model_state_dict'])
    
    val_loss = checkpoint['val_loss']
    val_accuracy = checkpoint['val_accuracy']
    
    return model, val_loss, val_accuracy

# Example usage:
word2vec, val_loss, val_accuracy = load_model(Word2VecModel)
tokenizer = pickle.load(open('task1-files/tokenizer.pkl', 'rb'))
dataset = NeuralLMDataset(tokenizer, word2vec)

In [4]:
WINDOW_SIZE = 2
BATCH_SIZE = 1024
NUM_EPOCHS = 1
LEARNING_RATE = 0.02
TRAIN_SPLIT = 0.8

# Split dataset into training and validation
train_dataset, val_dataset = torch.utils.data.random_split(dataset=dataset, lengths=[TRAIN_SPLIT, 1-TRAIN_SPLIT], generator=torch.Generator().manual_seed(42))

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [7]:
print(type(compute_perplexity(0.12)))

<class 'numpy.float64'>


In [5]:
model1 = NeuralLM1(input_dim=30, hidden_dim=256, vocab_size=8500)
train(model1, train_loader, val_loader, epochs=NUM_EPOCHS, lr=LEARNING_RATE)

----- Epoch 1/1 -----
Train Loss: 4.9545, Train Accuracy: 31.20%
Val Loss: 4.5821, Val Accuracy: 32.49%
<class 'float'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>


In [None]:
model2 = NeuralLM2(input_dim=30, hidden_dim=256, vocab_size=8500)
train(model2, train_loader, val_loader, epochs=NUM_EPOCHS, lr=LEARNING_RATE)

In [None]:
model3 = NeuralLM3(input_dim=30, hidden_dim=256, vocab_size=8500)
train(model3, train_loader, val_loader, epochs=NUM_EPOCHS, lr=LEARNING_RATE)

In [None]:
len(tokenizer.vocab)

In [20]:
def predict_tokens(sentence: str, num_tokens: int, context_size: int, dataset: NeuralLMDataset, model: nn.Module):
    model.eval() #evaluation mode for predicing tokens

    tokens = dataset.tokenizer.tokenize(sentence) #tokenize the input sentence
    if len(tokens) < context_size:
        tokens = [dataset.pad_token] * (context_size - len(tokens)) + tokens #add padding tokens

    sentence_embeds = [dataset.word2vec.embeddings(torch.tensor(dataset.word2idx[token], dtype=torch.long)).detach().numpy() for token in tokens] #create embeddings for tokenized sentence

    predicted_tokens = []
    for _ in range(num_tokens):
        context = sentence_embeds[-context_size:]
        context_tensor = torch.tensor(np.concatenate(context).flatten(), dtype=torch.float32)
        output = model(context_tensor)
        predicted_idx = torch.argmax(output).item()
        predicted_token = dataset.idx2word[predicted_idx]
        predicted_tokens.append(predicted_token)
        sentence_embeds.append(dataset.word2vec.embeddings(torch.tensor(predicted_idx, dtype=torch.long)).detach().numpy())
    
    return predicted_tokens

In [None]:
predict_tokens('', 3, 3, dataset, model1)

In [10]:
def prediciton_pipeline(input_file, num_tokens, context_size, dataset, model):
    with open(input_file, 'r') as f:
        sentences = f.readlines()
    for sentence in sentences:
        sentence = sentence.strip()
        predicted_tokens = predict_tokens(sentence, num_tokens, context_size, dataset, model)
        print(f"Input: {sentence}")
        print(f"Predicted Tokens: {' '.join(predicted_tokens)}")
        print()

In [None]:
prediciton_pipeline('task3-files/sample_test.txt', 3, 3, dataset, model1)

In [None]:
predicted_words = []
for _ in range(3):
    with torch.no_grad():
        context = sentence_embeds[-3:]
        context = torch.tensor(np.concatenate(context).flatten(), dtype=torch.float32)#.unsqueeze(0)  # Add batch dim
        output = model(context)  # Predict next word
        next_word_idx = torch.argmax(output).item()  # Get most probable word
        # print(next_word_idx)
        next_word = dataset.idx2word.get(next_word_idx, "<UNK>")  # Convert index to word
        # print(next_word)
        predicted_words.append(next_word)

        tokens.append(next_word)
        sentence_embeds.append(dataset.word2vec.embeddings(torch.tensor(next_word_idx, dtype=torch.long)).detach().numpy())


print(" ".join(predicted_words))

In [None]:
def predict_next_words(model, sentence, tokenizer, word2vec, vocab, num_words=3):
    model.eval()
    words = tokenizer.tokenize(sentence)
    
    # Convert to embeddings
    context = [word2vec[word] for word in words if word in word2vec][-3:]  # Use last 3 tokens
    context = torch.tensor(context, dtype=torch.float32).unsqueeze(0)  # Add batch dim

    predicted_words = []
    for _ in range(num_words):
        with torch.no_grad():
            output = model(context)  # Predict next word
            next_word_idx = torch.argmax(output, dim=1).item()  # Get most probable word
            next_word = vocab[next_word_idx]  # Convert index to word
            predicted_words.append(next_word)

            # Update context by adding new word embedding
            if next_word in word2vec:
                next_embedding = word2vec[next_word]
                context = torch.cat([context[:, 1:, :], torch.tensor(next_embedding).unsqueeze(0).unsqueeze(1)], dim=1)

    return " ".join(predicted_words)

In [None]:
sentence = "The weather is"
predicted = predict_next_words(model, sentence, tokenizer, word2vec, vocab)
print(f"Predicted words: {predicted}")

In [None]:
# Prepare data for training
dataset = NeuralLMDataset(corpus, text_processor.word2idx)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)

# Train models
embedding_dim = 10
hidden_dim = 20

models = [NeuralLM1, NeuralLM2, NeuralLM3]
losses = {}
accuracies = {}
perplexities = {}

for i, model_class in enumerate(models):
    model = model_class(len(text_processor.vocab), embedding_dim, hidden_dim)
    loss = train(model, dataloader)
    losses[f'NeuralLM{i+1}'] = loss
    acc = compute_accuracy(model, dataloader)
    accuracies[f'NeuralLM{i+1}'] = acc
    perplexities[f'NeuralLM{i+1}'] = compute_perplexity(loss[-1])
    plot_losses(loss, f'NeuralLM{i+1} Training Loss')
    torch.save(model.state_dict(), f"neural_lm{i+1}.pth")
    print(f"NeuralLM{i+1} - Accuracy: {acc}, Perplexity: {perplexities[f'NeuralLM{i+1}']}")

# Predict next 3 tokens from test.txt
def predict_next_tokens(model, sentence, n=3):
    words = sentence.split()
    context = torch.tensor([text_processor.word2idx[word] for word in words[-2:]], dtype=torch.long)
    predictions = []
    for _ in range(n):
        output = model(context.unsqueeze(0))
        next_word_idx = torch.argmax(output, dim=1).item()
        next_word = text_processor.idx2word[next_word_idx]
        predictions.append(next_word)
        context = torch.cat((context[1:], torch.tensor([next_word_idx])))
    return predictions

print("Model checkpoints saved!")