In [1]:
import unicodedata
from langchain_text_splitters import (
    SentenceTransformersTokenTextSplitter,
)
from transformers import (
    AutoTokenizer, 
    AutoConfig,
)
from sentence_transformers import SentenceTransformer

In [2]:
%%html
<style>
table {float:left}
</style>

# Embedding Model (snowflake-arctic-embedding-s)

> The snowflake-arctic-embedding models achieve state-of-the-art performance on the MTEB/BEIR leaderboard for each of their size variants.



* [snowflake-arctic-embed-s](https://huggingface.co/Snowflake/snowflake-arctic-embed-s/blob/main/config.json)
```
{
  "_name_or_path": "/data/.model_and_tokenizer_cache/86c70943a1386ead2399854a9324005efb9328b6a9a50b66353fe62386fd6257",
  "architectures": [
    "BertModel"
  ],
  "attention_probs_dropout_prob": 0.1,
  "classifier_dropout": null,
  "hidden_act": "gelu",
  "hidden_dropout_prob": 0.1,
  "hidden_size": 384,              # <---
  "initializer_range": 0.02,
  "intermediate_size": 1536,
  "layer_norm_eps": 1e-12,
  "max_position_embeddings": 512,
  "model_type": "bert",
  "num_attention_heads": 12,
  "num_hidden_layers": 12,
  "pad_token_id": 0,
  "position_embedding_type": "absolute",
  "torch_dtype": "float32",
  "transformers_version": "4.36.1",
  "type_vocab_size": 2,
  "use_cache": true,
  "vocab_size": 30522
}
```

In [7]:
EMBEDDING_MODEL: str = "Snowflake/snowflake-arctic-embed-s"

## Max Chunk Size (Max number of tokens)

When the both fields match, then that is the context size.

| Field                          | Meaning                                              |
|--------------------------------|------------------------------------------------------|
| tokenizer.model_max_length     | How many tokens the tokenizer will allow             |
| config.max_position_embeddings | The max positions of tokens the model can represent. |

In [9]:
tokenizer = AutoTokenizer.from_pretrained(EMBEDDING_MODEL)
config = AutoConfig.from_pretrained(EMBEDDING_MODEL)

print(f"Tokenizer max length:{tokenizer.model_max_length}")
print(f"Config max position embeddings:", config.max_position_embeddings)

if tokenizer.model_max_length == tokenizer.model_max_length:
    CONTEXT_WINDOW_SIZE: int = tokenizer.model_max_length

Tokenizer max length:512
Config max position embeddings: 512


In [32]:
class ChunkConfig:
    """Configuration for text chunking.
    
    Attributes:
        max_tokens: Maximum tokens per chunk (model's context window).
        overlap_tokens: Number of tokens to overlap between chunks.
        validate_integrity: If True, verify chunks match original text.
    """
    
    def __init__(
        self,
        max_tokens: int = 512,
        safety_margin: int = 32,
        overlap_tokens: int = 50,
        max_lookback: int = 100,
        validate_integrity: bool = True
    ):
        """Initialize chunk configuration.
        
        Args:
            max_tokens: Maximum tokens per chunk. For Arctic Embed, use 512.
            overlap_tokens: Tokens to overlap between chunks for context.
            validate_integrity: Whether to validate chunk integrity.
        
        Raises:
            ValueError: If configuration is invalid.
        """
        if max_tokens <= 0:
            raise ValueError(f'max_tokens must be positive, got {max_tokens}')
        if safety_margin < 0:
            raise safety_margin(f'safety_margin must be positive or 0, got {safety_margin}')
        if overlap_tokens < 0:
            raise ValueError(
                f'overlap_tokens must be non-negative, got {overlap_tokens}'
            )
        if max_lookback <= 0:
            raise ValueError(f'max_lookback must be positive, got {max_lookback}')
        if overlap_tokens >= max_tokens:
            raise ValueError(
                f'overlap_tokens ({overlap_tokens}) must be less than '
                f'max_tokens ({max_tokens})'
            )
        
        self.max_tokens = max_tokens
        self.safety_margin = safety_margin
        self.overlap_tokens = overlap_tokens
        self.max_lookback = max_lookback
        self.validate_integrity = validate_integrity


In [35]:
"""Simple, robust text chunking for BERT-based sentence transformers.

This module provides text chunking with guaranteed integrity - every chunk
can be perfectly reconstructed back to the original text. Designed for
embedding models like Snowflake Arctic Embed that use BERT tokenization.

Key features:
- Guaranteed chunk integrity (lossless tokenization)
- Word boundary-aware chunking
- Configurable chunk size and overlap
"""

from typing import List, Tuple, Optional
import logging

logger = logging.getLogger(__name__)


def chunk_text(
    text: str,
    tokenizer,
    config: Optional[ChunkConfig] = None
) -> List[str]:
    """Chunk a single text document with integrity validation.
    
    This function splits text into overlapping chunks that respect word
    boundaries. Every chunk is validated to ensure it can be correctly
    decoded back to the original text.
    
    Algorithm:
    1. Tokenize the full document
    2. Split tokens into overlapping chunks at word boundaries
    3. Decode each chunk back to text
    4. Validate that chunks can reconstruct the original text
    
    Args:
        text: Text document to chunk.
        tokenizer: HuggingFace BERT tokenizer (e.g., from Arctic Embed).
        config: Chunking configuration. Defaults to 512 tokens, 50 overlap.
    
    Returns:
        List of text chunks. Empty list if text is empty.
    
    Raises:
        ValueError: If chunking fails integrity validation.
    
    Example:
        >>> from sentence_transformers import SentenceTransformer
        >>> model = SentenceTransformer('Snowflake/snowflake-arctic-embed-s')
        >>> chunks = chunk_text(long_document, model.tokenizer)
        >>> embeddings = model.encode(chunks)
    """
    config = config or ChunkConfig()
    
    # Handle empty documents
    if not text or not text.strip():
        return []

    is_valid: bool = False
    
    # Tokenize the document
    # BERT tokenizers handle all Unicode correctly
    tokens = tokenizer.encode(
        text,
        add_special_tokens=False,  # We don't want [CLS], [SEP]
        truncation=False
    )
    
    # Split tokens into chunks at word boundaries
    token_ranges, is_valid = _create_chunks(tokens, tokenizer, config)
    
    # Decode chunks back to text
    text_chunks = [
        tokenizer.decode(
            tokens[start:end], 
            skip_special_tokens=True,
            clean_up_tokenization_spaces=True
        ).strip()
        for start, end in token_ranges
    ]
    
    # Remove empty chunks (can happen with whitespace-only sections)
    text_chunks = [chunk for chunk in text_chunks if chunk]
    
    # Validate integrity: ensure chunks can reconstruct original
    if config.validate_integrity:
        _validate_integrity(text, tokens, token_ranges, tokenizer)
    
    return text_chunks


def _create_chunks(
    tokens: List[int],
    tokenizer,
    config: ChunkConfig
) -> List[Tuple[int, int]]:
    """Split tokens into overlapping chunks with word boundaries.
    
    For BERT WordPiece tokenization:
    - Tokens starting with "##" are word continuations
    - Tokens NOT starting with "##" are word starts
    - We try to break chunks at word starts for clean boundaries
    
    Algorithm:
    1. Start at position 0
    2. Try to create chunk of max_tokens size
    3. Look back up to 100 positions for a word boundary
    4. If found, break there; otherwise use hard cut at max_tokens
    5. Advance by (chunk_end - overlap) and repeat
    
    Args:
        tokens: List of token IDs.
        tokenizer: Tokenizer instance (for decoding to check boundaries).
        config: Chunking configuration.
    
    Returns:
        List of (start, end) tuples representing chunk boundaries.
    """
    if not tokens:
        return []
    
    token_ranges = []
    start = 0
    max_lookback = 100  # Look back up to 100 tokens for word boundary
    is_valid: bool = False
    
    while start < len(tokens):
        # Calculate initial chunk end position
        end = min(start + config.max_tokens, len(tokens))
        
        # Try to find a word boundary if we're not at the document end
        # This makes chunks more semantically coherent
        if end < len(tokens):
            # Look backwards from 'end' to find a good breaking point
            for lookback in range(1, min(max_lookback, end - start) + 1):
                test_pos = end - lookback
                
                # Decode the token at this position to check if it's a word start
                # In WordPiece, tokens starting with "##" are continuations
                token_text = tokenizer.decode(
                    [tokens[test_pos]], skip_special_tokens=True, clean_up_tokenization_spaces=True
                )
                
                # Check if this is a word boundary
                # Word boundaries: whitespace or tokens not starting with ##
                if _is_word_boundary(token_text):
                    end = test_pos
                    break
            
            # If no boundary found in lookback window, use hard cut
            # The integrity validation will ensure this is safe
        
        # Record this chunk's range
        token_ranges.append((start, end))
        
        # Check if we're done processing the document
        if end >= len(tokens):
            break
        
        # Advance to next chunk with overlap
        # This maintains context across chunk boundaries
        next_start = end - config.overlap_tokens
        
        # Ensure we make forward progress (prevent infinite loop)
        if next_start <= start:
            raise ValueError(
                f'Chunking failed to make progress: start={start}, '
                f'next_start={next_start}, end={end}. '
                f'Try reducing overlap_tokens or increasing max_tokens.'
            )
        
        start = next_start

    is_valid = True
    return token_ranges, is_valid


def _is_word_boundary(token_text: str) -> bool:
    """Check if a token represents a word boundary.
    
    For BERT WordPiece tokenization:
    - Tokens starting with "##" are word continuations (not boundaries)
    - Tokens with leading whitespace are word boundaries
    - Tokens that are just whitespace are boundaries
    
    Args:
        token_text: Decoded text of a single token.
    
    Returns:
        True if this token represents a word boundary, False otherwise.
    """
    if not token_text:
        return False
    
    # Whitespace-only tokens are boundaries
    if not token_text.strip():
        return True
    
    # Tokens starting with whitespace are word boundaries
    # (The whitespace indicates separation from previous word)
    if token_text[0] in ' \t\n\r':
        return True
    
    # WordPiece continuation tokens start with ##
    # These are NOT word boundaries
    if token_text.startswith('##'):
        return False
    
    # Everything else is considered a word boundary
    return True


def _validate_integrity(
    original_text: str,
    tokens: List[int],
    token_ranges: List[Tuple[int, int]],
    tokenizer
) -> None:
    """Validate that chunks can perfectly reconstruct the original text.
    
    This is the critical integrity check that ensures no data loss during
    the tokenization -> chunking -> decoding process.
    
    We verify two things:
    1. All tokens decode back to the original text (checks tokenization)
    2. Non-overlapping chunks reconstruct the original (checks chunking logic)
    
    Args:
        original_text: Original input text before tokenization.
        tokens: Token IDs from the original text.
        token_ranges: List of (start, end) tuples for chunk boundaries.
        tokenizer: Tokenizer instance for decoding.
    
    Raises:
        ValueError: If integrity check fails, with detailed error message
                   explaining what went wrong.
    """
    # Check 1: Verify full token sequence decodes to original text
    # This ensures the tokenizer can handle this text correctly
    full_decoded = tokenizer.decode(
        tokens, skip_special_tokens=True, clean_up_tokenization_spaces=True
    )

    # Normalize whitespace for comparison
    # Tokenizers often normalize multiple spaces to single space
    original_normalized = ''.join(unicodedata.normalize('NFKC',original_text).split()).lower()
    decoded_normalized = ''.join(unicodedata.normalize('NFKC',full_decoded).split()).lower()
    
    if original_normalized != decoded_normalized:
        # Calculate where the difference occurs for debugging
        diff_pos = _find_first_diff(original_normalized, decoded_normalized)
        raise ValueError(
            f'Integrity check failed: decoded text does not match original.\n'
            f'Original length: {len(original_text)}\n'
            f'Decoded length: {len(full_decoded)}\n'
            f'First difference at position: {diff_pos}\n'
            f'This suggests the tokenizer cannot handle this text correctly.\n'
            f'Original excerpt: {original_normalized[max(0, diff_pos-20):diff_pos+20]}\n'
            f'Decoded excerpt: {decoded_normalized[max(0, diff_pos-20):diff_pos+20]}'
        )
    
    # Check 2: Verify non-overlapping chunks reconstruct the original
    # This ensures our chunking logic is correct
    reconstructed_parts = []
    prev_end = 0
    
    for start, end in token_ranges:
        # Extract only the non-overlapping portion of this chunk
        # If chunks overlap (start < prev_end), skip the overlapping tokens
        if start < prev_end:
            # This chunk overlaps with the previous chunk
            # Only decode the new tokens: prev_end to end
            chunk_tokens = tokens[prev_end:end]
        else:
            # No overlap, decode the full chunk
            chunk_tokens = tokens[start:end]
        
        # Decode this chunk's tokens
        chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True)
        reconstructed_parts.append(chunk_text)
        prev_end = end
    
    # Concatenate all non-overlapping chunks
    reconstructed = unicodedata.normalize('NFKC', ' '.join(reconstructed_parts))
    reconstructed_normalized = ''.join(reconstructed.split()).lower()
    
    if original_normalized != reconstructed_normalized:
        diff_pos = _find_first_diff(original_normalized, reconstructed_normalized)
        
        raise ValueError(
            f'Integrity check failed: reconstructed chunks do not match original.\n'
            f'Original length: {len(original_text)}\n'
            f'Reconstructed length: {len(reconstructed)}\n'
            f'Number of chunks: {len(token_ranges)}\n'
            f'First difference at position: {diff_pos}\n'
            f'This suggests a chunking logic error.\n'
            f'Original excerpt: {original_normalized[max(0, diff_pos-20):diff_pos+20]}\n'
            f'Reconstructed excerpt: {reconstructed_normalized[max(0, diff_pos-20):diff_pos+20]}'
        )


