# ✅ Byte Pair Encoding (BPE) Tokenizer 

The **BPE tokenizer** (Byte Pair Encoding) is a subword tokenization algorithm that merges frequent character pairs to form subword units. It sits between word-level and character-level tokenization, enabling better generalization and efficient vocabulary size control — especially useful in **language models** like GPT, RoBERTa, and CLIP.

---

## 🔍 Why Use BPE?

* **Solves out-of-vocabulary (OOV)** problem by breaking rare words into known subword units.
* **Reduces vocabulary size** compared to word-level tokenizers.
* **Captures frequent subwords**, helpful for morphologically rich languages.
* **Balances between character and word-level modeling**.

---

## 🧠 Intuition Behind BPE

Imagine your corpus contains the following text:

```
"low lower lowest"
```

1. Start with a vocabulary of characters: `l`, `o`, `w`, `e`, `r`, `s`, `t`.
2. Represent each word as a sequence of characters ending with a special end-of-word symbol `</w>`:

   ```
   l o w </w>
   l o w e r </w>
   l o w e s t </w>
   ```
3. Count frequency of adjacent symbol pairs.
4. Merge the **most frequent pair** into a new symbol (e.g., `l o` → `lo`).
5. Repeat the merge process a fixed number of times (e.g., 10000 steps).
6. The final vocabulary contains original characters + frequent subword units like `low`, `er`, `est`, etc.

---

## ⚙️ Step-by-Step Algorithm

### 🛠️ 1. Prepare Initial Vocabulary

* Split all words in your corpus into individual characters.
* Add a special end-of-word marker like `</w>`.

Example:

```
"lower" → ["l", "o", "w", "e", "r", "</w>"]
```

### 🔁 2. Iteratively Merge Frequent Pairs

* Count frequencies of adjacent pairs (e.g., `('l','o')`, `('o','w')`).
* Merge the most frequent pair into a new token.
* Update all words accordingly.
* Repeat for `N` merge operations.

### 🧾 3. Final Vocabulary

* Includes all characters + frequent merged tokens.
* Vocabulary is compact but expressive.

---

## 🧪 Tokenization Process (After Training)

Given a new word like `"lowered"`:

1. Try to match the longest possible subwords from your learned vocabulary.
2. Greedily tokenize:

   ```
   "lowered" → ["low", "er", "ed"]
   ```

If `"lowered"` is not in vocab:

```
→ "low", "er", "##e", "##d"  (greedy fallback to smaller subunits)
```

---

## 📄 Output Files (Used in Transformers and HuggingFace)

When exporting a BPE tokenizer for reuse, we usually generate:

| File                      | Description                                        |
| ------------------------- | -------------------------------------------------- |
| `tokenizer.json`          | Contains vocab, merges, and rules                  |
| `tokenizer_config.json`   | Contains casing, tokenizer type, settings          |
| `special_tokens_map.json` | Maps special tokens like `[PAD]`, `[CLS]`, `[SEP]` |

---

## 📘 Pros & Cons of BPE

### ✅ Pros:

* Handles unknown words gracefully (subword fallback).
* Efficient vocab usage.
* Simple to implement and fast.

### ❌ Cons:

* Greedy merging may miss semantically meaningful splits.
* Merge decisions are purely frequency-based.
* Doesn't handle multilingual edge cases as well as SentencePiece/Unigram.

---

## 🧠 BPE vs WordPiece vs Unigram

| Feature        | BPE                | WordPiece                      | Unigram                     |
| -------------- | ------------------ | ------------------------------ | --------------------------- |
| Merge Strategy | Greedy pair merges | Greedy + frequency-based score | Probabilistic over subwords |
| Token Growth   | Linear             | Controlled                     | Probabilistic               |
| Model Examples | GPT-2, RoBERTa     | BERT, ALBERT                   | XLNet, T5, MT5              |

---

## 📦 In Practice

Popular models using BPE:

* **GPT-2/GPT-3/GPT-4** (OpenAI): Uses BPE via byte encoding.
* **RoBERTa**: Uses byte-level BPE tokenizer.
* **CLIP** (OpenAI): Also uses byte-level BPE.


In [None]:
import os
import json
from collections import defaultdict, Counter
import torch
import regex as re

