In [38]:
import re
import html
from typing import List, Dict, Tuple
from collections import Counter, defaultdict
from tqdm import tqdm
import pickle
from pathlib import Path

class BpeTokenizer:
    def __init__(self):
        self.vocab_stoi = None
        self.vocab_itos = None

        # predefine tokens
        self.meta_tokens = dict(unk="<unk>", pad="<pad>", bos="<bos>", eos="<eos>", fld="<fld>")
        self.preprocess_tokens = dict(maj="<maj>", upp="<upp>", rep="<rep>", wrep="<wrep>",
                                     htag="<#tag>", ctag="<$tag>", atag="<@tag>")

        self.preprocess_args = dict(
            lowercase=True,
            spec_add_spaces=True,
            remove_useless_spaces=True,
            fix_html=True,
        )

        self.learn_bpe_args = dict(
            vocab_size=50_000,
            pairable_chars="a-zA-Z", # regex char set that can be merged in a BPE (`unpairable_chars` must be `None`)
            unpairable_chars=None, # regex char set that can't be merged in a BPE (`pairable_chars` must be `None`)
            unpairable_str="", # regex OR set that can't be in a BPE (will include `meta_tokens`, `preprocess_tokens`)

            required_chars = [], # list of chars to force include in vocab
            required_subwords = [], # list of subwords (e.g. morphemes such as `-er`) to force include in vocab
            required_words = [], # list of words to force include in vocab

            num_chars = -1, # number of chars from corpus (in order of freq) to force include in vocab (-1:all, 0:none)
            num_words = 0, # number of words from corpus (in order of freq) to force include in vocab (-1: all, 0: none)

            max_bpe_size = 0, # max number of chars per BPE (0: unlimited)
            bpe_per_merge = 10, # merge multiple pairs per iteration for speed
        )

    @classmethod
    def from_corpus(cls, corpus: str, save_path: Path,
                    preprocess_args: Dict = None, learn_bpe_args: Dict = None):

        bpet = cls()
        # TODO: clean up constructor api, find altenatives to arg dicts
        if preprocess_args is None:
            preprocess_args = {}
        if learn_bpe_args is None:
            learn_bpe_args = {}
        for k,v in preprocess_args.items():
            bpet.preprocess_args[k] = v
        for k,v in learn_bpe_args.items():
            bpet.learn_bpe_args[k] = v

        bpet._learn_bpe_vocab(corpus)
        bpet.save(save_path)
        return bpet

    def _learn_bpe_vocab(self, corpus: str):
        args = self.learn_bpe_args

        corpus = self._preprocess(corpus)
        word_counts, unpairable_counts = self._count_words(corpus, args)
        vocab_stoi, vocab_itos = self._init_vocab(corpus, word_counts, args)

        word_encodings = {word: [c for c in word] for word in word_counts.keys()}

        num_bpe = args['vocab_size'] - len(vocab_itos)
        num_merges = num_bpe//args['bpe_per_merge']
        for _ in tqdm(range(num_merges)):
            # generate new bytepair frequencies
            bp_counts = defaultdict(int)
            bp_words = defaultdict(set)
            for word, encodings in word_encodings.items():
                for bytepair in zip(encodings[:-1], encodings[1:]):
                    bp = "".join(bytepair)
                    if bp not in vocab_stoi and (len(bp) <= args['max_bpe_size'] or not args['max_bpe_size']):
                        bp = " ".join(bytepair) # space to facilitate word encodings update below
                        bp_counts[bp] += word_counts[word]
                        bp_words[bp].add(word)

            if len(bp_counts) == 0:
                break

            # update stoi/itos and word_encodings
            best_bp = sorted(bp_counts, key=bp_counts.get, reverse=True)[:args['bpe_per_merge']]
            for bp in best_bp:
                merged_bp = bp.replace(" ", "")
                vocab_itos += [merged_bp]
                vocab_stoi[merged_bp] = len(vocab_itos) - 1
                for word in bp_words[bp]:
                    word_encodings[word] = " ".join(word_encodings[word]).replace(bp, merged_bp).split(" ")

        self.vocab_stoi = vocab_stoi
        self.vocab_itos = vocab_itos

    def _preprocess(self, text: str) -> str:
        if self.preprocess_args['lowercase']:
            text = text.lower()

        if self.preprocess_args['spec_add_spaces']:
            text = re.sub(r'([/#\n])', r' \1 ', text)

        if self.preprocess_args['remove_useless_spaces']:
            text = re.sub(' {2,}', ' ', text)

        if self.preprocess_args['fix_html']:
            UNK = self.meta_tokens['unk']
            re1 = re.compile(r'  +')
            text = text.replace('#39;', "'").replace('amp;', '&').replace('#146;', "'").replace(
                'nbsp;', ' ').replace('#36;', '$').replace('\\n', "\n").replace('quot;', "'").replace(
                '<br />', "\n").replace('\\"', '"').replace('<unk>', UNK).replace(' @.@ ', '.').replace(
                ' @-@ ', '-').replace(' @,@ ', ',').replace('\\', ' \\ ')
            return re1.sub(' ', html.unescape(text))

        return text

    def _count_words(self, corpus: str, args: Dict) -> Tuple[Counter, Counter]:
        """Obtains corpus word counts used to learn BPE.

        A word is a continuous sequence of "pairable" characters.
        Tokens made from unpairable characters are also counted in case we want
        to include them by frequency instead of all by default.

        :param corpus: (str) text to learn BPEs from
        :param pairable_chars: (str) regex char set e.g. "a-z" ([ ] not included)
        :param unpairable_chars: (str) regex exclusion char set e.g. "0-9.?!'amp;'" ([ ] and ^ not included)
        :param unpairable_str: (str) regex string set e.g. "(dog|cat|axolotl)"
        :return word_counts: (Counter) count of words
        """
        pairable_chars = args['pairable_chars']
        unpairable_chars = args['unpairable_chars']
        unpairable_str = args['unpairable_str']

        if pairable_chars and unpairable_chars:
            raise ValueError("Only one of `pairable_chars` or `unpairable_chars` can be provided")

        unpairable_counts = Counter(re.findall("|".join(unpairable_str), corpus))
        corpus = re.sub("|".join(unpairable_str),"", corpus)

        # count words (pairabled chars) and unpairable chars
        if pairable_chars:
            word_counts = Counter(re.findall(f"[{pairable_chars}]+", corpus))
            unpairable_counts.update(re.findall(f"[^{pairable_chars}]", corpus))
        elif unpairable_chars:
            word_counts = Counter(re.findall(f"[^{unpairable_chars}]+", corpus))
            unpairable_counts.update(re.findall(f"[{unpairable_chars}]", corpus))
        else:
            word_counts = Counter(re.findall("[a-zA-Z]+", corpus))
            unpairable_counts.update(re.findall("^[a-zA-Z]", corpus))

        return word_counts, unpairable_counts

    def _init_vocab(self, corpus: str, word_counts: Counter, args: Dict) -> Tuple[Dict[str, int], List[str]]:
        """Initialize vocabulary based on predefined tokens (use temporary lookup set to avoid duplicates)

        """
        tmp_vocab_itos = [v for v in self.meta_tokens.values()] + [v for v in self.preprocess_tokens.values()] + \
                     args['required_chars'] + args['required_subwords'] + args['required_words']

        # update vocabulary with corpus words and chars
        if args['num_chars']:
            char_counts = Counter(corpus)
            if args['num_chars'] == -1:
                args['num_chars'] = len(char_counts)
            tmp_vocab_itos += sorted(char_counts, key=char_counts.get, reverse=True)[:args['num_chars']]
        if args['num_words']:
            if args['num_words'] == -1:
                args['num_words'] = len(word_counts)
            tmp_vocab_itos += sorted(word_counts, key=word_counts.get, reverse=True)[:args['num_words']]

        tmp_lookup = set()  # a temporary lookup set
        vocab_itos = [x for x in tmp_vocab_itos if x not in tmp_lookup and tmp_lookup.add(x) is None]
        vocab_stoi = {s:i for i,s in enumerate(vocab_itos)}
        unk_i = vocab_stoi[self.meta_tokens['unk']]
        vocab_stoi = defaultdict(lambda: unk_i, vocab_stoi)

        return vocab_stoi, vocab_itos

    def tokenize(self, text: str) -> List[str]:
        tokens = []
        token = None

        maj_flag = False
        upp_flag = False

        for c in text:
            # expand previous token by one character or append previous token to tokens
            if token is not None:
                new_token = token + c.lower()
                if (new_token not in self.vocab_stoi) or \
                        (len(token)>1 and c.isupper() and maj_flag) or \
                        (c.isupper() and not maj_flag and not upp_flag) or \
                        (c.islower() and upp_flag):
                    if maj_flag:
                        tokens.append(self.preprocess_tokens['maj'])
                        maj_flag = False
                    elif upp_flag:
                        tokens.append(self.preprocess_tokens['upp'])
                        upp_flag = False
                    tokens.append(token)
                    token = None
                else:
                    token = new_token
                    if c.isupper() and maj_flag:
                        upp_flag = True
                        maj_flag = False

            # handle unpairable tokens
            if c.lower() not in self.vocab_stoi:
                tokens.append(c)

            # begin new token
            elif token is None:
                if c.isupper():
                    maj_flag = True
                    token = c.lower()
                else:
                    token = c
        if token:
            tokens.append(token)

        return tokens

    def detokenize(self, tokens: List[str]) -> str:
        for i,token in enumerate(tokens):
            if token == self.preprocess_tokens['maj']:
                tokens[i+1] = tokens[i+1].title() # TODO: fix
            if token == self.preprocess_tokens['upp']:
                tokens[i+1] = tokens[i+1].upper()
        tokens = [token for token in tokens if token not in self.preprocess_tokens.values()]
        text = "".join(tokens)
        return text

    def encode(self, text: str) -> List[int]:
        return [self.vocab_stoi[s] for s in self.tokenize(text)]

    def decode(self, encodings: List[int]) -> str:
        return self.detokenize([self.vocab_itos[i] for i in encodings])

    def save(self, path: Path):
        pickle.dump(self.vocab_itos, path.open('wb'))

    @classmethod
    def load(cls, path: Path):
        bpet = cls()
        itos = pickle.load(path.open('rb'))
        stoi = {s: i for i,s in enumerate(itos)}
        bpet.vocab_itos = itos
        bpet.vocab_stoi = stoi
        return bpet

