# Advanced Methods in Text Analytics
# Exercise 4: Language Models - Part 2
### Daniel Ruffinelli
## FSS 2025

## 1. N-Gram Language Models

In [None]:
# we define a simple whitespace tokenizer that removes punctuation
def tokenize(text):
    """ 
    Given text, returns all words separated by white space after separating all
    punctuation.

    Args:
        text: string with text to tokenize

    Returns:
        list of tokens
    """

    import string

    # separate punctuation symbols with whitespaces
    for symbol in string.punctuation:
        text = text.replace(symbol, " " + symbol + " ")
    text_split = text.split()

    return text_split

In [None]:
# we test the tokenizer
text = "This is a phrase, with some punctuation, that we want to tokenize!"
print(tokenize(text))
# output should be:
# ['This', 'is', 'a', 'phrase', ',', 'with', 'some', 'punctuation', ',', 'that', 'we', 'want', 'to', 'tokenize', '!']

### Question (a)

In [None]:
# function for counting n-grams
def compute_ngrams(n, tokenized_text):
    """ 
    Compute n-grams in given list of tokens.

    Args:
        n: size of n-grams
        tokens: list of tokens

    Returns
        list of n-grams of the form (context, next_word), where context is a 
        tuple of n -1 previous words.
    """

    # we add left-side padding (i.e. start of sentence symbols)
    tokens = (n-1)*["<s>"] + tokenized_text

    # list to store ngrams
    ngrams = []

    ### WRITE YOUR CODE HERE ###

    return ngrams

In [None]:
# test your ngram computation
text = "This is a phrase, with some punctuation, that we want to tokenize!"
print(compute_ngrams(3, tokenize(text)))
# output should be:
# [(('<s>', '<s>'), 'This'), (('<s>', 'This'), 'is'), (('This', 'is'), 'a'), (('is', 'a'), 'phrase'), (('a', 'phrase'), ','), (('phrase', ','), 'with'), ((',', 'with'), 'some'), (('with', 'some'), 'punctuation'), (('some', 'punctuation'), ','), (('punctuation', ','), 'that'), ((',', 'that'), 'we'), (('that', 'we'), 'want'), (('we', 'want'), 'to'), (('want', 'to'), 'tokenize'), (('to', 'tokenize'), '!')]

### Question (b)

In [None]:
# define an n-gram model that keeps track of "context to next word" pairs
# and an ngram counter
from collections import defaultdict as ddict 

class NGramModel:
    """
    NGramModel class, keeps track of (context, next_word) pairs and an ngram
    counter.
    """

    def __init__(self, n):

        self._n = n

        # set to store vocabulary
        self.vocabulary = set()

        # dict to store ngram counters
        self.ngram_counter = ddict(int)

        # dict to track (context, next_word) pairs
        self.context = ddict(list)

    def extend_vocabulary(self, tokenized_text):
        """ 
        Adds tokens to vocabulary. Useful for smoothing for unseen data.
        """
        for token in tokenized_text:
            self.vocabulary.add(token)

    def update(self, tokenized_text):
        """ 
        Updates counts of ngram model by computing ngrams in given tokenized 
        text and adding them to ngram counter and context tracker.

        Args:
            text (str): text to tokenize and compute ngrams
        """

        # compute ngrams
        ngrams = compute_ngrams(self._n, tokenized_text)

        ### WRITE YOUR CODE HERE ###


    def next_word_prob(self, context, next_word):
        """
        Returns probability of predicting next_word after given context.
        We use add-1 smoothing to process unseen words.

        Args:
            context (list of tokens): context words
            next_word (str): next word

        Returns:
            prob (float): probability of next_word following context.
        """

        # probability value to return
        prob = 0.

        ### WRITE YOUR CODE HERE ###

        return prob

In [None]:
# test your NGram model
bigram_model = NGramModel(2)
print("Size on n-gram:", bigram_model._n)
# output should be: Size on n-gram: 2 

