<table style="width:100%">
<tr>
<td style="vertical-align:middle; text-align:left;">
<font size="2">
以下代码为 <a href="http://mng.bz/orYv">《从零开始构建大型语言模型》</a> 一书的补充代码，作者为 <a href="https://sebastianraschka.com">Sebastian Raschka</a><br>
<br>中文翻译和代码详细注释由Lux整理，Github下载地址：<a href="https://github.com/luxianyu">https://github.com/luxianyu</a>
    
<br>Lux的Github上还有吴恩达深度学习Pytorch版学习笔记及中文详细注释的代码下载
    
</font>
</td>
<td style="vertical-align:middle; text-align:left;">
<a href="http://mng.bz/orYv"><img src="https://sebastianraschka.com/images/LLMs-from-scratch-images/cover-small.webp" width="100px"></a>
</td>
</tr>
</table>


# 从零实现字节对编码（BPE）分词器


- 这是一个独立的笔记本，从零实现流行的字节对编码（Byte Pair Encoding, BPE）分词算法，用于教育目的。这种算法被用于 GPT-2 到 GPT-4、Llama 3 等模型中。
- 关于分词的目的的更多细节，请参考 [第 2 章](https://github.com/rasbt/LLMs-from-scratch/blob/main/ch02/01_main-chapter-code/ch02.ipynb)；这里的代码是附加内容，用于解释 BPE 算法。
- OpenAI 为训练原始 GPT 模型实现的 BPE 分词器可以在这里找到 [encoder.py](https://github.com/openai/gpt-2/blob/master/src/encoder.py)。
- BPE 算法最初发表于 1994 年，论文为 Philip Gage 的 "[A New Algorithm for Data Compression](http://www.pennelynn.com/Documents/CUJ/HTML/94HTML/19940045.HTM)"。
- 如今，大多数项目（包括 Llama 3）都使用 OpenAI 的开源 [tiktoken 库](https://github.com/openai/tiktoken)，因为其计算性能优异；它允许加载预训练的 GPT-2 和 GPT-4 分词器（例如，Llama 3 模型也使用 GPT-4 分词器训练）。
- 上述实现与我在本笔记本中的实现的区别在于，我的实现还包含一个用于训练分词器的函数（用于教育目的）。
- 还有一个名为 [minBPE](https://github.com/karpathy/minbpe) 的实现，支持训练，可能性能更好（我这里的实现主要用于教育目的）；与 `minBPE` 相比，我的实现额外支持加载原始 OpenAI 分词器词表和 BPE “merges”（此外，Hugging Face 的分词器也支持训练和加载各种分词器；请参见 [此 GitHub 讨论](https://github.com/rasbt/LLMs-from-scratch/discussions/485)，其中一位读者在尼泊尔语上训练了 BPE 分词器以获取更多信息）。


&nbsp;
# 1. 字节对编码（BPE）的主要思想


- BPE 的主要思想是将文本转换为整数表示（token ID），用于大型语言模型（LLM）训练（参见 [第 2 章](https://github.com/rasbt/LLMs-from-scratch/blob/main/ch02/01_main-chapter-code/ch02.ipynb)）。

<img src="https://sebastianraschka.com/images/LLMs-from-scratch-images/bonus/bpe-from-scratch/bpe-overview.webp" width="600px">


&nbsp;
## 1.1 位与字节


- 在介绍 BPE 算法之前，先引入字节（byte）的概念。
- 考虑将文本转换为字节数组（毕竟 BPE 的全称是 "byte pair encoding"）：


In [1]:
text = "This is some text"
byte_ary = bytearray(text, "utf-8")
print(byte_ary)

bytearray(b'This is some text')


- 当我们对 `bytearray` 对象调用 `list()` 时，每个字节会被视为一个独立的元素，结果是一个整数列表，对应于每个字节的值：


In [2]:
ids = list(byte_ary)
print(ids)

[84, 104, 105, 115, 32, 105, 115, 32, 115, 111, 109, 101, 32, 116, 101, 120, 116]


- 这是一种将文本转换为 token ID 表示的有效方法，可以用于 LLM 的嵌入层。
- 然而，这种方法的缺点是每个字符都会生成一个 ID（对于短文本来说 ID 数量很多！）。
- 也就是说，对于一个 17 个字符的输入文本，我们需要使用 17 个 token ID 作为 LLM 的输入：


In [3]:
print("Number of characters:", len(text))
print("Number of token IDs:", len(ids))

Number of characters: 17
Number of token IDs: 17


- 如果你以前使用过 LLM，你可能知道 BPE 分词器有一个词汇表，其中为整个单词或子词分配 token ID，而不是为每个字符分配。
- 例如，GPT-2 分词器将相同的文本（"This is some text"）分词为 4 个 token，而不是 17 个：`1212, 318, 617, 2420`。
- 你可以使用交互式 [tiktoken 应用](https://tiktokenizer.vercel.app/?model=gpt2) 或 [tiktoken 库](https://github.com/openai/tiktoken) 来验证：

<img src="https://sebastianraschka.com/images/LLMs-from-scratch-images/bonus/bpe-from-scratch/tiktokenizer.webp" width="600px">

```python
import tiktoken

gpt2_tokenizer = tiktoken.get_encoding("gpt2")
gpt2_tokenizer.encode("This is some text")
# 输出 [1212, 318, 617, 2420]

```

- 由于一个字节由 8 位组成，因此单个字节可以表示 2<sup>8</sup> = 256 个可能的值，范围从 0 到 255。
- 你可以通过执行代码 `bytearray(range(0, 257))` 来验证，会提示 `ValueError: byte must be in range(0, 256)`。
- BPE 分词器通常将这 256 个值作为其前 256 个单字符 token；你可以通过运行以下代码来直观地查看：


```python
import tiktoken
gpt2_tokenizer = tiktoken.get_encoding("gpt2")

for i in range(300):
    decoded = gpt2_tokenizer.decode([i])
    print(f"{i}: {decoded}")
"""
prints:
0: !
1: "
2: #
...
255: �  # <---- single character tokens up to here
256:  t
257:  a
...
298: ent
299:  n
"""
```

- 上述代码中，请注意索引 256 和 257 并不是单字符值，而是双字符值（一个空格 + 一个字母），这是原始 GPT-2 BPE 分词器的一个小缺陷（在 GPT-4 分词器中已得到改进）。


&nbsp;
## 1.2 构建词汇表


- BPE 分词算法的目标是构建一个常见子词的词汇表，例如 `298: ent`（可以在 *entangle, entertain, enter, entrance, entity, ...* 中找到），甚至包括完整单词，如 


```
318: is
617: some
1212: This
2420: text
```

- BPE 算法最初由 Philip Gage 于 1994 年提出，发表于论文 “[A New Algorithm for Data Compression](http://www.pennelynn.com/Documents/CUJ/HTML/94HTML/19940045.HTM)”  
- 在进入具体代码实现之前，我们先总结一下当今大型语言模型（LLM）分词器所采用的 BPE 形式，如下所述。


## 1.3 BPE 算法概要

**1. 识别最频繁的符号对（Identify frequent pairs）**  
- 在每次迭代中，扫描文本以找到最常出现的字节对（或字符对）。

**2. 替换并记录（Replace and record）**  
- 将该符号对替换为一个尚未使用的新占位符 ID（例如，如果我们最初使用 0...255，则第一个占位符为 256）。  
- 将此映射关系记录在查找表（lookup table）中。  
- 查找表的大小是一个超参数，也被称为“词汇表大小”（vocabulary size）。  
  （例如，GPT-2 的词汇表大小为 50,257。）

**3. 重复直到无收益（Repeat until no gains）**  
- 持续重复步骤 1 和 2，每次合并出现频率最高的符号对。  
- 当没有任何符号对出现超过一次时（即无法进一步压缩）停止。

**解码（Decompression / Decoding）**  
- 要还原原始文本，只需使用查找表反向替换每个 ID 对应的符号对，按生成顺序逆序替换即可。


&nbsp;
## 1.4 BPE 算法示例

### 1.4.1 编码阶段的具体示例（对应 1.3 节中的步骤 1 & 2）

假设我们有以下文本（训练数据集）： `the cat in the hat` 

我们希望从中构建一个 BPE 分词器的词汇表。


**迭代 1**

1. **识别最频繁的符号对**  
   - 在该文本中，符号对 `"th"` 出现了两次（分别出现在开头和第二个 “e” 之前）。

2. **替换并记录**  
   - 将 `"th"` 替换为一个尚未使用的新 token ID，例如 `256`。  
   - 替换后的新文本为：
     ```
     <256>e cat in <256>e hat
     ```
   - 新增词汇如下：
     ```
     0: ...
     ...
     256: "th"
     ```

**迭代 2**

1. **识别最频繁的符号对**  
   - 在文本 `<256>e cat in <256>e hat` 中，符号对 `<256>e` 出现了两次。

2. **替换并记录**  
   - 将 `<256>e` 替换为新的 token ID，例如 `257`。  
   - 替换后的文本为：
     ```
     <257> cat in <257> hat
     ```
   - 更新后的词汇表为：
     ```
     0: ...
     ...
     256: "th"
     257: "<256>e"
     ```


**迭代 3**

1. **识别最频繁的符号对**  
   - 在文本 `<257> cat in <257> hat` 中，符号对 `<257> ` 出现了两次（分别在开头和 “hat” 前）。

2. **替换并记录**  
   - 将 `<257> ` 替换为新的 token ID，例如 `258`。  
   - 新文本为：
     ```
     <258>cat in <258>hat
     ```
   - 更新后的词汇表为：
     ```
     0: ...
     ...
     256: "th"
     257: "<256>e"
     258: "<257> "
     ```


- 以此类推，不断重复上述步骤，直至没有可合并的频繁符号对。


### 1.4.2 解码阶段的具体示例（对应 1.3 节中的步骤 3）

要恢复原始文本，我们按合并的**逆序**执行替换操作。

 从最终压缩的文本开始: `<258>cat in <258>hat`

 依次反向替换：
- `<258>` → `<257> `  
  ```
  <257> cat in <257> hat
  ```
- `<257>` → `<256>e`  
  ```
  <256>e cat in <256>e hat
  ```
- `<256>` → "th"  
  ```
  the cat in the hat
  ```

最终还原出原始文本： `the cat in the hat`

&nbsp;
## 2. 一个简单的 BPE 实现



- 下面是一个用 Python 类实现的 BPE 算法示例，其接口风格模仿了 `tiktoken` 库的 Python 使用方式。  
- 需要注意的是，上文描述的编码部分主要对应的是通过 `train()` 进行的训练步骤；而这里的 `encode()` 方法工作原理类似（虽然实现上更复杂一些，因为它需要处理特殊标记（special tokens））。  

实现逻辑如下：

1. 将输入文本拆分成单独的字节（bytes）；
2. 不断查找并替换（合并）相邻的 token（成对出现的字节），当它们与已学习到的 BPE 合并规则匹配时（按照“rank”等级，从高到低，即按学习顺序）；
3. 持续合并，直到再也找不到可合并的 token 对；
4. 最终得到的 token ID 列表即为编码（encoded）输出。


In [4]:
from collections import Counter, deque
from functools import lru_cache
import re
import json


class BPETokenizerSimple:
    def __init__(self):
        # Maps token_id to token_str (e.g., {11246: "some"})
        self.vocab = {}
        # Maps token_str to token_id (e.g., {"some": 11246})
        self.inverse_vocab = {}
        # Dictionary of BPE merges: {(token_id1, token_id2): merged_token_id}
        self.bpe_merges = {}

        # For the official OpenAI GPT-2 merges, use a rank dict:
        #  of form {(string_A, string_B): rank}, where lower rank = higher priority
        self.bpe_ranks = {}

    def train(self, text, vocab_size, allowed_special={"<|endoftext|>"}):
        """
        Train the BPE tokenizer from scratch.

        Args:
            text (str): The training text.
            vocab_size (int): The desired vocabulary size.
            allowed_special (set): A set of special tokens to include.
        """

        # Preprocess: Replace spaces with "Ġ"
        # Note that Ġ is a particularity of the GPT-2 BPE implementation
        # E.g., "Hello world" might be tokenized as ["Hello", "Ġworld"]
        # (GPT-4 BPE would tokenize it as ["Hello", " world"])
        processed_text = []
        for i, char in enumerate(text):
            if char == " " and i != 0:
                processed_text.append("Ġ")
            if char != " ":
                processed_text.append(char)
        processed_text = "".join(processed_text)

        # Initialize vocab with unique characters, including "Ġ" if present
        # Start with the first 256 ASCII characters
        unique_chars = [chr(i) for i in range(256)]
        unique_chars.extend(
            char for char in sorted(set(processed_text))
            if char not in unique_chars
        )
        if "Ġ" not in unique_chars:
            unique_chars.append("Ġ")

        self.vocab = {i: char for i, char in enumerate(unique_chars)}
        self.inverse_vocab = {char: i for i, char in self.vocab.items()}

        # Add allowed special tokens
        if allowed_special:
            for token in allowed_special:
                if token not in self.inverse_vocab:
                    new_id = len(self.vocab)
                    self.vocab[new_id] = token
                    self.inverse_vocab[token] = new_id

        # Tokenize the processed_text into token IDs
        token_ids = [self.inverse_vocab[char] for char in processed_text]

        # BPE steps 1-3: Repeatedly find and replace frequent pairs
        for new_id in range(len(self.vocab), vocab_size):
            pair_id = self.find_freq_pair(token_ids, mode="most")
            if pair_id is None:
                break
            token_ids = self.replace_pair(token_ids, pair_id, new_id)
            self.bpe_merges[pair_id] = new_id

        # Build the vocabulary with merged tokens
        for (p0, p1), new_id in self.bpe_merges.items():
            merged_token = self.vocab[p0] + self.vocab[p1]
            self.vocab[new_id] = merged_token
            self.inverse_vocab[merged_token] = new_id

    def load_vocab_and_merges_from_openai(self, vocab_path, bpe_merges_path):
        """
        Load pre-trained vocabulary and BPE merges from OpenAI's GPT-2 files.

        Args:
            vocab_path (str): Path to the vocab file (GPT-2 calls it 'encoder.json').
            bpe_merges_path (str): Path to the bpe_merges file  (GPT-2 calls it 'vocab.bpe').
        """
        # Load vocabulary
        with open(vocab_path, "r", encoding="utf-8") as file:
            loaded_vocab = json.load(file)
            # encoder.json is {token_str: id}; we want id->str and str->id
            self.vocab = {int(v): k for k, v in loaded_vocab.items()}
            self.inverse_vocab = {k: int(v) for k, v in loaded_vocab.items()}
    
        # Must have GPT-2's printable newline character 'Ċ' (U+010A) at id 198
        if "Ċ" not in self.inverse_vocab or self.inverse_vocab["Ċ"] != 198:
            raise KeyError("Vocabulary missing GPT-2 newline glyph 'Ċ' at id 198.")
    
        # Must have <|endoftext|> at 50256
        if "<|endoftext|>" not in self.inverse_vocab or self.inverse_vocab["<|endoftext|>"] != 50256:
            raise KeyError("Vocabulary missing <|endoftext|> at id 50256.")
    
        # Provide a convenience alias for '\n' -> 198
        # Keep printable character 'Ċ' in vocab so BPE merges keep working
        if "\n" not in self.inverse_vocab:
            self.inverse_vocab["\n"] = self.inverse_vocab["Ċ"]

        if "\r" not in self.inverse_vocab:
            if 201 in self.vocab:
                self.inverse_vocab["\r"] = 201
            else:
                raise KeyError("Vocabulary missing carriage return token at id 201.")

        # Load GPT-2 merges and store ranks
        self.bpe_ranks = {}
        with open(bpe_merges_path, "r", encoding="utf-8") as file:
            lines = file.readlines()
            if lines and lines[0].startswith("#"):
                lines = lines[1:]
    
            rank = 0
            for line in lines:
                token1, *rest = line.strip().split()
                if len(rest) != 1:
                    continue
                token2 = rest[0]
                if token1 in self.inverse_vocab and token2 in self.inverse_vocab:
                    self.bpe_ranks[(token1, token2)] = rank
                    rank += 1
                else:
                    # Safe to skip pairs whose symbols are not in vocab
                    pass


    def encode(self, text, allowed_special=None):
        """
        Encode the input text into a list of token IDs, with tiktoken-style handling of special tokens.
    
        Args:
            text (str): The input text to encode.
            allowed_special (set or None): Special tokens to allow passthrough. If None, special handling is disabled.
    
        Returns:
            List of token IDs.
        """
    
        # ---- This section is to mimic tiktoken in terms of allowed special tokens ----
        specials_in_vocab = [
            tok for tok in self.inverse_vocab
            if tok.startswith("<|") and tok.endswith("|>")
        ]
        if allowed_special is None:
            # Nothing is allowed
            disallowed = [tok for tok in specials_in_vocab if tok in text]
            if disallowed:
                raise ValueError(f"Disallowed special tokens encountered in text: {disallowed}")
        else:
            # Some spefic tokens are allowed (e.g., we use this for <|endoftext|>)
            disallowed = [tok for tok in specials_in_vocab if tok in text and tok not in allowed_special]
            if disallowed:
                raise ValueError(f"Disallowed special tokens encountered in text: {disallowed}")
        # -----------------------------------------------------------------------------

        token_ids = []
        # If some specials are allowed, split around them and passthrough those ids
        if allowed_special is not None and len(allowed_special) > 0:
            special_pattern = "(" + "|".join(
                re.escape(tok) for tok in sorted(allowed_special, key=len, reverse=True)
            ) + ")"
    
            last_index = 0
            for match in re.finditer(special_pattern, text):
                prefix = text[last_index:match.start()]
                token_ids.extend(self.encode(prefix, allowed_special=None))  # encode prefix normally
    
                special_token = match.group(0)
                if special_token in self.inverse_vocab:
                    token_ids.append(self.inverse_vocab[special_token])
                else:
                    raise ValueError(f"Special token {special_token} not found in vocabulary.")
                last_index = match.end()
    
            text = text[last_index:]  # remainder to process normally
    
            # Extra guard for any other special literals left over
            disallowed = [
                tok for tok in self.inverse_vocab
                if tok.startswith("<|") and tok.endswith("|>") and tok in text and tok not in allowed_special
            ]
            if disallowed:
                raise ValueError(f"Disallowed special tokens encountered in text: {disallowed}")

    
        # ---- Newline and carriage return handling ----
        tokens = []
        parts = re.split(r'(\r\n|\r|\n)', text)
        for part in parts:
            if part == "":
                continue
            if part == "\r\n":
                tokens.append("\r")
                tokens.append("\n")
                continue
            if part == "\r":
                tokens.append("\r")
                continue
            if part == "\n":
                tokens.append("\n")
                continue
    
            # Normal chunk without line breaks:
            # - If spaces precede a word, prefix the first word with 'Ġ' and
            #   add standalone 'Ġ' for additional spaces
            # - If spaces trail the chunk (e.g., before a newline) add
            #   standalone 'Ġ' tokens (tiktoken produces id 220 for 'Ġ')
            pending_spaces = 0
            for m in re.finditer(r'( +)|(\S+)', part):
                if m.group(1) is not None:
                    pending_spaces += len(m.group(1))
                else:
                    word = m.group(2)
                    if pending_spaces > 0:
                        tokens.append("Ġ" + word) # one leading space
                        for _ in range(pending_spaces - 1):
                            tokens.append("Ġ")  # remaining spaces as standalone
                        pending_spaces = 0
                    else:
                        tokens.append(word)
            # Trailing spaces (no following word): add standalone 'Ġ' tokens
            for _ in range(pending_spaces):
                tokens.append("Ġ")
        # ---------------------------------------------------------------
    
        # Map tokens -> ids (BPE if needed)
        for tok in tokens:
            if tok in self.inverse_vocab:
                token_ids.append(self.inverse_vocab[tok])
            else:
                token_ids.extend(self.tokenize_with_bpe(tok))
    
        return token_ids

    def tokenize_with_bpe(self, token):
        """
        Tokenize a single token using BPE merges.

        Args:
            token (str): The token to tokenize.

        Returns:
            List[int]: The list of token IDs after applying BPE.
        """
        # Tokenize the token into individual characters (as initial token IDs)
        token_ids = [self.inverse_vocab.get(char, None) for char in token]
        if None in token_ids:
            missing_chars = [char for char, tid in zip(token, token_ids) if tid is None]
            raise ValueError(f"Characters not found in vocab: {missing_chars}")

        # If we haven't loaded OpenAI's GPT-2 merges, use my approach
        if not self.bpe_ranks:
            can_merge = True
            while can_merge and len(token_ids) > 1:
                can_merge = False
                new_tokens = []
                i = 0
                while i < len(token_ids) - 1:
                    pair = (token_ids[i], token_ids[i + 1])
                    if pair in self.bpe_merges:
                        merged_token_id = self.bpe_merges[pair]
                        new_tokens.append(merged_token_id)
                        # Uncomment for educational purposes:
                        # print(f"Merged pair {pair} -> {merged_token_id} ('{self.vocab[merged_token_id]}')")
                        i += 2  # Skip the next token as it's merged
                        can_merge = True
                    else:
                        new_tokens.append(token_ids[i])
                        i += 1
                if i < len(token_ids):
                    new_tokens.append(token_ids[i])
                token_ids = new_tokens
            return token_ids

        # Otherwise, do GPT-2-style merging with the ranks:
        # 1) Convert token_ids back to string "symbols" for each ID
        symbols = [self.vocab[id_num] for id_num in token_ids]

        # Repeatedly merge all occurrences of the lowest-rank pair
        while True:
            # Collect all adjacent pairs
            pairs = set(zip(symbols, symbols[1:]))
            if not pairs:
                break

            # Find the pair with the best (lowest) rank
            min_rank = float("inf")
            bigram = None
            for p in pairs:
                r = self.bpe_ranks.get(p, float("inf"))
                if r < min_rank:
                    min_rank = r
                    bigram = p

            # If no valid ranked pair is present, we're done
            if bigram is None or bigram not in self.bpe_ranks:
                break

            # Merge all occurrences of that pair
            first, second = bigram
            new_symbols = []
            i = 0
            while i < len(symbols):
                # If we see (first, second) at position i, merge them
                if i < len(symbols) - 1 and symbols[i] == first and symbols[i+1] == second:
                    new_symbols.append(first + second)  # merged symbol
                    i += 2
                else:
                    new_symbols.append(symbols[i])
                    i += 1
            symbols = new_symbols

            if len(symbols) == 1:
                break

        # Finally, convert merged symbols back to IDs
        merged_ids = [self.inverse_vocab[sym] for sym in symbols]
        return merged_ids

    def decode(self, token_ids):
        """
        Decode a list of token IDs back into a string.

        Args:
            token_ids (List[int]): The list of token IDs to decode.

        Returns:
            str: The decoded string.
        """
        out = []
        for tid in token_ids:
            if tid not in self.vocab:
                raise ValueError(f"Token ID {tid} not found in vocab.")
            tok = self.vocab[tid]

            # Map GPT-2 special chars back to real chars
            if tid == 198 or tok == "\n":
                out.append("\n")
            elif tid == 201 or tok == "\r":
                out.append("\r")
            elif tok.startswith("Ġ"):
                out.append(" " + tok[1:])
            else:
                out.append(tok)
        return "".join(out)

    def save_vocab_and_merges(self, vocab_path, bpe_merges_path):
        """
        Save the vocabulary and BPE merges to JSON files.

        Args:
            vocab_path (str): Path to save the vocabulary.
            bpe_merges_path (str): Path to save the BPE merges.
        """
        # Save vocabulary
        with open(vocab_path, "w", encoding="utf-8") as file:
            json.dump(self.vocab, file, ensure_ascii=False, indent=2)

        # Save BPE merges as a list of dictionaries
        with open(bpe_merges_path, "w", encoding="utf-8") as file:
            merges_list = [{"pair": list(pair), "new_id": new_id}
                           for pair, new_id in self.bpe_merges.items()]
            json.dump(merges_list, file, ensure_ascii=False, indent=2)

    def load_vocab_and_merges(self, vocab_path, bpe_merges_path):
        """
        Load the vocabulary and BPE merges from JSON files.

        Args:
            vocab_path (str): Path to the vocabulary file.
            bpe_merges_path (str): Path to the BPE merges file.
        """
        # Load vocabulary
        with open(vocab_path, "r", encoding="utf-8") as file:
            loaded_vocab = json.load(file)
            self.vocab = {int(k): v for k, v in loaded_vocab.items()}
            self.inverse_vocab = {v: int(k) for k, v in loaded_vocab.items()}

        # Load BPE merges
        with open(bpe_merges_path, "r", encoding="utf-8") as file:
            merges_list = json.load(file)
            for merge in merges_list:
                pair = tuple(merge["pair"])
                new_id = merge["new_id"]
                self.bpe_merges[pair] = new_id

    @lru_cache(maxsize=None)
    def get_special_token_id(self, token):
        return self.inverse_vocab.get(token, None)

    @staticmethod
    def find_freq_pair(token_ids, mode="most"):
        pairs = Counter(zip(token_ids, token_ids[1:]))

        if not pairs:
            return None

        if mode == "most":
            return max(pairs.items(), key=lambda x: x[1])[0]
        elif mode == "least":
            return min(pairs.items(), key=lambda x: x[1])[0]
        else:
            raise ValueError("Invalid mode. Choose 'most' or 'least'.")

    @staticmethod
    def replace_pair(token_ids, pair_id, new_id):
        dq = deque(token_ids)
        replaced = []

        while dq:
            current = dq.popleft()
            if dq and (current, dq[0]) == pair_id:
                replaced.append(new_id)
                # Remove the 2nd token of the pair, 1st was already removed
                dq.popleft()
            else:
                replaced.append(current)

        return replaced

- 上述 `BPETokenizerSimple` 类中包含了大量代码，详细讲解超出了本笔记本的范围。不过下一节将提供一个简要的使用概览，帮助更好地理解类的方法。


## 3. BPE 实现讲解


- 在实际应用中，我强烈推荐使用 [tiktoken](https://github.com/openai/tiktoken)，因为我上面的实现主要侧重于可读性和教学目的，而非性能优化。  
- 不过，其使用方法与 tiktoken 大致相似，只是 tiktoken 没有提供训练方法。  
- 下面通过一些示例来演示我上面 `BPETokenizerSimple` Python 代码的工作方式（详细的代码讲解超出本笔记本范围）。


### 3.1 训练、编码与解码


- 首先，让我们考虑一些示例文本，作为训练数据集：


In [5]:
import os
import requests

def download_file_if_absent(url, filename, search_dirs):
    for directory in search_dirs:
        file_path = os.path.join(directory, filename)
        if os.path.exists(file_path):
            print(f"{filename} already exists in {file_path}")
            return file_path

    target_path = os.path.join(search_dirs[0], filename)
    try:
        response = requests.get(url, stream=True, timeout=60)
        response.raise_for_status()
        with open(target_path, "wb") as out_file:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    out_file.write(chunk)
        print(f"Downloaded {filename} to {target_path}")
    except Exception as e:
        print(f"Failed to download {filename}. Error: {e}")

    return target_path


verdict_path = download_file_if_absent(
    url=(
         "https://raw.githubusercontent.com/rasbt/"
         "LLMs-from-scratch/main/ch02/01_main-chapter-code/"
         "the-verdict.txt"
    ),
    filename="the-verdict.txt",
    search_dirs=["ch02/01_main-chapter-code/", "../01_main-chapter-code/", "."]
)

with open(verdict_path, "r", encoding="utf-8") as f: # added ../01_main-chapter-code/
    text = f.read()

the-verdict.txt already exists in ../01_main-chapter-code/the-verdict.txt


- 接下来，让我们初始化并训练 BPE 分词器，设定词汇表大小为 1,000。  
- 注意，由于之前讨论的字节（byte）值，词汇表默认已经包含 256 个条目，因此我们实际只是在“学习”744 个新词汇条目（如果考虑 `<|endoftext|>` 特殊标记和 `Ġ` 空格标记，则精确为 742 条）。  
- 作为对比，GPT-2 的词汇表有 50,257 个 token，GPT-4 的词汇表有 100,256 个 token（在 tiktoken 中为 `cl100k_base`），而 GPT-4o 使用 199,997 个 token（`o200k_base`）；它们的训练集远比我们上面示例文本要大得多。


In [6]:
tokenizer = BPETokenizerSimple()
tokenizer.train(text, vocab_size=1000, allowed_special={"<|endoftext|>"})

- 你可能想查看词汇表的内容（但请注意，这会生成一份很长的列表）。


In [7]:
# print(tokenizer.vocab)
print(len(tokenizer.vocab))

1000


- 这个词汇表是通过合并 742 次生成的（`= 1000 - len(range(0, 256)) - len(special_tokens) - "Ġ" = 1000 - 256 - 1 - 1 = 742`）。


In [8]:
print(len(tokenizer.bpe_merges))

742


- 这意味着前 256 个条目是单字符 token。


- 接下来，让我们使用 `encode` 方法和已创建的合并规则对一些文本进行编码：


In [9]:
input_text = "Jack embraced beauty through art and life."
token_ids = tokenizer.encode(input_text)
print(token_ids)

[424, 256, 654, 531, 302, 311, 256, 296, 97, 465, 121, 595, 841, 116, 287, 466, 256, 326, 972, 46]


In [10]:
input_text = "Jack embraced beauty through art and life.<|endoftext|> "
token_ids = tokenizer.encode(input_text, allowed_special={"<|endoftext|>"})
print(token_ids)

[424, 256, 654, 531, 302, 311, 256, 296, 97, 465, 121, 595, 841, 116, 287, 466, 256, 326, 972, 46, 257, 256]


In [11]:
print("Number of characters:", len(input_text))
print("Number of token IDs:", len(token_ids))

Number of characters: 56
Number of token IDs: 22


- 从上面的长度可以看出，一个 42 个字符的句子被编码为 20 个 token ID，相比基于字符字节的编码，输入长度几乎减半。


- 注意，词汇表本身在 `decode()` 方法中被使用，它可以将 token ID 映射回原始文本：


In [12]:
print(token_ids)

[424, 256, 654, 531, 302, 311, 256, 296, 97, 465, 121, 595, 841, 116, 287, 466, 256, 326, 972, 46, 257, 256]


In [13]:
print(tokenizer.decode(token_ids))

Jack embraced beauty through art and life.<|endoftext|> 


- 遍历每个 token ID 可以帮助我们更好地理解这些 token ID 是如何通过词汇表解码回文本的：


In [14]:
for token_id in token_ids:
    print(f"{token_id} -> {tokenizer.decode([token_id])}")

424 -> Jack
256 ->  
654 -> em
531 -> br
302 -> ac
311 -> ed
256 ->  
296 -> be
97 -> a
465 -> ut
121 -> y
595 ->  through
841 ->  ar
116 -> t
287 ->  a
466 -> nd
256 ->  
326 -> li
972 -> fe
46 -> .
257 -> <|endoftext|>
256 ->  


- 如我们所见，大多数 token ID 代表的是两个字符的子词（subword）；这是因为训练数据文本非常短，重复词汇不多，并且我们使用的词汇表大小相对较小。


- 总结来说，调用 `decode(encode())` 应该能够重现任意输入文本：


In [15]:
tokenizer.decode(
    tokenizer.encode("This is some text.")
)

'This is some text.'

In [16]:
tokenizer.decode(
    tokenizer.encode("This is some text with \n newline characters.")
)

'This is some text with \n newline characters.'

### 3.2 保存与加载分词器


- 接下来，让我们看看如何保存训练好的分词器，以便后续重用：


In [17]:
# Save trained tokenizer
tokenizer.save_vocab_and_merges(vocab_path="vocab.json", bpe_merges_path="bpe_merges.txt")

In [18]:
# Load tokenizer
tokenizer2 = BPETokenizerSimple()
tokenizer2.load_vocab_and_merges(vocab_path="vocab.json", bpe_merges_path="bpe_merges.txt")

- 加载后的分词器应能生成与之前相同的结果：


In [19]:
print(tokenizer2.decode(token_ids))

Jack embraced beauty through art and life.<|endoftext|> 


In [20]:
tokenizer2.decode(
    tokenizer2.encode("This is some text with \n newline characters.")
)

'This is some text with \n newline characters.'

&nbsp;
### 3.3 从 OpenAI 加载原始 GPT-2 BPE 分词器


- 最后，让我们加载 OpenAI 的 GPT-2 分词器文件


In [21]:
# Download files if not already present in this directory

# Define the directories to search and the files to download
search_directories = ["ch02/02_bonus_bytepair-encoder/gpt2_model/", "../02_bonus_bytepair-encoder/gpt2_model/", "."]

files_to_download = {
    "https://openaipublic.blob.core.windows.net/gpt-2/models/124M/vocab.bpe": "vocab.bpe",
    "https://openaipublic.blob.core.windows.net/gpt-2/models/124M/encoder.json": "encoder.json"
}

# Ensure directories exist and download files if needed
paths = {}
for url, filename in files_to_download.items():
    paths[filename] = download_file_if_absent(url, filename, search_directories)

vocab.bpe already exists in ../02_bonus_bytepair-encoder/gpt2_model/vocab.bpe
encoder.json already exists in ../02_bonus_bytepair-encoder/gpt2_model/encoder.json


- 接下来，我们通过 `load_vocab_and_merges_from_openai` 方法加载这些文件：


In [22]:
tokenizer_gpt2 = BPETokenizerSimple()
tokenizer_gpt2.load_vocab_and_merges_from_openai(
    vocab_path=paths["encoder.json"], bpe_merges_path=paths["vocab.bpe"]
)

- 词汇表大小应为 `50257`，我们可以通过下面的代码确认：


In [23]:
len(tokenizer_gpt2.vocab)

50257

- 现在，我们可以通过我们的 `BPETokenizerSimple` 对象使用 GPT-2 分词器：


In [24]:
input_text = "This is some text"
token_ids = tokenizer_gpt2.encode(input_text)
print(token_ids)

[1212, 318, 617, 2420]


In [25]:
print(tokenizer_gpt2.decode(token_ids))

This is some text


- 你可以通过交互式 [tiktoken 应用](https://tiktokenizer.vercel.app/?model=gpt2) 或 [tiktoken 库](https://github.com/openai/tiktoken) 再次确认这会生成正确的 token：

<img src="https://sebastianraschka.com/images/LLMs-from-scratch-images/bonus/bpe-from-scratch/tiktokenizer.webp" width="600px">

```python
import tiktoken

gpt2_tokenizer = tiktoken.get_encoding("gpt2")
gpt2_tokenizer.encode("This is some text")
# 输出 [1212, 318, 617, 2420]

```


&nbsp;
# 4. 总结


- 就这样！这就是 BPE 的基本原理，同时提供了一个训练方法，用于创建新的分词器，或者从原始 OpenAI GPT-2 模型加载 GPT-2 分词器的词汇表和合并规则。  
- 希望这个简短的教程对你的学习有帮助；如果有任何问题，请随时在 [这里](https://github.com/rasbt/LLMs-from-scratch/discussions/categories/q-a)开启新的讨论。  
- 若想与其他分词器实现进行性能对比，请参见 [此笔记本](https://github.com/rasbt/LLMs-from-scratch/blob/main/ch02/02_bonus_bytepair-encoder/compare-bpe-tiktoken.ipynb)。
