In [1]:
%pip install chonkie langchain-community langchain-ollama langchain-experimental faiss-cpu pandas pydantic tiktoken pyarrow unstructured

Note: you may need to restart the kernel to use updated packages.


In [17]:
import os
import json
import hashlib
import time
from typing import List, Optional, Dict, Any, Literal
from datetime import datetime
import pandas as pd
import tiktoken
from pydantic import BaseModel, Field

# --- 1. Metadata Schema (Strict Pydantic Model) ---
class ChunkMetadata(BaseModel):
    doc_id: str
    chunk_id: str
    source_path: str
    mime_type: str
    page_number: Optional[int] = None
    bbox: Optional[List[float]] = None # [x1, y1, x2, y2]
    section_title: Optional[str] = "General"
    content_type: Literal["narrative", "tabular", "code", "markdown"]
    token_count: int
    hash_sha256: str
    vector_metric: str = "cosine"
    embedding_model: str
    table_schema: Optional[str] = None # For tabular chunks
    neighbors: List[str] = [] # IDs of adjacent chunks
    created_at: str = Field(default_factory=lambda: datetime.now().isoformat())

class Chunk(BaseModel):
    text: str
    tokens: List[int]
    start_char: int
    end_char: int
    metadata: ChunkMetadata

# --- 2. Configuration ---
EMBEDDING_MODEL_NAME = "nomic-embed-text:latest"
HOST_IP = "10.255.255.254"
OLLAMA_URL = f"http://{HOST_IP}:11434"

TOKEN_ENCODING = "cl100k_base" # Standard for many LLMs



In [19]:
import hashlib
from typing import List
from langchain_core.embeddings import Embeddings
from langchain_community.embeddings import OllamaEmbeddings

# --- Define Fallback Embedder (Best Practice) ---
# Define the DummyEmbeddings class once, outside the try/except block.
# It's also good practice to inherit from LangChain's base Embeddings class.
class DummyEmbeddings(Embeddings):
    """
    A deterministic fallback embedder that creates vectors from a SHA256 hash.
    This is useful for testing or when a real embedding service is unavailable.
    """
    def __init__(self, length: int = 16):
        self.length = length

    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        out = []
        for t in texts:
            h = hashlib.sha256(t.encode("utf-8")).digest()
            # Create a deterministic vector from the hash
            vec = [((b % 128) - 64) / 64.0 for b in h[:self.length]]
            # Normalize the vector
            norm = sum(x * x for x in vec) ** 0.5
            if norm > 1e-12:
                vec = [x / norm for x in vec]
            out.append(vec)
        return out

    def embed_query(self, text: str) -> List[float]:
        return self.embed_documents([text])[0]

# --- Initialize Embedder with Fallback Logic ---
# These should be defined in your configuration
EMBEDDING_MODEL_NAME = "nomic-embed-text:latest"
HOST_IP = "10.255.255.254"
OLLAMA_URL = f"http://{HOST_IP}:11434"

embedder = None

try:
    # 1. Attempt to initialize and use the Ollama embedder.
    # The network call happens on the first use, not during initialization.
    ollama_embedder = OllamaEmbeddings(model=EMBEDDING_MODEL_NAME, base_url=OLLAMA_URL)
    vec = ollama_embedder.embed_query("Hello world")
    embedder = ollama_embedder
    print("Successfully connected to Ollama and generated embedding.")
    print("Vector length:", len(vec))

except Exception as e:
    # 2. If any exception occurs (e.g., ConnectionError), fall back.
    print(f"Warning: Failed to use Ollama embedder: {e}")
    print("Falling back to deterministic DummyEmbeddings.")
    embedder = DummyEmbeddings()
    vec = embedder.embed_query("Hello world")
    print("Using DummyEmbeddings. Vector length:", len(vec))

# Now, you can use the `embedder` variable throughout your notebook.
# It will be either OllamaEmbeddings or DummyEmbeddings.


  ollama_embedder = OllamaEmbeddings(model=EMBEDDING_MODEL_NAME, base_url=OLLAMA_URL)


Falling back to deterministic DummyEmbeddings.
Using DummyEmbeddings. Vector length: 16


In [8]:
from chonkie import TokenChunker, RecursiveChunker, SemanticChunker
from langchain_ollama import OllamaEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_community.docstore.in_memory import InMemoryDocstore
import faiss