class BPETokenizer:
    def __init__(self, vocab_size=100, special_tokens=None):
        self.vocab_size = vocab_size
        self.vocab = {}
        self.ids_to_tokens = {}
        self.merge_rules = {}  # Maps pairs to their merged token
        self.word_frequencies = Counter()
        self.special_tokens = special_tokens or {
            "[PAD]": 0,
            "[CLS]": 1,
            "[SEP]": 2,
            "[UNK]": 3,
            "[MASK]": 4
        }
        self.vocab.update(self.special_tokens)
        self.ids_to_tokens = {v: k for k, v in self.vocab.items()}
        self.pattern = re.compile(r"""'s|'t|'re|'ve|'m|'ll|'d| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+""")
        
    def train(self, corpus, min_frequency=2):
        # Pre-tokenize the corpus into words
        words = []
        for sentence in corpus:
            words.extend(self._pre_tokenize(sentence))
        
        # Count word frequencies
        self.word_frequencies = Counter(words)
        
        # Initialize vocabulary with characters
        vocab = set()
        for word in self.word_frequencies:
            for char in word:
                vocab.add(char)
        
        # Convert to list and sort
        vocab = sorted(vocab)
        
        # Initialize vocabulary with bytes
        self.vocab = {token: i for i, token in enumerate(vocab)}
        self.vocab.update(self.special_tokens)
        self.ids_to_tokens = {v: k for k, v in self.vocab.items()}
        
        # Get initial pairs
        pairs = defaultdict(int)
        for word, freq in self.word_frequencies.items():
            symbols = list(word)
            for i in range(len(symbols)-1):
                pairs[(symbols[i], symbols[i+1])] += freq
        
        # BPE algorithm
        while len(self.vocab) < self.vocab_size:
            if not pairs:
                break
            
            # Get most frequent pair
            best_pair = max(pairs, key=pairs.get)
            best_freq = pairs[best_pair]
            
            if best_freq < min_frequency:
                break
                
            # Merge the pair
            new_token = "".join(best_pair)
            self.vocab[new_token] = len(self.vocab)
            self.ids_to_tokens[len(self.vocab)-1] = new_token
            self.merge_rules[best_pair] = new_token
            
            # Update pairs
            new_pairs = defaultdict(int)
            for word, freq in self.word_frequencies.items():
                new_word = self._merge_pair_in_word(word, best_pair, new_token)
                if new_word != word:
                    symbols = new_word.split()
                    for i in range(len(symbols)-1):
                        new_pairs[(symbols[i], symbols[i+1])] += freq
            
            # Remove the merged pair and add new pairs
            del pairs[best_pair]
            for pair in new_pairs:
                pairs[pair] += new_pairs[pair]
    
    def _pre_tokenize(self, text):
        # Simple whitespace tokenizer with regex pattern
        tokens = []
        for token in re.findall(self.pattern, text):
            tokens.append(token)
        return tokens
    
    def _merge_pair_in_word(self, word, pair, new_token):
        symbols = word.split()
        i = 0
        while i < len(symbols)-1:
            if symbols[i] == pair[0] and symbols[i+1] == pair[1]:
                symbols[i:i+2] = [new_token]
            else:
                i += 1
        return " ".join(symbols)
    
    def tokenize(self, text):
        tokens = self._pre_tokenize(text)
        bpe_tokens = []
        
        for token in tokens:
            # Convert token to string of characters separated by spaces
            chars = list(token)
            current_word = " ".join(chars)
            
            # Apply all merge rules
            for pair, merged in self.merge_rules.items():
                while True:
                    new_word = self._merge_pair_in_word(current_word, pair, merged)
                    if new_word == current_word:
                        break
                    current_word = new_word
            
            # Split the final merged word
            bpe_tokens.extend(current_word.split())
        
        return bpe_tokens
    
    def convert_tokens_to_ids(self, tokens):
        return [self.vocab.get(token, self.vocab["[UNK]"]) for token in tokens]
    
    def convert_ids_to_tokens(self, ids):
        return [self.ids_to_tokens.get(i, "[UNK]") for i in ids]
    
    def __call__(self, text, padding=False, truncation=False, return_tensors=None, max_length=20):
        tokens = ["[CLS]"] + self.tokenize(text) + ["[SEP]"]
        input_ids = self.convert_tokens_to_ids(tokens)
        token_type_ids = [0] * len(input_ids)
        attention_mask = [1] * len(input_ids)

        if padding and len(input_ids) < max_length:
            pad_len = max_length - len(input_ids)
            input_ids += [self.vocab["[PAD]"]] * pad_len
            attention_mask += [0] * pad_len
            token_type_ids += [0] * pad_len

        if truncation and len(input_ids) > max_length:
            input_ids = input_ids[:max_length]
            attention_mask = attention_mask[:max_length]
            token_type_ids = token_type_ids[:max_length]

        encoded = {
            "input_ids": input_ids,
            "token_type_ids": token_type_ids,
            "attention_mask": attention_mask
        }

        if return_tensors == "pt":
            encoded = {k: torch.tensor(v).unsqueeze(0) for k, v in encoded.items()}

        return encoded
    
    def save(self, path):
        os.makedirs(path, exist_ok=True)
        with open(os.path.join(path, "tokenizer.json"), "w") as f:
            json.dump({
                "vocab": self.vocab,
                "merge_rules": self.merge_rules,
                "word_frequencies": self.word_frequencies
            }, f, indent=2)

        with open(os.path.join(path, "tokenizer_config.json"), "w") as f:
            json.dump({
                "tokenizer_class": "BPETokenizer",
                "vocab_size": self.vocab_size,
                "model_max_length": 512
            }, f, indent=2)

        with open(os.path.join(path, "special_tokens_map.json"), "w") as f:
            json.dump(self.special_tokens, f, indent=2)
    
    def load(self, path):
        with open(os.path.join(path, "tokenizer.json")) as f:
            data = json.load(f)
            self.vocab = {k: int(v) for k, v in data["vocab"].items()}
            self.merge_rules = {tuple(k): v for k, v in data["merge_rules"].items()}
            self.word_frequencies = Counter(data["word_frequencies"])
            self.ids_to_tokens = {int(v): k for k, v in self.vocab.items()}

        with open(os.path.join(path, "tokenizer_config.json")) as f:
            config = json.load(f)
            self.vocab_size = config.get("vocab_size", len(self.vocab))

        with open(os.path.join(path, "special_tokens_map.json")) as f:
            self.special_tokens = json.load(f)
            # Update vocab with any new special tokens
            for token, idx in self.special_tokens.items():
                if token not in self.vocab:
                    self.vocab[token] = idx
                    self.ids_to_tokens[idx] = token

