<a href="https://colab.research.google.com/github/zakariajaadi/data-science-portofolio/blob/main/Byte_Pair_Encoding_Algorithm_from_scratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Byte Pair Encoding tokenizer from scratch 🔢

**Definition** :

* Byte Pair Encoding is a subword tokenization method that is used by a lot of Transformer models such as GPT, GPT-2, RoBERTa, BART, and DeBERTa.
* It uses a learned vocabulary to break down a text into subword tokens, which can include characters, parts of words, or whole words.

**Why subwords ?**

Using subwords avoids the need for a massive vocabulary that includes every possible word. for example a word "quickest" might not be present in the vocabulary, but can be represented with two subwords tokens `["quick", "est"]`.

**Algorithm**:

1. **Initialization**: Begin with a vocabulary consisting of the unique characters found in your training corpus, along with the special end-of-word token.
2. **Most frequent Pair Identification**: Identify the most frequently occurring adjacent pair of symbols within the words of the corpus.
3. **In-Word Merging**: Merge the identified pair into a new single symbol. Crucially, this merging operation is performed directly within the words of your corpus, replacing all occurrences of the pair with the new merged symbol. Add this new merged symbol to your vocabulary.
4. **Iteration**: Repeat steps 2 and 3 for a predetermined number of iterations or until no more pairs can be merged.

**The key insight** ✨

The key insight is that this greedy approach of repeatedly merging the most frequent pairs will naturally discover common subword units like prefixes, suffixes, and stems that occur frequently in the language. This is why BPE is considered a subword tokenization method.

The beauty of BPE is that it's completely data-driven. It doesn't need any linguistic knowledge about the language; it simply discovers patterns based on frequency statistics.

# 1- BPE implementation from scratch
BPE requires a special token to mark the end of words. In this implementation, I am using `</w>`.

In [None]:
import re
from collections import defaultdict
import json

