In [61]:
import re
from operator import itemgetter
from collections import Counter, defaultdict
import os
import time
import numpy as np
from operator import itemgetter
from typing import Dict, Tuple, List, Set

def build_vocab(corpus: str) -> dict:
    """Step 1. Build vocab from text corpus"""

    # Separate each char in word by space and add mark end of token
    tokens = [" ".join(word) + " </w>" for word in corpus.split()]
   
    # Count frequency of tokens in corpus
    vocab = Counter(tokens)  
  
    return vocab


def get_stats(vocab: dict) -> dict:
    """Step 2. Get counts of pairs of consecutive symbols"""

    pairs = defaultdict(int)
    
    for word, frequency in vocab.items():
        symbols = word.split()
       
        for i in range(len(symbols) - 1):
           
            pairs[symbols[i], symbols[i + 1]] += frequency
           
            
    return pairs


def merge_vocab(pair: tuple, v_in: dict) -> dict:
    """Step 3. Merge all occurrences of the most frequent pair"""
    
    v_out = {}
    bigram = re.escape(' '.join(pair))
    p = re.compile(r'(?<!\S)' + bigram + r'(?!\S)')
    
    for word in v_in:
        # replace most frequent pair in all vocabulary
        w_out = p.sub(''.join(pair), word)
        v_out[w_out] = v_in[word]

    return v_out

corpus="I am Ivo ,Whats up fellas"
test_sentence="Hello IVO how are you fellas"

vocab = build_vocab(corpus) # Step 1
#print(vocab)

bpe_codes = {}
num_merges = 50  # Hyperparameter
for i in range(num_merges):
    #print("Merge Nummer" +str(i))
    pairs = get_stats(vocab)  # Step 2
    #print(pairs)
    if not pairs:
        break

    # step 3
    best = max(pairs, key=pairs.get)
    bpe_codes[best] = i
    print(bpe_codes)
    vocab = merge_vocab(best, vocab)
    #print(vocab)

{('s', '</w>'): 0}
{('s', '</w>'): 0, ('I', '</w>'): 1}
{('s', '</w>'): 0, ('I', '</w>'): 1, ('a', 'm'): 2}
{('s', '</w>'): 0, ('I', '</w>'): 1, ('a', 'm'): 2, ('am', '</w>'): 3}
{('s', '</w>'): 0, ('I', '</w>'): 1, ('a', 'm'): 2, ('am', '</w>'): 3, ('I', 'v'): 4}
{('s', '</w>'): 0, ('I', '</w>'): 1, ('a', 'm'): 2, ('am', '</w>'): 3, ('I', 'v'): 4, ('Iv', 'o'): 5}
{('s', '</w>'): 0, ('I', '</w>'): 1, ('a', 'm'): 2, ('am', '</w>'): 3, ('I', 'v'): 4, ('Iv', 'o'): 5, ('Ivo', '</w>'): 6}
{('s', '</w>'): 0, ('I', '</w>'): 1, ('a', 'm'): 2, ('am', '</w>'): 3, ('I', 'v'): 4, ('Iv', 'o'): 5, ('Ivo', '</w>'): 6, (',', 'W'): 7}
{('s', '</w>'): 0, ('I', '</w>'): 1, ('a', 'm'): 2, ('am', '</w>'): 3, ('I', 'v'): 4, ('Iv', 'o'): 5, ('Ivo', '</w>'): 6, (',', 'W'): 7, (',W', 'h'): 8}
{('s', '</w>'): 0, ('I', '</w>'): 1, ('a', 'm'): 2, ('am', '</w>'): 3, ('I', 'v'): 4, ('Iv', 'o'): 5, ('Ivo', '</w>'): 6, (',', 'W'): 7, (',W', 'h'): 8, (',Wh', 'a'): 9}
{('s', '</w>'): 0, ('I', '</w>'): 1, ('a', 'm'): 2,

In [48]:
original_word = 'Ivoto'
word = list(original_word)
word.append('</w>')
word

['I', 'v', 'o', 't', 'o', '</w>']

In [57]:
def get_pairs(word:list):
    pairs = set()
    prev_char = word[0]
    for char in word[1:]:
        pairs.add((prev_char, char))
        prev_char = char

    return pairs

pairs = get_pairs(word)
pairs

{('I', 'v'), ('o', '</w>'), ('o', 't'), ('t', 'o'), ('v', 'o')}

In [68]:
pairs = get_pairs(word)
bpe_codes_pairs = [(pair, bpe_codes[pair]) for pair in pairs if pair in bpe_codes]
print(bpe_codes_pairs)
print(bpe_codes)
print(pairs)
pair_to_merge = min(bpe_codes_pairs, key=itemgetter(1))[0]
pair_to_merge

[(('I', 'v'), 4)]
{('s', '</w>'): 0, ('I', '</w>'): 1, ('a', 'm'): 2, ('am', '</w>'): 3, ('I', 'v'): 4, ('Iv', 'o'): 5, ('Ivo', '</w>'): 6, (',', 'W'): 7, (',W', 'h'): 8, (',Wh', 'a'): 9, (',Wha', 't'): 10, (',What', 's</w>'): 11, ('u', 'p'): 12, ('up', '</w>'): 13, ('f', 'e'): 14, ('fe', 'l'): 15, ('fel', 'l'): 16, ('fell', 'a'): 17, ('fella', 's</w>'): 18}
{('t', 'o'), ('o', 't'), ('v', 'o'), ('o', '</w>'), ('I', 'v')}


('I', 'v')

In [74]:
def create_new_word(word: List[str], pair_to_merge: Tuple[str, str]):
    first, second = pair_to_merge
    new_word = []
    i = 0
    while i < len(word):
        try:
            j = word.index(first, i)
            new_word.extend(word[i:j])
            i = j
        except ValueError:
            new_word.extend(word[i:])
            break

        if i < len(word) - 1 and word[i + 1] == second:
            
            new_word.append(first + second)
            i += 2
        else:
            new_word.append(first)
            i += 1

    return new_word

new_word = create_new_word(word, pair_to_merge)
new_word

['Iv', 'o', 't', 'o', '</w>']

In [76]:
def token_segmenter(original_word: str, bpe_codes: Dict[Tuple[str, str], int]):
    if len(original_word) == 1:
        return original_word

    word = list(original_word)
    word.append('</w>')

    while True:
        pairs = get_pairs(word)
        bpe_codes_pairs = [(pair, bpe_codes[pair]) for pair in pairs if pair in bpe_codes]
        if not bpe_codes_pairs:
            break

        pair_to_merge = min(bpe_codes_pairs, key=itemgetter(1))[0]
        word = create_new_word(word, pair_to_merge)

    return word

original_word = 'Ivoto'
token_segmenter(original_word, bpe_codes)

['Ivo', 't', 'o', '</w>']