In [None]:
#BPE+WordPiece代码实践：https://zhuanlan.zhihu.com/p/673692328#BPE+WordPiece代码实践解析：https://zhuanlan.zhihu.com/p/651430181

### 1.BPE分词训练

In [None]:
# 加载语料库
corpus = [
    "This is the Hugging Face Course.",
    "This chapter is about tokenization.",
    "This section shows several tokenizer algorithms.",
    "Hopefully, you will be able to understand how they are trained and generate tokens.",
]

In [None]:
# 加载预分词器
from transformers import AutoTokenizer

# initial pre tokenize function
gpt2_tokenizer=AutoTokenizer.from_pretrained('gpt2')
pre_tokenize_function=gpt2_tokenizer.backend_tokenizer.pre_tokenizer.pre_tokenize_str
# 预分词 pre tokenize
pre_tokenized_corpus=[pre_tokenize_function(text) for text in corpus]

In [None]:
# 统计词频
from collections import defaultdict

word2count=defaultdict(int)
for split_text in pre_tokenized_corpus:
    for word, _ in split_text:
        word2count[word]+=1 
        
word2count

In [None]:

# 构建由字(character)组成的初始词表
vocabs=set()

for word in word2count:
    vocabs.update(list(word))
    
vocabs=list(vocab)

In [None]:
# 将特殊符号添加到初始词表中
vocabs=["<|endoftext|>"]+vocabs.copy()
# 基于初始词表对整词进行切分
word2splits={word: [c for c in word] for word in word2count}

# 统计相邻两个pair的词频
def _compute_pair2score(word2split,word2count):
    pair2count=defaultdict(int)
    for word,word_count in word2count.items():
        split=word2split[word]
        if len(split)==1:
            continue
        for i in range(len(split)-1):
            pair=(split[i],split[i+1])
            pair2count[pair]+=word_count
    return pair2count

pair2count=_compute_pair2score(word2split,word2count)

In [None]:
# 返回最高频的pair
def _compute_most_score_pair(pair2count):
    best_pair=None
    max_freq=None
    for pair,pair_count in pair2count.items():
        if max_freq is None or max_freq<pair_count:
            best_pair=pair
            max_freq=pair_count
    return best_pair

best_pair=_compute_most_score_pair(pair2count)

In [None]:
# 合并最高频的pair，并将其添加到vocabs中
merge_rules=[]
best_pair=_compute_most_score_pair(pair2count)
vocabs.append(best_pair[0]+best_pair[1])
merge_rules.append(best_pair)

In [None]:
# 利用更新后的vocabs重新对词进行切分
# 因为每次只合并一个pair，可以通用merge_rules从旧的word2split去更新新的word2split
def _merge_pair(a,b,word2splits):
    new_word2splits=dict()
    for word,split in word2splits.items():
        if len(split)==1:
            new_word2splits[word]=split
        i=0
        while i<len(split)-1:
            if split[i]==a and split[i+1]==b:
                split=split[:i]+[a+b]+split[i+2:]
            else:
                i+=1
        new_word2splits[word]=split 
        
    return new_word2splits

new_word2splits=_merge_pair(best_pair[0],best_pair[1],word2splits)

In [None]:
# 更新词表直到达到指定的大小
vocab_size=50

while(len(vocabs))<vocab_size:
    pair2count=_compute_pair2score(word2split=word2splits,word2count=word2count)
    best_pair=_compute_most_score_pair(pair2count)
    vocabs.append(best_pair[0]+best_pair[1])
    merge_rules.append(best_pair)
    word2splits=_merge_pair(best_pair[0],best_pair[1],word2splits)

### 2.BPE分词推理

In [None]:
#给定一个句子，将其切分成一个token序列。实现步骤为：1）进行预分词pre-tokenize。2）将词切分成字符级别。3）利用合并规则进行合并。

In [None]:
from typing import List

def tokenize(text:str) -> List[str]:
    # pre tokenize
    words=[word for word,_ in pre_tokenize_function(text)]
    
    # split into char level
    splits=[[c for c in word] for word in words]
    
    # apply merge rules
    for merge_rule in merge_rules:
        for index,split in enumerate(splits):
            i=0
            while i<len(split)-1:
                if split[i]==merge_rule[0] and split[i+1]==merge_rule[1]:
                    split=split[:i]+["".join(merge_rule)]+split[i+2:]
                else:
                    i+=1
            splits[index]=split
            
    return sum(splits,[])  #将嵌套数组进行合并

tokenize("This is not a token.")

#  #########################

### 3.WordPiece分词训练