class BPE:
    def __init__(self, vocab_size):
        """Initializes the BPE model with a target vocabulary size."""
        self.vocab_size = vocab_size
        self.merges = {}  # Maps pairs to their rank for efficient lookup
        self.vocab = {}   # Final vocabulary after training
        self.token_to_id = {}  # Maps tokens to unique IDs
        self.id_to_token = {}  # Maps IDs back to tokens

    def _preprocess(self, text):
        """Preprocesses the text by lowercasing and splitting into words.

        Args:
            text (str): Input text

        Returns:
            list: List of words
        """
        # Split on whitespace and filter out empty strings
        return [word for word in re.findall(r'\S+', text.lower())]

    def _get_initial_vocab(self, text):
        """Generates initial vocabulary from text, with each word split into characters.

        Args:
            text (str): Input text

        Returns:
            dict: Dictionary mapping character-split words to frequencies
        """
        vocab = defaultdict(int)
        words = self._preprocess(text)

        for word in words:
            # Split word into characters and add end-of-word token
            char_word = ' '.join(list(word)) + ' </w>'
            vocab[char_word] += 1

        return dict(vocab)

    def _get_pair_stats(self, vocab):
        """Counts frequencies of adjacent pairs in the vocabulary.

        Args:
            vocab (dict): Current vocabulary

        Returns:
            dict: Dictionary mapping character pairs to their frequencies
        """
        pairs = defaultdict(int)

        for word, freq in vocab.items():
            symbols = word.split()
            # Count pairs in each word
            for i in range(len(symbols) - 1):
                pairs[(symbols[i], symbols[i + 1])] += freq

        return pairs

    def _merge_pair(self, pair, vocab):
        """Merges the specified pair in the vocabulary.

        Args:
            pair (tuple): Pair of tokens to merge
            vocab (dict): Current vocabulary

        Returns:
            dict: Updated vocabulary with merged pairs
        """
        pair_str = ' '.join(pair)
        merged = ''.join(pair)
        new_vocab = {}

        pattern = re.compile(r'(?<!\S)' + re.escape(pair_str) + r'(?!\S)')

        for word, freq in vocab.items():
            # Use regex for more accurate replacements of the specific pair
            new_word = pattern.sub(merged, word)
            new_vocab[new_word] = freq

        return new_vocab

    def train(self, text):
        """Trains the BPE model on the provided text.

        Args:
            text (str): Training text

        Returns:
            self: Trained BPE model
        """
        vocab = self._get_initial_vocab(text)

        print("#----------Training Started-----------#")

        # Perform merges up to vocab_size or until no more pairs can be merged
        for i in range(self.vocab_size):
            pair_stats = self._get_pair_stats(vocab)
            if not pair_stats:
                break

            # Find the most frequent pair
            best_pair = max(pair_stats, key=pair_stats.get)
            best_freq = pair_stats[best_pair]

            # Print progress info
            print(f"Iteration {i}: Merging {best_pair} with frequency {best_freq}")

            # Store merge with its rank (lower rank = higher priority)
            self.merges[best_pair] = i

            # Update vocabulary with the merged pair
            vocab = self._merge_pair(best_pair, vocab)

        self.vocab = vocab

        # Build token mappings after training
        self._build_token_mappings()

        print(f"#----------Training Finished with {len(self.merges)} merges-----------#")

        return self

    def _build_token_mappings(self):
        """Builds token-to-ID and ID-to-token mappings from the trained vocabulary."""
        # Collect all unique tokens from the vocabulary
        tokens = set()
        for word in self.vocab:
            tokens.update(word.split())

        # Add special tokens if needed
        tokens = sorted(tokens)  # Sort for deterministic IDs

        # Map tokens to IDs and vice versa
        self.token_to_id = {token: idx for idx, token in enumerate(tokens)}
        self.id_to_token = {idx: token for idx, token in enumerate(tokens)}

    def encode(self, text):
        """Encodes text using the trained BPE model.

        Args:
            text (str): Text to encode

        Returns:
            list: List of token IDs
        """
        if not self.merges:
            raise ValueError("Model must be trained before encoding")

        words = self._preprocess(text)
        encoded_ids = []

        for word in words:
            # Initialize word as character sequence with end token
            word_tokens = list(word) + ['</w>']

            # Apply merges in order of priority
            while len(word_tokens) > 1:
                pairs = [(word_tokens[i], word_tokens[i+1])
                         for i in range(len(word_tokens)-1)]

                # Find the highest-priority merge (lowest rank)
                best_pair = None
                best_rank = float('inf')
                best_idx = -1

                for i, pair in enumerate(pairs):
                    if pair in self.merges and self.merges[pair] < best_rank:
                        best_pair = pair
                        best_rank = self.merges[pair]
                        best_idx = i

                # If no merge is applicable, we're done
                if best_pair is None:
                    break

                # Apply the merge
                word_tokens = (word_tokens[:best_idx] +
                               [''.join(best_pair)] +
                               word_tokens[best_idx+2:])

            # Convert tokens to IDs, handling unknown tokens
            for token in word_tokens:
                if token in self.token_to_id:
                    encoded_ids.append(self.token_to_id[token])
                else:
                    # Handle unknown tokens
                    print(f"Warning: Unknown token '{token}'")

        return encoded_ids

    def decode(self, encoded_ids):
        """Decodes the encoded IDs back into text.

        Args:
            encoded_ids (list): List of token IDs

        Returns:
            str: Decoded text
        """
        if not self.id_to_token:
            raise ValueError("Model must be trained before decoding")

        decoded_text = []
        current_word = []

        for token_id in encoded_ids:
            if token_id not in self.id_to_token:
                print(f"Warning: Unknown token ID {token_id}")
                continue

            token = self.id_to_token[token_id]

            # If this is the end-of-word token
            if token == '</w>':
                # Join and add the current word
                decoded_text.append(''.join(current_word))
                current_word = []
            # If this token contains the end-of-word marker
            elif '</w>' in token:
                # Split the token and add the word part
                word_part = token.replace('</w>', '')
                current_word.append(word_part)
                # Join and add the current word
                decoded_text.append(''.join(current_word))
                current_word = []
            else:
                # Add token to current word
                current_word.append(token)

        # Handle any remaining tokens in the buffer
        if current_word:
            decoded_text.append(''.join(current_word))

        return ' '.join(decoded_text)



# 2 Training BPE on a small corpus

I'm using a small corpus for demonstration.

`vocab_size` is hyper-param representing the maximum number of unique subwords that BPE will learn and include in its vocabulary