def _find_first_diff(str1: str, str2: str) -> int:
    """Find the position of the first character difference between strings.
    
    Helper function for generating detailed integrity validation errors.
    
    Args:
        str1: First string to compare.
        str2: Second string to compare.
    
    Returns:
        Position (index) of first difference, or -1 if strings are equal.
    """
    min_len = min(len(str1), len(str2))
    
    # Find first position where characters differ
    for i in range(min_len):
        if str1[i] != str2[i]:
            return i
    
    # If we get here, one string is a prefix of the other
    # The difference is at the end of the shorter string
    if len(str1) != len(str2):
        return min_len
    
    # Strings are identical
    return -1

Loading Snowflake Arctic Embed model...


Token indices sequence length is longer than the specified maximum sequence length for this model (1000 > 512). Running this sequence through the model will result in indexing errors



EXAMPLE 1: Basic Chunking

Document split into 1 chunks

Chunk sizes:
  Chunk 1: 181 tokens, 1070 characters

First chunk preview:
  machine learning is a subset of artificial intelligence that focuses on developing systems that can learn from and make decisions based on data. unlik...

Generating embeddings for 1 chunks...
Embeddings shape: (1, 384)
Embedding dimension: 384

EXAMPLE 2: Custom Configuration

With smaller chunks: 1 chunks
  Chunk 1: 181 tokens

