In [5]:
import typing
from pathlib import Path
from collections import defaultdict
import re

In [6]:
paths = [x for x in Path('/home/maskedpirate/repos/DATA/nlpdata/bookcorpus').glob('*.txt')]

In [5]:
class BPE:
    def __init__(self, vocab_size=50):
        self.vocab_size = vocab_size
        self.vocab = set() 
        self.split = list()


    @staticmethod 
    def __create_corpus(path: Path):
        '''Creates the first word corpus for BPE.

        Parameters:
            path[Path]: path to the txt file 
        '''
        with open(path, 'r') as f:
            file = f.read()
        return re.sub(r'[^\x00-\x7F]+', '', file).split(' ')


    def __split_and_initiate(self, wordcorpus: list):
        for word in wordcorpus:
            for char in word:
                self.split.append(char)
                self.vocab.add(char)
        
        
    def __count(self):
        '''Creates a defaultdict which shows frequency of 
        repeated elements.
        
        Parameters:
            placeholder 
        Returns:
            splits[defaultdict]: defaultdict of frequencies
        '''
        pairs = defaultdict(int)
        splint_len = len(self.split)
        for i in range(splint_len-1):
            pair = f'{self.split[i]}<sp>{self.split[i+1]}'
            pairs[pair] += 1
            if len(pairs) > 50:
                break
        return pairs


    def __add_to_vocab(self, pairs, max_freq):
        '''Adds given tokens to vocab'''
        for tup in pairs.items():
            if tup[1] == max_freq:
                self.vocab.add(tup[0].replace('<sp>', ''))
                
            
    def __merge(self, pair):
        f, s = pair.split('<sp>')
        ind = 0
        while ind < len(self.split) - 1:
            if self.split[ind] == f and self.split[ind+1] == s:
                self.split = self.split[:ind] + [f+s] + self.split[ind+2:]
            else:
                ind += 1
                    
    def __find_best(self):
        pairs = self.__count()
        max_freq = max(pairs.values())
        highest_freq_vals = [k for k, v in pairs.items() if v == max_freq]
        self.__add_to_vocab(pairs, max_freq)
        for item in highest_freq_vals:
            self.__merge(item)

 
    def train(self, path, vocab_size):
        corpus = self.__create_corpus(path)
        self.__split_and_initiate(corpus)
        while len(self.vocab) < vocab_size:
            self.__find_best()
            print(f'vocab: {self.vocab}')
            print(f'splits: {self.split}')

In [6]:
paths

[PosixPath('/home/maskedpirate/repos/DATA/nlpdata/bookcorpus/books_large_p1.txt'),
 PosixPath('/home/maskedpirate/repos/DATA/nlpdata/bookcorpus/books_large_p2.txt')]

In [7]:
tokenizer = BPE()
tokenizer.train('./toy_data/example.txt', 200)