In [None]:
corpus="""The quick brown fox jumps over the lazy dog.
A quicker brown fox leaps over the lazy dog.
The lazy dog barks at the quick brown fox.
The fox, quick and brown, jumps over the lazy dog.
The dog, lazy and sleepy, ignores the quick brown fox.
A quick brown fox and a lazy dog live in harmony.
The quick brown fox is faster than the lazy dog.
The lazy dog is slower than the quick brown fox.
The fox and the dog are friends, despite their differences.
Quick brown foxes are rare, but lazy dogs are common.
The quick brown fox is a symbol of agility.
The lazy dog is a symbol of relaxation.
Quick and brown, the fox is always on the move.
Lazy and sleepy, the dog is always at rest.
The quick brown fox and the lazy dog are opposites.
Yet, they coexist in the same environment.
The fox jumps, the dog barks, and life goes on."""

# Create and train BPE model
bpe_model = BPE(vocab_size=50)
bpe_model.train(corpus)

#----------Training Started-----------#
Iteration 0: Merging ('e', '</w>') with frequency 33
Iteration 1: Merging ('t', 'h') with frequency 29
Iteration 2: Merging ('th', 'e</w>') with frequency 25
Iteration 3: Merging ('.', '</w>') with frequency 17
Iteration 4: Merging ('s', '</w>') with frequency 16
Iteration 5: Merging ('n', '</w>') with frequency 15
Iteration 6: Merging ('f', 'o') with frequency 14
Iteration 7: Merging ('fo', 'x') with frequency 14
Iteration 8: Merging ('d', 'o') with frequency 14
Iteration 9: Merging ('do', 'g') with frequency 14
Iteration 10: Merging ('r', 'o') with frequency 13
Iteration 11: Merging ('l', 'a') with frequency 13
Iteration 12: Merging ('y', '</w>') with frequency 13
Iteration 13: Merging ('q', 'u') with frequency 12
Iteration 14: Merging ('qu', 'i') with frequency 12
Iteration 15: Merging ('qui', 'c') with frequency 12
Iteration 16: Merging ('quic', 'k') with frequency 12
Iteration 17: Merging ('b', 'ro') with frequency 12
Iteration 18: Merging (

<__main__.BPE at 0x7b110454d4d0>

print learned vocab

In [None]:
# @title
print(sorted(list(bpe_model.token_to_id.keys()),key=lambda x: len(x), reverse=True))

['brown</w>', 'quick</w>', 'dog.</w>', 'fox.</w>', 'lazy</w>', 'over</w>', 'and</w>', 'are</w>', 'dog</w>', 'fox</w>', 'the</w>', 'er</w>', 'es</w>', 'is</w>', 's,</w>', ',</w>', '.</w>', 'a</w>', 'e</w>', 'n</w>', 'quick', 's</w>', 't</w>', 'y</w>', '</w>', 'brow', 'jump', 'dog', 'fox', 'ar', 'en', 'er', 'es', 'la', 'le', 'li', 'nd', 'on', 'ov', 'ro', 'th', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'k', 'l', 'm', 'n', 'o', 'p', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y']


# 3- Test the trained BPE

In [None]:
# Test encoding and decoding
test_text = "the quickest brownest fox"
encoded = bpe_model.encode(test_text)
decoded = bpe_model.decode(encoded)

print("\nInput text:", test_text)
print(f"\nTokenized text: {[bpe_model.id_to_token[token_id] for token_id in encoded]}")
print("\nToken IDs:", encoded)
print("\nDecoded:", decoded)


Input text: the quickest brownest fox

Tokenized text: ['the</w>', 'quick', 'es', 't</w>', 'brow', 'n', 'es', 't</w>', 'fox</w>']

Token IDs: [57, 47, 21, 55, 9, 39, 21, 55, 26]

Decoded: the quickest brownest fox


# Conclusion

In [None]:
print("Input text legth: ",len(list(test_text)))
print("Tokenized text length: ",len(list(encoded)))

Input text legth:  25
Tokenized text length:  9


The 25-character sentence `"the quickest brownest fox"` was encoded into just 9 token IDs using BPE. If we didn't use BPE, we would have needed a token per character, resulting in 25 tokens, which demonstrates a reduction of about two-thirds in input length