# Probabilistic Language Modeling

In this assignment, you will implement a probabilistic language model based on n-grams.

## Probability

In probability theory, the likelihood that an event (A) would occur is quantified by the formula $P(A)$ (the probability of A). If the event A is a coin flip resulting in *heads*, and we have data showing 3 coin flips have resulted in 1 heads and 2 tails, then we can calculate

$P(A) = \frac{\# heads}{\# coin flips} = \frac{1}{3}$.

In NLP, we use probability theory to reason about the likelihood of a word (or token) occurring in a sentence. So, if we have a corpus consisting of a single sentence "I like avocados", we can say the probability of the word "avocados" in that corpus is $\frac{1}{3}$ because it appears once out of three words.

### Helpful formulas

#### Conditional Probability

When reasoning about the likelihood of an event occurring, it makes sense that our reasoning might change if we have more context. For instance, if we know that some event has *already* occurred, we can look at our data and calculate the probability of another event occurring *given that we know* that first event occurred.

This is the idea behind conditional probability. $P(A \mid B)$ is the probability of event A given that event B has already occurred. In NLP, for example, if our data consists solely of the two sentences "I like avocados" and "I hate avocados", the probability of the word "like" occurring immediately after the word "I" is 1/2, or 0.5.

$P(A \mid B)=\frac{P(A \cap B)}{P(B)}$

So, how do we reason about the conditional probability $P(like|I)$ in NLP? That is, the probability that the word "like" follows the word "I"? In ngram language modeling, we'll use word counts in our corpus:

$P(like|I) = \frac{P(like \cap I)}{P(I)} = \frac{count(\text{"I like"})}{count(\text{"I"})} = \frac{1}{2}$

In other words, there are two words that can follow "I" in the corpus: "like" and "hate". Since there is only one "like" in the corpus, the probability of "like" occurring after "I" is 1/2.

#### Joint Probability

To predict the probability of an *entire sentence* we would want to calculate the probability $P(w_1w_2w_3...w_n)$ for a sentence made up of $n$ words labeled $w_i$. This requires a slight modification to our previous equation as this probability would be akin to $P(X_1 \cap X_2 \cap X_3 \cap ... X_n)$. This is called a joint probability, or the likelihood of some number of events co-occurring. In the simple case of two events, we can derive from our conditional probability formula:

$P(A \cap B)=P(A \mid B) * P(A)$

#### Chain Rule of Probability

However, in the case of a longer sequence the appearance of each word is conditioned on the appearance of all prior words in the sentence. Thus, to find the probability of the entire sequence, we can use the chain rule of probability:

$\begin{aligned}
P\left(X_1 \ldots X_n\right) & =P\left(X_1\right) P\left(X_2 \mid X_1\right) P\left(X_3 \mid X_{1: 2}\right) \ldots P\left(X_n \mid X_{1: n-1}\right) \\
& =\prod_{k=1}^n P\left(X_k \mid X_{1: k-1}\right)
\end{aligned}$

Putting this equation in the context of our sentences:

$\begin{equation*}
    \begin{split}
        P(w_{1:n}) & = P(w_1)P(w_2|w_1)P(w_3|w_{1:2})...P(w_n|w_{1:n-1}) \\
        & = \prod^n_{k=1} P(w_k|w_{1:k-1})
    \end{split}
\end{equation*}$

For a clearer idea of why we use these formulas in for ngram language modeling, review page 4, chapter 3 of Jurafsky and Martin - https://web.stanford.edu/~jurafsky/slp3/3.pdf. Now, let's start putting some of these formulas into practice.

### Unigram Language Modeling

Let's start by implementing the functions we need for a unigram language model. 

In [None]:
from collections import Counter, defaultdict
from typing import List, Union, Tuple, Dict, Optional

In [None]:
# sample corpus ###################################################################
corpus = ['<s> Man was I wrong. </s>',
          '<s> This place is terrible. </s>',
          '<s> The room was very dirty and there were dead bugs everywhere. </s>',
          '<s> What was I thinking? </s>',
          '<s> This place was cheap and I thought I was clever. </s>',
          '<s> Man was I wrong. </s>']
###################################################################################

### Preprocessing

Working with text data in the `str` format gets very inefficient as we scale up the size of the dataset. Although our sample corpus is small, let's just mitigate this problem right off the bat by tokenizing our corpus and converting all `str`s to `int`s. We'll call each `int` that corresponds to a unique token the `token_id` or just `id`. We'll keep track of our vocabulary and token IDs so that we can easily convert and re-convert sequences. This is a common way to preprocess data in NLP.

In [None]:
def convert_strs_to_ids(strings: List[str])-> Tuple[List[List[int]], defaultdict]:
    """Convert a corpus of strings to a tokenized corpus
    of integer token_ids"""
    token_to_id = defaultdict(lambda: len(token_to_id))  # Dictionary to map tokens to unique IDs
    
    tokenized_strings = []  # List to hold tokenized strs as lists of IDs
    
    for string in strings:
        token_ids = []  # List to hold IDs for each token in the current string
        tokens = string.split()  # Tokenize the string by splitting on spaces
        for token in tokens:
            token_ids.append(token_to_id[token])  # Automatically assign ID if token is new
        tokenized_strings.append(token_ids)
    
    return tokenized_strings, token_to_id

In [None]:
tokenized_corpus, token_to_id = convert_strs_to_ids(corpus) # Tokenize and convert our corpus

In [None]:
# Create a reverse mapping from ID to token
id_to_token = {id_: token for token, id_ in token_to_id.items()}
print(tokenized_corpus)
print([id_to_token[token_id] for token_id in tokenized_corpus[0]]) # Reconverting back to check that it works...

Looks like our tokenizing and converting works as intended. We see our special start token `<s>` has `token_id = 0`, the token `Man` has `token_id = 1`, etc. Using these special start-of-sentence and end-of-sentence tokens `<s>` and `</s>` are a common practice in NLP, and the end token in particular will help us generate fluent-ish sentences in this assignment.

Now, let's calculate probabilities based on our tokens.

In [None]:
def unigram_probability(word: int, corpus: List[List[int]]) -> float:
    """
    Return the probability of `word` in `corpus`.
    P(w_1) = count(w_1) / N
    """
    
    # Below is a list of token IDs for all sentences
    # in the corpus, and a dictionary of counts for each unique word.
    tokens = [token for tokenized_sentence in corpus for token in tokenized_sentence]
    counts = Counter(tokens)
    
    # Find the probability of word given these counts
    probability = counts[word] / len(tokens)
    
    return probability

In [None]:
# There are 49 tokens in our sample corpus, and "was"
# appears 6 times, so our probability should equal 6/49...
print('P("was") =', unigram_probability(token_to_id["was"], tokenized_corpus))
print('6/49 =', 6/49)
if unigram_probability(token_to_id["was"], tokenized_corpus) == 6/49:
    print("Woohoo, it works!")

### Bigrams and trigrams

Now, let's implement bigram and trigram probabilities so that we can create our bigram and trigram language models. First, write function a to get a list of bigrams, and a function to get a list of trigrams:

In [None]:
def get_bigrams(corpus: List[List[int]]) -> List[Tuple[int, int]]:
    """Return a list of bigrams in `corpus`."""
    
    # your code here
    raise NotImplementedError
    
    return bigrams

In [None]:
sample_bigrams = get_bigrams(tokenized_corpus)
all_bigrams = [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (0, 6), (6, 7), (7, 8), (8, 9), (9, 5), (0, 10), (10, 11), (11, 2), (2, 12), (12, 13), (13, 14), (14, 15), (15, 16), (16, 17), (17, 18), (18, 19), (19, 5), (0, 20), (20, 2), (2, 3), (3, 21), (21, 5), (0, 6), (6, 7), (7, 2), (2, 22), (22, 14), (14, 3), (3, 23), (23, 3), (3, 2), (2, 24), (24, 5), (0, 1), (1, 2), (2, 3), (3, 4), (4, 5)]
for bigram in all_bigrams:
    assert bigram in sample_bigrams
print('All test cases have passed!')

Now that we have all of the bigrams, we can compute the probability of any word occurring after another as follows:

$P(\text{word2}|\text{word1}) = \frac{count(\text{word1}, \text{word2})}{count(\text{word1})}$, where $count(\text{word1}, \text{word2})$ is the number of times $\text{word1}$ and $\text{word2}$ occur together and $count(\text{word1})$ is the number of times $\text{word1}$ occurs. (This uses the same logic as the conditional probability we reviewed earlier.)

Write a function to compute the conditional probability of a word given another word. The function should take a corpus and two words as input and return the conditional probability of the second word given the first word. For example, the probability of the word "quick" occurring given that the word "the" has already occurred is P(quick|the).

The function should also implement Laplace (add-one) smoothing.

In [None]:
def conditional_prob(w_2, w_1, corpus: List[List[int]]) -> float:
    """Return the conditional probability of `w_2` given `w_1` in `corpus`.
    P(w_2 | w_1) = count(w_1, w_2) / count(w_1)
    """
    
    bigrams = get_bigrams(corpus)
    tokens = [token for tokenized_sentence in corpus for token in tokenized_sentence]
    unigram_counts = Counter(tokens)
    bigram_counts = Counter(bigrams)
    
    # your code here
    raise NotImplementedError
    
    return conditional_probability

In [None]:
# There are 6 instances of the unigram "<s>" and
# 2 of those instances coincide with the bigram "<s> Man"
# So the probability of "<s> Man" should be 2/6 = 1/3...
assert conditional_prob(token_to_id["Man"], token_to_id["<s>"], tokenized_corpus) == 0.3333333333333333
print("Woohoo, it works!")

In [None]:
# But what if we want the probability of an out-of-vocabulary token?
print(conditional_prob(token_to_id["cat"], token_to_id["the"], tokenized_corpus))

#### Laplace Smoothing
This is looking good but we're getting a division by zero error! Let's use *Laplace Smoothing* to fix it. 

Laplace smoothing is a technique for dealing with words that do not occur in the corpus. It is a way of adjusting the probability of a word occurring given another word by adding 1 to the numerator and the number of unique words in the corpus to the denominator. This ensures that the probability of a word occurring is never 0. For a clearer idea of why this formula works, refer page 6, chapter 3 of Jurafsky and Martin - https://web.stanford.edu/~jurafsky/slp3/3.pdf

To implement Laplace Smoothing, we need to modify our conditional probability function as follows:

$P\left(\text{word2}|\text{word1} \right)_{Laplace} = \frac{count(\text{word1},\text{word2})+1}{count(\text{word1})+V}$

where $count(\text{word1},\text{word2})$ is the number of times $\text{word1}$ and $\text{word2}$ occur together, $count(\text{word1})$, is the number of times $\text{word1}$ occurs, and $V$ is the number of unique words in the corpus.

In [None]:
# Here's a handy helper function for calculating vocabulary size!
# You can use this for Laplace (add-one) smoothing.
def vocabulary_size(tokens: List[int]) -> int:
    """Return the vocabulary size of `tokens`. (Unique tokens = types)"""
    return len(set(tokens))

In [None]:
def conditional_prob(w_2, w_1, corpus: List[List[int]]) -> float:
    """Return the conditional probability of `w_2` given `w_1` in `corpus`.
    P(w_2 | w_1)_Laplace = count(w_1, w_2) + 1 / count(w_1) + V
    Uses Laplace (add-one) smoothing.
    """
    
    bigrams = get_bigrams(corpus)
    tokens = [token for tokenized_sentence in corpus for token in tokenized_sentence]
    unigram_counts = Counter(tokens)
    bigram_counts = Counter(bigrams)
    
    # your code here
    raise NotImplementedError
    
    return conditional_probability

In [None]:
# Now let's try that OOV word...
assert conditional_prob(token_to_id["cat"], token_to_id["the"], tokenized_corpus) == 0.04
print('Woohoo it works!')

Now, let's try and predict the next word in a sentence. This is essentially what an advanced chatbot like ChatGPT also tries to do, although ChatGPT uses a much bigger corpus and a more sophisticated algorithm. We will use the same corpus as before, but this time we will use the conditional probability formula to predict the next word in a sentence. We will also use the start and end tokens to help us compute the probabilities.

<b>Write a function to predict the next word in a sentence. The function should take a corpus and a sentence as input and return the most likely next word in the sentence. For example, if the sentence is "the quick brown", the function should return "fox".</b>

<i>Hint: You can use the conditional probability function you wrote earlier to compute the probability of each word in the vocabulary occurring after the last word in the sentence. The word with the highest probability is the most likely next word. In other words, check the probability of each possible word in the corpus with respect to the given previous word and pick the one with the maximum probability.</i>

In [None]:
def predict_next_word(w_1, corpus) -> int:
    """Return the most likely next word given `w_1` in `corpus`."""
    
    # your code here
    raise NotImplementedError
    
    return most_likely_next_word

In [None]:
assert predict_next_word(token_to_id['was'], tokenized_corpus) == 3

Let's try and predict an entire sentence now.

Write a function that takes an initial string and a word limit as input and returns a sentence no more than `limit` tokens longer than the original sequence. The initial string is the starting point for the sentence. For example, if the initial string is "the quick brown" and the word limit is 5, the function should return "the quick brown fox jumped over the lazy". The function should use the `predict_next_word` function you wrote earlier to predict the next word in the sentence. 

The function should automatically stop when it encounters a `</s>` (end of sequence) token, even if it encounters that token before the limit is reached.

<i>Hint: You can use a for loop to predict the next word in the sentence. The initial string is the starting point for the sentence. At each iteration, you can add the predicted word to the sentence and use the predicted word as the new initial string. You can stop the loop when the word limit is reached. </i>

In [None]:
stop_token_id = token_to_id['</s>']
def predict_sentence_bigrams(initial_sequence: List[int], corpus: List[List[int]], limit: int = 5) -> List[int]:
    """Return a sentence of `len(initial_sequence)`+`limit` words 
    predicted from `initial_sequence` in `corpus`."""
    # your code here
    raise NotImplementedError
    return predicted_sequence

In [None]:
id_to_token = {id_: token for token, id_ in token_to_id.items()}
initial_sequence = [token_to_id[word] for word in "This is".split()]
predicted_sequence = predict_sentence_bigrams(initial_sequence, tokenized_corpus, limit = 5)
reconstructed_sentence = ' '.join([id_to_token[id_] for id_ in predicted_sequence])
assert reconstructed_sentence == 'This is terrible.'

## Perplexity
Perplexity is a measurement in NLP used to evaluate language models. It's a measure of how well a probability model predicts a sample. A lower perplexity score indicates better performance of the model.

The perplexity of a sentence is calculated as the inverse probability of the sentence, normalized by the number of words. In other words, it's the geometric mean of the inverse conditional probability of each word given the previous word in the sentence.

$\text{Perplexity}(W)=\sqrt[N]{\frac{1}{P(w_1, w_2, \ldots, w_N)}}=\sqrt[N]{\prod_{i=1}^N \frac{1}{P(w_i \mid w_1, \ldots, w_{i-1})}}$

where $N$ is the number of words in the sentence and $P(w_i \mid w_1, \ldots, w_{i-1})$ is the conditional probability of the i'th word given the previous words in the sentence.

In our case, since we only have bigrams, the formula becomes:
$\text{Perplexity}(W)=\sqrt[N]{\frac{1}{P(w_1, w_2, \ldots, w_N)}}=\sqrt[N]{\prod_{i=1}^N \frac{1}{P(w_i \mid w_{i-1})}}$

Perplexity is a useful metric for evaluating language models because it is a measure of how well a probability model predicts a sample. A lower perplexity score indicates better performance of the model.

Let's try and calculate the perplexity. We will use the same corpus as before, but this time we will use the perplexity formula to calculate the perplexity of each sentence. We will also use the start and end tokens to help us compute the perplexity.

NOTE: We have a problem when finding the conditional probability of $w_1$ here as there are no preceding tokens. To make life simple, we can use the standard probability of that word occurring over the *entire corpus*. We can use the `unigram_probability` function to find the probability of $w_1$.

In [None]:
def perplexity_bg(test_sequence: List[int], corpus: List[List[int]]) -> float:
    """Return the perplexity of `test_sequence` given `corpus` using bigrams."""
    product = 1
    # First word
    product *= (1 / unigram_probability(test_sequence[0], corpus))
    # your code here
    raise NotImplementedError
    return perplexity

Now run the perplexity calculation on a couple of sample sentences. 

In [None]:
test_seq1 = [token_to_id[word] for word in "The quick brown fox jumped over the lazy dog".split()]
test_seq2 = [token_to_id[word] for word in "The quick brown fox jumped over the perplexing dog".split()]

In [None]:
print(perplexity_bg(
    test_seq1, 
    [test_seq1]))

In [None]:
assert perplexity_bg(
    test_seq1, 
    [test_seq1]) < perplexity_bg(
    test_seq1, 
    [test_seq2])

Now we'll improve the model by using trigrams instead of bigrams.


A trigram is a tuple of three words. For example, the sentence "The quick brown fox jumps over the lazy dog" contains 7 trigrams: (The, quick, brown), (quick, brown, fox), (brown, fox, jumps), (fox, jumps, over), (jumps, over, the), (over, the, lazy), (the, lazy, dog). You can use the same formula to compute the conditional probability of a word given two other words. You can also use the start and end tokens to help you compute the probabilities. 

You can use the same function you wrote earlier to compute the conditional probability of a word given two other words. The function should take a corpus and three words as input and return the conditional probability of the third word given the first two words. For example, the probability of the word "the" occurring given that the words "quick" and "brown" have already occurred is P(the|quick, brown).

In [None]:
def get_trigrams(corpus: List[List[int]]) -> List[Tuple[int, int, int]]:
    """Return a list of trigrams in `corpus`."""
    
    # your code here
    raise NotImplementedError

    return trigrams

In [None]:
trigrams = get_trigrams(tokenized_corpus)
assert (1, 2, 3) in trigrams

Now update the conditional probability function to handle trigrams. Then, predict the next word, and a full sentence, just like we did with the bigram model. Remember to use Laplace (add-one) smoothing to handle out-of-vocabulary words.

In [None]:
def conditional_prob_trigram(w_3, w_2, w_1, corpus: List[List[int]]) -> float:
    """Return the conditional probability of `w_3` given `w_2` and `w_1` in `corpus`.
    P(w_3 | w_2, w_1)_Laplace = count(w_1, w_2, w_3) + 1 / count(w_1, w_2) + V
    Uses Laplace (add-one) smoothing.
    """
    
    tokens = [token for tokenized_sentence in corpus for token in tokenized_sentence]
    bigrams = get_bigrams(corpus)
    trigrams = get_trigrams(corpus)
    
    bigram_counts = Counter(bigrams)
    trigram_counts = Counter(trigrams)
    
    # your code here
    raise NotImplementedError
    
    return probability

In [None]:
assert conditional_prob_trigram(
    token_to_id['wrong'], 
    token_to_id['I'], 
    token_to_id['was'], 
    tokenized_corpus) == 0.03571428571428571

In [None]:
def predict_next_word_trigram(w_2, w_1, corpus: List[List[int]]) -> int:
    """Return the most likely next word given `w_2` and `w_1` in `corpus`."""
    
    # your code here
    raise NotImplementedError

In [None]:
assert predict_next_word_trigram(
        token_to_id['very'], 
        token_to_id['was'], 
        tokenized_corpus) == 13 # 13 is 'dirty'

Now generate a full sentence and calculate the perplexity using the trigram model.

In [None]:
stop_token_id = token_to_id['</s>']
def predict_sentence_trigrams(initial_sequence: List[int], corpus: List[List[int]], limit: int = 5) -> List[int]:
    """Return a sentence of `len(initial_sequence)`+`limit` words 
    predicted from `initial_sequence` in `corpus`."""

    # your code here
    raise NotImplementedError
    return predicted_sequence

In [None]:
id_to_token = {id_: token for token, id_ in token_to_id.items()}
initial_sequence = [token_to_id[word] for word in "Man was".split()]
predicted_sequence = predict_sentence_trigrams(initial_sequence, tokenized_corpus, limit = 5)
reconstructed_sentence = ' '.join([id_to_token[id_] for id_ in predicted_sequence])
assert reconstructed_sentence == 'Man was I wrong.'

Now create a function that calculates perplexity based on the trigram model. 

NOTE: Remember in `perplexity_bg` how we used the `unigram_probability` function to calculate the probability of $w_1$ (i.e. the edge case where there are no preceding tokens)? Well, we'll also have to use the `conditional_prob` (bigram probability) function in the $w_2$ case for trigrams, when there is only *one* preceding token.

In [None]:
def perplexity_tg(test_sequence: List[int], corpus: List[List[int]]) -> float:
    """Return the perplexity of `test_sequence` given `train_corpus` using trigrams."""

    product = 1
    # First word
    product *= 1 / unigram_probability(test_sequence[0], corpus)
    # First two words
    product *= 1 / conditional_prob(test_sequence[1], test_sequence[0], corpus)
    # your code here
    raise NotImplementedError

In [None]:
print(perplexity_tg(test_seq2, [test_seq1]))

In [None]:
assert perplexity_tg(test_seq1, 
    [test_seq1]) < perplexity_tg(
    test_seq2, 
    [test_seq1])

# BLT Corpus

Now let's try this with a real corpus. We will use the Boulder Lies and Truth (BLT) corpus. Boulder Lies and Truth was developed at the University of Colorado Boulder and contains approximately 1,500 elicited English reviews of hotels and electronics for the purpose of studying deception in written language. Reviews were collected by crowd-sourcing with Amazon Mechanical Turk.

In [None]:
# Let's get the reviews and the sentiment of each review,
# and then explore some of the data.
import csv

file_path = 'blt_corpus.csv'
reviews = []
sentiment = []

with open(file_path, mode='r', encoding='utf-8') as file:
    reader = csv.DictReader(file) 
    for row in reader:
        reviews.append(row['Review'])
        sentiment.append(row['Sentiment Polarity'])

# Add start and end tags to each review
reviews = ['<s> ' + sentence + ' </s>' for sentence in reviews][:]

### Data exploration and preprocessing

In [None]:
reviews[0] # the first review is... hard to decipher...

In [None]:
reviews = reviews[1:] # let's get rid of that review.
sentiment = sentiment[1:]

In [None]:
print('Review:', reviews[0]) # okay, the start and end tokens are there
print('Sentiment:', sentiment[0]) # let's check that the sentiment of this review makes sense too

In [None]:
# Now let's tokenize and convert these reviews to IDs as before
tokenized_reviews, token_to_id = convert_strs_to_ids(reviews) # Tokenize and convert our corpus

In [None]:
# Sanity check...
print('IDs:', tokenized_reviews[0])
id_to_token = {id_: token for token, id_ in token_to_id.items()}
print('Reconverted:', ' '.join([id_to_token[id_] for id_ in tokenized_reviews[0]]))

Now let's try to make a sentence from our models trained on the BLT corpus. This may take some time to run, so let's just play around with a sample of the corpus of size 10.

In [None]:
sentence = "I"
initial_sequence = [token_to_id[word] for word in sentence.split()]
predicted_sequence = predict_sentence_bigrams(initial_sequence, tokenized_reviews[:10], limit = 3)
reconstructed_sentence = ' '.join([id_to_token[id_] for id_ in predicted_sequence])
print('Predicted sentence:', reconstructed_sentence)

In [None]:
# That wasn't very fluent. Let's try with trigrams...
# Note that this will take longer.
sentence = "I have"
initial_sequence = [token_to_id[word] for word in sentence.split()]
predicted_sequence = predict_sentence_trigrams(initial_sequence, tokenized_reviews[:10], limit = 3)
reconstructed_sentence = ' '.join([id_to_token[id_] for id_ in predicted_sequence])
print('Predicted sentence:', reconstructed_sentence)

The trigram model is a bit more fluent, even when we only train on 10 reviews!

### Faster and more flexible functions

One reason our functions are so slow is because they are continually calling `get_bigrams` and `get_trigrams`. Let's rewrite them so that they take in bigrams and trigrams as arguments, instead of an unprocessed corpus. And, to cut down on the amount of code we have to write, let's make our functions flexible, so that they accept an ngram of $n \in [1,2,3]$.

In [None]:
# Helpful classes!
class Ngram:
    def __init__(self, w_1: int, w_2: Optional[int] = None, w_3: Optional[int] = None):
        """Initialize an Ngram with up to three words (IDs)."""
        self.w_1 = w_1
        self.w_2 = w_2
        self.w_3 = w_3

    @property
    def n(self) -> int:
        """Return the "n" value of the n-gram (1 for unigram, 2 for bigram, 3 for trigram)."""
        if self.w_3 is not None:
            return 3
        elif self.w_2 is not None:
            return 2
        else:
            return 1

    def as_tuple(self) -> Tuple[int, ...]:
        """Return the n-gram as a tuple of integers."""
        if self.n == 3:
            return (self.w_1, self.w_2, self.w_3)
        elif self.n == 2:
            return (self.w_1, self.w_2)
        else:
            return (self.w_1,)
        
    def __hash__(self):
        return hash(self.as_tuple())
        
    def __eq__(self, other):
        """Check equality of two Ngram instances."""
        if isinstance(other, Ngram):
            return self.as_tuple() == other.as_tuple()
        return False

    def __repr__(self) -> str:
        """Return a string representation of the Ngram."""
        return f"Ngram(n={self.n}, words={self.as_tuple()})"
    
class NgramCorpus():
    def __init__(self, n: int, tokenized_data: List[List[int]]):
        """A handy object to store all our ngram information in one place."""
        self.n = n
        
        if self.n >= 1:
            self.unigrams = [Ngram(id_) for sequence in tokenized_data for id_ in sequence]
            self.total_num_tokens_in_corpus = len(self.unigrams)
            self.unigram_counts = Counter(self.unigrams)
            self.v = len(self.unigram_counts.keys())
        if self.n >= 2:
            self.bigrams = [Ngram(*bg) for bg in get_bigrams(tokenized_data)]
            self.bigram_counts = Counter(self.bigrams)
        if self.n >= 3:
            self.trigrams = [Ngram(*tg) for tg in get_trigrams(tokenized_data)]
            self.trigram_counts = Counter(self.trigrams)

    def count(self, ngram: Ngram) -> int:
        """Get the count of a specific N-gram."""
        
        if ngram.n == 1:
            return self.unigram_counts[ngram]
        if ngram.n == 2:
            return self.bigram_counts[ngram]
        elif ngram.n == 3:
            return self.trigram_counts[ngram]
        else:
            print('Error')
            return 0

In [None]:
# One ngram corpus to rule them all.
# This will contain all unigrams, bigrams, and trigrams, 
# and all their counts in the BLT corpus.
blt_ngram_corpus = NgramCorpus(n = 3, tokenized_data = tokenized_reviews)

In [None]:
# Now let's rewrite our probability functions into one flexible function 
# using our Ngram and NgramCorpus data types.

def ngram_probability(ngram: Ngram, ngram_corpus: NgramCorpus) -> float:
    """Return the conditional probability of `ngram_sequence` in `ngram_corpus`.
    Assume 
    Remember:
    P(w_1)_Laplace = count(w_1) + 1 / N + V
    P(w_2 | w_1)_Laplace = count(w_1, w_2) + 1 / count(w_1) + V
    P(w_3 | w_2, w_1)_Laplace = count(w_1, w_2, w_3) + 1 / count(w_1, w_2) + V
    Uses Laplace (add-one) smoothing.
    """
    # Vocabulary size (unique Ngrams)
    vocab_size = ngram_corpus.v
    if ngram.n == 1:
        ngram_counts = ngram_corpus.unigram_counts
    elif ngram.n == 2:
        ngram_counts = ngram_corpus.bigram_counts
        lower_order_ngram_counts = ngram_corpus.unigram_counts
    elif ngram.n == 3:
        ngram_counts = ngram_corpus.trigram_counts
        lower_order_ngram_counts = ngram_corpus.bigram_counts
    else:
        print('Error')
        return 0
    
    # Calculate the probability
    # your code here
    raise NotImplementedError
    
    return probability

In [None]:
# Testing our new machinery...
test_sequence_1 = 'I have never'
test_sequence_2 = 'egyd hr fhjfhjtjr'
ids1 = [token_to_id[w] for w in test_sequence_1.split()]
ids2 = [token_to_id[w] for w in test_sequence_2.split()]
ngram1 = Ngram(*ids1)
ngram2 = Ngram(*ids2)

assert ngram_probability(ngram1, blt_ngram_corpus) > ngram_probability(ngram2, blt_ngram_corpus)

In [None]:
def predict_next_word(context: Ngram, ngram_corpus: NgramCorpus) -> int:
    """Predict the most likely next word given the context Ngram using ngram_corpus."""
    if context.n not in [1, 2]:
        raise ValueError("Context must be a unigram or bigram.")

    # Get the list of possible next words
    if context.n == 1:
        candidates = [Ngram(context.w_1, next_word.w_1) for next_word in ngram_corpus.unigram_counts.keys()]
    elif context.n == 2:
        candidates = [Ngram(context.w_1, context.w_2, next_word.w_1) for next_word in ngram_corpus.unigram_counts.keys()]

    # your code here
    raise NotImplementedError
    
    return next_word

In [None]:
test_sequence = 'the product'
ids = [token_to_id[w] for w in test_sequence.split()]
ngram = Ngram(*ids)

next_word = predict_next_word(ngram, blt_ngram_corpus)
id_to_token = {id_: token for token, id_ in token_to_id.items()}
assert id_to_token[next_word] == 'is'

In [None]:
stop_token_id = token_to_id['</s>']
def predict_sentence(start_context: List[int], ngram_corpus: NgramCorpus, limit: int = 5) -> List[int]:
    """Return a sentence of `len(start_context)`+`limit` words 
    predicted from `start_context` based on `ngram_corpus`."""
    if len(start_context) >= 2:
        # Create a bigram to start the prediction
        ngram_context = Ngram(start_context[-2], start_context[-1])
    else:
        ngram_context = Ngram(start_context[-1]) # just the last word as a unigram

    predicted_sequence = start_context  # Initialize the sequence with the context
    
    # Hint: use the highest-order ngram you can to predict the next word
    # (which would be bigram)
    # your code here
    raise NotImplementedError

    return predicted_sequence

In [None]:
test_sequence = 'the product'
ids = [token_to_id[w] for w in test_sequence.split()]

# Cool!
sentence = predict_sentence(ids, blt_ngram_corpus)
print(sentence)
print(' '.join(id_to_token[id_] for id_ in sentence))

In [None]:
def perplexity(test_sequence: List[int], ngram_corpus: NgramCorpus) -> float:
    """Return the perplexity of `test_sequence` given `train_corpus` using trigrams
    whenever possible."""

    product = 1
    # First word
    product *= 1 / ngram_probability(Ngram(test_sequence[0]), ngram_corpus)
    # First two words
    product *= 1 / ngram_probability(Ngram(test_sequence[0], test_sequence[1]), ngram_corpus)
    # your code here
    raise NotImplementedError

In [None]:
test_sentence_1 = 'the product is not worth the money.'
test_sentence_2 = 'egyd hr fhjfhjtjr rhjtjt tjfhjfwettert'
ids1 = [token_to_id[w] for w in test_sequence_1.split()]
ids2 = [token_to_id[w] for w in test_sequence_2.split()]

assert perplexity(ids1, blt_ngram_corpus) < perplexity(ids2, blt_ngram_corpus)
print('Woohoo, it works!')

## Part 2: Sentiment

Let's see if we can use our ngram language modeling techniques to predict attributes of this data. Let's build two different ngram models: one for the positive reviews, and one for the negative reviews. We want to make sure we can evaluate our work, so let's reserve some reviews as a test set first.

In [None]:
from sklearn.model_selection import train_test_split

# We'll use sklearn to reserve 20% of our data as a test set:
train_reviews, test_reviews, train_sentiment, test_sentiment = train_test_split(
    tokenized_reviews, 
    sentiment, 
    test_size=0.2, 
    random_state=42)

In [None]:
# Split apart the positive and negative reviews

positive_indices = [i for i in range(len(train_sentiment)) if train_sentiment[i] == 'pos']
negative_indices = [i for i in range(len(train_sentiment)) if train_sentiment[i] == 'neg']

positive_reviews = [train_reviews[i] for i in positive_indices]
negative_reviews = [train_reviews[i] for i in negative_indices]

print('Size of training set:', len(train_reviews))
print('Size of test set:', len(test_reviews))
print('# positive reviews in training set:', len(positive_reviews))
print('# negative reviews in training set:', len(negative_reviews))

In [None]:
# Now, let's make our two langauge models. One is trained on only
# positive reviews, and  one is trained on only negative reviews.
positive_ngrams = NgramCorpus(n = 3, tokenized_data = positive_reviews)
negative_ngrams = NgramCorpus(n = 3, tokenized_data = negative_reviews)

In [None]:
# Let's play around with these a bit. Are they doing what we
# think they should be doing?
test_sequence = 'the'
ids = [token_to_id[w] for w in test_sequence.split()]

sentence = predict_sentence(ids, positive_ngrams, limit = 6)
print('---------------------------------------------')
print('A positive-sounding sentence:')
print(' '.join(id_to_token[id_] for id_ in sentence))
print('---------------------------------------------')

test_sequence = 'the'
ids = [token_to_id[w] for w in test_sequence.split()]
sentence = predict_sentence(ids, negative_ngrams, limit = 6)
print('A negative-sounding sentence:')
print(' '.join(id_to_token[id_] for id_ in sentence))
print('---------------------------------------------')

In [None]:
def classify_sentiment(ids: List[int]):
    """A function that classifies the sentiment of text using our language models."""
    positive_perplexity = perplexity(ids, positive_ngrams)
    negative_perplexity = perplexity(ids, negative_ngrams)
    
    if positive_perplexity < negative_perplexity:
        return 'pos'
    else:
        return 'neg'

In [None]:
print(classify_sentiment([token_to_id[w] for w in 'The hotel was wonderful.'.split()]))
print(classify_sentiment([token_to_id[w] for w in 'That hotel stinks!'.split()]))

In [None]:
# Wow, it looks like it is working! Let's do an official test, 
# using our held-out test set...
positive_test_indices = [i for i in range(len(test_sentiment)) if test_sentiment[i] == 'pos']
negative_test_indices = [i for i in range(len(test_sentiment)) if test_sentiment[i] == 'neg']

positive_test_reviews = [test_reviews[i] for i in positive_test_indices]
negative_test_reviews = [test_reviews[i] for i in negative_test_indices]

print('# positive reviews in test set:', len(positive_test_reviews))
print('# negative reviews in test set:', len(negative_test_reviews))

In [None]:
tp = 0
fn = 0
tn = 0
fp = 0

for pos_rev in positive_test_reviews:
    prediction = classify_sentiment(pos_rev)
    if prediction == 'pos':
        tp += 1
    else:
        fn += 1
        
for neg_rev in negative_test_reviews:
    prediction = classify_sentiment(neg_rev)
    if prediction == 'neg':
        tn += 1
    else:
        fp += 1
        
positive_acc = tp/(tp + fn)
negative_acc = tn/(tn + fp)
total_acc = (tp + tn) / (tp + tn + fn + fp)
print(f'Accuracy on positive reviews: {round(positive_acc * 100, 2)}% correct')
print(f'Accuracy on negative reviews: {round(negative_acc * 100, 2)}% correct')
print(f'Total accuracy: {round(total_acc * 100, 2)}% correct')

assert total_acc > 0.5

# Conclusion

With this technique and this data, you should be able to achieve better than random (50%) accuracy, but maybe not much more than that! In the next homework, we'll explore techniques that will allow us to classify sentiment with greater accuracy. In the meantime, take some time to mull over this assignment. Why do you think this technique is so much more effective on negative reviews than on positive reviews? Do you think there's any hope for this technique working better, and how could we make it work better, if so?