vocab: {'g', '3', '4', 'd', 'c', 'z', '0', 'j', 'b', '.', 'm', '-', 'er', '1', 'i', ':', '7', 'f', 'p', 't', 'k', 'o', 'y', 'a', 'h', 'n', ',', 'r', '9', 'e', '?', 'x', 'v', 's', '2', '\n', 'w', 'l', '8', 'q', 'u'}
splits: ['t', 'h', 'e', 'h', 'a', 'l', 'f', '-', 'l', 'i', 'n', 'g', 'b', 'o', 'o', 'k', 'o', 'n', 'e', 'i', 'n', 't', 'h', 'e', 'f', 'a', 'l', 'l', 'o', 'f', 'i', 'g', 'n', 'e', 'er', 'i', 'a', 's', 'er', 'i', 'e', 's', 'k', 'a', 'y', 'l', 'e', 'e', 's', 'o', 'd', 'er', 'b', 'u', 'r', 'g', 'c', 'o', 'p', 'y', 'r', 'i', 'g', 'h', 't', '2', '0', '1', '3', 'k', 'a', 'y', 'l', 'e', 'e', 's', 'o', 'd', 'er', 'b', 'u', 'r', 'g', 'a', 'l', 'l', 'r', 'i', 'g', 'h', 't', 's', 'r', 'e', 's', 'er', 'v', 'e', 'd', '.', '\n', 'i', 's', 'b', 'n', ':', '1', '4', '9', '2', '9', '1', '3', '7', '3', '1', 'i', 's', 'b', 'n', '-', '1', '3', ':', '9', '7', '8', '-', '1', '4', '9', '2', '9', '1', '3', '7', '3', '3', 'f', 'o', 'r', 'm', 'y', 'f', 'a', 'm', 'i', 'l', 'y', ',', 'w', 'h', 'o', 'e', 

In [9]:
class BPE:
    def __init__(self, vocab_size=50):
        self.vocab_size = vocab_size
        self.vocab = set()
        self.split = list()

    @staticmethod
    def __create_corpus(path: Path):
        '''Creates the first word corpus for BPE.

        Parameters:
            path[Path]: path to the txt file
        '''
        with open(path, 'r') as f:
            file = f.read()
        return re.sub(r'[^\x00-\x7F]+', '', file).split(' ')

    def __split_and_initiate(self, wordcorpus: list):
        for word in wordcorpus:
            for i, char in enumerate(word):
                self.split.append(char)
                self.vocab.add(char)
                if i < len(word) - 1:
                    self.split.append(' ')
                    self.vocab.add(' ')

    def __count(self):
        '''Creates a defaultdict which shows frequency of
        repeated elements.

        Parameters:
            placeholder
        Returns:
            splits[defaultdict]: defaultdict of frequencies
        '''
        pairs = defaultdict(int)
        splint_len = len(self.split)
        for i in range(splint_len - 1):
            if self.split[i] != ' ' and self.split[i + 1] != ' ':
                pair = f'{self.split[i]}<sp>{self.split[i+1]}'
                pairs[pair] += 1
                if len(pairs) > 50:
                    break
        return pairs

    def __add_to_vocab(self, pairs, max_freq):
        '''Adds given tokens to vocab'''
        for tup in pairs.items():
            if tup[1] == max_freq:
                self.vocab.add(tup[0].replace('<sp>', ''))

    def __merge(self, pair):
        f, s = pair.split('<sp>')
        ind = 0
        while ind < len(self.split) - 1:
            if self.split[ind] == f and self.split[ind + 1] == s:
                self.split = self.split[:ind] + [f + s] + self.split[ind + 2:]
            else:
                ind += 1
                
    def __find_best(self):
        return self.__count()

    def train(self, path, vocab_size):
        corpus = self.__create_corpus(path)
        self.__split_and_initiate(corpus)
        while len(self.vocab) < vocab_size:
            pairs = self.__find_best()
            if not pairs:
                print('No more pairs to process')
                break
            max_freq = max(pairs.values())
            highest_freq_vals = [k for k, v in pairs.items() if v == max_freq]
            self.__add_to_vocab(pairs, max_freq)
            for item in highest_freq_vals:
                self.__merge(item)
 
        


In [11]:
tokenizer = BPE()
tokenizer.train('toy_data/example.txt', 2000)
print(tokenizer.vocab)

No more pairs to process
{'nih', 'u', 'lf', 'ew', '3if', 'dit', 'yg', 't', 'ca', 'y;i', 'sj', 'r,n', 'un', 'd,n', 'diw', 'nj', 'lo', 'tt', 'wf', 'gah', 'lt', 'mw', 'fir', 'dab', 't,w', 'md', 'pa', 'n,w', '-', 'do', 'h', 'o', 'ia', 'gal', 'fh', 'd', 'e,t', 'eik', 'mo', 'ie', 'ee', 'fl', 'f,', 'h,ir', 'og', 'lm', 'eij', 'y!', 'g!', 'oo', 'ls', 'hiw', 'saf', 's,p', 'el', ',t', ':w', 'haf', 'g,i', 'hj', '.', 'w,', 'l;i', 'r?', 'eiw', ';t', 'r.', 'eh', 'xm', 'eaq', 'e,d', 't,t', 'ed', 'mf', 'fc', 't,g', 'ub', 'hb', 'gac', 'yj', 'r,o', 'sic', 'fio', 'm.', 'rf', 's,aw', 'k', 'f.', 'iq', 'le', 'j', 'fp', 'n,l', 'ws', 'kh', 'e,w', 'ph', '.w', 'tp', 'ku', 'fs', ' ', 'so', 'hih', 's,', 'sc', ',s', 'bt', 'e,e', 'kl', 'il', 'nir', 'sf', '1', '\n', 'rn', 'sad', 'h,', 'gi', 'saw', 'hm', 'nd', 'dp', 'q', 'ik', 'dia', 't,l', ',f', 'e,b', 'gic', 'lg', 'tan', 'gak', 'ej', 'i,h', 's;w', ',n', 'rak', 'di', 'gb', 'ns', 'f', 'n,d', 'dap', 'cb', '8', 'lic', ',w', 'd,w', 'ok', 'r,f', 'ha', 'e,m', 'p,s', 'd,a',