In [1]:
# ------------------------- LOAD CORPUS -------------------------
import re
from collections import Counter

def load_corpus(path="corpus.txt", lowercase=True):
    try:
        with open(path, "r", encoding="utf-8") as f:
            text = f.read()
    except:
        text = """
        this is a demo corpus used only if corpus.txt not found
        byte pair encoding and wordpiece tokenization implemented
        from scratch using python without any libraries
        """

    if lowercase:
        text = text.lower()

    text = re.sub(r"\s+", " ", text)
    words = [w for w in text.split(" ") if w.strip() != ""]
    return words


In [2]:
# ------------------------- BPE IMPLEMENTATION -------------------------
def split_word(word):
    return list(word) + ["</w>"]

def build_vocab(words):
    freq = Counter(words)
    vocab = Counter()
    for w, f in freq.items():
        vocab[tuple(split_word(w))] += f
    return vocab

def get_pair_counts(vocab):
    pairs = Counter()
    for word, freq in vocab.items():
        for i in range(len(word)-1):
            pairs[(word[i], word[i+1])] += freq
    return pairs

def merge_vocab(vocab, pair):
    a, b = pair
    merged = a + b
    new_vocab = Counter()

    for word, freq in vocab.items():
        new_word = []
        i = 0
        while i < len(word):
            if i < len(word)-1 and word[i] == a and word[i+1] == b:
                new_word.append(merged)
                i += 2
            else:
                new_word.append(word[i])
                i += 1
        new_vocab[tuple(new_word)] += freq

    return new_vocab, merged

def train_bpe(words, merge_steps=1000, vocab_size=None):
    vocab = build_vocab(words)
    merges = []
    token_set = set([s for w in vocab for s in w])

    for step in range(merge_steps):
        pairs = get_pair_counts(vocab)
        if not pairs:
            break

        best = pairs.most_common(1)[0][0]
        vocab, merged_sym = merge_vocab(vocab, best)
        merges.append((best, merged_sym))
        token_set.add(merged_sym)

        if vocab_size and len(token_set) >= vocab_size:
            break

    final_vocab = set(token_set)
    return merges, final_vocab

def apply_bpe(word, merges):
    tokens = split_word(word)

    for pair, merged in merges:
        i = 0
        while i < len(tokens)-1:
            if tokens[i] == pair[0] and tokens[i+1] == pair[1]:
                tokens[i:i+2] = [merged]
            else:
                i += 1

    if tokens[-1] == "</w>":
        tokens = tokens[:-1]

    return tokens


In [3]:

# ------------------------- WORDPIECE IMPLEMENTATION -------------------------
def train_wordpiece(words, vocab_size=32000, merge_steps=1000):
    vocab = build_vocab(words)
    merges = []
    token_set = set([s for w in vocab for s in w])

    for step in range(merge_steps):
        pairs = get_pair_counts(vocab)
        if not pairs:
            break
        
        best = pairs.most_common(1)[0][0]
        merged = best[0] + best[1]   # simple merge
        merges.append((best, merged))

        new_vocab = Counter()
        for word, freq in vocab.items():
            new_word = []
            i = 0
            while i < len(word):
                if i < len(word)-1 and word[i] == best[0] and word[i+1] == best[1]:
                    new_word.append(merged)
                    i += 2
                else:
                    new_word.append(word[i])
                    i += 1
            new_vocab[tuple(new_word)] += freq
        vocab = new_vocab
        token_set.add(merged)

        if len(token_set) >= vocab_size:
            break

    final_vocab = set(token_set)
    return merges, final_vocab

def apply_wordpiece(word, merges):
    tokens = split_word(word)

    for pair, merged in merges:
        i = 0
        while i < len(tokens)-1:
            if tokens[i] == pair[0] and tokens[i+1] == pair[1]:
                tokens[i:i+2] = [merged]
            else:
                i += 1

    if tokens[-1] == "</w>":
        tokens = tokens[:-1]

    return tokens


In [4]:

# ------------------------- RUN TRAINING -------------------------
words = load_corpus("corpus.txt")
print("Loaded tokens:", len(words))

# For your assignment:
MERGES = 32000
VOCAB = 32000

bpe_merges, bpe_vocab = train_bpe(words, merge_steps=MERGES, vocab_size=VOCAB)
wp_merges, wp_vocab = train_wordpiece(words, vocab_size=VOCAB, merge_steps=MERGES)

print("BPE vocab size:", len(bpe_vocab))
print("WordPiece vocab size:", len(wp_vocab))


Loaded tokens: 25
BPE vocab size: 129
WordPiece vocab size: 129


In [6]:
sample =" byte pair encoding and wordpiece tokenization"
print("Sample:", sample)

print("BPE →", apply_bpe(sample, bpe_merges))
print("WordPiece →", apply_wordpiece(sample, wp_merges))


Sample:  byte pair encoding and wordpiece tokenization
BPE → [' ', 'byt', 'e', ' ', 'pair', ' ', 'encod', 'ing', ' ', 'an', 'd', ' ', 'wordpiec', 'e', ' ', 'tokenization</w>']
WordPiece → [' ', 'byt', 'e', ' ', 'pair', ' ', 'encod', 'ing', ' ', 'an', 'd', ' ', 'wordpiec', 'e', ' ', 'tokenization</w>']