class AgenticChunkingOrchestrator:
    def __init__(self, embedder=None, allow_fallback=True):
        """
        Initialize tokenizer and embeddings.

        If Ollama isn't available we'll fall back to a lightweight deterministic
        DummyEmbeddings class so the notebook can run locally without requiring
        an Ollama server. You can pass a custom embedder to override behavior.
        """
        self.tokenizer = tiktoken.get_encoding(TOKEN_ENCODING)
        self.chunks_buffer: List[Chunk] = []
        self.report_log: List[str] = []
        self.embedder_available = False

        # If user provided an embedder, use it
        if embedder is not None:
            self.embedder = embedder
            self.embedder_available = True
            return

        # Try to instantiate Ollama; if it fails, fall back to a safe dummy embedder
        try:
            self.embedder = OllamaEmbeddings(
                model=EMBEDDING_MODEL_NAME,
                base_url=OLLAMA_URL
            )
            # success
            self.embedder_available = True
        except Exception as e:
            # Keep processing possible â€” add warning to report and optionally use fallback
            self.report_log.append(
                f"Warning: Could not initialize OllamaEmbeddings: {e}. Using fallback embeddings."
            )
            if not allow_fallback:
                # re-raise so caller can handle the failure explicitly
                raise

            # Lightweight deterministic fallback embedder (works without external services)
            class DummyEmbeddings:
                def embed_documents(self, texts: List[str]) -> List[List[float]]:
                    out = []
                    for t in texts:
                        h = hashlib.sha256(t.encode("utf-8")).digest()
                        # build a small fixed-size vector (16 dimensions)
                        vec = [((b % 128) - 64) / 64.0 for b in h[:16]]
                        # L2 normalize
                        norm = sum(x * x for x in vec) ** 0.5
                        if norm > 1e-12:
                            vec = [x / norm for x in vec]
                        out.append(vec)
                    return out

                def embed_query(self, text: str) -> List[float]:
                    return self.embed_documents([text])[0]

            self.embedder = DummyEmbeddings()
            self.embedder_available = False

    def _compute_hash(self, text: str) -> str:
        return hashlib.sha256(text.encode('utf-8')).hexdigest()

    def _count_tokens(self, text: str) -> int:
        return len(self.tokenizer.encode(text))

    # --- STRATEGY ROUTER ---
    def determine_strategy(self, file_path: str, content_sample: str) -> str:
        """
        Agentic decision making based on file extension and content signals.
        """
        ext = os.path.splitext(file_path)[1].lower()
        
        # 1. Code/Markdown Check
        if ext in ['.md', '.py', '.js', '.json', '.yaml']:
            return "recursive_markdown"
        
        # 2. Tabular Check (CSV or Excel)
        if ext in ['.csv', '.xlsx', '.parquet']:
            return "tabular_row"

        # 3. Narrative Check (PDFs, Docx, TXT)
        # Simple heuristic: specific keywords or lack of code symbols
        if "table of contents" in content_sample.lower() or len(content_sample) > 1000:
            return "semantic_narrative"
            
        return "token_fallback"

    # --- CHUNKING ENGINES ---
    def process_document(self, file_path: str, text_content: str, doc_id: str):
        strategy = self.determine_strategy(file_path, text_content[:2000])
        self.report_log.append(f"Doc: {doc_id} | Strategy: {strategy}")
        
        raw_chunks = []
        
        # Strategy 1: Narrative (Semantic)
        if strategy == "semantic_narrative":
            # Chonkie Semantic Chunker (presuming similarity thresholding)
            chunker = SemanticChunker(
                embedding_model=self.embedder, 
                threshold=0.75, 
                chunk_size=512
            )
            raw_chunks = chunker.chunk(text_content)

        # Strategy 2: Code/Markdown (Recursive)
        elif strategy == "recursive_markdown":
            chunker = RecursiveChunker(
                chunk_size=1024,
                # chunk_overlap=100 # ~10% overlap
            )
            raw_chunks = chunker.chunk(text_content)

        # Strategy 3: Tabular (Custom Logic)
        elif strategy == "tabular_row":
            # Basic CSV row grouping simulation
            rows = text_content.split('\n')
            headers = rows[0]
            # Group every 5 rows to maintain context
            current_chunk = []
            for row in rows[1:]:
                current_chunk.append(row)
                if len(current_chunk) >= 5:
                    raw_chunks.append(f"Schema: {headers}\nData:\n" + "\n".join(current_chunk))
                    current_chunk = []
            if current_chunk:
                raw_chunks.append(f"Schema: {headers}\nData:\n" + "\n".join(current_chunk))

        # Strategy 4: Fallback
        else:
            chunker = TokenChunker()
            raw_chunks = chunker.chunk(text_content)

        # --- ENRICHMENT PHASE ---
        self._enrich_and_store(raw_chunks, doc_id, file_path, strategy)

    def _enrich_and_store(self, raw_chunks: List[Any], doc_id: str, source: str, strategy: str):
        total_chunks = len(raw_chunks)
        
        for i, chunk_obj in enumerate(raw_chunks):
            # Handle Chonkie object vs string
            text = chunk_obj.text if hasattr(chunk_obj, 'text') else str(chunk_obj)
            start = chunk_obj.start_index if hasattr(chunk_obj, 'start_index') else 0
            end = chunk_obj.end_index if hasattr(chunk_obj, 'end_index') else 0
            
            token_ids = self.tokenizer.encode(text)
            count = len(token_ids)
            
            # Generate ID: doc_id:chunk_index
            chunk_id = f"{doc_id}:ch_{i}"
            
            # Identify Neighbors
            prev_id = f"{doc_id}:ch_{i-1}" if i > 0 else None
            next_id = f"{doc_id}:ch_{i+1}" if i < total_chunks - 1 else None
            neighbors = list(filter(None, [prev_id, next_id]))

            meta = ChunkMetadata(
                doc_id=doc_id,
                chunk_id=chunk_id,
                source_path=source,
                mime_type="text/plain", # Simplified for demo
                content_type="tabular" if "tabular" in strategy else "narrative",
                token_count=count,
                hash_sha256=self._compute_hash(text),
                embedding_model=EMBEDDING_MODEL_NAME,
                vector_metric="cosine",
                neighbors=neighbors
            )
            final_chunk = Chunk(
                text=text,
                tokens=token_ids,
                start_char=start,
                end_char=end,
                metadata=meta
            )
            self.chunks_buffer.append(final_chunk)

    # --- PERSISTENCE & INDEXING ---
    def save_results(self, output_dir="output"):
        os.makedirs(output_dir, exist_ok=True)
        
        # 1. JSONL Store
        jsonl_path = f"{output_dir}/chunks.jsonl"
        with open(jsonl_path, 'w') as f:
            for ch in self.chunks_buffer:
                f.write(ch.model_dump_json() + "\n")
        
        # 2. Parquet Store (Metadata only for fast scanning)
        meta_dicts = [ch.metadata.model_dump() for ch in self.chunks_buffer]
        df = pd.DataFrame(meta_dicts)
        df.to_parquet(f"{output_dir}/chunk_metadata.parquet")

        # 3. Vector Indexing (FAISS)
        print("Generating Embeddings...")
        texts = [ch.text for ch in self.chunks_buffer]
        metadatas = [ch.metadata.model_dump() for ch in self.chunks_buffer]
        
        # Initialize FAISS with Cosine Similarity (Normalize L2)
        vectorstore = FAISS.from_texts(
            texts=texts,
            embedding=self.embedder,
            metadatas=metadatas,
            docstore=InMemoryDocstore(),
            index_to_docstore_id={}
        )
        vectorstore.save_local(f"{output_dir}/faiss_index")

        # 4. Report
        with open(f"{output_dir}/chunking_report.md", "w") as f:
            f.write("# Chunking Strategy Report\n\n")
            f.write(f"**Total Chunks:** {len(self.chunks_buffer)}\n")
            f.write("## Operations Log\n")
            for log in self.report_log:
                f.write(f"- {log}\n")

        print(f"Processing Complete. Outputs saved to {output_dir}/")

# --- EXECUTION ---
if __name__ == "__main__":
    # Mock Data Creation for Demonstration
    sample_text = """
    # Agentic RAG
    RAG systems require smart chunking. 
    
    ## Strategy
    1. Semantic chunking is best for stories.
    2. Recursive is best for code.
    """
    
    orchestrator = AgenticChunkingOrchestrator()
    orchestrator.process_document("manual.md", sample_text, "doc_001")
    orchestrator.save_results()

Generating Embeddings...


ConnectionError: Failed to connect to Ollama. Please check that Ollama is downloaded, running and accessible. https://ollama.com/download