In [0]:
%pip install -U langchain langchain-community langchain-databricks faiss-cpu tiktoken

Collecting langchain
  Downloading langchain-1.2.0-py3-none-any.whl.metadata (4.9 kB)
Collecting langchain-community
  Downloading langchain_community-0.4.1-py3-none-any.whl.metadata (3.0 kB)
Collecting langchain-databricks
  Downloading langchain_databricks-0.1.2-py3-none-any.whl.metadata (3.3 kB)
Collecting faiss-cpu
  Downloading faiss_cpu-1.13.2-cp310-abi3-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl.metadata (7.6 kB)
Collecting tiktoken
  Downloading tiktoken-0.12.0-cp312-cp312-manylinux_2_28_aarch64.whl.metadata (6.7 kB)
Collecting langchain-core<2.0.0,>=1.2.1 (from langchain)
  Downloading langchain_core-1.2.5-py3-none-any.whl.metadata (3.7 kB)
Collecting langgraph<1.1.0,>=1.0.2 (from langchain)
  Downloading langgraph-1.0.5-py3-none-any.whl.metadata (7.4 kB)
Collecting langchain-classic<2.0.0,>=1.0.0 (from langchain-community)
  Downloading langchain_classic-1.0.1-py3-none-any.whl.metadata (4.2 kB)
Collecting SQLAlchemy<3.0.0,>=1.4.0 (from langchain-community)
  Downloadin

In [0]:
dbutils.library.restartPython()

In [0]:
import re
from typing import List, Callable
from langchain_text_splitters import TokenTextSplitter
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import TokenTextSplitter
from langchain_databricks import ChatDatabricks, DatabricksEmbeddings


### Raw Token Chunks

In [0]:
# -----------------------
# 1) Databricks LLM + Embeddings
# -----------------------
# Make sure your Databricks auth is configured (e.g., DATABRICKS_HOST + DATABRICKS_TOKEN)
LLM_ENDPOINT_NAME = "databricks-meta-llama-3-1-8b-instruct"
EMBEDDING_ENDPOINT_NAME = "databricks-bge-large-en"  # <-- change to your embedding endpoint name

llm = ChatDatabricks(endpoint=LLM_ENDPOINT_NAME, temperature=0.1)
embeddings = DatabricksEmbeddings(endpoint=EMBEDDING_ENDPOINT_NAME)

  llm = ChatDatabricks(endpoint=LLM_ENDPOINT_NAME, temperature=0.1)
  embeddings = DatabricksEmbeddings(endpoint=EMBEDDING_ENDPOINT_NAME)


In [0]:
# -----------------------
# 2) Sentence split + sentence-window splitter (LlamaIndex-like)
# -----------------------
_SENT_SPLIT_RE = re.compile(r"(?<=[。！？!?])\s+|\n+")

def split_sentences(text: str) -> List[str]:
    parts = [p.strip() for p in _SENT_SPLIT_RE.split(text) if p.strip()]
    return parts

def sentence_window_splitter(
    documents: List[Document],
    window_size: int = 2,
) -> List[Document]:
    """
    Mimics LlamaIndex SentenceWindowNodeParser:
    - Each chunk is one core sentence
    - metadata includes:
        - original_text: core sentence
        - window: context window (core +/- window_size sentences)
    """
    out: List[Document] = []
    for doc in documents:
        sents = split_sentences(doc.page_content)
        for i, core in enumerate(sents):
            lo = max(0, i - window_size)
            hi = min(len(sents), i + window_size + 1)
            window_text = " ".join(sents[lo:hi])

            out.append(
                Document(
                    page_content=core,
                    metadata={
                        **(doc.metadata or {}),
                        "original_text": core,
                        "window": window_text,
                        "sent_index": i,
                        "window_size": window_size,
                    },
                )
            )
    return out

In [0]:
# -----------------------
# 3) Prompt + QA
# -----------------------
QA_PROMPT = ChatPromptTemplate.from_messages([
    ("system",
     "You are a helpful technical assistant. Answer using ONLY the provided context. "
     "If the context is insufficient, say what is missing."),
    ("human",
     "Question:\n{question}\n\nContext:\n{context}\n\nAnswer in English:")
])

def answer_with_retrieval(
    docs: List[Document],
    question: str,
    top_k: int = 5,
    use_window_metadata: bool = False,
) -> None:
    """
    Build FAISS index, retrieve top_k chunks, print retrieved chunks, then ask the Databricks LLM.
    If use_window_metadata=True, feed metadata['window'] as context (Sentence Window style).
    """
    vs = FAISS.from_documents(docs, embeddings)
    retrieved = vs.similarity_search_with_score(question, k=top_k)

    print("\n--- Top retrieved chunks ---")
    context_blocks = []
    for rank, (d, score) in enumerate(retrieved, 1):
        if use_window_metadata and "window" in (d.metadata or {}):
            window = d.metadata.get("window", d.page_content)
            core = d.metadata.get("original_text", d.page_content)
            print(f"\n[{rank}] score={score:.4f}")
            print(f"Core: {core}")
            print(f"Window:\n{window}")
            context_blocks.append(window)
        else:
            print(f"\n[{rank}] score={score:.4f}")
            print(d.page_content)
            context_blocks.append(d.page_content)

    context = "\n\n".join(context_blocks)
    msg = QA_PROMPT.format_messages(question=question, context=context)
    resp = llm.invoke(msg)

    print("\n--- LLM Answer ---")
    print(resp.content)