In [63]:
class BPE:
    def __init__(self, num_merges):
        self.num_merges = num_merges
        self.bpe_codes = {}

    def get_stats(self, vocabulary):
        pairs = {}
        for word, freq in vocabulary.items():
            symbols = word.split()
            for i in range(len(symbols)-1):
                pair = (symbols[i], symbols[i+1])
                if pair in pairs:
                    pairs[pair] += freq
                else:
                    pairs[pair] = freq
        return pairs

    def merge_vocabulary(self, pair, v_in):
        v_out = {}
        bigram = ' '.join(pair)
        replacement = ''.join(pair)
        for word in v_in:
            w_out = word.replace(bigram, replacement)
            v_out[w_out] = v_in[word]
        return v_out

    def build_bpe(self, text):
        vocabulary = {}
        for word in text.split(' '):
            word = ' '.join(list(word)) + ' </w>'
            if word in vocabulary:
                vocabulary[word] += 1
            else:
                vocabulary[word] = 1

        for i in range(self.num_merges):
            pairs = self.get_stats(vocabulary)
            if not pairs:
                break
            best = max(pairs, key=pairs.get)
            vocabulary = self.merge_vocabulary(best, vocabulary)
            self.bpe_codes[best] = i

    def encode(self, text):
        encoded_text = text.split()
        for i in range(len(encoded_text)):
            word = encoded_text[i]
            for pair in self.bpe_codes:
                if pair[0] in word and pair[1] in word:
                    word = word.replace(''.join(pair), ''.join(pair))
            encoded_text[i] = word
        return encoded_text


