# Byte Pair Encoding

In [1]:
#Unicode
#Strings are immutable sequences of Unicode code points. 

var = "Hello world !"

for word in var:
    # ord ---> returns unicode for character not string tho!
    print(ord(word))

unicode_list = [(ord(x)) for x in var]


72
101
108
108
111
32
119
111
114
108
100
32
33


In [2]:
unicode_list

[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 33]

In [2]:
base_para = str("Artificial intelligence, or AI, is concerned with building systems that simulate intelligent behavior. It encompasses a wide range of approaches, including those based on logic, search, and probabilistic reasoning. Machine learning is a subset of AI that learns to make decisions by fitting mathematical models to observed data. This area has seen explosive growth and is now (incorrectly) almost synonymous with the term AI. A deep neural network (or deep network for short) is a type of machine learning model, and the process of fitting these models to data is referred to as deep learning. At the time of writing, deep networks are the most powerful and practical machine learning models and are often encountered in day-to-day life. It is commonplace to translate text to another language using a natural language processing algorithm, to search for images of a given object using a computer vision system, or to converse with a digital assistant via a speech recognition interface. All of these applications are powered by deep learning.")
byte_object = base_para[:500].encode("utf-8")
print(byte_object, len(byte_object))

print("_____________")
byte_list = list(byte_object) #list(map(int, tokens))
print(byte_list, len(byte_list))
print("_____________")