In [0]:
# -----------------------
# 4) Runner (prints raw chunks + retrieval behavior)
# -----------------------
def evaluate_splitter(
    splitter_name: str,
    split_fn: Callable[[], List[Document]],
    question: str,
    use_window_metadata: bool = False,
    top_k: int = 5,
    max_print_chunks: int = 50,
) -> None:
    print(f"\n{'='*60}")
    print(f"Testing splitter: {splitter_name}")
    print(f"{'='*60}")

    chunks = split_fn()

    print(f"\n[Raw chunks generated] total={len(chunks)}")
    for i, d in enumerate(chunks[:max_print_chunks], 1):
        print(f"\n--- Chunk {i} ---")
        if use_window_metadata and "window" in (d.metadata or {}):
            print(f"Core: {d.metadata.get('original_text')}")
            print(f"Window: {d.metadata.get('window')}")
        else:
            print(d.page_content)

    if len(chunks) > max_print_chunks:
        print(f"\n... (only printed first {max_print_chunks} chunks)")

    print(f"\nQuestion: {question}")
    answer_with_retrieval(
        docs=chunks,
        question=question,
        top_k=top_k,
        use_window_metadata=use_window_metadata,
    )

    print(f"\n{splitter_name} done.")
    print(f"{'='*60}\n")


In [0]:
# -----------------------
# 5) Example document + question (English version)
# -----------------------
documents = [
    Document(page_content="""
Retrieval-Augmented Generation (RAG) is a common architecture for building
LLM-powered applications that combine external knowledge retrieval with
text generation.
Instead of relying solely on a model’s internal parameters, RAG systems
retrieve relevant information from external data sources and use it as
context during answer generation.
This approach is widely used for applications such as question answering,
knowledge assistants, and domain-specific search.

A typical RAG pipeline consists of several core components, including
document ingestion, text chunking, embedding generation, vector indexing,
retrieval, and response synthesis.
Documents from various sources—such as PDFs, databases, APIs, or web pages—
are first ingested and preprocessed.
They are then split into smaller units, often referred to as chunks,
to make retrieval more precise and efficient.

Each chunk is converted into a vector representation using an embedding model
and stored in a vector index.
During inference, a user query is embedded and compared against the indexed
vectors to retrieve the most relevant chunks.
These retrieved chunks are provided to a Large Language Model (LLM) as context,
allowing the model to generate answers that are grounded in external knowledge.

--- The following content is less directly related to the RAG topic ---

In addition, Python, as a general-purpose programming language, is widely used
in the AI field due to its simplicity and rich ecosystem.
For example, NumPy and Pandas are foundational tools for data processing,
providing powerful capabilities for numerical computation and structured data.
Scikit-learn offers a comprehensive suite of machine learning algorithms
for tasks such as classification, regression, and clustering.
Together, these tools form a powerful toolbox for data scientists and AI practitioners,
enabling efficient development and deployment of complex AI models.

--- The following is another related but conceptually independent section ---

Sentence window chunking is an advanced chunking strategy in which each chunk
contains a target sentence along with a configurable number of surrounding
“window” sentences as context.
This approach aims to provide rich local context to the LLM during retrieval,
thereby improving the coherence and factual consistency of generated answers.
Semantic chunking, on the other hand, attempts to split text based on semantic
content rather than relying solely on fixed character counts or sentence boundaries.
It leverages embedding models to compute semantic similarity between sentences
or phrases and identify natural breakpoints where topics or meanings shift.
Both advanced methods can significantly improve retrieval quality and
downstream generation performance in RAG applications.
Choosing the right chunking strategy typically depends on the characteristics
of the data and the expected query types.
""")
]

question = (
    "What are the main components of a Retrieval-Augmented Generation (RAG) system, "
    "and how do sentence window chunking and semantic chunking differ?"
)

# -----------------------
# 6) Run splitters
# -----------------------
# Token-based split (chunk_size=30, overlap=0)
splitter_a = TokenTextSplitter(chunk_size=30, chunk_overlap=0)
evaluate_splitter(
    splitter_name="Token Split (chunk_size=30, overlap=0)",
    split_fn=lambda: splitter_a.split_documents(documents),
    question=question,
    use_window_metadata=False,
    top_k=5
)

# Token-based split (chunk_size=30, overlap=10)
splitter_b = TokenTextSplitter(chunk_size=30, chunk_overlap=10)
evaluate_splitter(
    splitter_name="Token Split (chunk_size=30, overlap=10)",
    split_fn=lambda: splitter_b.split_documents(documents),
    question=question,
    use_window_metadata=False,
    top_k=5
)