In [39]:
import nltk
nltk.download('gutenberg')
from nltk.corpus import gutenberg

[nltk_data] Downloading package gutenberg to
[nltk_data]     /Users/paarthvisharma/nltk_data...
[nltk_data]   Package gutenberg is already up-to-date!


In [40]:
files = gutenberg.fileids()

print("Available Books:")
for book in files:
    print(book)

Available Books:
austen-emma.txt
austen-persuasion.txt
austen-sense.txt
bible-kjv.txt
blake-poems.txt
bryant-stories.txt
burgess-busterbrown.txt
carroll-alice.txt
chesterton-ball.txt
chesterton-brown.txt
chesterton-thursday.txt
edgeworth-parents.txt
melville-moby_dick.txt
milton-paradise.txt
shakespeare-caesar.txt
shakespeare-hamlet.txt
shakespeare-macbeth.txt
whitman-leaves.txt


In [41]:
selected_book = gutenberg.raw('shakespeare-hamlet.txt')
print("\nSample Text from 'shakespeare-hamlet.txt':")
print(selected_book[:500])


Sample Text from 'shakespeare-hamlet.txt':
[The Tragedie of Hamlet by William Shakespeare 1599]


Actus Primus. Scoena Prima.

Enter Barnardo and Francisco two Centinels.

  Barnardo. Who's there?
  Fran. Nay answer me: Stand & vnfold