In [None]:
# test your update function
bigram_model.update(tokenize("We now need text! Let's add some text to our bigram model"))
print("Vocabulary:", bigram_model.vocabulary)
print("N-Gram Counter:", bigram_model.ngram_counter)
print("Context Dict:", bigram_model.context)
# output should be:
# Vocabulary: {'s', '!', 'model', 'to', 'Let', 'add', "'", 'bigram', 'need', 'now', 'We', 'text', 'our', 'some'}
# N-Gram Counter: defaultdict(<class 'int'>, {(('<s>',), 'We'): 1, (('We',), 'now'): 1, (('now',), 'need'): 1, (('need',), 'text'): 1, (('text',), '!'): 1, (('!',), 'Let'): 1, (('Let',), "'"): 1, (("'",), 's'): 1, (('s',), 'add'): 1, (('add',), 'some'): 1, (('some',), 'text'): 1, (('text',), 'to'): 1, (('to',), 'our'): 1, (('our',), 'bigram'): 1, (('bigram',), 'model'): 1})
# Context Dict: defaultdict(<class 'list'>, {('<s>',): ['We'], ('We',): ['now'], ('now',): ['need'], ('need',): ['text'], ('text',): ['!', 'to'], ('!',): ['Let'], ('Let',): ["'"], ("'",): ['s'], ('s',): ['add'], ('add',): ['some'], ('some',): ['text'], ('to',): ['our'], ('our',): ['bigram'], ('bigram',): ['model']})

In [None]:
# test your next_word_prob function
print("Probability that word 'to' follows word 'text':", bigram_model.next_word_prob(("text",), "to"))
# output should be:
# Probability that word 'to' follows word 'text': 0.125

### Question (c)

In [None]:
# define function to predict next word given context
def predict_next_word(context, model):
    """
    Uses given model to predict the next word that could follow the given 
    context. Each possible next word is chosen based on its probability given
    by the model.
    """

    import random

    # get all possible next words
    next_words = model.context[context]
    next_words_probs = []

    ### WRITE YOUR CODE HERE ###

    # select one of possible next word to predict
    predicted_word = random.choices(next_words, next_words_probs, k=1)

    return predicted_word[0]

In [None]:
# test your function predict_next_word
predict_next_word(("text",), bigram_model)
# output should be either '!' or 'to'

### Question (d) 

In [None]:
# load shakespeare's entire works
with open("shakespeare_train.txt") as f:
    corpus = f.read()

# break it into sentences
corpus_sentences = corpus.split(".")

In [None]:
# let's create a trigram model
trigram_model = NGramModel(3)

# feed corpus to model
for sentence in corpus_sentences:
        trigram_model.update(tokenize(sentence))


In [None]:
# check most common ngrams
top = 10
sorted(trigram_model.ngram_counter.items(), key=lambda x:x[1], reverse=True)[:top]

### Question (e)

In [None]:
# we define function to generate n tokens given starting token
def generate_text(num_tokens, context, model):
    """
    Generate num_tokens words using given model

    Args:
        num_tokens (int): number of tokens to generate
        context (tuple of previous words): starting prompt
        model (NGramModel): ngram model
    """

    output = []

    ### WRITE YOUR CODE HERE ###

    return " ".join(output)

In [None]:
# predict next words
context = ("My", "lord")

# test text generation
# this may fail at times because our simple model does not have an entry for the
# given context in its {context: next_word} dict, which in turn happens because
# the generated context (prompt) is a combination the model has not seen
# In short, it's a problem of our implementation that uses a dict
# In practice, it's not often done like this (we'll see in the next tasks)
print(generate_text(40, context, trigram_model))

### Question (f)

In [None]:
# let's try stronger models
fourgram_model = NGramModel(4)

# feed corpus to model
# fourgram_model.update(tokenize(corpus))
for sentence in corpus_sentences:
        fourgram_model.update(tokenize(sentence))

# check most common ngrams
top = 10
sorted(fourgram_model.ngram_counter.items(), key=lambda x:x[1], reverse=True)[:top]

In [None]:
# predict next words
context = ("I", "'", "ll")

# test text generation
print(generate_text(20, context, fourgram_model))

In [None]:
# and a 5-gram models
fivegram_model = NGramModel(5)

# feed corpus to model
# fivegram_model.update(tokenize(corpus))
for sentence in corpus_sentences:
        fivegram_model.update(tokenize(sentence))

# check most common ngrams
top = 10
sorted(fivegram_model.ngram_counter.items(), key=lambda x:x[1], reverse=True)[:top]