# Sentence-window split (window_size=2)
evaluate_splitter(
    splitter_name="Sentence Window Split (window_size=2)",
    split_fn=lambda: sentence_window_splitter(documents, window_size=2),
    question=question,
    use_window_metadata=True,   # feed metadata['window'] into LLM context
    top_k=5
)



Testing splitter: Token Split (chunk_size=30, overlap=0)

[Raw chunks generated] total=20

--- Chunk 1 ---

Retrieval-Augmented Generation (RAG) is a common architecture for building
LLM-powered applications that combine external knowledge retrieval with


--- Chunk 2 ---
text generation.
Instead of relying solely on a model’s internal parameters, RAG systems
retrieve relevant information from external data sources and

--- Chunk 3 ---
 use it as
context during answer generation.
This approach is widely used for applications such as question answering,
knowledge assistants, and domain-specific

--- Chunk 4 ---
 search.

A typical RAG pipeline consists of several core components, including
document ingestion, text chunking, embedding generation, vector index

--- Chunk 5 ---
ing,
retrieval, and response synthesis.
Documents from various sources—such as PDFs, databases, APIs, or web pages—

--- Chunk 6 ---

are first ingested and preprocessed.
They are then split into smaller units, oft

### Evaluate Token Text Splitter

In [0]:

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA

sentence_splitter = RecursiveCharacterTextSplitter(
    chunk_size=512,
    chunk_overlap=50,
)


def run_rag_pipeline(
    splitter,
    documents: List[Document],
    question: str,
    splitter_name: str,
):
    print(f"\n{'=' * 70}")
    print(f"Running RAG pipeline with splitter: {splitter_name}")
    print(f"{'=' * 70}\n")

    # -----------------------
    # Step 1: Split documents
    # -----------------------
    chunks = splitter.split_documents(documents)

    print(f"[{splitter_name}] Generated document chunks:")
    for i, chunk in enumerate(chunks, 1):
        print(f"\n--- Chunk {i} ---")
        print(chunk.page_content.strip())
        print("-" * 40)

    # -----------------------
    # Step 2: Build vector store
    # -----------------------
    print("\nBuilding vector store...")
    vectorstore = FAISS.from_documents(chunks, embeddings)

    retriever = vectorstore.as_retriever(
        search_kwargs={"k": 5}
    )

    # -----------------------
    # Step 3: Build RAG chain
    # -----------------------
    qa_chain = RetrievalQA.from_chain_type(
        llm=llm,
        retriever=retriever,
        chain_type="stuff",
        return_source_documents=True,
    )

    # -----------------------
    # Step 4: Ask question
    # -----------------------
    print(f"\nQuestion:\n{question}\n")
    print("Model Answer:\n")

    result = qa_chain(question)
    print(result["result"])

    # -----------------------
    # Step 5: Show retrieved context
    # -----------------------
    print(f"\n[{splitter_name}] Retrieved source documents:")
    for i, doc in enumerate(result["source_documents"], 1):
        print(f"\n--- Source Document {i} ---")
        print(doc.page_content.strip())
        print("-" * 60)

    print(f"\nFinished RAG pipeline with {splitter_name}")
    print(f"{'=' * 70}\n")


run_rag_pipeline(
    splitter=sentence_splitter,
    documents=documents,
    question=question,
    splitter_name="RecursiveCharacterTextSplitter",
)


Running RAG pipeline with splitter: RecursiveCharacterTextSplitter

[RecursiveCharacterTextSplitter] Generated document chunks:

--- Chunk 1 ---
Retrieval-Augmented Generation (RAG) is a common architecture for building
LLM-powered applications that combine external knowledge retrieval with
text generation.
Instead of relying solely on a model’s internal parameters, RAG systems
retrieve relevant information from external data sources and use it as
context during answer generation.
This approach is widely used for applications such as question answering,
knowledge assistants, and domain-specific search.
----------------------------------------

--- Chunk 2 ---
A typical RAG pipeline consists of several core components, including
document ingestion, text chunking, embedding generation, vector indexing,
retrieval, and response synthesis.
Documents from various sources—such as PDFs, databases, APIs, or web pages—
are first ingested and preprocessed.
They are then split into smaller units,

  result = qa_chain(question)


According to the provided context, the main components of a Retrieval-Augmented Generation (RAG) system are:

1. Document ingestion
2. Text chunking
3. Embedding generation
4. Vector indexing
5. Retrieval
6. Response synthesis

Regarding sentence window chunking and semantic chunking, here's how they differ:

* Sentence window chunking: This approach involves splitting text into chunks that contain a target sentence along with a configurable number of surrounding "window" sentences as context. This provides rich local context to the Large Language Model (LLM) during retrieval, aiming to improve the coherence and factual consistency of generated answers.
* Semantic chunking: This method attempts to split text based on semantic content rather than relying solely on fixed character counts or sentence boundaries. Each chunk is converted into a vector representation using an embedding model and stored in a vector index.

[RecursiveCharacterTextSplitter] Retrieved source documents:

--- Sour

### Evaluate Sentence Window Chunking