In [None]:
from typing import List

# 准备预料
corpus = [
    "This is the Hugging Face Course.",
    "This chapter is about tokenization.",
    "This section shows several tokenizer algorithms.",
    "Hopefully, you will be able to understand how they are trained and generate tokens.",
]

In [None]:
from transformers import AutoTokenizer

# init pre tokenize function
bert_tokenizer=AutoTokenizer.from_pretrained('bert-base-cased')
pre_tokenize_function=bert_tokenizer.backend_tokenizer.pre_tokenizer.pre_tokenize_str

# 预分词
pre_tokenized_corpus=[pre_tokenize_function(text) for text in corpus]


In [None]:
from collections import defaultdict 

# 统计词频
word2count=defaultdict(int)

for split_text in pre_tokenized_corpus:
    for word,_ in split_text:
        word2count[word]+=1 
        
word2count

In [None]:
# 将特殊符号添加到初始词表中
vocabs=["[PAD]", "[UNK]", "[CLS]", "[SEP]", "[MASK]"]+vocabs.copy()

# 构建由character组成的初始词表
# 注意这里每个词（word）除了第一个字符，后面的字符都需要添加"##"特殊符号
vocab_set=set()
for word in word2count:
    vocab_set.add(word[0])
    vocab_set.update(["##"+c for c in word[1:]])
    
vocabs=list(vocab_set)


In [None]:
# 基于初始词表对整词进行切分
word2splits={word:[word[0]]+['##'+c for c in word[1:]] for word in word2count}

# 统计相邻两个pair的互信息
def _compute_pair2score(word2splits,word2count):
    vocab2count=defaultdict(int)
    pair2count=defaultdict(int)
    for word,word_count in word2count.items():
        splits=word2splits[word]
        if len(splits)==1:
            vocab2count[splits[0]]+=word_count
        for i in range(len(splits)-1):
            pair=(splits[i],splits[i+1])
            vocab2count[splits[i]]+=word_count
            pair2count[pair]+=word_count
        vocab2count[splits[-1]]+=word_count # 每个splits的最后一个在for循环中没有统计到，需要单独添加
        
    scores={
        pair: freq/(vocab2count[pair[0]]*vocab2count[pair[1]]) for pair,freq in pair2count.items()
    }
    
    return scores

pair2score=_compute_pair2score(word2splits,word2count)

In [None]:

# 返回最高互信息的pair
def _compute_most_score_pair(pair2score):
    best_pair=None
    best_score=None 
    for pair, score in pair2score.items():
        if best_score is None or best_score<score:
            best_pair=pair
            best_score=score 
            
    return best_pair

best_pair=_compute_most_score_pair(pair2score)

In [None]:
# 合并最高互信息的pair，并将其添加至词表中
vocabs.append(best_pair[0]+best_pair[1][2:] if best_pair[1].startswith('##') else best_pair[1]) 


In [None]:
# 利用更新后的词表重新对word2count进行切分
def _merge_pair(a,b,word2splits):
    new_word2splits=dict()
    for word,split in word2splits.items():
        if len(split)==1: 
            new_word2splits[word]=split
            continue
        i=0
        while(i<len(split)-1):
            if split[i]==a and split[i+1]==b:
                merge=a+b[2:] if b.startswith('##') else a+b 
                split=split[:i]+[merge]+split[i+2:]
            else:
                i+=1
        new_word2splits[word]=split 
        
    return new_word2splits

new_word2splits=_merge_pair(best_pair[0],best_pair[1],word2splits)


In [None]:
# 更新词表直到达到指定的大小
vocab_size=70

while len(vocabs)<vocab_size:
    pair2score=_compute_pair2score(word2splits,word2count)
    best_pair=_compute_most_score_pair(pair2score)
    word2splits=_merge_pair(best_pair[0],best_pair[1],word2splits)
    new_token=best_pair[0]+best_pair[1][2:] if best_pair[1].startswith('##') else best_pair[1]
    vocabs.append(new_token)

### 4.WordPiece的推理

In [None]:
def _encode_word(word):
    tokens=[]
    while len(word)>0:
        i=len(word)
        while i>0 and word[:i] not in vocabs:
            i-=1
        if i==0:
            return [['UNK']]
        tokens.append(word[:i])
        word=word[i:]
        if len(word)>0:
            word=f"##{word}"
            
    return tokens

def tokenize(text):
    words=[word for word,_ in pre_tokenize_function(text)]
    encoded_words=[_encode_word(word) for word in words]
    
    return sum(encoded_words,[])

tokenize("This is not a token.")