# Introduction to Tokenization Using the Byte-Pair Encoding Algorithm  
  
The tokenization process consists of the following steps:  
1. Normalization: In this step, we normalize the text to a standard format. For example, this includes converting all characters to ASCII, removing diacritics, etc.  
2. Pre-tokenization: This step involves splitting the text into word tokens, typically by dividing the text at spaces and newlines.  
3. Subword tokenization: In this phase, we break down the word tokens into smaller subword tokens. To accomplish this, we must first develop a statistical model that informs us of the most effective method for subdivision. The Byte-Pair Encoding algorithm is one such model.  
  
<img src="../../images/tokenization_process.jpg" width="700" />  
  
In this tutorial, we will initially explore the normalization and pre-tokenization steps. Subsequently, we will delve into the Byte-Pair Encoding algorithm and apply it to devise a subword tokenization model.  


In [ ]:
import collections
from tqdm import tqdm  
import unicodedata

## Normalization
Here we define a function that normalizes a Unicode strings to ASCII. It works as follows:
1. Normalize the Unicode string to NFD (Normalization Form Decomposed) 'NFD' breaks down each character into its base character and diacritics. For example, the character `é` is decomposed into `e` and `´`.
2. Filter out non-ASCII characters (i.e., characters with diacritics). For example, the character `´` is filtered out.

In [ ]:
def normalize(text):
    # Normalize the Unicode string to NFD (Normalization Form Decomposed) 
    # 'NFD' breaks down each character into its base character and diacritics (also known as "combining marks")
    text = unicodedata.normalize('NFD', text)
    # Filter out non-ASCII characters (i.e., characters with diacritics)  
    text = ''.join(char for char in text if _is_ascii(char)) 
    return text

def _is_ascii(char):
    return unicodedata.category(char) != 'Mn'

result = normalize("Héllò hôw are ü?")
expected = "Hello how are u?"
assert result == expected, f"Expected `{expected}`, got `{result}`"

# Pre-tokenization
In this step, we split the text into word and whitespace tokens. For example, `Hello how are u?` will be split into `["Hello", " ", "how", " ", "are", " ", "u", "?"]`. We keep still keep the `" "` and `"\n"` tokens so that we can later revere the tokenization process.

Complete the `pre_tokenize` function below by implementing the missing if-statements.
When you are done, it should pass the tests below.

In [ ]:
def pre_tokenize(text: str):
    word_tokens = []
    current_word = ""
    for char in text:
        if char == " ":
            # YOUR CODE HERE START
            # YOUR CODE HERE END
        elif char == "\n":
            # YOUR CODE HERE START
            # YOUR CODE HERE END
        else:
            current_word += char

    
    if len(current_word) > 0:
        word_tokens.append(current_word)

    return word_tokens

def un_pre_tokenize(tokens):
    return "".join(tokens)

input_text = "3.2.1: Let's  get started!\nMy name is bob_smith"
assert pre_tokenize(input_text) == ["3.2.1:", " ", "Let's", " ", " ", "get", " ", "started!", "\n", "My", " ", "name"," ", "is", " ", "bob_smith"], f"Got `{pre_tokenize(input_text)}`"
assert un_pre_tokenize(pre_tokenize(input_text)) == input_text, f"Got `{un_pre_tokenize(pre_tokenize(input_text))}`"

## Byte-Pair Encoding  
Byte-Pair Encoding (BPE) was originally devised as a text compression algorithm and later adopted by OpenAI for tokenization in the pretraining of the GPT model. 


The tokenization process is as follows:  
1. Normalize the text and split it into word tokens.  
2. Split each word token into a list of characters.  
3. Sequentially apply the merge rules as they were discovered during the learning process to the subword tokens.  
4. Concatenate and flatten the array of subword lists into a unified list of subword tokens.

As you can see, the tokenization process need to learn two things from a corpus of text:
- The initial vocabulary of characters: This learned by collecting all unique characters found in the corpus.
- The merge rules: These rules define how we merge these characters into larger tokens. 
  