In [None]:
# predict next words
context = ("<s>", "<s>", "<s>", "<s>")

# test text generation
print(generate_text(20, context, fivegram_model))

### Question (g)

In [None]:
# compute likelihood of validation data
def compute_likelihood(tokenized_text, model, n):
    """
    Compute log_likelihood of given tokenized text using given model.
    We compute likelihood in log space to prevent numerical underflow.

    Args:
        tokenized_text (list of tokens): text to compute likelihood of
        model (NGramModel): model used to compute likelihood
        n (int): size of n-grams of givem model
    """

    import math

    log_likelihood = 0.0

    ### WRITE YOUR CODE HERE ###

    return log_likelihood, math.exp(-log_likelihood / i)

In [None]:
# load shakespeare's validation data
with open("shakespeare_valid.txt") as f:
    corpus_valid = f.read()

# tokenize it
tokenized_validation = tokenize(corpus_valid)

print(tokenized_validation[:100])

In [None]:
# extend trigram model vocabulary for proper smoothing
trigram_model.extend_vocabulary(tokenized_validation)

#  compute validation likelihood
valid_likelihood, perplexity = compute_likelihood(tokenize(corpus_valid), trigram_model, 3)
print("Validation likelihood:", valid_likelihood)
print("Validation perplexity:", perplexity)

## 2. Language Models with Fully-Connected Neural Networks

In [None]:
import torch

# test it
a = torch.rand(3,3)
print(a)

In [None]:
# let's preprocess our text (we work with embeddings now, not just strings)
from collections import defaultdict as ddict

# these are our splits
shakespeare_splits = {
    "train": "shakespeare_train.txt", 
    "valid": "shakespeare_valid.txt", 
    "text": "shakespeare_test.txt"
}

# we create a vocabulary dict of the form {token: ID}
shakespeare_vocab = {}
for text_file in shakespeare_splits.values():
    with open(text_file) as f:
        split_text = f.read()
        tokenized_split = tokenize(split_text)
        for token in tokenized_split:
            if token not in shakespeare_vocab:
                shakespeare_vocab[token] = len(shakespeare_vocab)
# we add the padding symbol to our vocabulary
shakespeare_vocab["<s>"] = len(shakespeare_vocab)
print("Size of vocabulary:", len(shakespeare_vocab))

# we turn our splits into sequences of token IDs
shakespeare_splits_ids = ddict(list)
for split_id, split_file in shakespeare_splits.items():
    with open(split_file) as f:
            tokenized_split = tokenize(f.read())
    for token in tokenized_split:
        shakespeare_splits_ids[split_id].append(shakespeare_vocab[token])


### Question (a)

In [None]:
from torch import nn

class NeuralLM(nn.Module):

    def __init__(self, 
                 vocabulary_size, 
                 embedding_size,
                 max_input_length,
                 padding_token_id,
                 hidden_layer_sizes,
                 hidden_layer_activation="tanh"):
        """
        Neural language model.

        Args:
            vocabulary_size (int): size of vocabulary
            embedding size (int): size of input tokens
            max_input_length (int): max. number of input tokens
            padding_token_id (int): id of token to use for left padding
            hidden_layer_sizes (list): list of hidden layer sizes, e.g. [10, 5]
            hidden_layer_activation (string): activation, e.g. sigmoid, tanh
        """
        super().__init__()

        # set hyperparameters
        self._max_input_length = max_input_length
        self._pad_token_id = torch.tensor(padding_token_id)
        if hasattr(torch, hidden_layer_activation):
            self._activation = getattr(torch, hidden_layer_activation)
        else:
            raise ValueError("Activation must be a torch-supported function.")

        # create embedding matrix
        self._embeddings = nn.Embedding(vocabulary_size, embedding_size)

        # create hidden layers    
        self._hidden_layers = []
        input_size = embedding_size * max_input_length
        for output_size in hidden_layer_sizes:
            self._hidden_layers.append(nn.Linear(input_size, output_size))
            input_size = output_size
        
        # create output layer (no need for softmax, we'll do that with the loss)
        self._output_layer = nn.Linear(output_size, vocabulary_size)

    def forward(self, input_ids):
        """
        Forward pass.

        Args:
            seq_indices (tensor): tensor of token IDs of size 
            (batch_size, max_input_length)

        Return:
            out (tensor): tensor of size [len(input_ids), vocabulary_size]
        """

        # pad to the left
        num_pads = self._max_input_length - input_ids.size()[1]
        padding_tensor = self._pad_token_id.expand(input_ids.size()[0])
        padding_tensor = torch.unsqueeze(padding_tensor, 1).expand(-1, num_pads)
        input_ids = torch.cat(
            [padding_tensor, input_ids],
            dim=1
        )

        ### WRITE YOUR CODE HERE ###

        return out
        