In [0]:
class SentenceWindowTextSplitter:
    def __init__(
        self,
        window_size: int = 3,
        window_metadata_key: str = "window",
        original_text_metadata_key: str = "original_text",
    ):
        self.window_size = window_size
        self.window_metadata_key = window_metadata_key
        self.original_text_metadata_key = original_text_metadata_key

    def split_documents(self, documents: List[Document]) -> List[Document]:
        chunks = []

        for doc in documents:
            sentences = [
                s.strip()
                for s in re.split(r"(?<=[.!?])\s+", doc.page_content)
                if s.strip()
            ]

            for i, sentence in enumerate(sentences):
                start = max(0, i - self.window_size)
                end = min(len(sentences), i + self.window_size + 1)

                window_sentences = sentences[start:end]
                window_text = " ".join(window_sentences)

                chunks.append(
                    Document(
                        page_content=window_text,
                        metadata={
                            self.window_metadata_key: window_text,
                            self.original_text_metadata_key: sentence,
                        },
                    )
                )

        return chunks


def run_rag_pipeline(
    splitter,
    documents: List[Document],
    question: str,
    splitter_name: str,
):
    print(f"\n{'=' * 70}")
    print(f"Running RAG with splitter: {splitter_name}")
    print(f"{'=' * 70}\n")

    # --- Split documents ---
    chunks = splitter.split_documents(documents)

    print(f"[{splitter_name}] Generated chunks:")
    for i, chunk in enumerate(chunks, 1):
        print(f"\n--- Chunk {i} ---")
        if "original_text" in chunk.metadata:
            print("Center sentence:")
            print(chunk.metadata["original_text"])
            print("\nWindow context:")
            print(chunk.metadata["window"])
        else:
            print(chunk.page_content)
        print("-" * 40)

    # --- Vector store ---
    print("\nBuilding vector store...")
    vectorstore = FAISS.from_documents(chunks, embeddings)

    retriever = vectorstore.as_retriever(search_kwargs={"k": 5})

    # --- RAG chain ---
    qa_chain = RetrievalQA.from_chain_type(
        llm=llm,
        retriever=retriever,
        chain_type="stuff",
        return_source_documents=True,
    )

    # --- Query ---
    print(f"\nQuestion:\n{question}\n")
    print("Model Answer:\n")

    result = qa_chain(question)
    print(result["result"])

    # --- Retrieved context ---
    print(f"\n[{splitter_name}] Retrieved source documents:")
    for i, doc in enumerate(result["source_documents"], 1):
        print(f"\n--- Source Document {i} ---")
        if "original_text" in doc.metadata:
            print("Center sentence:")
            print(doc.metadata["original_text"])
            print("\nWindow context:")
            print(doc.metadata["window"])
        else:
            print(doc.page_content)
        print("-" * 60)

    print(f"\nFinished RAG with {splitter_name}")
    print(f"{'=' * 70}\n")


In [0]:
# =========================================================
# 5. Run Sentence Window Chunking (LlamaIndex-style)
# =========================================================

sentence_window_splitter = SentenceWindowTextSplitter(
    window_size=3,
    window_metadata_key="window",
    original_text_metadata_key="original_text",
)

run_rag_pipeline(
    splitter=sentence_window_splitter,
    documents=documents,
    question=question,
    splitter_name="Sentence Window (LangChain)",
)


Running RAG with splitter: Sentence Window (LangChain)

[Sentence Window (LangChain)] Generated chunks:

--- Chunk 1 ---
Center sentence:
Retrieval-Augmented Generation (RAG) is a common architecture for building
LLM-powered applications that combine external knowledge retrieval with
text generation.

Window context:
Retrieval-Augmented Generation (RAG) is a common architecture for building
LLM-powered applications that combine external knowledge retrieval with
text generation. Instead of relying solely on a model’s internal parameters, RAG systems
retrieve relevant information from external data sources and use it as
context during answer generation. This approach is widely used for applications such as question answering,
knowledge assistants, and domain-specific search. A typical RAG pipeline consists of several core components, including
document ingestion, text chunking, embedding generation, vector indexing,
retrieval, and response synthesis.
------------------------------------

### Evaluate Sliding Window Splitter

In [0]:
class SlidingWindowTextSplitter:
    """
    Sliding window over sentences:
    - window_size: number of sentences per chunk
    - stride: how many sentences to move forward each step
      (stride < window_size => overlap, stride == window_size => no overlap)
    """

    def __init__(
        self,
        window_size: int = 5,
        stride: int = 2,
        window_metadata_key: str = "window",
        start_idx_metadata_key: str = "start_sentence_idx",
        end_idx_metadata_key: str = "end_sentence_idx",
    ):
        if window_size <= 0:
            raise ValueError("window_size must be > 0")
        if stride <= 0:
            raise ValueError("stride must be > 0")
        if stride > window_size:
            # Allowed, but usually not what you want (gaps between windows)
            pass

        self.window_size = window_size
        self.stride = stride
        self.window_metadata_key = window_metadata_key
        self.start_idx_metadata_key = start_idx_metadata_key
        self.end_idx_metadata_key = end_idx_metadata_key

    def _split_sentences(self, text: str) -> List[str]:
        # Same splitting logic as your sentence window splitter
        return [
            s.strip()
            for s in re.split(r"(?<=[.!?])\s+", text)
            if s.strip()
        ]

    def split_documents(self, documents: List[Document]) -> List[Document]:
        chunks: List[Document] = []

        for doc in documents:
            sentences = self._split_sentences(doc.page_content)
            n = len(sentences)
            if n == 0:
                continue

            # Sliding windows
            for start in range(0, n, self.stride):
                end = min(start + self.window_size, n)
                window_sentences = sentences[start:end]
                window_text = " ".join(window_sentences)

                chunks.append(
                    Document(
                        page_content=window_text,
                        metadata={
                            self.window_metadata_key: window_text,
                            self.start_idx_metadata_key: start,
                            self.end_idx_metadata_key: end - 1,
                        },
                    )
                )

                if end == n:
                    break  # reached the end

        return chunks

