In [None]:
import json
import os
import re
import time
import requests
import sys
import logging
from typing import Dict, List, Optional, Tuple, Iterable, Iterator

import psutil
import cProfile
import pstats
import io
from memory_profiler import memory_usage

from tokenizers import Tokenizer, models, trainers, pre_tokenizers
from tokenizers.processors import TemplateProcessing

# ======================
# Configuration Parameters
# ======================
CONFIG = {
    "data_url": "https://huggingface.co/datasets/roneneldan/TinyStories/resolve/main/TinyStoriesV2-GPT4-valid.txt",
    "output_path": "./tinystories_bpe_5k.json",
    "vocab_size": 5000,
    "special_tokens": ["<|endoftext|>"],
    "enable_profiling": True
}

# ======================
# Data Handling Utilities
# ======================
def download_dataset(url: str, filename: str = "dataset.txt") -> str:
    if os.path.exists(filename):
        print(f"Dataset {filename} already exists.")
        return filename
    print(f"Downloading dataset from {url}...")
    response = requests.get(url, stream=True)
    response.raise_for_status()
    
    with open(filename, "wb") as f:
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)
    
    print(f"Dataset saved to {filename}")
    return filename

def load_text_data(filepath: str) -> str:
    """Load and preprocess text data"""
    with open(filepath, "r", encoding="utf-8") as f:
        text = f.read()
    
    # Clean whitespace and non-ASCII characters
    text = re.sub(r"\s+", " ", text)
    text = text.encode("utf-8", "ignore").decode("utf-8")
    print(f"ðŸ“– Loaded {len(text)} characters")
    return text

# ======================
# BPE Tokenizer Implementation
# ======================
class BPETokenizer(Tokenizer):
    def __init__(self, vocab: Dict[str, int], merges: Dict[Tuple[str, str], int], special_tokens: Optional[Dict[str, int]] = None):
        super().__init__(models.BPE.from_file("tokenizer.json"))
        
        # Initialize special tokens
        self.special_tokens = {"<|endoftext|>": 50256} if not specials else specials
        self._normalize_special_tokens()
        
        # Build vocabulary mappings
        self.vocab = vocab.copy()
        self.merges = {tuple(k): v for k, v in merges.items()}
        self._build_mappings()

    def _normalize_special_tokens(self):
        """Normalize special token formatting"""
        self.special_tokens = {k.lower(): v for k, v in self.special_tokens.items()}
        self.special_tokens.setdefault("<|endoftext|>", 50256)

    def _build_mappings(self):
        """Construct bidirectional vocabulary mappings"""
        self.id_to_token = {v: k for k, v in self.vocab.items()}
        self.token_to_id = {k: v for k, v in self.vocab.items()}
        self.token_to_id.update(self.special_tokens)

    @classmethod
    def from_pretrained(cls, vocab_path: str, merges_path: Optional[str] = None):
        """Load pretrained tokenizer"""
        tokenizer = cls(vocab={}, merges={}, special_tokens=SPECIAL_TOKENS)
        
        # Load vocabulary
        with open(vocab_path, "r", encoding="utf-8") as f:
            tokenizer.vocab = json.load(f)
        
        # Load merge operations
        if merges_path:
            with open(merges_path, "r", encoding="utf-8") as f:
                merges = json.load(f)
                tokenizer.merges = {tuple(k): v for k, v in merges.items()}
        
        tokenizer._build_mappings()
        return tokenizer

# ======================
# Training Pipeline
# ======================
def train_tokenizer():
    data_path = download_dataset(CONFIG["data_url"])
    
    tokenizer = Tokenizer(models.BPE(unk_token="<|unk|>"))
    tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()
    
    trainer = trainers.BpeTrainer(
        vocab_size=CONFIG["vocab_size"],
        special_tokens=["<|unk|>", "<|endoftext|>"],
        min_frequency=2
    )
    
    print(f"\nTraining BPE model ({CONFIG['vocab_size']} vocabulary size)")
    start_time = time.time()
    
    tokenizer.train(files=[data_path], trainer=trainer)
    elapsed = time.time() - start_time
    
    tokenizer.post_processor = TemplateProcessing(
        single="[SOS] $A [EOS]",
        pair="[SEP] $A [SEP] $B [SEP]",
        special_tokens=[("[SOS]", 1), ("[EOS]", 2), ("[SEP]", 3)]
    )
    
    if CONFIG["enable_profiling"]:
        analyze_performance(elapsed, tokenizer)

    tokenizer.save(CONFIG["output_path"])
    print(f"Tokenizer saved to {CONFIG['output_path']}")
    return tokenizer

# ======================
# Performance Analysis
# ======================
def analyze_performance(elapsed_time: float, tokenizer: Tokenizer):
    print("\nPerformance Analysis Report:")
    print(f"Total training time: {elapsed_time:.2f} seconds")
    
    mem = psutil.virtual_memory()
    print(f"Memory usage: {mem.percent}%")
    
    vocab = tokenizer.get_vocab()
    sorted_vocab = sorted(vocab.items(), key=lambda x: x[1])
    print(f"Vocabulary stats:")
    print(f"  Total entries: {len(vocab)}")
    print(f"  Max token length: {max(len(k) for k in sorted_vocab)}")

# ======================
# Testing & Validation
# ======================
def test_tokenizer(tokenizer):
    test_cases = [
        "Once upon a time,",
        "The quick brown fox jumps over the lazy dog.",
        "AI models are transforming the world."
    ]
    
    for text in test_cases:
        encoded = tokenizer.encode(text)
        print(f"\nInput: {text}")
        print(f"Token IDs: {encoded.ids}")
        print(f"Tokens: {encoded.tokens}")
        print(f"Decoded: {tokenizer.decode(encoded.ids)}")

# ======================
# Main Execution Flow
# ======================
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    
    try:
        tokenizer = train_tokenizer()
        test_tokenizer(tokenizer)
        
    except Exception as e:
        print(f"Critical error: {str(e)}")
        sys.exit(1)