In [None]:
# test your NeuralLM
embedding_size = 16
hidden_layer_size = 32
max_input_length = 64
neural_lm_1 = NeuralLM(
    len(shakespeare_vocab),
    embedding_size=embedding_size,
    max_input_length=max_input_length,
    padding_token_id=shakespeare_vocab["<s>"],
    hidden_layer_sizes=[hidden_layer_size]
)
print(neural_lm_1)

# output should be:
# NeuralLM(
#   (_embeddings): Embedding(29245, 16)
#   (_output_layer): Linear(in_features=32, out_features=29245, bias=True)
# )

### Question (b)

In [None]:
# create torch dataset class
from torch.utils.data import Dataset

class SelfSupervisedTextDataset(Dataset):

    def __init__(self, tokenized_text, example_length):
        """
        Dataset to process text examples constructed with self-supervision.

        Args:
            tokenized_text (string): list of tokens to construct examples
            example_length (int): length of inputs strings for model
        """

        # we divide tokenized text into subsequences of (equal) example_length
        # we ignore leftover tokens at the end
        self._examples = []

        ### WRITE YOUR CODE HERE ###

    def __len__(self):
        return len(self._examples)
    
    def __getitem__(self, idx):
        return self._examples[idx]


In [None]:
# create shakespeare dataset
training_dataset = SelfSupervisedTextDataset(shakespeare_splits_ids["train"], 
                                             max_input_length)
print("Num examples:", len(training_dataset))
print("Example length:", len(training_dataset[0]))
# output should be:
# Num examples: 16824
# Example length: 64

In [None]:
# create data loader
from torch.utils.data import DataLoader

def collate_fn(batch):
    """
    Function to construct labeled example from given batch.

    Args:
        batch (tensor): tensor of size batch_size x sentence_length with tokens
    """

    # we create two lists for our training examples: inputs and corresponding 
    # targets    
    inputs = []
    targets = []

    ### WRITE YOUR CODE HERE ###

    return torch.stack(inputs, dim=0), torch.stack(targets, dim=0)

In [None]:
# test your collate_fn
# we create a toy batch of sequence_length = 4
toy_batch = torch.tensor([[1, 2, 3, 4], [5, 6, 7, 8]])
print(collate_fn(toy_batch))
# output should be: 
# (tensor([[1, 2, 3],
#          [5, 6, 7]]), tensor([4, 8]))

In [None]:
# test your dataloader
batch_size = 128
training_dataloader = DataLoader(training_dataset, 
                                 collate_fn=collate_fn,
                                 batch_size=batch_size, 
                                 shuffle=True, 
                                 num_workers=0)
print(training_dataloader)

### Question (c)

In [None]:
# write training loop
import time, math

def train(model, dataloader, num_epochs=10, print_batch_stats=False, rnn=False):
    """
    Training loop

    Args:
        model: some LM implemented in PyTorch
        dataloader: dataloader that returns sentences as examples
        num_epochs (int): number of epochs to train 
    """

    # set training hyperparameters
    loss_fn = nn.CrossEntropyLoss()
    learning_rate = 0.1
    optimizer = torch.optim.Adagrad(model.parameters(), lr=learning_rate)

    # set model to train mode
    model.train()

    ### WRITE YOUR CODE HERE ###


In [None]:
# test your training loop (loss and ppl should go down)
train(neural_lm_1, training_dataloader, num_epochs=20)

### Question (d)