In [0]:
splitter = SlidingWindowTextSplitter(window_size=5, stride=2)

run_rag_pipeline(
    splitter=splitter,
    documents=documents,
    question=question,
    splitter_name="Sliding Window (size=5, stride=2)",
)


Running RAG with splitter: Sliding Window (size=5, stride=2)

[Sliding Window (size=5, stride=2)] Generated chunks:

--- Chunk 1 ---
Retrieval-Augmented Generation (RAG) is a common architecture for building
LLM-powered applications that combine external knowledge retrieval with
text generation. Instead of relying solely on a model’s internal parameters, RAG systems
retrieve relevant information from external data sources and use it as
context during answer generation. This approach is widely used for applications such as question answering,
knowledge assistants, and domain-specific search. A typical RAG pipeline consists of several core components, including
document ingestion, text chunking, embedding generation, vector indexing,
retrieval, and response synthesis. Documents from various sources—such as PDFs, databases, APIs, or web pages—
are first ingested and preprocessed.
----------------------------------------

--- Chunk 2 ---
This approach is widely used for applications such 

### Semantic Splitter

In [0]:
import re
import html
import numpy as np
from typing import List, Dict, Any, Optional, Tuple

In [0]:
# -----------------------
# 2) Analysis helpers (chunk stats + retrieval scores)
# -----------------------
def _approx_token_count(text: str) -> int:
    # Cheap proxy; good enough for comparing splitters
    return max(1, len(text.split()))


def analyze_chunks(chunks: List[Document], name: str) -> Dict[str, Any]:
    lengths = [_approx_token_count(c.page_content) for c in chunks]
    lengths_sorted = sorted(lengths)
    n = len(lengths_sorted)

    def _p(pct: float) -> int:
        if n == 0:
            return 0
        idx = min(n - 1, int(round((pct / 100.0) * (n - 1))))
        return lengths_sorted[idx]

    out = {
        "splitter": name,
        "num_chunks": n,
        "min_tokens": min(lengths) if lengths else 0,
        "p50_tokens": _p(50),
        "p90_tokens": _p(90),
        "max_tokens": max(lengths) if lengths else 0,
        "avg_tokens": round(sum(lengths) / n, 2) if n else 0,
        "exact_duplicate_chunks": n - len({c.page_content for c in chunks}),
    }
    return out


def retrieve_with_scores(vectorstore: FAISS, query: str, k: int = 5) -> List[Tuple[Document, float]]:
    # FAISS supports similarity_search_with_score
    return vectorstore.similarity_search_with_score(query, k=k)


def analyze_retrieval(
    retrieved: List[Tuple[Document, float]],
    keywords: Optional[List[str]] = None,
) -> Dict[str, Any]:
    scores = [s for _, s in retrieved]
    out = {
        "k": len(retrieved),
        "best_score": float(min(scores)) if scores else None,
        "worst_score": float(max(scores)) if scores else None,
        "avg_score": float(sum(scores) / len(scores)) if scores else None,
    }

    if keywords:
        hits = 0
        for doc, _ in retrieved:
            t = doc.page_content.lower()
            if any(kw.lower() in t for kw in keywords):
                hits += 1
        out["keyword_hits"] = hits
        out["keywords"] = keywords
    return out


def print_analysis(chunk_stats: Dict[str, Any], retrieval_stats: Dict[str, Any]) -> None:
    print("\n" + "=" * 70)
    print("ANALYSIS")
    print("=" * 70)

    print("Chunk stats:")
    for k, v in chunk_stats.items():
        print(f"  - {k}: {v}")

    print("\nRetrieval stats (FAISS similarity_search_with_score):")
    for k, v in retrieval_stats.items():
        print(f"  - {k}: {v}")

    print("=" * 70 + "\n")


In [0]:
# -----------------------
# 3) Semantic Splitter (LangChain version)
#    - Sentence tokenize
#    - Embed each sentence
#    - Compute adjacent cosine similarity
#    - Break at low-similarity points by percentile threshold
#    - Optional buffer_size to include neighbors around boundaries
# -----------------------
def sentence_tokenize_en(text: str) -> List[str]:
    # Simple sentence splitter; you can swap with nltk/spacy if you prefer
    sents = [s.strip() for s in re.split(r"(?<=[.!?])\s+", text) if s.strip()]
    return sents