The learning process includes the following steps:  
1. Normalize the text and split it into word tokens.  
2. Count the frequency of each word token. This allows us to retain only one copy of each word token in memory.  
3. Construct an initial vocabulary consisting of all unique characters found in the word tokens.  
4. Decompose each word token into the subwords available in the vocabulary. For instance, if the word token is `hello` and the vocabulary contains `{"h", "e", "l", "o"}`, then the word token will be split into `["h", "e", "l", "l", "o"]`.  
5. Calculate the frequency with which each adjacent pair of subword tokens occurs, considering the frequency of the word tokens. For example, if the word token `hello` appears twice, then the pair `("l", "l")` will be counted twice.  
6. Identify the most frequently occurring pair of subwords and merge them into a single subword token. For example, if the pair `("l", "l")` is most frequent, it will be merged to form the subword token `("ll")`. Incorporate this new subword token into the vocabulary and add the corresponding pair to the merge rules.  
7. Apply this new merge rule to all subword tokens. For instance, if the subword token is `["h", "e", "l", "l", "o"]` and the merge rule is `("l", "l") -> ("ll")`, then the subword token will be transformed into `["h", "e", "ll", "o"]`. Ensure to keep track of the sequence in which the merge rules were identified.  
8. Repeat steps 4-6 until the vocabulary attains the desired size.  
  
  
Before we begin implementing the algorithm, we will first define some helper functions.  
Let's start by creating the count_frequency function. This function accepts a list of word tokens as input and returns the frequency of each unique word token found within the list. For example, if the input is `["hello", "hello", "world"]`, then the output would be `{"hello": 2, "world": 1}`.  


In [ ]:
def count_frequency(items: list[str]):
    """
    :param items: List of word tokens 
    :return: Dictionary mapping each unique word token to its frequency
    """
    # YOUR CODE HERE START
    # YOUR CODE HERE END

text = "3.2.1: Let's get started!"
token_freq = count_frequency(pre_tokenize(text))
assert token_freq == {"3.2.1:": 1, "Let's": 1, "get": 1, "started!": 1, " ": 3}, f"Got `{token_freq}`"

To determine the merge rules, we need to identify which pairs of tokens occur most frequently together in the corpus. To accomplish this, we utilize the following helper function. 


In [ ]:
def count_pair_frequencies(
    subtoken_per_word_token: dict[str, list[str]],
    word_token_frequency: dict[str, int]
):
    """
    :param subtoken_per_word_token: This the current mapping of each word token to its subtokens. E.g. if the word "going" is split into ["go", "ing"], then subtoken_per_word_token["going"] == ["go", "ing"]
    :param word_token_frequency: A dictionary mapping each unique word token to its frequency in the corpus. E.g. if the word "going" appears 314 times in the corpus, then word_token_frequency["going"] == 314
    :return: A dictionary mapping each pair of subtokens to its frequency in the corpus. E.g. if the pair ("go", "ing") appears 314 times in the corpus, then the output would be {("go", "ing"): 314}
    """
    pair_frequencies = collections.defaultdict(lambda: 0)
    
    for token, freq in word_token_frequency.items():
        subtokens = subtoken_per_word_token[token]
        for i in range(len(subtokens) - 1):
            pair = (subtokens[i], subtokens[i + 1])
            pair_frequencies[pair] += freq
   
    return pair_frequencies

subtoken_per_word_token={"going": ["go", "i", "ng"], "boing": ["bo", "i", "ng"]}
word_token_frequency={"going": 314, "boing": 42}
pair_frequencies = count_pair_frequencies(subtoken_per_word_token, word_token_frequency)
assert pair_frequencies[("go", "i")] == 314, f"Got `{pair_frequencies[('go', 'i')]}`"
assert pair_frequencies[("i", "ng")] == 356, f"Got `{pair_frequencies[('i', 'ng')]}`"
assert pair_frequencies[("bo", "i")] == 42, f"Got `{pair_frequencies[('bo', 'i')]}`"

