# Assignment 1

Using text http://www.gutenberg.org/files/2600/2600-0.txt
1. Make text lowercase and remove all punctuation except spaces and dots.
2. Tokenize text by BPE with vocab_size = 100
3. Train 3-gram language model with laplace smoothing $\delta=1$
4. Using beam search with k=10 generate sequences of length=10 conditioned on provided inputs. Treat dots as terminal tokens.
5. Calculate perplexity of the language model for the first sentence.

In [1]:
text = open('peace.txt', 'r', encoding='utf-8-sig').read()[2:]
len(text)

3227578

In [2]:
import string
import re

In [3]:
print(''.join(set(''.join(set(text.lower()) - set(string.ascii_lowercase) - set(string.digits) - set(' $.\nëóýêíáèïéîüúçâœæôöäà')))))

’“:”-(@,*;#—%[?=‘!)]/


In [4]:
def preprocess_text(text):
    # TODO
    # make lowercase
    # replace all punctuation except dots with spaces
    # collapse multiple spaces into one '   ' -> ' '
    text = text.lower()
    text = re.sub('[@;*—/?!()“”‘’=#:,[\]%-]', ' ', text)
    text = re.sub('\s+', ' ', text)
    text = text.strip()
    return text

In [5]:
text = preprocess_text(text)
assert len(text) == 3141169

In [6]:
texts = text.split('.')
texts = [text.strip() for text in texts]

In [7]:
from collections import Counter
import nltk
from sklearn.base import TransformerMixin


class BPE(TransformerMixin):
    def __init__(self, vocab_size=100):
        super().__init__()
        self.vocab_size = vocab_size
        # index to token
        self.itos = []
        # token to index
        self.stoi = {}
        
    def fit(self, texts):
        """
        fit itos and stoi
        texts: list of strings 
        """
        
        # TODO
        # tokenize text by symbols and fill in self.itos and self.stoi
        unique_symbols = set()
        for text in texts:
            unique_symbols |= set(text)
        self.itos = list(unique_symbols)
        self.stoi = {token: index for index, token in enumerate(self.itos)}
        texts = self.transform(texts)

        while len(self.itos) < self.vocab_size:
            # TODO
            # count bigram freqencies in the text
            bigrams_counter = Counter()
            for text in texts:
                # bigrams_counter.update(zip(*(text[i:] for i in range(2))))
                # or simply
                bigrams_counter.update(nltk.bigrams(text))
            new_token_numbers = bigrams_counter.most_common(1)[0][0]
            new_token_letters = self.decode(new_token_numbers)  # most common bigram in the text
            new_id = len(self.itos)

            self.itos.append(new_token_letters)
            self.stoi[new_token_letters] = new_id

            # find occurences of the new_token_numbers in the text and replace them with new_id
            for i, text in enumerate(texts):
                # TODO
                indices_to_be_removed = []
                for j in range(1, len(text)):
                    if (text[j - 1], text[j]) == new_token_numbers:
                        text[j] = new_id
                        indices_to_be_removed.append(j - 1)
                texts[i] = [token for j, token in enumerate(text) if j not in indices_to_be_removed]

        return self

    def recursive_replace(self, text):
        if not text:
            return []
        for token in self.itos[::-1]:
            if len(token) > len(text):
                continue
            if token in text:
                index = text.index(token)
                return self.recursive_replace(text[:index]) + [self.stoi[token]] + \
                       self.recursive_replace(text[index + len(token):])
        raise ValueError("Impossible to transform the text. Maybe you sent different texts to the fit and transform methods.")

    def transform(self, texts):
        """
        convert texts to a sequences of token ids
        texts: list of strings
        """
        # TODO tokenize text by symbols using self.stoi
        texts = [self.recursive_replace(text) for text in texts]
        return texts

    def decode_token(self, tok):
        """
        tok: int or tuple
        """
        # TODO
        return self.itos[tok]

    def decode(self, text):
        """
        convert token ids into text
        """
        return ''.join(map(self.decode_token, text))
        
        
vocab_size = 100
bpe = BPE(vocab_size)
tokenized_texts = bpe.fit_transform(texts)

In [8]:
assert bpe.decode(tokenized_texts[0]) == texts[0]

In [9]:
import random
for i in range(50):
    n = random.randint(0, len(texts) - 1)
    assert bpe.decode(tokenized_texts[n]) == texts[n]

In [10]:
import numpy as np


start_token = vocab_size
end_token = vocab_size + 1


class LM:
    def __init__(self, vocab_size, delta=1):
        self.delta = delta
        self.vocab_size = vocab_size + 2
        # TODO create array for storing 3-gram counters
        self.proba = []

    def infer(self, a, b, tau=1):
        """
        return vector of probabilities of size self.vocab for 3-grams which start with (a,b) tokens
        a: first token id
        b: second token id
        tau: temperature
        """
        # TODO
        assert tau != 0
        result = [self.get_proba(a, b, c, tau) for c in range(self.vocab_size)]
        return result

    def get_proba(self, a, b, c, tau=1):
        """
        get probability of 3-gram (a,b,c)
        a: first token id
        b: second token id
        c: third token id
        tau: temperature
        """
        # TODO approximate probability by counters
        assert tau != 0
        result = pow(self.delta + self.proba[(a, b, c)], 1 / tau) / sum(
            [pow(self.delta + self.proba[(a, b, x)], 1 / tau) for x in range(self.vocab_size)])
        return result
    
    def fit(self, texts):
        """
        train language model on texts
        texts: list of lists
        """
        # TODO count 3-grams in the texts
        trigrams_counter = Counter()
        for text in texts:
            # trigrams_counter.update(zip(*(text[i:] for i in range(3))))
            # or simply
            trigrams_counter.update(nltk.trigrams([start_token] + text + [end_token]))
        self.proba = trigrams_counter
        return self

lm = LM(vocab_size, 1).fit(tokenized_texts)

In [11]:
import operator
import math

def beam_search(input_seq, lm, max_len=10, k=5, tau=1):
    """
    generate sequence from language model *lm* conditioned on input_seq
    input_seq: sequence of token ids for conditioning
    lm: language model
    max_len: max generated sequence length
    k: size of beam
    tau: temperature
    """

    if not input_seq:
        raise ValueError('input_seq is empty')
    
    # TODO store in beam tuples of current sequences and their log probabilities
    beam = [(input_seq, 0)]  # 0 as log(1) == 0

    for i in range(max_len):
        candidates = []
        candidates_proba = []
        for snt, snt_proba in beam:
            if snt[-1] == end_token:
                # TODO process terminal token
                # add to candidates as it is
                candidates.append(snt)
                candidates_proba.append(snt_proba)
            else:
                # probability vector of the next token
                if len(snt) == 1:
                    proba = lm.infer(start_token, snt[-1], tau)
                else:
                    proba = lm.infer(*snt[-2:], tau)

                # top-k most probable (token ids, token proba) pairs
                best_k_pairs = sorted(list(enumerate(proba)), key=lambda elem: elem[1], reverse=True)[:k]

                # TODO update candidates' sequences and corresponding probabilities
                for token_id, token_proba in best_k_pairs:
                    candidates.append(snt + [token_id])
                    candidates_proba.append(snt_proba + math.log(token_proba))

        # select top-k most probable sequences from candidates
        top_k_candidates = sorted(list(enumerate(candidates_proba)), key=lambda elem: elem[1], reverse=True)[:k]
        beam = [(candidates[candidate_index], candidate_proba) for candidate_index, candidate_proba in top_k_candidates]
    return beam

In [12]:
# TODO print decoded generated strings and their probabilities
def print_decoded_sequences(results):
    for seq, proba in results:
        print(f'genderated string: {bpe.decode(seq)} | log probability: {proba}')

In [13]:
# strip end token as bpe doesn't have it in self.itos
def strip_end_token(results):
    for i, result in enumerate(results):
        if result[0][-1] == end_token:
            results[i] = (result[0][:-1], result[1])
    return results

In [14]:
input1 = 'horse '
input1 = bpe.transform([input1])[0]
results = beam_search(input1, lm, max_len=10, k=10, tau=0.1)
results = strip_end_token(results)
print_decoded_sequences(results)

genderated string: horse of the cound the c | log probability: -1.3536314912661447
genderated string: horse of the said not  | log probability: -1.9001351870506915
genderated string: horse of the countere  | log probability: -2.292039183018149
genderated string: horse of the cound him  | log probability: -2.608492913751813
genderated string: horse of the counted to  | log probability: -2.7446600910824483
genderated string: horse of the cound the s | log probability: -2.8334885942426555
genderated string: horse of the counderstand  | log probability: -3.6449111435014103
genderated string: horse and the cound the c | log probability: -3.660157755865623
genderated string: horse of the prince and | log probability: -3.7028044294026317
genderated string: horse of the french sa | log probability: -3.9261584346778005


In [15]:
input1 = 'her'
input1 = bpe.transform([input1])[0]
results = beam_search(input1, lm, max_len=10, k=10, tau=0.1)
results = strip_end_token(results)
print_decoded_sequences(results)

genderated string: her | log probability: -0.05834838774541152
genderated string: hers who had been  | log probability: -4.9607753160300065
genderated string: herself said no | log probability: -4.995819366802682
genderated string: hers were said n | log probability: -5.023525935156409
genderated string: hers | log probability: -5.147449208919879
genderated string: herself and the coun | log probability: -5.652776409930521
genderated string: hers what is the cou | log probability: -5.739949162261469
genderated string: hers what is not b | log probability: -5.90341700199199
genderated string: herself they had b | log probability: -6.443309174549449
genderated string: her who had been s | log probability: -6.481797449679599


In [16]:
input1 = 'what'
input1 = bpe.transform([input1])[0]
results = beam_search(input1, lm, max_len=10, k=10, tau=1)
results = strip_end_token(results)
print_decoded_sequences(results)

genderated string: what | log probability: -3.857214768933151
genderated string: what would not know | log probability: -10.255435891167032
genderated string: what would notion  | log probability: -10.618565066731836
genderated string: what would not kne | log probability: -11.028625779400514
genderated string: what would not been | log probability: -11.439729787892619
genderated string: what would not und | log probability: -11.504062725885246
genderated string: what would not up  | log probability: -11.510622312112565
genderated string: what would not beg | log probability: -11.77715034132269
genderated string: what would notions  | log probability: -11.821996012469603
genderated string: what would not the c | log probability: -11.835417210580742


In [17]:
input1 = 'gun '
input1 = bpe.transform([input1])[0]
results = beam_search(input1, lm, max_len=10, k=10, tau=0.1)
results = strip_end_token(results)
print_decoded_sequences(results)

genderated string: gun and the cound the c | log probability: -2.0479674572284665
genderated string: gun been said no | log probability: -2.2351840799068863
genderated string: gun t been said n | log probability: -2.4100062859620257
genderated string: gun and the said not  | log probability: -2.5944711530130125
genderated string: gun t you said no | log probability: -2.82762409839334
genderated string: gun and the countere  | log probability: -2.9863751489804704
genderated string: gun and the cound him  | log probability: -3.3028288797141347
genderated string: gun and the counted to  | log probability: -3.4389960570447697
genderated string: gun and the cound the s | log probability: -3.5278245602049774
genderated string: gun said not been | log probability: -3.57709182270748


In [18]:
def perplexity(snt, lm):
    """
    snt: sequence of token ids
    lm: language model
    """
    # TODO perplexity for the sentence
    if not snt:
        raise ValueError('snt is empty')
    snt = [start_token] + snt + [end_token]
    result = pow(2, (-1 / (len(snt) - 2)) * sum(
        [
            math.log(
                lm.get_proba(
                    snt[i], token_id, snt[i + 2]
                ), 2
            ) for i, token_id in enumerate(snt[1:-1])
        ]
    ))
    return result

perplexity(tokenized_texts[0], lm)

13.041252131484478