def _cosine_sim(a: np.ndarray, b: np.ndarray) -> float:
    denom = (np.linalg.norm(a) * np.linalg.norm(b))
    if denom == 0:
        return 0.0
    return float(np.dot(a, b) / denom)


class SemanticTextSplitter:
    """
    LangChain-style semantic chunker inspired by LlamaIndex SemanticSplitterNodeParser.

    breakpoint_percentile_threshold:
        If 95: break on the lowest 5% similarities (conservative, fewer breaks)
        If 5:  break on the lowest 95% similarities (aggressive, many breaks)
    """

    def __init__(
        self,
        embedder,
        sentence_splitter=sentence_tokenize_en,
        breakpoint_percentile_threshold: float = 95,
        buffer_size: int = 0,
        min_sentences_per_chunk: int = 1,
        max_sentences_per_chunk: Optional[int] = None,
    ):
        self.embedder = embedder
        self.sentence_splitter = sentence_splitter
        self.breakpoint_percentile_threshold = breakpoint_percentile_threshold
        self.buffer_size = buffer_size
        self.min_sentences_per_chunk = min_sentences_per_chunk
        self.max_sentences_per_chunk = max_sentences_per_chunk

    def split_documents(self, documents: List[Document]) -> List[Document]:
        out_chunks: List[Document] = []

        for doc in documents:
            sentences = self.sentence_splitter(doc.page_content)
            if len(sentences) == 0:
                continue
            if len(sentences) == 1:
                out_chunks.append(Document(page_content=sentences[0], metadata=dict(doc.metadata or {})))
                continue

            # Embed sentences (Databricks embeddings returns list[list[float]])
            sent_vecs = self.embedder.embed_documents(sentences)
            sent_vecs = [np.array(v, dtype=np.float32) for v in sent_vecs]

            sims = [_cosine_sim(sent_vecs[i], sent_vecs[i + 1]) for i in range(len(sent_vecs) - 1)]
            # Breakpoints are where similarity is BELOW threshold value
            threshold_val = float(np.percentile(sims, self.breakpoint_percentile_threshold))

            break_idxs = [i for i, sim in enumerate(sims) if sim < threshold_val]
            # i means "between sentence i and i+1" -> boundary at i+1
            boundaries = set([0, len(sentences)])
            for i in break_idxs:
                boundaries.add(i + 1)

            boundaries = sorted(boundaries)

            # Create initial chunks from boundaries
            chunks_sent_ranges: List[Tuple[int, int]] = []
            for a, b in zip(boundaries[:-1], boundaries[1:]):
                if a < b:
                    chunks_sent_ranges.append((a, b))

            # Apply buffer_size (expand ranges)
            if self.buffer_size > 0:
                expanded: List[Tuple[int, int]] = []
                for a, b in chunks_sent_ranges:
                    aa = max(0, a - self.buffer_size)
                    bb = min(len(sentences), b + self.buffer_size)
                    expanded.append((aa, bb))
                # Merge overlaps after expansion
                expanded.sort()
                merged: List[Tuple[int, int]] = []
                for a, b in expanded:
                    if not merged or a > merged[-1][1]:
                        merged.append((a, b))
                    else:
                        merged[-1] = (merged[-1][0], max(merged[-1][1], b))
                chunks_sent_ranges = merged

            # Enforce min/max sentences per chunk (optional)
            final_ranges: List[Tuple[int, int]] = []
            for a, b in chunks_sent_ranges:
                # Ensure minimum sentences by merging forward if needed
                if (b - a) < self.min_sentences_per_chunk and final_ranges:
                    prev_a, prev_b = final_ranges.pop()
                    final_ranges.append((prev_a, b))
                else:
                    final_ranges.append((a, b))

            if self.max_sentences_per_chunk is not None:
                # split overly large chunks
                limited: List[Tuple[int, int]] = []
                for a, b in final_ranges:
                    cur = a
                    while cur < b:
                        nxt = min(b, cur + self.max_sentences_per_chunk)
                        limited.append((cur, nxt))
                        cur = nxt
                final_ranges = limited

            # Build Documents
            base_meta = dict(doc.metadata or {})
            for a, b in final_ranges:
                text = " ".join(sentences[a:b]).strip()
                if not text:
                    continue
                meta = dict(base_meta)
                # optional debug metadata
                meta.update(
                    {
                        "semantic_threshold_val": threshold_val,
                        "semantic_percentile": self.breakpoint_percentile_threshold,
                        "semantic_buffer_size": self.buffer_size,
                        "sentence_range": (a, b),
                    }
                )
                out_chunks.append(Document(page_content=text, metadata=meta))

        return out_chunks