EXAMPLE 3: Processing Multiple Documents
Document 1: 1 chunks
Document 2: 1 chunks
Document 3: 1 chunks

Total chunks across all documents: 3
Generated 3 embeddings

EXAMPLE 4: Integrity Validation in Action
✓ Integrity check passed for 3 chunks

Empty text produces: []

Tutorial complete!


In [28]:
"""Text chunking with batch tokenization support.

This module provides efficient text chunking functionality with support for
batch processing and multiple tokenizer types (BPE, SentencePiece, WordPiece).

The chunking algorithm splits documents into token-based chunks while attempting
to break at natural word boundaries rather than mid-word. It uses a hybrid
approach: pre-decoding all tokens for small documents, and on-demand caching
for large documents to balance memory usage and performance.
"""

from typing import List, Optional, Tuple, Callable
import logging

logger = logging.getLogger(__name__)

# Module-level constant for punctuation characters used in boundary detection
# Using frozenset for O(1) lookup performance and immutability
_PUNCTUATION_SET = frozenset('.,!?;:()[]{}""\'\'`-–—…/\\|\n\t\r')


def chunk_texts_batch(
    texts: List[str],
    tokenizer,
    config: Optional['ChunkConfig'] = None,
    metrics: Optional['ChunkingMetrics'] = None,
    batch_size: int = 32
) -> List[Tuple[List[str], bool]]:
    """Batch process multiple documents with efficient tokenization.
    
    This function processes multiple documents in batches to leverage the
    tokenizer's batch processing capabilities, which is 4-8x faster than
    processing documents one at a time.
    
    Args:
        texts: List of documents to chunk.
        tokenizer: HuggingFace tokenizer instance.
        config: Chunking configuration. Defaults to ChunkConfig().
        metrics: Optional metrics tracking object.
        batch_size: Number of documents to tokenize at once. Larger batches
            are faster but use more memory. Default 32 is a good balance.
    
    Returns:
        List of (chunks, is_valid) tuples for each document, where:
            - chunks: List of text chunks for the document
            - is_valid: Boolean indicating if token coverage validation passed
    
    Raises:
        ValueError: If any document produces invalid chunks.
    """
    config = config or ChunkConfig()
    results = []
    
    # Process documents in batches to leverage tokenizer's batch processing
    # This is the key optimization: tokenizing 32 docs at once is much faster
    # than tokenizing them one-by-one in a loop
    for batch_start in range(0, len(texts), batch_size):
        batch_end = min(batch_start + batch_size, len(texts))
        batch_texts = texts[batch_start:batch_end]
        
        # Batch tokenization: single call to tokenize multiple documents
        # Key parameters:
        #   - add_special_tokens=False: We don't want [CLS], [SEP], etc.
        #   - truncation=False: We want to process the full document
        #   - padding=False: No need to pad since we're not doing inference
        #   - return_attention_mask=False: Not needed for chunking
        try:
            batch_encodings = tokenizer(
                batch_texts,
                add_special_tokens=False,
                truncation=False,
                return_attention_mask=False,
                return_token_type_ids=False,
                padding=False
            )
        except Exception as e:
            # Fallback: if document exceeds max length, truncate it
            # This shouldn't happen often, but handles edge cases
            logger.warning(
                'Batch tokenization failed: %s, falling back to truncation', e
            )
            batch_encodings = tokenizer(
                batch_texts,
                add_special_tokens=False,
                truncation=True,
                return_attention_mask=False,
                return_token_type_ids=False,
                padding=False
            )
            if metrics:
                metrics.truncation_warnings += len(batch_texts)
        
        # Process each document in the batch individually
        # batch_encodings['input_ids'] is a list of token lists
        for text, token_ids in zip(batch_texts, batch_encodings['input_ids']):
            chunks, is_valid = _chunk_single_document(
                text=text,
                tokens=token_ids,
                tokenizer=tokenizer,
                config=config,
                metrics=metrics
            )
            results.append((chunks, is_valid))
    
    return results