your selfe

   Bar. Long liue the King

   Fran. Barnardo?
  Bar. He

   Fran. You come most carefully vpon your houre

   Bar. 'Tis now strook twelue, get thee to bed Francisco

   Fran. For this releefe much thankes: 'Tis bitter cold,
And I am sicke at heart

   Barn. Haue you had quiet Guard?
  Fran. Not


In [42]:
learn_bpe_args = dict(
    vocab_size=1000,
    pairable_chars="a-zA-Z0-9",
)
path = Path("shakespeare.vocab")
bpe = BpeTokenizer()
bpet = bpe.from_corpus(selected_book, path , learn_bpe_args=learn_bpe_args)

100%|██████████| 94/94 [00:01<00:00, 66.93it/s]


In [72]:
# Example usage
bpe = BPE(num_merges=500)
text = "This is a simple example to illustrate the BPE algorithm."
bpe.build_bpe(selected_book)
encoded_text = bpe.encode(selected_book)
print("Encoded Text: ", encoded_text)



In [43]:
bpet = bpe.load(path)

In [44]:
test_books = gutenberg.raw('whitman-leaves.txt')
test_books



In [36]:
from nltk.tokenize import word_tokenize
testing = bpet.tokenize("I am here, how are you?")
print(testing)
nltk_tokens_testing = word_tokenize("I am here, how are you?")
print(nltk_tokens_testing)

['<maj>', 'i', ' ', 'am', ' ', 'he', 're', ',', ' ', 'how', ' ', 'are', ' ', 'y', 'ou', '?']
['I', 'am', 'here', ',', 'how', 'are', 'you', '?']


In [45]:
bpe_tokens = bpet.tokenize(test_books)
bpe_tokens