In [0]:
def run_rag_pipeline(
    splitter,
    documents: List[Document],
    question: str,
    splitter_name: str,
    top_k: int = 5,
    print_raw_chunks: bool = True,
):
    print(f"\n{'=' * 70}")
    print(f"Running RAG with splitter: {splitter_name}")
    print(f"{'=' * 70}\n")

    # --- Split documents ---
    chunks = splitter.split_documents(documents)

    if print_raw_chunks:
        print(f"[{splitter_name}] Generated chunks: total={len(chunks)}")
        for i, chunk in enumerate(chunks, 1):
            print(f"\n--- Chunk {i} ---")
            print(chunk.page_content)
            if chunk.metadata:
                # show only the semantic fields for readability
                semantic_keys = ["sentence_range", "semantic_percentile", "semantic_threshold_val", "semantic_buffer_size"]
                shown = {k: chunk.metadata.get(k) for k in semantic_keys if k in chunk.metadata}
                if shown:
                    print(f"\n[metadata] {shown}")
            print("-" * 40)

    # --- Vector store ---
    print("\nBuilding vector store...")
    vectorstore = FAISS.from_documents(chunks, embeddings)

    # --- Analysis: chunk stats + retrieval scores ---
    chunk_stats = analyze_chunks(chunks, splitter_name)
    retrieved = retrieve_with_scores(vectorstore, question, k=top_k)
    keywords = ["retrieval", "chunk", "chunking", "semantic", "embedding", "vector", "index", "rag"]
    retrieval_stats = analyze_retrieval(retrieved, keywords=keywords)
    print_analysis(chunk_stats, retrieval_stats)

    # --- RAG chain (stuff) ---
    retriever = vectorstore.as_retriever(search_kwargs={"k": top_k})
    qa_chain = RetrievalQA.from_chain_type(
        llm=llm,
        retriever=retriever,
        chain_type="stuff",
        return_source_documents=True,
    )

    # --- Query ---
    print(f"Question:\n{question}\n")
    print("--- LLM Answer ---")
    result = qa_chain(question)
    print(result["result"])

    # --- Retrieved context (with scores) ---
    print(f"\n[{splitter_name}] Retrieved source documents (with scores):")
    for i, (doc, score) in enumerate(retrieved, 1):
        print(f"\n--- Source Document {i} --- score={score:.4f}")
        print(doc.page_content)
        # show semantic debug metadata if present
        if doc.metadata:
            semantic_keys = ["sentence_range", "semantic_percentile", "semantic_threshold_val", "semantic_buffer_size"]
            shown = {k: doc.metadata.get(k) for k in semantic_keys if k in doc.metadata}
            if shown:
                print(f"\n[metadata] {shown}")
        print("-" * 60)

    print(f"\nFinished RAG with {splitter_name}")
    print(f"{'=' * 70}\n")


In [0]:
# -----------------------
# 6) Run Semantic Splitter experiments (conservative vs aggressive)
# -----------------------
# Conservative: break at lowest 5% similarities (95th percentile threshold)
semantic_splitter_conservative = SemanticTextSplitter(
    embedder=embeddings,
    sentence_splitter=sentence_tokenize_en,
    breakpoint_percentile_threshold=95,
    buffer_size=1,                 # include 1 neighboring sentence around boundaries
    min_sentences_per_chunk=1,
)

run_rag_pipeline(
    splitter=semantic_splitter_conservative,
    documents=documents,
    question=question,
    splitter_name="Semantic Split (percentile=95, buffer=1) - conservative",
    print_raw_chunks=True,
)



Running RAG with splitter: Semantic Split (percentile=95, buffer=1) - conservative

[Semantic Split (percentile=95, buffer=1) - conservative] Generated chunks: total=1

--- Chunk 1 ---
Retrieval-Augmented Generation (RAG) is a common architecture for building
LLM-powered applications that combine external knowledge retrieval with
text generation. Instead of relying solely on a model’s internal parameters, RAG systems
retrieve relevant information from external data sources and use it as
context during answer generation. This approach is widely used for applications such as question answering,
knowledge assistants, and domain-specific search. A typical RAG pipeline consists of several core components, including
document ingestion, text chunking, embedding generation, vector indexing,
retrieval, and response synthesis. Documents from various sources—such as PDFs, databases, APIs, or web pages—
are first ingested and preprocessed. They are then split into smaller units, often referred to

In [0]:
# Aggressive: break at lowest 95% similarities (5th percentile threshold)
semantic_splitter_aggressive = SemanticTextSplitter(
    embedder=embeddings,
    sentence_splitter=sentence_tokenize_en,
    breakpoint_percentile_threshold=5,
    buffer_size=1,
    min_sentences_per_chunk=1,
)

run_rag_pipeline(
    splitter=semantic_splitter_aggressive,
    documents=documents,
    question=question,
    splitter_name="Semantic Split (percentile=5, buffer=1) - aggressive",
    top_k=5,
    print_raw_chunks=True,
)



Running RAG with splitter: Semantic Split (percentile=5, buffer=1) - aggressive

[Semantic Split (percentile=5, buffer=1) - aggressive] Generated chunks: total=1

--- Chunk 1 ---
Retrieval-Augmented Generation (RAG) is a common architecture for building
LLM-powered applications that combine external knowledge retrieval with
text generation. Instead of relying solely on a model’s internal parameters, RAG systems
retrieve relevant information from external data sources and use it as
context during answer generation. This approach is widely used for applications such as question answering,
knowledge assistants, and domain-specific search. A typical RAG pipeline consists of several core components, including
document ingestion, text chunking, embedding generation, vector indexing,
retrieval, and response synthesis. Documents from various sources—such as PDFs, databases, APIs, or web pages—
are first ingested and preprocessed. They are then split into smaller units, often referred to as ch