In [None]:
# evaluate 
def evaluate(model, dataloader, print_batch_stats=False, rnn=False):
    """
    Evaluate model on given dataset.

    Args:
        model: some LM implemented in PyTorch
        dataloader: dataloader that returns sentences as examples
    """

    # we use cross entropy so we can compute perplexity from this
    # we sum loss up, to then divide by number of examples
    loss_fn = nn.CrossEntropyLoss(reduction="sum")

    # set model to eval mode (turns off dropout, etc.)
    model.eval()

    num_batches = len(dataloader)
    num_examples = 0
    total_loss = 0.

    ### WRITE YOUR CODE HERE ###


In [None]:
# create validation dataset
validation_dataset = SelfSupervisedTextDataset(shakespeare_splits_ids["valid"], 
                                               max_input_length)
print(validation_dataset)

In [None]:
# create data loader for validation
batch_size = 128 
validation_dataloader = DataLoader(validation_dataset, 
                                 collate_fn=collate_fn,
                                 batch_size=batch_size, 
                                 shuffle=True, 
                                 num_workers=0)

In [None]:
# evaluate FNN
evaluate(neural_lm_1, validation_dataloader, rnn=False)

## 3. Language Models with RNNs

### Question (a)

In [None]:
# now an RNN-based language model
# TODO: add weight sharing, see this: https://arxiv.org/pdf/1608.05859.pdf

class RNNLM(nn.Module):

    def __init__(self,
                 vocabulary_size, 
                 embedding_size,
                 hidden_layer_size,
                 tie_weights=True,
                 num_hidden_layers=1,
                 hidden_layer_activation="tanh",
                 dropout_rate=0.0):
        """
        RNN-based language model.
        """

        super().__init__()
        self._num_layers = num_hidden_layers
        self._hidden_layer_size = hidden_layer_size
        self._embedding_size = embedding_size

        self._embeddings = nn.Embedding(vocabulary_size, embedding_size)
        self._rnn = nn.RNN(embedding_size, 
                           hidden_layer_size,
                           num_layers=num_hidden_layers, 
                           dropout=dropout_rate, 
                           nonlinearity=hidden_layer_activation,
                           batch_first=True)
        self.output_layer = nn.Linear(hidden_layer_size, vocabulary_size)

        if tie_weights:
            self._embeddings.weight = self.output_layer.weight

    def init_hidden(self, batch_size):
        """
        Initialize hidden states.

        Args:
            batch_size (int): batch size
        """

        hidden = torch.zeros(self._num_layers, 
                             batch_size, 
                             self._hidden_layer_size)

        return hidden

    def forward(self, input_ids, hidden):
        """
        Forward pass.

        Args:
            input_ids (tensor): 3D tensors with batched data, or 2D tensor
            hidden (tensor): tensor with initial hidden states

        Return:
            out (tensor): tensor of size [batch size, seq length, vocab_size]
        """

        ### WRITE YOUR CODE HERE ###

        return out, hidden_state


In [None]:
# test your RNN
embedding_size = 16
hidden_layer_size = 16
rnn_lm_1 = RNNLM(
    len(shakespeare_vocab),
    embedding_size=embedding_size,
    hidden_layer_size=hidden_layer_size,
)
print(rnn_lm_1)

# output should be:
# RNNLM(
#   (_embeddings): Embedding(29245, 16)
#   (_rnn): RNN(16, 16, batch_first=True)
#   (output_layer): Linear(in_features=16, out_features=29245, bias=True)
# )

### Question (b)

In [None]:
# we need a slightly different collate function for the RNN, if we want to train 
# with teacher forcing
def rnn_collate_fn(batch):
    """
    Function to construct labeled example from given batch.

    Args:
        batch (tensor): tensor of size batch_size x sentence_length with tokens
    """

    # we create two lists for our training examples: inputsand corresponding 
    # targets    
    inputs = []
    targets = []

    ### WRITE YOUR CODE HERE ###

    return torch.stack(inputs, dim=0), torch.stack(targets, dim=0)

In [None]:
# test your collate_fn
# we create a toy batch of sequence_length = 4
toy_batch = torch.tensor([[1, 2, 3, 4], [5, 6, 7, 8]])
print(rnn_collate_fn(toy_batch))

# output should be:
# (tensor([[1, 2, 3],
#          [5, 6, 7]]), 
#  tensor([[2, 3, 4],
#          [6, 7, 8]]))