In [None]:
# Training the tokenizer
corpus = [
    "Artificial Intelligence (AI) refers to the technology that allows machines and computers to replicate human intelligence. Enables systems to perform tasks that require human-like decision-making, such as learning from data, identifying patterns, making informed choices and solving complex problems. Improves continuously by utilizing methods like machine learning and deep learning. Used in healthcare for diagnosing diseases, finance for fraud detection, e-commerce for personalized recommendations and transportation for self-driving cars. It also powers virtual assistants like Siri and Alexa, chatbots for customer support and manufacturing robots that automate production processes.",
    "Machine Learning is a subset of artificial intelligence (AI) that focuses on building systems that can learn from and make decisions based on data. Instead of being explicitly programmed to perform a task, a machine learning model uses algorithms to identify patterns within data and improve its performance over time without human intervention.",
    "Generative AI refers to a type of artificial intelligence designed to create new content, whether it's text, images, music, or even video. Unlike traditional AI, which typically focuses on analyzing and classifying data, generative AI goes a step further by using patterns it has learned from large datasets to generate new, original outputs. Essentially, it creates rather than just recognizes."
]

tokenizer = BPETokenizer(vocab_size=50)
tokenizer.train(corpus)

# Save the tokenizer
tokenizer.save("my_bpe_tokenizer")

# Load the tokenizer
new_tokenizer = BPETokenizer()
new_tokenizer.load("my_bpe_tokenizer")

# Tokenize a new sentence
text = "This is a test sentence."
tokens = new_tokenizer.tokenize(text)
print("Tokens:", tokens)

ids = new_tokenizer.convert_tokens_to_ids(tokens)
print("Token IDs:", ids)

# Full encoding
encoded = new_tokenizer(text, padding=True, max_length=10)
print("Encoded:", encoded)