We also need an `apply_merge_rule` that merges each `token_a` and `token_b` that appear next to each other in the `subtokens` list. For example, if `token_a` is `"l"` and `token_b` is `"l"`, then the function should return a new list where all occurrences of `["l", "l"]` are replaced with `["ll"]`.

In [ ]:
def apply_merge_rule(token_a: str, token_b: str, subtokens: list[str]):
    new_subtokens = []  
  
    # We cannot do a for because we either take 1 step if we don't merge or 2 steps if we do merge.  
    i = 0  
    while i < len(subtokens): 
        # You have to handle the following cases:
        # 1. if token_i == token_a and token_i+1 == token_b, then merge the two tokens
        # 2. if token_i != token_a or token_i+1 != token_b, then keep the token as is
        # YOUR CODE HERE START:
        # YOUR CODE HERE END
        

    return new_subtokens


token_a = "l"
token_b = "l"
subtokens = ["h", "e", "l", "l", "o"]
assert apply_merge_rule(token_a, token_b, subtokens) == ["h", "e", "ll", "o"], f"Got `{apply_merge_rule(token_a, token_b, subtokens)}`"

token_a = "ll"
token_b = "o"
subtokens = ["h", "e", "ll", "o"]
assert apply_merge_rule(token_a, token_b, subtokens) == ["h", "e", "llo"], f"Got `{apply_merge_rule(token_a, token_b, subtokens)}`"

# Learning Merge rules
Now that we have defined all the helper functions, we can start implementing the learning process.
Most of the code is already written for you. You only need to implement the missing parts which are indicated by the `# YOUR CODE HERE START` and `# YOUR CODE HERE END` comments.

In [ ]:
class BPETokenizer:
    def __init__(
        self, 
        max_vocab_size: int, 
    ) -> None:
        """
        :param max_vocab_size: The maximum size of the vocabulary. 
        """
        self.max_vocab_size = max_vocab_size
        
        self._merge_rules: list[tuple[str, str]] = []
        self.vocab: set[str] = set()

        
        
    def fit(self, text: str):
        """
        Learn the merge rules from the given text corpus.
        
        :param text: The text corpus to learn the merge rules from.
        :return: None
        """
        # 1. Normalizing the text
        text = normalize(text)
        # 2. Pre-tokenizing the text
        word_tokens = pre_tokenize(text)
        # 3. Counting the frequency of each word token
        word_token_frequency = count_frequency(word_tokens)
        
        
        # 4. Constructing an initial vocabulary consisting of all unique characters found in the word tokens
        unique_word_tokens = set(word_token_frequency.keys())
        subtoken_per_word_token: dict[str, list[str]] = {}
        for word_token in unique_word_tokens:
            for char in word_token:
                # Here we add each unique character to the initial vocabulary
                self.vocab.add(char)
            
            # Here we decompose each word token into a list of characters
            # This is the most basic subword token since no merge rules have been applied yet
            subtoken_per_word_token[word_token] = list(word_token)
            

        with tqdm(total=self.max_vocab_size) as progress_bar:
            
            while len(self.vocab) < self.max_vocab_size:
                # 5. Find the most frequent pair of subtokens in the corpus.
                # Use the count_pair_frequencies function to define the pair_frequencies variable.
                # YOUR CODE HERE START
                # YOUR CODE HERE END
                
                if len(pair_frequencies) == 0:
                    print("No more pairs to merge")
                    break
                
                # find the most frequent pair in the pair_frequencies dictionary and assign it to the most_frequent_pair variable
                # YOUR CODE HERE START
                # YOUR CODE HERE END
                token_a, token_b = most_frequent_pair
                
                # 6. Add the merge rule to the list of merge rules and aplly it to all subtokens
                self._merge_rules.append(most_frequent_pair)
                self.vocab.add(token_a + token_b)
                
                for word, subtokens in subtoken_per_word_token.items():
                    # we apply the merge rule to each subtoken list and replace it with the new list of subtokens
                    subtoken_per_word_token[word] = apply_merge_rule(token_a, token_b, subtokens)
                
                progress_bar.update(len(self.vocab) - progress_bar.n)
                

    def encode(self, text: str) -> list[str]:
        """
        Encode the given text into a list of subword tokens based on the learned merge rules.
        :param text: The text to encode
        :return: A list of subword tokens
        """
        text = normalize(text)
        word_tokens = pre_tokenize(text)
        
        result = []
        for word_token in word_tokens:
            subtokens = list(word_token)
            for merge_rule in self._merge_rules:
                # apply the merge rule to the subtokens using the apply_merge_rule function
                # YOUR CODE HERE START
                # YOUR CODE HERE END
            
            # We extend the result list with the subtokens of the current word token
            # such that we get a flat list of subtokens
            result.extend(subtokens)
            
        return result
        
    
    def decode(self, sub_tokens: list[str]) -> str:
        """
        Decode the given list of subword tokens into a string.
        :param sub_tokens: The list of subword tokens to decode
        :return: A string.
        """
        
        result = ""

        for sub_token in sub_tokens:
            result += sub_token
            
    
        
        return result
        