def detect_tokenizer_type(tokenizer) -> str:
    """Identify the underlying algorithm of the HuggingFace tokenizer."""
    name = str(type(tokenizer)).lower()
    if "bert" in name: return "wordpiece"
    if "llama" in name or "sp" in name: return "sentencepiece"
    if "gpt" in name or "roberta" in name: return "bpe"
    return "generic"

    
def _chunk_single_document(
    text: str,
    tokens: List[int],
    tokenizer,
    config: 'ChunkConfig',
    metrics: Optional['ChunkingMetrics'] = None
) -> Tuple[List[str], bool]:
    """Chunk a single document using pre-tokenized tokens.
    
    This function is separated from chunk_text_hybrid to enable batch
    tokenization. It uses a hybrid strategy: simple list comprehension for
    short documents, on-demand caching for long documents.
    
    Core algorithm:
    1. Start at position 0
    2. Try to create chunk of size effective_limit
    3. Look back up to max_lookback positions for a word boundary
    4. If found, use that position; otherwise, hard cut at effective_limit
    5. Advance by (chunk_end - overlap) and repeat until document is covered
    
    Args:
        text: Original text of the document (used for validation).
        tokens: Pre-tokenized token IDs from the document.
        tokenizer: HuggingFace tokenizer instance.
        config: Chunking configuration with max_tokens, overlap, etc.
        metrics: Optional metrics tracking object.
    
    Returns:
        Tuple of (chunks, is_valid) where:
            - chunks: List of text chunks
            - is_valid: Boolean indicating validation success
    
    Raises:
        ValueError: If zero-length chunks are detected or no progress is made.
    """
    # Handle empty documents early
    if not text or not text.strip():
        return [], True
    
    # Detect tokenizer type to use appropriate word boundary markers
    # Different tokenizers use different conventions for word starts
    tokenizer_type = detect_tokenizer_type(tokenizer)
    
    # Calculate effective chunk size accounting for safety margin
    # Safety margin prevents edge cases where token count != character count
    effective_limit = config.max_tokens - config.safety_margin
    
    # Hybrid strategy: choose decoding method based on document size
    # For small docs, pre-decode everything (simple, fast)
    # For large docs, decode on-demand with caching (memory-efficient)
    token_threshold = 10_000
    get_token_text = _create_token_text_getter(
        tokens, tokenizer, token_threshold
    )
    
    # Track chunks and their token ranges for validation
    chunks = []
    chunk_token_ranges = []  # List of (start, end) tuples
    start = 0
    
    # Main chunking loop: process tokens from start to end
    while start < len(tokens):
        # Initial end position: start + chunk size, capped at document end
        end = min(start + effective_limit, len(tokens))
        boundary_found = False
        
        # Word boundary search: only needed if we're not at the end
        # We look backwards from 'end' to find a good breaking point
        if end < len(tokens):
            # Limit lookback to avoid excessive searching
            # Using 1/5 of chunk size as max prevents long searches
            max_lookback = min(config.max_lookback, effective_limit // 5)
            
            # Search backwards from 'end' for a word boundary
            # Start at lookback=1 (not 0) to check position end-1 first
            for lookback in range(1, max_lookback + 1):
                test_end = end - lookback
                
                # Stop if we've looked back to the start position
                # This prevents creating zero-length chunks
                if test_end <= start:
                    break
                
                # Decode token at test position to check if it's a boundary
                token_text = get_token_text(test_end)
                is_boundary, boundary_type = _is_word_boundary_fast(
                    token_text, tokenizer_type, _PUNCTUATION_SET
                )
                
                # If boundary found, use this position and stop searching
                if is_boundary:
                    end = test_end
                    boundary_found = True
                    if metrics:
                        # Track what type of boundary we found
                        metrics.boundary_types[boundary_type] += 1
                    break
        
        # Handle fallback: no boundary found, use hard cut
        # This happens when no good breaking point exists in lookback window
        if not boundary_found and end < len(tokens):
            if metrics:
                metrics.boundary_fallbacks += 1
            if config.warn_on_fallback:
                logger.warning(
                    'No boundary found (%d-%d). Using hard cut.', start, end
                )
        
        # Extract tokens for this chunk
        chunk_tokens = tokens[start:end]
        
        # Sanity check: ensure we're making progress
        # Zero-length chunks indicate a logic error
        if not start < end:
            raise ValueError(
                f'Zero-length chunk detected: start={start}, end={end}'
            )
        
        # Decode chunk tokens back to text
        # skip_special_tokens=True removes [PAD], [UNK], etc.
        # .strip() removes leading/trailing whitespace
        chunk_str = tokenizer.decode(
            chunk_tokens, skip_special_tokens=True, clean_up_tokenization_spaces=True
        ).strip()
        
        # Only add non-empty chunks
        # Empty chunks can occur with certain token sequences
        if chunk_str:
            if metrics:
                metrics.total_chunks += 1
                # Track micro-chunks (chunks much smaller than limit)
                chunk_ratio = len(chunk_tokens) / effective_limit
                if chunk_ratio < config.micro_chunk_threshold:
                    metrics.micro_chunks += 1
            
            chunks.append(chunk_str)
            chunk_token_ranges.append((start, end))
        
        # Check if we've processed the entire document
        if end >= len(tokens):
            break
        
        # Advance to next chunk with overlap
        # Overlap helps maintain context across chunk boundaries
        # Example: if end=500 and overlap=50, next_start=450
        # This means tokens 450-500 appear in both chunks
        next_start = end - config.overlap_tokens
        
        # Sanity check: ensure we're making forward progress
        # If next_start <= start, we'd be stuck in an infinite loop
        if next_start <= start and end < len(tokens):
            raise ValueError(
                f'No progress made: start={start}, next_start={next_start}, '
                f'end={end}, total={len(tokens)}'
            )
        
        start = next_start
    
    # Validate that chunks cover all tokens without gaps or skips
    # This is a fast O(c) check where c is the number of chunks
    is_valid = validate_token_coverage_fast(tokens, chunk_token_ranges)
    
    if not is_valid and metrics:
        metrics.validation_failures += 1
    
    # NEW: Validate chunk integrity - ensures decoded chunks match original
    # This checks that no information is lost during tokenization/decoding
    integrity_valid, error_msg = validate_chunk_integrity(
        original_text=text,
        tokens=tokens,
        chunk_token_ranges=chunk_token_ranges,
        tokenizer=tokenizer,
        config=config
    )
    
    if not integrity_valid:
        logger.error('Chunk integrity check failed: %s', error_msg)
        if metrics:
            metrics.integrity_failures += 1
        # Mark overall validation as failed
        is_valid = False
    
    return chunks, is_valid


def _create_token_text_getter(
    tokens: List[int], tokenizer, threshold: int
) -> Callable[[int], str]:
    """Create a token text getter function based on document size.
    
    This implements the hybrid strategy:
    - Short documents: Pre-decode all tokens into a list (O(1) access)
    - Long documents: Decode on-demand with caching (memory-efficient)
    
    The threshold-based decision balances speed vs memory:
    - Below 10K tokens: ~100KB memory, instant access
    - Above 10K tokens: Minimal memory, slight decoding overhead
    
    Args:
        tokens: List of token IDs.
        tokenizer: HuggingFace tokenizer instance.
        threshold: Token count threshold for strategy selection.
    
    Returns:
        Function that takes token index and returns decoded text.
    """
    if len(tokens) < threshold:
        # SHORT DOCUMENTS: Pre-decode all tokens
        # This creates a list where token_texts[i] = decoded text of tokens[i]
        # Memory: O(n) where n is number of tokens
        # Access time: O(1) - simple list lookup
        token_texts = [
            tokenizer.decode([tok], skip_special_tokens=True, clean_up_tokenization_spaces=True)
            for tok in tokens
        ]
        return lambda idx: token_texts[idx]
    else:
        # LONG DOCUMENTS: On-demand decoding with caching
        # Only decode tokens when needed, cache results
        # Memory: O(k) where k is number of accessed tokens (much less than n)
        # Access time: O(1) for cached, O(decode) for first access
        decode_cache = {}
        cache_size_limit = 5000
        
        def get_token_text(idx: int) -> str:
            """Get decoded text for token at index with caching.
            
            Cache eviction strategy: Simple clear when limit reached.
            This is not LRU, but works well because we access tokens
            sequentially (during lookback) so recently used tokens
            will be re-cached if needed.
            """
            if idx not in decode_cache:
                # Prevent unbounded cache growth
                # When cache gets too large, clear it completely
                # This is OK because we typically access tokens in sequence
                if len(decode_cache) >= cache_size_limit:
                    decode_cache.clear()
                    
                # Decode single token and cache result
                decode_cache[idx] = tokenizer.decode(
                    [tokens[idx]], skip_special_tokens=True, clean_up_tokenization_spaces=True
                )
            return decode_cache[idx]
        
        return get_token_text


def _is_word_boundary_fast(
    token_text: str,
    tokenizer_type: str,
    punctuation_set: frozenset
) -> Tuple[bool, Optional[str]]:
    """Fast boundary check for word tokenization.
    
    Different tokenizers use different conventions for marking word starts:
    - BPE (GPT): Uses Ġ (U+0120) for space before word
    - SentencePiece (LLaMA/T5): Uses ▁ (U+2581) for space before word
    - WordPiece (BERT): Tokens NOT starting with ## are word starts
    
    Checks in order of likelihood for performance:
    1. Whitespace (most common)
    2. Tokenizer-specific markers
    3. Punctuation
    
    Args:
        token_text: Decoded text of the token to check.
        tokenizer_type: Type of tokenizer ('bpe', 'sentencepiece', 'wordpiece').
        punctuation_set: Set of punctuation characters for O(1) lookup.
    
    Returns:
        Tuple of (is_boundary, boundary_type) where:
            - is_boundary: Boolean indicating if this is a word boundary
            - boundary_type: String description of boundary type or None
    """
    # Empty tokens are not boundaries
    if not token_text:
        return False, None
    
    # Whitespace-only tokens are boundaries
    # Example: "   " -> boundary
    if not token_text.strip():
        return True, 'whitespace'
    
    first_char = token_text[0]
    
    # Fast path: Check for whitespace at start
    # Common in most tokenizers
    if first_char in ' \t\n\r':
        return True, 'whitespace'
    
    # Tokenizer-specific word start markers
    # These are special Unicode characters used by different tokenizers
    
    # BPE (GPT models): Ġ indicates space before word
    # Example token: "Ġhello" represents " hello"
    if tokenizer_type == 'bpe' and first_char == '\u0120':
        return True, 'bpe_start'
    
    # SentencePiece (LLaMA, T5): ▁ indicates space before word
    # Example token: "▁hello" represents " hello"
    if tokenizer_type == 'sentencepiece' and first_char == '\u2581':
        return True, 'sentencepiece_start'
    
    # WordPiece (BERT): Tokens starting with ## are word continuations
    # So tokens NOT starting with ## are word starts
    # Example: "hello" is start, "##ing" is continuation
    if tokenizer_type == 'wordpiece' and not token_text.startswith('##'):
        return True, 'wordpiece_start'
    
    # Check if token starts with punctuation (after stripping whitespace)
    # Example: " ." or "." both count as punctuation boundary
    # We strip first to handle tokens like "  ,"
    stripped = token_text.lstrip()
    if stripped and stripped[0] in punctuation_set:
        return True, 'punctuation'
    
    # Not a boundary: this token is in the middle of a word
    return False, None


def validate_token_coverage_fast(
    tokens: List[int],
    ranges: List[Tuple[int, int]]
) -> bool:
    """Fast O(c) validation of token coverage.
    
    This function validates that chunks completely cover the document without
    gaps or skips. It allows overlaps (which are intentional for context).
    
    Algorithm complexity: O(c) where c is number of chunks, NOT O(n) where
    n is number of tokens. This is much faster for large documents.
    
    Example valid coverage (with overlap):
    tokens: 0...100
    ranges: [(0, 50), (40, 90), (80, 100)]
    - Starts at 0 ✓
    - Ends at 100 ✓
    - No gaps: 40 < 50, 80 < 90 ✓
    - Overlaps are fine: 40-50 appears in both chunks
    
    Example invalid coverage (with gap):
    ranges: [(0, 50), (60, 100)]
    - Gap from 50-60: next_start (60) > prev_end (50) ✗
    
    Args:
        tokens: List of token IDs from the original document.
        ranges: List of (start, end) tuples representing chunk boundaries.
    
    Returns:
        True if coverage is valid, False otherwise.
    
    Checks performed:
        1. First chunk starts at index 0
        2. Last chunk ends at len(tokens)
        3. No gaps between consecutive chunks (overlaps are allowed)
    """
    # Empty document edge case
    if not ranges:
        return len(tokens) == 0
    
    # Check 1: First chunk must start at the beginning
    # If it doesn't, we're missing tokens 0...(first_start-1)
    if ranges[0][0] != 0:
        return False
    
    # Check 2: Last chunk must end at the document end
    # If it doesn't, we're missing tokens (last_end)...len(tokens)-1
    if ranges[-1][1] != len(tokens):
        return False
    
    # Check 3: No gaps between consecutive chunks
    # For each pair of adjacent chunks, verify the next chunk starts
    # at or before the previous chunk ended
    # If next_start > prev_end, there's a gap of uncovered tokens
    for i in range(len(ranges) - 1):
        current_end = ranges[i][1]
        next_start = ranges[i + 1][0]
        
        # Gap detected: tokens from current_end to next_start are missing
        if next_start > current_end:
            return False
    
    # All checks passed: complete coverage with no gaps
    return True


def validate_chunk_integrity(
    original_text: str,
    tokens: List[int],
    chunk_token_ranges: List[Tuple[int, int]],
    tokenizer,
    config: 'ChunkConfig'
) -> Tuple[bool, Optional[str]]:
    """Validate that chunks can reconstruct the original text.
    
    This critical validation ensures no information is lost during the
    tokenization -> chunking -> decoding process. It checks both:
    1. Full token sequence decodes back to original text
    2. Non-overlapping chunk reconstruction matches original
    
    Common failure modes detected:
    - Lossy Unicode tokenization (e.g., rare emoji, mathematical symbols)
    - Special token handling issues
    - Encoding/decoding mismatches
    - Whitespace normalization problems
    
    Args:
        original_text: Original input text before tokenization.
        tokens: Token IDs from the original text.
        chunk_token_ranges: List of (start, end) tuples for chunks.
        tokenizer: HuggingFace tokenizer instance.
        config: Chunking configuration (for integrity_check settings).
    
    Returns:
        Tuple of (is_valid, error_message):
            - is_valid: True if integrity check passes
            - error_message: Description of mismatch if validation fails,
                           None if validation passes
    
    Note:
        This validation allows whitespace differences by default since
        tokenizers often normalize whitespace (multiple spaces -> single space).
        Set config.strict_integrity_check=True to require exact matches.
    """
    # Check 1: Verify full token sequence decodes correctly
    # This catches fundamental tokenization issues
    try:
        full_decoded = tokenizer.decode(
            tokens, skip_special_tokens=True, clean_up_tokenization_spaces=True
        )
    except Exception as e:
        return False, f'Failed to decode tokens: {e}'
    
    # Normalize texts for comparison
    # Most tokenizers normalize whitespace, so we do the same
    # This prevents false failures due to "\n\n" -> "\n" normalization
    original_normalized = ''.join(original_text.split()).lower()
    decoded_normalized = ''.join(full_decoded.split()).lower()
    
    # Allow optional strict mode for exact matching
    strict_mode = getattr(config, 'strict_integrity_check', False)
    
    if strict_mode:
        # Strict mode: require exact match (only strip outer whitespace)
            return False, (
                f'Strict integrity check failed. '
                f'Original length: {len(original_text)}, '
                f'Decoded length: {len(full_decoded)}, '
                f'First difference at position: '
                f'{_find_first_diff(original_text, full_decoded)}'
            )
    else:
        # Normal mode: allow whitespace normalization
        if original_normalized != decoded_normalized:
            return False, (
                f'Decoded text does not match original after normalization. '
                f'Original normalized length: {len(original_normalized)}, '
                f'Decoded normalized length: {len(decoded_normalized)}, '
                f'First difference at position: '
                f'{_find_first_diff(original_normalized, decoded_normalized)}'
            )
    
    # Check 2: Verify non-overlapping chunk reconstruction
    # This ensures chunk boundaries are correct and don't skip tokens
    reconstructed_parts = []
    prev_end = 0
    
    for start, end in chunk_token_ranges:
        # Extract only the non-overlapping portion of this chunk
        # If chunks overlap (start < prev_end), skip the overlapping tokens
        if start < prev_end:
            # This chunk overlaps with previous chunk
            # Only decode the new tokens: prev_end to end
            chunk_tokens = tokens[prev_end:end]
        else:
            # No overlap, decode the full chunk
            chunk_tokens = tokens[start:end]
        
        # Decode this chunk's tokens
        try:
            chunk_text = tokenizer.decode(
                chunk_tokens, skip_special_tokens=True
            )
            reconstructed_parts.append(chunk_text)
        except Exception as e:
            return False, (
                f'Failed to decode chunk tokens [{start}:{end}]: {e}'
            )
        
        prev_end = end
    
    # Concatenate all non-overlapping chunks
    # This should equal the original text (modulo whitespace)
    reconstructed = ' '.join(reconstructed_parts)
    reconstructed_normalized = ''.join(reconstructed.split())
    
    if strict_mode:
        if original_text.strip() != reconstructed.strip():
            return False, (
                f'Reconstructed chunks do not match original (strict mode). '
                f'Reconstruction length: {len(reconstructed)}'
            )
    else:
        if original_normalized != reconstructed_normalized:
            return False, (
                f'Reconstructed chunks do not match original. '
                f'Original normalized length: {len(original_normalized)}, '
                f'Reconstructed normalized length: '
                f'{len(reconstructed_normalized)}, '
                f'First difference at position: '
                f'{_find_first_diff(original_normalized, reconstructed_normalized)}'
            )
    
    # Both checks passed: integrity is valid
    return True, None


def _find_first_diff(str1: str, str2: str) -> int:
    """Find the position of the first character difference between strings.
    
    Helper function for integrity validation error messages.
    
    Args:
        str1: First string to compare.
        str2: Second string to compare.
    
    Returns:
        Position (index) of first difference, or -1 if strings are equal.
    """
    min_len = min(len(str1), len(str2))
    
    for i in range(min_len):
        if str1[i] != str2[i]:
            return i
    
    # If we get here, one string is a prefix of the other
    # The difference is at the end of the shorter string
    if len(str1) != len(str2):
        return min_len
    
    # Strings are identical
    return -1


def chunk_text(
    text: str,
    tokenizer,
    config: Optional['ChunkConfig'] = None,
    metrics: Optional['ChunkingMetrics'] = None
) -> Tuple[List[str], bool]:
    """Chunk a single text document with hybrid optimization strategy.
    
    This is the main entry point for single-document chunking. For better
    performance when processing multiple documents, use chunk_texts_batch()
    which leverages batch tokenization (4-8x faster).
    
    Args:
        text: Text document to chunk.
        tokenizer: HuggingFace tokenizer instance.
        config: Chunking configuration. Defaults to ChunkConfig().
        metrics: Optional metrics tracking object.
    
    Returns:
        Tuple of (chunks, is_valid) where:
            - chunks: List of text chunks
            - is_valid: Boolean indicating validation success
    
    Raises:
        ValueError: If chunking produces invalid results.
    """
    config = config or ChunkConfig()
    
    # Handle empty documents early
    if not text or not text.strip():
        return [], True
    
    # Tokenize the document
    # We try without truncation first, fall back to truncation if needed
    try:
        tokens = tokenizer.encode(
            text, add_special_tokens=False, truncation=False
        )
    except Exception as e:
        # Fallback: truncate if document exceeds tokenizer's max length
        logger.warning('Tokenization error: %s, using truncation', e)
        tokens = tokenizer.encode(
            text, add_special_tokens=False, truncation=True
        )
        if metrics:
            metrics.truncation_warnings += 1
    
    # Delegate to the main chunking logic
    return _chunk_single_document(text, tokens, tokenizer, config, metrics)

In [33]:
# Example usage and tutorial
if __name__ == '__main__':
    """
    Tutorial: Using chunking with Snowflake Arctic Embed
    
    This example shows how to chunk long documents and embed them using
    the Snowflake Arctic Embed sentence transformer model.
    """
    print("Loading Snowflake Arctic Embed model...")
    model = SentenceTransformer('Snowflake/snowflake-arctic-embed-s')
    
    # Example long document (in practice, this would be much longer)
    long_document = """
    Machine learning is a subset of artificial intelligence that focuses on
    developing systems that can learn from and make decisions based on data.
    Unlike traditional programming where rules are explicitly coded, machine
    learning algorithms build models based on sample data, known as training data.
    
    The field of machine learning is closely related to computational statistics,
    which focuses on making predictions using computers. It has strong ties to
    mathematical optimization, which delivers methods, theory and application
    domains to the field.
    
    Deep learning, a subset of machine learning, uses neural networks with
    multiple layers to progressively extract higher-level features from raw input.
    For example, in image processing, lower layers may identify edges, while
    higher layers may identify concepts relevant to a human such as digits,
    letters, or faces.
    
    Modern machine learning has many applications including computer vision,
    speech recognition, email filtering, agriculture, and medicine. When applied
    to business problems, it is known as predictive analytics.
    """
    
    print("\n" + "="*70)
    print("EXAMPLE 1: Basic Chunking")
    print("="*70)
    
    # Configure chunking for Arctic Embed
    # Arctic Embed has a 512 token context window
    config = ChunkConfig(
        max_tokens=512,          # Model's maximum context length
        overlap_tokens=50,       # Overlap for context continuity
        validate_integrity=True  # Always validate! (default)
    )
    is_valid: bool = False
    
    # Chunk the document
    chunks, is_valid = chunk_text(long_document, model.tokenizer, config)
    
    print(f"\nDocument split into {len(chunks)} chunks")
    print(f"\nChunk sizes:")
    for i, chunk in enumerate(chunks):
        tokens = model.tokenizer.encode(chunk, add_special_tokens=False)
        print(f"  Chunk {i+1}: {len(tokens)} tokens, {len(chunk)} characters")
    
    print(f"\nFirst chunk preview:")
    print(f"  {chunks[0][:150]}...")
    
    # Embed all chunks
    print(f"\nGenerating embeddings for {len(chunks)} chunks...")
    embeddings = model.encode(chunks)
    
    print(f"Embeddings shape: {embeddings.shape}")
    print(f"Embedding dimension: {embeddings[0].shape[0]}")
    
    print("\n" + "="*70)
    print("EXAMPLE 2: Custom Configuration")
    print("="*70)
    
    # Smaller chunks with more overlap for fine-grained search
    small_config = ChunkConfig(
        max_tokens=256,      # Smaller chunks
        overlap_tokens=100,  # More overlap for better context
        validate_integrity=True
    )
    
    small_chunks, is_valid = chunk_text(long_document, model.tokenizer, small_config)
    
    print(f"\nWith smaller chunks: {len(small_chunks)} chunks")
    for i, chunk in enumerate(small_chunks):
        tokens = model.tokenizer.encode(chunk, add_special_tokens=False)
        print(f"  Chunk {i+1}: {len(tokens)} tokens")
    
    print("\n" + "="*70)
    print("EXAMPLE 3: Processing Multiple Documents")
    print("="*70)
    
    documents = [
        "First document about AI and machine learning...",
        "Second document about deep learning architectures...",
        "Third document about natural language processing..."
    ]
    
    all_chunks = []
    for i, doc in enumerate(documents):
        doc_chunks, is_valid = chunk_text(doc, model.tokenizer, config)
        print(f"Document {i+1}: {len(doc_chunks)} chunks")
        all_chunks.extend(doc_chunks)
    
    print(f"\nTotal chunks across all documents: {len(all_chunks)}")
    
    # Embed all chunks
    all_embeddings = model.encode(all_chunks)
    print(f"Generated {len(all_embeddings)} embeddings")
    
    print("\n" + "="*70)
    print("EXAMPLE 4: Integrity Validation in Action")
    print("="*70)
    
    # This will succeed
    try:
        test_text = "The quick brown fox jumps over the lazy dog. " * 100
        test_chunks, is_valid = chunk_text(test_text, model.tokenizer, config)
        print(f"✓ Integrity check passed for {len(test_chunks)} chunks")
    except ValueError as e:
        print(f"✗ Integrity check failed: {e}")
    
    # Example: What happens with empty text
    empty_chunks = chunk_text("", model.tokenizer, config)
    print(f"\nEmpty text produces: {empty_chunks}")
    
    print("\n" + "="*70)
    print("Tutorial complete!")
    print("="*70)

Loading Snowflake Arctic Embed model...


Token indices sequence length is longer than the specified maximum sequence length for this model (1000 > 512). Running this sequence through the model will result in indexing errors



EXAMPLE 1: Basic Chunking

Document split into 1 chunks

Chunk sizes:
  Chunk 1: 181 tokens, 1070 characters

First chunk preview:
  machine learning is a subset of artificial intelligence that focuses on developing systems that can learn from and make decisions based on data. unlik...

Generating embeddings for 1 chunks...
Embeddings shape: (1, 384)
Embedding dimension: 384

EXAMPLE 2: Custom Configuration

With smaller chunks: 1 chunks
  Chunk 1: 181 tokens

EXAMPLE 3: Processing Multiple Documents
Document 1: 1 chunks
Document 2: 1 chunks
Document 3: 1 chunks

Total chunks across all documents: 3
Generated 3 embeddings

EXAMPLE 4: Integrity Validation in Action
✓ Integrity check passed for 3 chunks

Empty text produces: ([], True)

Tutorial complete!