b'Artificial intelligence, or AI, is concerned with building systems that simulate intelligent behavior. It encompasses a wide range of approaches, including those based on logic, search, and probabilistic reasoning. Machine learning is a subset of AI that learns to make decisions by fitting mathematical models to observed data. This area has seen explosive growth and is now (incorrectly) almost synonymous with the term AI. A deep neural network (or deep network for short) is a type of machine lea' 500
_____________
[65, 114, 116, 105, 102, 105, 99, 105, 97, 108, 32, 105, 110, 116, 101, 108, 108, 105, 103, 101, 110, 99, 101, 44, 32, 111, 114, 32, 65, 73, 44, 32, 105, 115, 32, 99, 111, 110, 99, 101, 114, 110, 101, 100, 32, 119, 105, 116, 104, 32, 98, 117, 105, 108, 100, 105, 110, 103, 32, 115, 121, 115, 116, 101, 109, 115, 32, 116, 104, 97, 116, 32, 115, 105, 109, 117, 108, 97, 116, 101, 32, 105, 110, 116, 101, 108, 108, 105, 103, 101, 110, 116, 32, 98, 101, 104, 97, 118, 105, 111, 114,

In [15]:
import regex
from collections import defaultdict
from typing import List, Dict, Tuple, Any


class BytePairEncoder:
    """
    A class implementing Byte Pair Encoding (BPE) compression algorithm.
    
    BPE iteratively replaces the most frequent pair of consecutive bytes/tokens
    with a new token, effectively compressing the data by reducing repeated patterns.
    """
    
    def __init__(self, lower_frequency_limit: int = 1, max_iterations: int = 100, vocabulary:Dict[Tuple[Any, Any], int] = {} ):
        """
        Initialize the BPE encoder.
        
        Args:
            lower_frequency_limit: Minimum frequency threshold for pair merging
            max_iterations: Maximum number of compression iterations
            vocabulary: predefined dictionary of current vocabulary in use
        """
        self.lower_frequency_limit = lower_frequency_limit
        self.max_iterations = max_iterations
        self.vocab = vocabulary
        
    def _count_pair_frequencies(self, sequence: List[int]) -> Dict[Tuple[Any, Any], int]:
        """
        Count frequencies of adjacent pairs in the sequence.
        
        Args:
            sequence: List of tokens to analyze
            
        Returns:
            Dictionary mapping token pairs to their frequencies
        """
        freq_map = defaultdict(int)
        for i in range(len(sequence) - 1):
            pair = (sequence[i], sequence[i + 1])
            freq_map[pair] += 1
        return freq_map
    
    def _find_most_frequent_pairs(self, freq_map: Dict[Tuple[Any, Any], int]) -> List[Tuple[Any, Any]]:
        """
        Find pairs that occur most frequently, above the lower frequency limit.
        
        Args:
            freq_map: Dictionary of pair frequencies
            
        Returns:
            List of pairs that meet the frequency criteria
        """
        if not freq_map:
            return []
        most_freq_pairs = [x for x in freq_map.keys() if freq_map[x] == max(max(freq_map.values()), self.lower_frequency_limit)]
        return most_freq_pairs

    

    
    def _update_vocabulary(self, pairs):
        """
        Assign new token values to frequent pairs in the vocabulary.
        
        Args:
            pairs: List of pairs to add to vocabulary
        """
        if not self.vocab:
            next_token = 0
        else:
            next_token = max(self.vocab.values()) + 1
            
        for pair in pairs:
            self.vocab[pair] = next_token
            next_token += 1
    
    def _compress_sequence(self, sequence: List[Any], vocab) -> List[Any]:
        """
        Compress the sequence by replacing frequent pairs with their token values.
        
        Args:
            sequence: List of tokens to compress
            vocabulary
            
        Returns:
            Compressed sequence
        """
        compressed = []
        i, flag = 0, 0
        while i < len(sequence) - 1:
            pair = (sequence[i], sequence[i + 1])
            if pair in vocab:
                flag = 1
                compressed.append(vocab[pair])
                i += 2
            else:
                compressed.append(sequence[i])
                i += 1
                
        # Handle last token if we're at the end
        if i == len(sequence) - 1:
            compressed.append(sequence[-1])

        return compressed

    def regex_chunking(self, text: str) -> List[str]:
        pattern = regex.compile(r"""'s|'t|'re|'ve|'m|'ll|'d| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+""")
        text_chunks = regex.findall(pattern , text)
        return text_chunks
    
    def train_bpe(self, train_text: str):
        """
        Encode the input sequence using BPE algorithm.
        
        Args:
            sequence: Input sequence to compress
            
        Returns:
            None
        """
        text_chunks = self.regex_chunking(train_text)
        
        current_sequence_list = [list(chunk.encode('utf-8')) for chunk in text_chunks]
        
        
        
        for i in range(self.max_iterations):
            freq_map = defaultdict(int)
            
            # Count frequencies of adjacent pairs
            for seq in current_sequence_list:
                for k, v in self._count_pair_frequencies(seq).items():
                    freq_map[k] += v

            #Find frequent pairs. Note multiple pairs can have same frequency.
            frequent_pairs = self._find_most_frequent_pairs(freq_map)
            
           
            # If no pairs meet criteria, stop iteration
            if not frequent_pairs:
                print("No more frequent pairs found. Training completed!")
                break
                
            # Update vocabulary with new tokens
            self._update_vocabulary(frequent_pairs)

          
            # Compress sequence using updated vocabulary
            new_sequence_list = [self._compress_sequence(seq, self.vocab) for seq in current_sequence_list]   
            current_sequence_list = new_sequence_list

        print(f"length of training corpus {len(train_text)}, #new_tokens added:{len(self.vocab)-256}. Training completed")
        

    def encode(self, text):
        pass
        
            
    def get_vocabulary(self) -> Dict[Tuple[Any, Any], int]:
        """
        Get the current BPE vocabulary.
        
        Returns:
            Dictionary mapping token pairs to their assigned values
        """
        return self.vocab.copy()

    

    

'a'

In [9]:
lower_lim = 3
max_iter = 5
initial_vocab = {chr(i):i for i in range(256)}
BPE = BytePairEncoder(lower_lim, max_iter, initial_vocab)
com_seq = BPE.train_bpe(base_para)
vocabulary = BPE.get_vocabulary()
print(vocabulary)

length of training corpus 1043, #new_tokens added:5. Training completed
{'\x00': 0, '\x01': 1, '\x02': 2, '\x03': 3, '\x04': 4, '\x05': 5, '\x06': 6, '\x07': 7, '\x08': 8, '\t': 9, '\n': 10, '\x0b': 11, '\x0c': 12, '\r': 13, '\x0e': 14, '\x0f': 15, '\x10': 16, '\x11': 17, '\x12': 18, '\x13': 19, '\x14': 20, '\x15': 21, '\x16': 22, '\x17': 23, '\x18': 24, '\x19': 25, '\x1a': 26, '\x1b': 27, '\x1c': 28, '\x1d': 29, '\x1e': 30, '\x1f': 31, ' ': 32, '!': 33, '"': 34, '#': 35, '$': 36, '%': 37, '&': 38, "'": 39, '(': 40, ')': 41, '*': 42, '+': 43, ',': 44, '-': 45, '.': 46, '/': 47, '0': 48, '1': 49, '2': 50, '3': 51, '4': 52, '5': 53, '6': 54, '7': 55, '8': 56, '9': 57, ':': 58, ';': 59, '<': 60, '=': 61, '>': 62, '?': 63, '@': 64, 'A': 65, 'B': 66, 'C': 67, 'D': 68, 'E': 69, 'F': 70, 'G': 71, 'H': 72, 'I': 73, 'J': 74, 'K': 75, 'L': 76, 'M': 77, 'N': 78, 'O': 79, 'P': 80, 'Q': 81, 'R': 82, 'S': 83, 'T': 84, 'U': 85, 'V': 86, 'W': 87, 'X': 88, 'Y': 89, 'Z': 90, '[': 91, '\\': 92, ']': 93, 

In [6]:
def detect_circular_mappings(vocab):
    for pair, token in vocab.items():
        if (token,) in vocab:
            
            print(f"Circular mapping detected: {pair} -> {token}")

    print("no")

detect_circular_mappings(vocabulary)

no


In [16]:
text = "My name is Hello World!"
encoded = BPE.encode(text)

initial byte list: [77, 121, 32, 110, 97, 109, 101, 32, 105, 115, 32, 72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33]
compressed byte list: [77, 121, 32, 110, 97, 109, 101, 32, 105, 115, 32, 72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33]


In [15]:
print(com_seq)

[65, 114, 116, 105, 102, 269, 105, 270, 32, 257, 271, 108, 282, 103, 272, 99, 101, 273, 259, 283, 273, 265, 99, 266, 99, 101, 114, 275, 267, 284, 260, 32, 98, 117, 105, 108, 100, 268, 261, 121, 115, 271, 109, 256, 260, 97, 262, 285, 109, 117, 108, 97, 116, 258, 257, 271, 108, 282, 103, 272, 262, 98, 101, 104, 97, 118, 105, 259, 276, 73, 262, 272, 99, 111, 109, 112, 97, 115, 263, 256, 97, 32, 284, 100, 258, 114, 97, 110, 103, 258, 286, 277, 112, 112, 287, 288, 101, 115, 273, 257, 99, 108, 117, 100, 268, 289, 290, 258, 98, 97, 263, 267, 266, 32, 108, 111, 103, 269, 44, 261, 291, 278, 273, 97, 110, 267, 112, 287, 98, 97, 98, 105, 282, 115, 116, 269, 32, 114, 264, 115, 266, 268, 276, 77, 288, 257, 258, 292, 114, 110, 268, 32, 293, 261, 117, 98, 263, 262, 286, 283, 289, 97, 262, 292, 114, 110, 256, 116, 111, 294, 107, 258, 281, 99, 105, 285, 266, 256, 98, 121, 32, 102, 105, 116, 116, 268, 294, 260, 101, 280, 116, 269, 270, 279, 111, 281, 108, 256, 116, 111, 32, 111, 98, 263, 114, 118, 101, 

In [50]:
from collections import defaultdict

def foo(byte_list, lower_lim, vocab, max_iterations):
    iteration = 0
    while iteration < max_iterations:
        if iteration > 0: byte_list = new_byte_list
        freq_map = defaultdict(int) #default value for int is 0
        
        # Count frequencies
        for i in range(len(byte_list)-1):
            pair = (byte_list[i], byte_list[i+1])
            freq_map[pair] += 1
        
        #get most frequently occuring pairs
        most_freq_pairs = [x for x in freq_map.keys() if freq_map[x] == max(max(freq_map.values()), lower_lim)]

    
        #create a dictionary of the identified pairs and allot them a token value
        last_token = vocab[list(vocab.keys())[-1]]

        for i, pair in enumerate(most_freq_pairs):
            print(i+last_token)
            vocab[pair]= i + last_token +1
        
        
        #compress the current byte list
        new_byte_list = []
        
        i = 0
        while i < len(byte_list)-1:
            pair = (byte_list[i], byte_list[i+1])
            if pair in most_freq_pairs:
                new_byte_list.append(vocab[pair])
                i += 2  # Skip both tokens in the pair
            else:
                new_byte_list.append(byte_list[i])
                i += 1
                
            if i == len(byte_list)-1:  # Handle last token
                new_byte_list.append(byte_list[-1])
        iteration+=1

In [51]:
lower_lim =4
max_iterations = 5
loop = 0
foo(byte_list, lower_lim, initial_vocab, max_iterations )

    


[(115, 32)]
ÿ 255
255
[(105, 110)]
(115, 32) 256
256
[(101, 32)]
(105, 110) 257
257
[(111, 114), (116, 104)]
(101, 32) 258
258
259
[(32, 115), (116, 32), (115, 101), (101, 97)]
(116, 104) 260
260
261
262
263


In [52]:
print(initial_vocab)

{'\x00': 0, '\x01': 1, '\x02': 2, '\x03': 3, '\x04': 4, '\x05': 5, '\x06': 6, '\x07': 7, '\x08': 8, '\t': 9, '\n': 10, '\x0b': 11, '\x0c': 12, '\r': 13, '\x0e': 14, '\x0f': 15, '\x10': 16, '\x11': 17, '\x12': 18, '\x13': 19, '\x14': 20, '\x15': 21, '\x16': 22, '\x17': 23, '\x18': 24, '\x19': 25, '\x1a': 26, '\x1b': 27, '\x1c': 28, '\x1d': 29, '\x1e': 30, '\x1f': 31, ' ': 32, '!': 33, '"': 34, '#': 35, '$': 36, '%': 37, '&': 38, "'": 39, '(': 40, ')': 41, '*': 42, '+': 43, ',': 44, '-': 45, '.': 46, '/': 47, '0': 48, '1': 49, '2': 50, '3': 51, '4': 52, '5': 53, '6': 54, '7': 55, '8': 56, '9': 57, ':': 58, ';': 59, '<': 60, '=': 61, '>': 62, '?': 63, '@': 64, 'A': 65, 'B': 66, 'C': 67, 'D': 68, 'E': 69, 'F': 70, 'G': 71, 'H': 72, 'I': 73, 'J': 74, 'K': 75, 'L': 76, 'M': 77, 'N': 78, 'O': 79, 'P': 80, 'Q': 81, 'R': 82, 'S': 83, 'T': 84, 'U': 85, 'V': 86, 'W': 87, 'X': 88, 'Y': 89, 'Z': 90, '[': 91, '\\': 92, ']': 93, '^': 94, '_': 95, '`': 96, 'a': 97, 'b': 98, 'c': 99, 'd': 100, 'e': 101

In [30]:
print(500 - len(compressed_list))

147


In [24]:
print(new_list_1[:10])
print(new_list[:10])

NameError: name 'new_list_1' is not defined