In [ ]:
with open("../../data/shakespeare.txt") as f:
    shakespeare = f.read()
    
max_vocab_size = 250 # 250 is fast but increase it to large value for more realistic results
bpe_tokenizer = BPETokenizer(max_vocab_size=max_vocab_size)
bpe_tokenizer.fit(shakespeare)

inputs = """First Citizen:
He's one honest enough: would all the rest were so!"""
tokens = bpe_tokenizer.encode(inputs)
assert isinstance(tokens, list), f"Got `{type(tokens)}` instead of list"
assert all(isinstance(token, str) for token in tokens), f"Got `{[type(token) for token in tokens]}` instead of list[str]"
assert bpe_tokenizer.decode(tokens) == inputs, f"Got `{bpe_tokenizer.decode(tokens)}`"

100%|██████████| 250/250 [00:08<00:00, 31.20it/s]


In [ ]:
print(f"Vocab size: {len(bpe_tokenizer.vocab)}")
for token in sorted(bpe_tokenizer.vocab, key=len):
    print(f"- `{token}`")

Vocab size: 250
- `:`
- `I`
- `i`
- `,`
- `W`
- `F`
- `T`
- `K`
- `G`
- `g`
- `h`
- `r`
- `3`
- `R`
- `z`
- `n`
- `k`
- `u`
- `
`
- `X`
- `$`
- `?`
- ` `
- `.`
- `a`
- `d`
- `f`
- `p`
- `'`
- `o`
- `v`
- `N`
- `L`
- `A`
- `P`
- `S`
- `m`
- `D`
- `l`
- `E`
- `J`
- `M`
- `j`
- `!`
- `e`
- `;`
- `B`
- `s`
- `w`
- `y`
- `x`
- `Z`
- `b`
- `U`
- `&`
- `C`
- `t`
- `c`
- `V`
- `Y`
- `O`
- `Q`
- `q`
- `H`
- `-`
- `or`
- `le`
- `te`
- `oo`
- `nd`
- `it`
- `po`
- `AR`
- `ha`
- `su`
- `do`
- `li`
- `we`
- `ow`
- `ch`
- `'t`
- `qu`
- `A:`
- `ra`
- `fa`
- `er`
- `ne`
- `is`
- `ay`
- `ld`
- `pe`
- `an`
- `to`
- `ta`
- `me`
- `'d`
- `S:`
- `am`
- `ce`
- `ar`
- `so`
- `un`
- `ve`
- `by`
- `sh`
- `gh`
- `il`
- `al`
- `ca`
- `up`
- `UC`
- `ad`
- `ru`
- `ma`
- `ET`
- `bo`
- `ou`
- `s,`
- `th`
- `se`
- `ri`
- `ur`
- `mu`
- `my`
- `ke`
- `e,`
- `lo`
- `EN`
- `ge`
- `st`
- `en`
- `ut`
- `de`
- `et`
- `id`
- `in`
- `ir`
- `mo`
- `on`
- `Th`
- `ER`
- `ck`
- `pr`
- `be`
- `AN`
- `IN`
- `ro`
- `'s`
- `ho`
- `at`