# Week 1 Lesson 1: Subword tokenization using byte pair encoding (BPE)

In [39]:
import nltk
import collections
nltk.download('punkt_tab')
from nltk.tokenize import word_tokenize

[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\kayau\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


## The idea of subword tokenization

## Byte pair encoding: Training

### Step 1: Initial steps

Let's say that these are the inputs we'll be giving the BPE algorithm, including a corpus and a desired vocabulary size:

In [40]:
corpus = ["The sink stinks.", "Sit in the egg.",
"The tinker hikes.", "The hiker kisses the stinking sink."]
V = 20

Step 1(a): We kick off the process by preprocessing the corpus, splitting it by words and removing case:

In [41]:
def preprocess_corpus(corpus):
    """Preprocess a corpus by turning everything lowercase and
    splitting it into words.

    Args:
        corpus (list(chr)): A list of texts.

    Returns:
        list(chr): A list of texts with the pound sign (#) between words.
    """
    processed_corpus = [""] * len(corpus)
    for (i, text) in enumerate(corpus):
        processed_corpus[i] = preprocess_text(text)
    return processed_corpus

def preprocess_text(text):
    """Preprocess a corpus by turning everything lowercase and
    splitting it into words.

    Args:
        corpus (chr): A text.

    Returns:
        list(chr): A list of words from the input text.
    """
    text = text.lower()
    return "#".join(word_tokenize(text))

corpus_preprocessed = preprocess_corpus(corpus)
corpus_preprocessed


['the#sink#stinks#.',
 'sit#in#the#egg#.',
 'the#tinker#hikes#.',
 'the#hiker#kisses#the#stinking#sink#.']

Step 1(b): We get the initial tokenisation by splitting the text into tokens of single characters

In [42]:
def get_init_tokenization(corpus):
    """Get an initial tokenized corpus where each
       character is a token.

    Args:
        corpus (list(str)): A list of preprocessed texts.                            

    Returns:
        list(str): A list of lists, each of which is a single-
                   character token.
    """
    corpus_tokenized = [None] * len(corpus)
    for (i, text) in enumerate(corpus):
        corpus_tokenized[i] = list(text)
    return corpus_tokenized

def print_tokenized_corpus(corpus, include_pound = True):
    """Prints a tokenized corpus with a space between each token
    and a line break between two texts.

    Args:
        corpus (list(list(str))): A list of texts in the form of
                                  lists of tokens.        
    """
    joined_texts = [""] * len(corpus)
    for (i, text) in enumerate(corpus):
        joined_text = " ".join(text)
        if not include_pound:
            joined_text.replace(" #", "")
        joined_texts[i] = joined_text
    print("\n".join(joined_texts))
    
corpus_tokenized = get_init_tokenization(corpus_preprocessed)
print_tokenized_corpus(corpus_tokenized)

t h e # s i n k # s t i n k s # .
s i t # i n # t h e # e g g # .
t h e # t i n k e r # h i k e s # .
t h e # h i k e r # k i s s e s # t h e # s t i n k i n g # s i n k # .
t h e # s i n k # s t i n k s # .
s i t # i n # t h e # e g g # .
t h e # t i n k e r # h i k e s # .
t h e # h i k e r # k i s s e s # t h e # s t i n k i n g # s i n k # .


Step 1(c): Get the vocabulary from the initial tokenisation.

In [43]:
def get_vocab(tokenized_corpus):
    """Get the vocabulary from a tokenzied corpus.

    Args:
        corpus (list(list(str))): A list of texts in the form of
                                  lists of tokens.

    Returns:
        set(str): A set of types in the vocabulary.
    """
    vocab = set()
    for text in tokenized_corpus:
        vocab = vocab.union(set(text))
    vocab.remove("#")
    return vocab

vocab = get_vocab(corpus_tokenized)
vocab

{'.', 'e', 'g', 'h', 'i', 'k', 'n', 'r', 's', 't'}

### Step 2: Main processing step

In each iteration of the main processing step, we need to find all the bigrams in the corpus, count them, get the most frequent one, and merge it in the corpus.

Let's first count all the bigrams:

In [44]:
def count_bigrams_in_corpus(tokenized_corpus):
    """Finds all bigrams in a corpus of texts.

    Args:
        tokenized_corpus (list(list(str))): A list of texts in the form of
                                  lists of tokens.

    Returns:
        A list of lists, where each inner list contains the bigrams found in one text in the corpus."""

    bigrams_all = list()
    for text in tokenized_corpus:
        bigrams_all = bigrams_all + (find_bigrams_in_text(text))
    return collections.Counter(bigrams_all)

def find_bigrams_in_text(tokenized_text):
    """Find all bigrams in a tokenized text.

    Args:
        tokenized_text (list(str)): A list of tokens representing
                                    a text.

    Returns:
        list(tuple(str)): A list of tuples representing bigrams. 
    """
    bigrams_list = list()
    for i in range(len(tokenized_text) - 1):
        if tokenized_text[i] == "#" or tokenized_text[i + 1] == "#":
            continue
        bigrams_list.append((tokenized_text[i], tokenized_text[i + 1]))
    return bigrams_list

bigrams_merged = list()
bigrams = count_bigrams_in_corpus(corpus_tokenized)
print(bigrams)
top_bigram = bigrams.most_common()[0][0]
bigrams_merged.append(top_bigram)
print(top_bigram)

Counter({('i', 'n'): 7, ('t', 'h'): 5, ('h', 'e'): 5, ('n', 'k'): 5, ('s', 'i'): 3, ('t', 'i'): 3, ('k', 'e'): 3, ('s', 't'): 2, ('e', 'r'): 2, ('h', 'i'): 2, ('i', 'k'): 2, ('e', 's'): 2, ('k', 'i'): 2, ('k', 's'): 1, ('i', 't'): 1, ('e', 'g'): 1, ('g', 'g'): 1, ('i', 's'): 1, ('s', 's'): 1, ('s', 'e'): 1, ('n', 'g'): 1})
('i', 'n')
Counter({('i', 'n'): 7, ('t', 'h'): 5, ('h', 'e'): 5, ('n', 'k'): 5, ('s', 'i'): 3, ('t', 'i'): 3, ('k', 'e'): 3, ('s', 't'): 2, ('e', 'r'): 2, ('h', 'i'): 2, ('i', 'k'): 2, ('e', 's'): 2, ('k', 'i'): 2, ('k', 's'): 1, ('i', 't'): 1, ('e', 'g'): 1, ('g', 'g'): 1, ('i', 's'): 1, ('s', 's'): 1, ('s', 'e'): 1, ('n', 'g'): 1})
('i', 'n')


The next step is to merge the bigrams. Let's first write a function to merge two items within a list:

In [45]:
def merge_strlist_items(strlist, i):
    """Merge two tokens inside a list.

    Args:
        strlist (_type_): A list of tokens.
        i (int): The index of the first item to be merged.

    Returns:
        list(str): The new list of tokens.
    """

    j = i + 1
    strlist[i] = strlist[i] + strlist[j]
    strlist.pop(j)
    return strlist

merge_strlist_items(["s", "t", "i", "n", "k"], 2)

['s', 't', 'in', 'k']

Now we can merge the desired bigram in the text:

In [46]:
def find_bigram_in_text(tokenized_text, bigram):
    """Find the location of a bigram in a text.
        If none are found, return None."""
    for i in range(len(tokenized_text) - 1):
        if tokenized_text[i] == bigram[0] and\
            tokenized_text[i + 1] == bigram[1]:
            return i
    return None

def merge_bigram_in_text(tokenized_text, bigram):
    """Merge a specified bigram in a list of tokens.

    Args:
        tokenized_text (list(str)): The list of tokens to be retokenized.
        bigram (tuple): The bigram to be merged.
    """
    done = False
    while not done:
        bigram_loc = find_bigram_in_text(tokenized_text, bigram)
        if bigram_loc is None:
            done = True
            break
        merge_strlist_items(tokenized_text, bigram_loc)

def merge_bigram_in_corpus(tokenized_corpus, bigram):
    """Merge a specified bigram in a corpus.

    Args:
        tokenized_corpus (list(list(str))): The list of tokens to be retokenized.
        bigram (tuple): The bigram to be merged.
    """
    for text in tokenized_corpus:
        merge_bigram_in_text(text, bigram)

merge_bigram_in_corpus(corpus_tokenized, top_bigram)
print_tokenized_corpus(corpus_tokenized)

t h e # s in k # s t in k s # .
s i t # in # t h e # e g g # .
t h e # t in k e r # h i k e s # .
t h e # h i k e r # k i s s e s # t h e # s t in k in g # s in k # .
t h e # s in k # s t in k s # .
s i t # in # t h e # e g g # .
t h e # t in k e r # h i k e s # .
t h e # h i k e r # k i s s e s # t h e # s t in k in g # s in k # .


The final substep is to update the vocabulary to contain the new subword:

In [47]:
vocab.add(top_bigram[0] + top_bigram[1])
print(vocab)

{'h', '.', 'g', 'r', 's', 'in', 't', 'e', 'i', 'n', 'k'}
{'h', '.', 'g', 'r', 's', 'in', 't', 'e', 'i', 'n', 'k'}


Let's put all those together into a function, and tuck it into a `while` loop so that it keeps running until the vocabulary size has reached *V*:

In [48]:
def bpe_train(corpus_tokenized, vocab, V, bigrams_merged=None):    
    """Train a byte-pair encoding tokenizer.

    Args:
        corpus_tokenized  (list(list(str))): The list of tokens to be retokenized.
        vocab (set): The vocabulary so far.
        V (int): The desired vocabulary size.
    """

    i = 1
    if bigrams_merged is None:
        bigrams_merged = list()
    while len(vocab) < V:
        print(f"====Iteration {i}====")
        bigrams = count_bigrams_in_corpus(corpus_tokenized)
        top_bigram = bigrams.most_common()[0][0]
        bigrams_merged.append(top_bigram)
        print(f"Top bigram: {bigrams.most_common()[0]}")
        merge_bigram_in_corpus(corpus_tokenized, top_bigram)
        print(f"Corpus after merge:")
        print_tokenized_corpus(corpus_tokenized)
        vocab.add(top_bigram[0] + top_bigram[1])
        print(f"Vocab after merge (size {len(vocab)}): {vocab}")

        i += 1

    return {"corpus_tokenized": corpus_tokenized, "vocab": vocab, "bigrams_merged": bigrams_merged}

bpe_train(corpus_tokenized, vocab, V, bigrams_merged=bigrams_merged)


====Iteration 1====
Top bigram: (('t', 'h'), 5)
Corpus after merge:
th e # s in k # s t in k s # .
s i t # in # th e # e g g # .
th e # t in k e r # h i k e s # .
th e # h i k e r # k i s s e s # th e # s t in k in g # s in k # .
Vocab after merge (size 12): {'h', '.', 'th', 'g', 'r', 's', 'in', 't', 'e', 'i', 'n', 'k'}
====Iteration 2====
Top bigram: (('th', 'e'), 5)
Corpus after merge:
the # s in k # s t in k s # .
s i t # in # the # e g g # .
the # t in k e r # h i k e s # .
the # h i k e r # k i s s e s # the # s t in k in g # s in k # .
Vocab after merge (size 13): {'h', '.', 'th', 'g', 'the', 'r', 's', 'in', 't', 'e', 'i', 'n', 'k'}
====Iteration 3====
Top bigram: (('in', 'k'), 5)
Corpus after merge:
the # s ink # s t ink s # .
s i t # in # the # e g g # .
the # t ink e r # h i k e s # .
the # h i k e r # k i s s e s # the # s t ink in g # s ink # .
Vocab after merge (size 14): {'ink', 'h', '.', 'th', 'g', 'the', 'r', 's', 'in', 't', 'e', 'i', 'n', 'k'}
====Iteration 4====
Top bi

In [49]:
print("Tokenized corpus:")
print_tokenized_corpus(corpus_tokenized)
print("Final vocabulary:")
print(vocab)
print("Bigrams merged at each step:")
print(bigrams_merged)

Tokenized corpus:
the # sink # stink s # .
s i t # in # the # e g g # .
the # tink er # hik e s # .
the # hik er # k i s s e s # the # stink in g # sink # .
Final vocabulary:
{'sink', 'ink', 'stink', 'h', '.', 'th', 'g', 'the', 'r', 'hi', 's', 'in', 't', 'er', 'e', 'i', 'n', 'tink', 'k', 'hik'}
Bigrams merged at each step:
[('i', 'n'), ('t', 'h'), ('th', 'e'), ('in', 'k'), ('t', 'ink'), ('s', 'ink'), ('s', 'tink'), ('e', 'r'), ('h', 'i'), ('hi', 'k')]
Tokenized corpus:
the # sink # stink s # .
s i t # in # the # e g g # .
the # tink er # hik e s # .
the # hik er # k i s s e s # the # stink in g # sink # .
Final vocabulary:
{'sink', 'ink', 'stink', 'h', '.', 'th', 'g', 'the', 'r', 'hi', 's', 'in', 't', 'er', 'e', 'i', 'n', 'tink', 'k', 'hik'}
Bigrams merged at each step:
[('i', 'n'), ('t', 'h'), ('th', 'e'), ('in', 'k'), ('t', 'ink'), ('s', 'ink'), ('s', 'tink'), ('e', 'r'), ('h', 'i'), ('hi', 'k')]


## Byte pair encoding: Tokenising a new text

In [53]:
def bpe_tokenize_new_text(corpus_new, merge_steps):
    """Tokenize a new text using an existing BPE tokenizer.

    Args:
        corpus_new (list(str)):  A list of raw texts.
        merge_steps (list(tuple(str))): A list of bigrams, representing
                                        the bigrams to be merged, in the
                                        order they are applied.

    Returns:
        list(list(str)): The tokenised texts.
    """
    corpus_new_preprocessed = preprocess_corpus(corpus_new)
    corpus_new_tokenized = get_init_tokenization(corpus_new_preprocessed)
    print(f"Tokenised new text:")
    print_tokenized_corpus(corpus_new_tokenized)
    for bigram in merge_steps:
        print(f"Bigram merged: {bigram}")
        merge_bigram_in_corpus(corpus_new_tokenized, bigram)
        print(f"Corpus after merge:")
        print_tokenized_corpus(corpus_new_tokenized)
    return corpus_new_tokenized


corpus_new = ["The sinks are stinky.", "He kisses the egg."]
bpe_tokenize_new_text(corpus_new, bigrams_merged)

Tokenised new text:
t h e # s i n k s # a r e # s t i n k y # .
h e # k i s s e s # t h e # e g g # .
Bigram merged: ('i', 'n')
Corpus after merge:
t h e # s in k s # a r e # s t in k y # .
h e # k i s s e s # t h e # e g g # .
Bigram merged: ('t', 'h')
Corpus after merge:
th e # s in k s # a r e # s t in k y # .
h e # k i s s e s # th e # e g g # .
Bigram merged: ('th', 'e')
Corpus after merge:
the # s in k s # a r e # s t in k y # .
h e # k i s s e s # the # e g g # .
Bigram merged: ('in', 'k')
Corpus after merge:
the # s ink s # a r e # s t ink y # .
h e # k i s s e s # the # e g g # .
Bigram merged: ('t', 'ink')
Corpus after merge:
the # s ink s # a r e # s tink y # .
h e # k i s s e s # the # e g g # .
Bigram merged: ('s', 'ink')
Corpus after merge:
the # sink s # a r e # s tink y # .
h e # k i s s e s # the # e g g # .
Bigram merged: ('s', 'tink')
Corpus after merge:
the # sink s # a r e # stink y # .
h e # k i s s e s # the # e g g # .
Bigram merged: ('e', 'r')
Corpus after merg

[['the', '#', 'sink', 's', '#', 'a', 'r', 'e', '#', 'stink', 'y', '#', '.'],
 ['h',
  'e',
  '#',
  'k',
  'i',
  's',
  's',
  'e',
  's',
  '#',
  'the',
  '#',
  'e',
  'g',
  'g',
  '#',
  '.']]