In [3]:
# simple_tokenizer.py
import os, re, string, pickle
from collections import Counter
from typing import List

class SimpleTokenizer:
    """
    Word-level tokenizer with:
      • special tokens  : <PAD> =0, <UNK>=1
      • optional fixed-size *word* vocab
      • guaranteed 1-char tokens <C_a> … <C_z>, <C_0> … <C_9>
        so an OOV word is split into characters instead of <UNK>.
    """

    CHAR_TOKENS = list(string.ascii_lowercase) + list(string.digits)

    def __init__(self, vocab_size: int | None = 10_000, add_char_fallback: bool = True):
        """
        Parameters
        ----------
        vocab_size : int | None
            Max *word* tokens *excluding* special + char tokens.
            None ⇒ unlimited.
        add_char_fallback : bool
            If True, reserves tokens <C_a> … and uses them when a word is OOV.
        """
        self.vocab_size = vocab_size
        self.add_char_fallback = add_char_fallback

        self.word_to_id = {"<PAD>": 0, "<UNK>": 1}
        self.id_to_word = {0: "<PAD>", 1: "<UNK>"}
        self.next_id = 2  # start after PAD, UNK

        # will be filled in fit()
        self.char_token_ids: dict[str, int] = {}

    # ────────────────────────────────────────────────────────────────────────
    # public API
    # ────────────────────────────────────────────────────────────────────────
    def fit(self, texts: List[str]):
        """Build vocabulary from a list of raw texts."""
        # 1) optional char tokens
        if self.add_char_fallback:
            for ch in self.CHAR_TOKENS:
                self._add_token(f"<C_{ch}>", force=True)   # always add

        # 2) word statistics
        counts = Counter()
        for txt in texts:
            counts.update(txt.lower().split())

        # 3) take most common words up to vocab_size
        limit = (self.vocab_size or len(counts))           # None → unlimited
        for word, _ in counts.most_common(limit):
            self._add_token(word)

        # store quick look-up for char fallback
        self.char_token_ids = {
            ch: self.word_to_id.get(f"<C_{ch}>") for ch in self.CHAR_TOKENS
        }

        print(f"Vocabulary built: {len(self.word_to_id)} tokens "
              f"(words ≤ {self.vocab_size}, chars × {len(self.CHAR_TOKENS)})")
        return self

    def tokenize(self, text: str) -> List[int]:
        """Convert a sentence to a list of token IDs with char fallback."""
        ids: List[int] = []
        for word in text.lower().split():
            tid = self.word_to_id.get(word)
            if tid is not None:
                ids.append(tid)
            elif self.add_char_fallback:
                # decompose into characters
                ids.extend(self.char_token_ids.get(ch, 1)  # 1 = <UNK> char
                           for ch in word)
            else:
                ids.append(1)      # <UNK>
        return ids or [1]           # never return []

    def decode(self, ids: List[int]) -> List[str]:
        """IDs → tokens (words or <C_x>)."""
        return [self.id_to_word.get(i, "<UNK>") for i in ids]

    # alias for HuggingFace-style API
    convert_ids_to_tokens = decode

    # ───────────────────────────────────────────── storage helpers ──────────
    def save(self, path):
        with open(path, "wb") as f:
            pickle.dump(self.__dict__, f)
        print(f"Tokenizer saved to {path}")

    @classmethod
    def load(cls, path):
        with open(path, "rb") as f:
            state = pickle.load(f)
        tok = cls(vocab_size=state["vocab_size"],
                  add_char_fallback=state["add_char_fallback"])
        tok.__dict__.update(state)
        print(f"Tokenizer loaded from {path} ({len(tok.word_to_id)} tokens)")
        return tok

    # ───────────────────────────────────────────── private  ────────────────
    def _add_token(self, token: str, force: bool = False):
        """Add a single token to vocab (internal)."""
        if token in self.word_to_id:
            return
        if (self.vocab_size is not None
                and not force
                and (self.next_id - 2 - len(self.CHAR_TOKENS)) >= self.vocab_size):
            return      # reached word budget
        self.word_to_id[token] = self.next_id
        self.id_to_word[self.next_id] = token
        self.next_id += 1


def process_text_files(folder_path):
    """Process all .txt files in a folder and return cleaned texts."""
    punct_pattern = re.compile(f'[{re.escape(string.punctuation)}]')
    texts = []
    
    # Get list of all text files
    txt_files = [f for f in os.listdir(folder_path) if f.endswith('.txt')]
    print(f"Found {len(txt_files)} text files in {folder_path}")
    
    # Process each file
    for filename in txt_files:
        filepath = os.path.join(folder_path, filename)
        try:
            with open(filepath, 'r', encoding='utf-8') as f:
                text = f.read()
                
            # Remove punctuation and convert to lowercase
            text = punct_pattern.sub(' ', text).lower()
            # Normalize whitespace
            text = ' '.join(text.split())
            
            texts.append(text)
            
        except Exception as e:
            print(f"Error processing {filename}: {e}")
    
    print(f"Processed {len(texts)} text files")
    return texts

def main():
    # Configuration
    folder_path = "../../data/voicebank_demand/trainset_28spk_txt"  # Change this to your folder path
    vocab_size = 1000
    output_path = "simple_tokenizer.pkl"
    
    # Process text files
    texts = process_text_files(folder_path)
    
    # Train tokenizer
    tokenizer = SimpleTokenizer(vocab_size=vocab_size)
    tokenizer.fit(texts)
    
    # Save tokenizer
    tokenizer.save(output_path)
    
    # Example usage
    if texts:
        sample_text = texts[0][:100]  # First 100 chars of first text
        print(f"\nSample text: '{sample_text}'")
        
        tokens = tokenizer.tokenize(sample_text)
        print(f"Tokenized: {tokens}")
        
        decoded = tokenizer.decode(tokens)
        print(f"Decoded: {' '.join(decoded)}")
        
        # Vocabulary stats
        print(f"\nVocabulary size: {len(tokenizer.word_to_id)}")
        print(f"Top 10 words: {list(tokenizer.word_to_id.keys())[:12]}")  # First 12 includes <PAD> and <UNK>

if __name__ == "__main__":
    main()

Found 11572 text files in ../../data/voicebank_demand/trainset_28spk_txt
Processed 11572 text files
Vocabulary built: 1038 tokens (words ≤ 1000, chars × 36)
Tokenizer saved to simple_tokenizer.pkl

Sample text: 'but you can go beyond that condition'
Tokenized: [74, 70, 80, 153, 209, 50, 314]
Decoded: but you can go beyond that condition

Vocabulary size: 1038
Top 10 words: ['<PAD>', '<UNK>', '<C_a>', '<C_b>', '<C_c>', '<C_d>', '<C_e>', '<C_f>', '<C_g>', '<C_h>', '<C_i>', '<C_j>']