['[',
 '<maj>',
 'lea',
 'v',
 'es',
 ' ',
 'of',
 ' ',
 '<maj>',
 'gra',
 's',
 's',
 ' ',
 'by',
 ' ',
 '<maj>',
 'wal',
 't',
 ' ',
 '<maj>',
 'whi',
 't',
 'man',
 ' ',
 '1',
 '8',
 '5',
 '5',
 ']',
 '\n',
 '\n',
 '\n',
 '<maj>',
 'come',
 ',',
 ' ',
 'sa',
 'id',
 ' ',
 'my',
 ' ',
 'sou',
 'l',
 ',',
 '\n',
 '<maj>',
 'su',
 'ch',
 ' ',
 'v',
 'ers',
 'es',
 ' ',
 'for',
 ' ',
 'my',
 ' ',
 '<maj>',
 'bo',
 'dy',
 ' ',
 'let',
 ' ',
 'us',
 ' ',
 'wr',
 'ite',
 ',',
 ' ',
 '(',
 'for',
 ' ',
 'we',
 ' ',
 'are',
 ' ',
 'one',
 ',',
 ')',
 '\n',
 '<maj>',
 'th',
 'at',
 ' ',
 'sh',
 'ou',
 'ld',
 ' ',
 '<maj>',
 'i',
 ' ',
 'af',
 'ter',
 ' ',
 'ret',
 'ur',
 'n',
 ',',
 '\n',
 '<maj>',
 'or',
 ',',
 ' ',
 'lo',
 'ng',
 ',',
 ' ',
 'lo',
 'ng',
 ' ',
 'hen',
 'ce',
 ',',
 ' ',
 'in',
 ' ',
 'o',
 'th',
 'er',
 ' ',
 'sp',
 'he',
 're',
 's',
 ',',
 '\n',
 '<maj>',
 'th',
 'er',
 'e',
 ' ',
 'to',
 ' ',
 'so',
 'me',
 ' ',
 'gr',
 'ou',
 'p',
 ' ',
 'of',
 ' ',
 'mat',
 'es',
 ' ',

In [48]:
from nltk.tokenize import word_tokenize
nltk_tokens = word_tokenize(test_books)
nltk_tokens_set =set(nltk_tokens)
nltk_tokens

['[',
 'Leaves',
 'of',
 'Grass',
 'by',
 'Walt',
 'Whitman',
 '1855',
 ']',
 'Come',
 ',',
 'said',
 'my',
 'soul',
 ',',
 'Such',
 'verses',
 'for',
 'my',
 'Body',
 'let',
 'us',
 'write',
 ',',
 '(',
 'for',
 'we',
 'are',
 'one',
 ',',
 ')',
 'That',
 'should',
 'I',
 'after',
 'return',
 ',',
 'Or',
 ',',
 'long',
 ',',
 'long',
 'hence',
 ',',
 'in',
 'other',
 'spheres',
 ',',
 'There',
 'to',
 'some',
 'group',
 'of',
 'mates',
 'the',
 'chants',
 'resuming',
 ',',
 '(',
 'Tallying',
 'Earth',
 "'s",
 'soil',
 ',',
 'trees',
 ',',
 'winds',
 ',',
 'tumultuous',
 'waves',
 ',',
 ')',
 'Ever',
 'with',
 'pleas',
 "'d",
 'smile',
 'I',
 'may',
 'keep',
 'on',
 ',',
 'Ever',
 'and',
 'ever',
 'yet',
 'the',
 'verses',
 'owning',
 '--',
 'as',
 ',',
 'first',
 ',',
 'I',
 'here',
 'and',
 'now',
 'Signing',
 'for',
 'Soul',
 'and',
 'Body',
 ',',
 'set',
 'to',
 'them',
 'my',
 'name',
 ',',
 'Walt',
 'Whitman',
 '[',
 'BOOK',
 'I.',
 'INSCRIPTIONS',
 ']',
 '}',
 "One's-Self",
 'I'

In [73]:
bpe_tokens = bpe.encode(test_books)
bpe_tokens

['[Leaves',
 'of',
 'Grass',
 'by',
 'Walt',
 'Whitman',
 '1855]',
 'Come,',
 'said',
 'my',
 'soul,',
 'Such',
 'verses',
 'for',
 'my',
 'Body',
 'let',
 'us',
 'write,',
 '(for',
 'we',
 'are',
 'one,)',
 'That',
 'should',
 'I',
 'after',
 'return,',
 'Or,',
 'long,',
 'long',
 'hence,',
 'in',
 'other',
 'spheres,',
 'There',
 'to',
 'some',
 'group',
 'of',
 'mates',
 'the',
 'chants',
 'resuming,',
 '(Tallying',
 "Earth's",
 'soil,',
 'trees,',
 'winds,',
 'tumultuous',
 'waves,)',
 'Ever',
 'with',
 "pleas'd",
 'smile',
 'I',
 'may',
 'keep',
 'on,',
 'Ever',
 'and',
 'ever',
 'yet',
 'the',
 'verses',
 'owning--as,',
 'first,',
 'I',
 'here',
 'and',
 'now',
 'Signing',
 'for',
 'Soul',
 'and',
 'Body,',
 'set',
 'to',
 'them',
 'my',
 'name,',
 'Walt',
 'Whitman',
 '[BOOK',
 'I.',
 'INSCRIPTIONS]',
 '}',
 "One's-Self",
 'I',
 'Sing',
 "One's-self",
 'I',
 'sing,',
 'a',
 'simple',
 'separate',
 'person,',
 'Yet',
 'utter',
 'the',
 'word',
 'Democratic,',
 'the',
 'word',
 'E

In [74]:
from nltk.tokenize import word_tokenize
nltk_tokens = word_tokenize(test_books)
nltk_tokens_set =set(nltk_tokens)
nltk_tokens

['[',
 'Leaves',
 'of',
 'Grass',
 'by',
 'Walt',
 'Whitman',
 '1855',
 ']',
 'Come',
 ',',
 'said',
 'my',
 'soul',
 ',',
 'Such',
 'verses',
 'for',
 'my',
 'Body',
 'let',
 'us',
 'write',
 ',',
 '(',
 'for',
 'we',
 'are',
 'one',
 ',',
 ')',
 'That',
 'should',
 'I',
 'after',
 'return',
 ',',
 'Or',
 ',',
 'long',
 ',',
 'long',
 'hence',
 ',',
 'in',
 'other',
 'spheres',
 ',',
 'There',
 'to',
 'some',
 'group',
 'of',
 'mates',
 'the',
 'chants',
 'resuming',
 ',',
 '(',
 'Tallying',
 'Earth',
 "'s",
 'soil',
 ',',
 'trees',
 ',',
 'winds',
 ',',
 'tumultuous',
 'waves',
 ',',
 ')',
 'Ever',
 'with',
 'pleas',
 "'d",
 'smile',
 'I',
 'may',
 'keep',
 'on',
 ',',
 'Ever',
 'and',
 'ever',
 'yet',
 'the',
 'verses',
 'owning',
 '--',
 'as',
 ',',
 'first',
 ',',
 'I',
 'here',
 'and',
 'now',
 'Signing',
 'for',
 'Soul',
 'and',
 'Body',
 ',',
 'set',
 'to',
 'them',
 'my',
 'name',
 ',',
 'Walt',
 'Whitman',
 '[',
 'BOOK',
 'I.',
 'INSCRIPTIONS',
 ']',
 '}',
 "One's-Self",
 'I'

In [75]:
# First : Calculating Tokenization Accuracy
correct_tokens = sum(token in nltk_tokens_set for token in bpe_tokens)
accuracy = (correct_tokens / len(nltk_tokens)) * 100

print("Tokenization accuracy : " , accuracy)


Tokenization accuracy :  65.32865061226131


In [76]:
# Second : Tokenization Coverage
# Identifying the unique tokens covered by implemented BPE algorithm:
bpe_tokens_set = set(bpe_tokens)
coverage = (len(bpe_tokens_set.intersection(nltk_tokens_set)) / len(nltk_tokens_set)) * 100
print("Tokenization coverage : " , coverage)

Tokenization coverage :  74.69353700019255


In [77]:
# Third: Precision, Recall, F1-Score
TP = len(bpe_tokens_set.intersection(set(nltk_tokens)))
FP = len(bpe_tokens_set - set(nltk_tokens))
FN = len(set(nltk_tokens) - set(bpe_tokens))
precision = TP / (TP + FP)
recall = TP / (TP + FN)
f1_score = 2 * (precision * recall) / (precision + recall)

print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1-Score: {f1_score}")


Precision: 0.5032431030009513
Recall: 0.7469353700019254
F1-Score: 0.6013382592295967


In [79]:
TP = len(bpe_tokens_set.intersection(nltk_tokens_set))
FP = sum(token not in nltk_tokens_set for token in bpe_tokens_set)
FN = sum(token not in bpe_tokens_set for token in nltk_tokens_set)

precision = TP / (TP + FP)
recall = TP / (TP + FN)
f1_score = 2 * (precision * recall) / (precision + recall)

print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1-Score: {f1_score}")

Precision: 0.5032431030009513
Recall: 0.7469353700019254
F1-Score: 0.6013382592295967


In [78]:

# Jaccard Similarity
jaccard_similarity = len(bpe_tokens_set.intersection(nltk_tokens_set)) / len(bpe_tokens_set.union(nltk_tokens_set))
jaccard_similarity

0.4299383058110754