In [None]:
# create a new dataloader with this new collate function
rnn_training_dataloader = DataLoader(training_dataset, 
                                 collate_fn=rnn_collate_fn,
                                 batch_size=batch_size, 
                                 shuffle=True, 
                                 num_workers=0)
print(rnn_training_dataloader)

In [None]:
# train your RNN
train(rnn_lm_1, rnn_training_dataloader, num_epochs=1, rnn=True)

### Question (c) 

In [None]:
# evaluate RNN
# we use the same dataloader as with our model model, so performance is 
# comparable, but the evaluate function must be able to handle both models
evaluate(rnn_lm_1, validation_dataloader, rnn=True)

### Question (d)

In [None]:
# construct reverse dict to generate text with RNN
reverse_shapeskeare_vocab = {}
for k, v in shakespeare_vocab.items():
    reverse_shapeskeare_vocab[v] = k

In [None]:
# generate text with RNN
def generate_text_with_rnn(num_tokens, 
                           context, 
                           model, 
                           temperature, 
                           decoding_dict):
    """
    Generate num_tokens given context and model.

    Args:
        num_tokens (int): number of tokens to be generated
        context (tensor): sequence of n token IDs in tensor of size [1, n]
        model (RNNLM): RNN model
        temperature (float): softmax temperature
        decoding_dict (dict): dict of the form {token_id: token}
    """

    predictions = []
    context = torch.unsqueeze(context, 0)
    with torch.no_grad():

        ### WRITE YOUR CODE HERE ###

    # decode predictions
    output_tokens = []
    for pred in predictions:
        output_tokens.append(decoding_dict[pred])

    return " ".join(output_tokens)

In [None]:
# test text generation

# construct context
context_ids = [10, 11]
context = torch.tensor(context_ids)
# print context
print("PROMPT:")
for id in context_ids:
    print(reverse_shapeskeare_vocab[id])
print()

# generate!
# play around with temperature, higher values make distribution more uniform
# lower values puts more mass on already probably events
num_tokens = 10
temperature = 1.0
print("GENERATED TEXT:")
generate_text_with_rnn(num_tokens, 
                       context, 
                       rnn_lm_1, 
                       temperature, 
                       reverse_shapeskeare_vocab)


### Question (e)

In [None]:
# here's a modified version of the generated_text_with_rnn function. It now
# takes as input a sample function and a value for k.
def generate_text_with_rnn(num_tokens, 
                           context, 
                           model, 
                           temperature, 
                           decoding_dict,
                           sampling_fn,
                           k):
    """
    Generate num_tokens given context and model.

    Args:
        num_tokens (int): number of tokens to be generated
        context (tensor): sequence of n token IDs in tensor of size [1, n]
        model (RNNLM): RNN model
        temperature (float): softmax temperature
        decoding_dict (dict): dict of the form {token_id: token}
        sampling_fn (callable): function to produce single sample given 
                                distribution
        k (int): number of top-k elements in distribution to sample from
    """

    predictions = []
    context = torch.unsqueeze(context, 0)
    with torch.no_grad():

        ### WRITE YOUR CODE HERE ###

    # decode predictions
    output_tokens = []
    for pred in predictions:
        output_tokens.append(decoding_dict[pred])

    return " ".join(output_tokens)

In [None]:
# top-k sampling (gives us greedy with k = 1 and random with k = |V|)
def topk_sampling(logits, k, temperature):
    """
    Top-k sampling, we get greedy sampling with k = 1 and random sampling with 
    k = |V|.

    Args:
        logits (tensor): tensor of unnormalized probabilities
        k (int): number of top-k elements in distribution to sample from
        temperature (float): softmax temperature
    """

    ### WRITE YOUR CODE HERE ###


In [None]:
# test text generation now with different sampling approaches

# construct context
context_ids = [10, 11]
context = torch.tensor(context_ids)
# print context
print("PROMPT:")
for id in context_ids:
    print(reverse_shapeskeare_vocab[id])
print()

# generate!
# in addition to temperature, play around with different values of k
num_tokens = 10
temperature = 1.0
k = 10
print("GENERATED TEXT:")
generate_text_with_rnn(num_tokens, 
                       context, 
                       rnn_lm_1, 
                       temperature, 
                       reverse_shapeskeare_vocab,
                       topk_sampling,
                       k
                       )