### HybridTextSplitter

In [0]:
class HybridTextSplitter:
    def __init__(
        self,
        primary_splitter,                 # e.g., SemanticTextSplitter
        secondary_splitter,               # e.g., SlidingWindowTextSplitter
        max_chunk_tokens: int = 300,
        debug_print: bool = True,
    ):
        self.primary_splitter = primary_splitter
        self.secondary_splitter = secondary_splitter
        self.max_chunk_tokens = max_chunk_tokens
        self.debug_print = debug_print

    def _token_len(self, text: str) -> int:
        # Uses the same approximate token proxy as the previous code
        return max(1, len(text.split()))

    def split_documents(self, documents: List[Document]) -> List[Document]:
        if self.debug_print:
            print("--- Starting HYBRID splitting ---")
            print("Step 1: semantic split...")

        primary_chunks = self.primary_splitter.split_documents(documents)

        if self.debug_print:
            print(f"\n{'='*25} Step 1 (Semantic) Output {'='*25}")
            print(f"Produced {len(primary_chunks)} semantic chunks.")
            for i, c in enumerate(primary_chunks, 1):
                print(f"\n[Semantic chunk {i}] tokens≈{self._token_len(c.page_content)}")
                print("-" * 60)
                print(c.page_content.strip())
                print("-" * 60)

            print(f"\n{'='*25} Step 2 (Check + Secondary Split) {'='*25}")

        final_chunks: List[Document] = []
        for i, chunk in enumerate(primary_chunks, 1):
            tlen = self._token_len(chunk.page_content)

            if self.debug_print:
                print(f"\n>>> Checking semantic chunk {i} tokens≈{tlen} ...")

            if tlen <= self.max_chunk_tokens:
                if self.debug_print:
                    print(f"  └─ OK (<= {self.max_chunk_tokens}). Keep as-is.")
                final_chunks.append(chunk)
            else:
                if self.debug_print:
                    print(f"  └─ Too large (> {self.max_chunk_tokens}). Apply sliding window split.")
                    print("     [Original oversized chunk]")
                    print("     " + "-" * 50)
                    print("     " + chunk.page_content.strip().replace("\n", "\n     "))
                    print("     " + "-" * 50)

                # Secondary split acts on the oversized chunk as a single Document
                sub_docs = self.secondary_splitter.split_documents(
                    [Document(page_content=chunk.page_content, metadata=dict(chunk.metadata or {}))]
                )

                if self.debug_print:
                    print(f"\n     [Secondary split produced {len(sub_docs)} sub-chunks]")
                    for j, sd in enumerate(sub_docs, 1):
                        print(f"\n     [Sub-chunk {i}.{j}] tokens≈{self._token_len(sd.page_content)}")
                        print("     " + "-" * 40)
                        print("     " + sd.page_content.strip().replace("\n", "\n     "))
                        print("     " + "-" * 40)

                final_chunks.extend(sub_docs)

        if self.debug_print:
            print("\n--- HYBRID splitting complete ---")
            print(f"Final chunk count: {len(final_chunks)}")

        return final_chunks


In [0]:
semantic_primary = SemanticTextSplitter(
    embedder=embeddings,
    sentence_splitter=sentence_tokenize_en,   # or your chinese tokenizer if your doc is Chinese
    breakpoint_percentile_threshold=95,       # conservative
    buffer_size=1,
    min_sentences_per_chunk=1,
)

sliding_secondary = SlidingWindowTextSplitter(
    window_size=256,
    stride=2)

hybrid_splitter = HybridTextSplitter(
    primary_splitter=semantic_primary,
    secondary_splitter=sliding_secondary,
    max_chunk_tokens=300,      # align with your example's max_chunk_size idea
    debug_print=True,
)

# Now run the exact same RAG pipeline you already have:
run_rag_pipeline(
    splitter=hybrid_splitter,
    documents=documents,   # reuse your RAG document list
    question=question,
    splitter_name="Hybrid Split (Semantic -> Sliding Window fallback)",
    top_k=5,
    print_raw_chunks=True,
)


Running RAG with splitter: Hybrid Split (Semantic -> Sliding Window fallback)

--- Starting HYBRID splitting ---
Step 1: semantic split...

Produced 1 semantic chunks.

[Semantic chunk 1] tokens≈414
------------------------------------------------------------
Retrieval-Augmented Generation (RAG) is a common architecture for building
LLM-powered applications that combine external knowledge retrieval with
text generation. Instead of relying solely on a model’s internal parameters, RAG systems
retrieve relevant information from external data sources and use it as
context during answer generation. This approach is widely used for applications such as question answering,
knowledge assistants, and domain-specific search. A typical RAG pipeline consists of several core components, including
document ingestion, text chunking, embedding generation, vector indexing,
retrieval, and response synthesis. Documents from various sources—such as PDFs, databases, APIs, or web pages—
are first ingested 