In [2]:
from rich import print
from rich.panel import Panel
from dotenv import load_dotenv
load_dotenv()

True

# Document Conversion

## MarkItdown
[MarkItDown](https://github.com/microsoft/markitdown) is a utility for converting various files to Markdown (e.g., for indexing, text analysis, etc).
It supports:

- PDF
- PowerPoint
- Word
- Excel
- Images (EXIF metadata and OCR)
- Audio (EXIF metadata and speech transcription)
- HTML
- Text-based formats (CSV, JSON, XML)
- ZIP files (iterates over contents)
- Youtube URLs
- ... and more!

To install MarkItDown, use pip: `pip install markitdown[all]`. Alternatively, you can install it from the source:

```bash
pip install markitdown[all]
```


In [None]:
from markitdown import MarkItDown
from openai import OpenAI

client = OpenAI()
md = MarkItDown(llm_client=client, llm_model="gpt-4o-mini")
result = md.convert("papers/2501.07391v1.pdf")

print(Panel.fit(result.text_content, title="MarkItDown Output"))

### Docling


[Docling](https://github.com/DS4SD/docling) simplifies document processing, parsing diverse formats — including advanced PDF understanding — and providing seamless integrations with the gen AI ecosystem.

## Features

* 🗂️ Parsing of [multiple document formats][supported_formats] incl. PDF, DOCX, XLSX, HTML, images, and more
* 📑 Advanced PDF understanding incl. page layout, reading order, table structure, code, formulas, image classification, and more
* 🧬 Unified, expressive [DoclingDocument][docling_document] representation format
* ↪️ Various [export formats][supported_formats] and options, including Markdown, HTML, and lossless JSON
* 🔒 Local execution capabilities for sensitive data and air-gapped environments
* 🤖 Plug-and-play [integrations][integrations] incl. LangChain, LlamaIndex, Crew AI & Haystack for agentic AI
* 🔍 Extensive OCR support for scanned PDFs and images
* 💻 Simple and convenient CLI





In [None]:
import logging
from pathlib import Path
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import (
    PdfPipelineOptions,
)
from docling.document_converter import DocumentConverter, PdfFormatOption

logging.basicConfig(level=logging.INFO)

input_doc_path = Path("papers/2501.07391v1.pdf")

pipeline_options = PdfPipelineOptions(
    enable_remote_services=False
)
pipeline_options.do_picture_description = False

doc_converter = DocumentConverter(
    format_options={
        InputFormat.PDF: PdfFormatOption(
            pipeline_options=pipeline_options,
        )
    }
)
result = doc_converter.convert(input_doc_path)

In [None]:

print(Panel.fit(result.document.export_to_markdown(), title="Docling Output"))

# Chunking

## Recursive Chunker

In [None]:
import re
from typing import List, Optional


def recursive_text_splitter(
    text: str,
    chunk_size: int = 1000,
    min_chunk_size: int = 200,
    chunk_overlap: int = 200,
    separators: Optional[List[str]] = None) -> List[str]:
    """
    Split text recursively using a list of separators, ensuring minimum chunk size.
    
    Args:
        text: The text to split
        chunk_size: Maximum size of each chunk
        min_chunk_size: Minimum size of each chunk
        chunk_overlap: Overlap between chunks
        separators: List of separators to use in order of preference
        
    Returns:
        List of text chunks
    """
    # Default separators if none provided
    if separators is None:
        separators = ["\n\n", "\n", " ", ""]
    
    # Ensure min_chunk_size is not larger than chunk_size
    min_chunk_size = min(min_chunk_size, chunk_size)
    
    def merge_chunks(splits: List[str], separator: str) -> List[str]:
        """Merge splits into chunks with overlap, ensuring minimum size."""
        chunks = []
        current_chunk = []
        current_length = 0
        
        for split in splits:
            split_length = len(split) + (len(separator) if current_chunk else 0)
            
            # If adding this split would exceed chunk size, finalize current chunk
            if current_length + split_length > chunk_size:
                # Save current chunk if it meets minimum size
                if current_chunk:
                    chunk_text = separator.join(current_chunk)
                    if len(chunk_text) >= min_chunk_size or not chunks:
                        chunks.append(chunk_text)
                    
                    # Create overlap for next chunk
                    overlap_chunks = []
                    overlap_length = 0
                    
                    for item in reversed(current_chunk):
                        sep_len = len(separator) if overlap_chunks else 0
                        if len(item) + sep_len + overlap_length > chunk_overlap:
                            break
                        overlap_chunks.insert(0, item)
                        overlap_length += len(item) + sep_len
                    
                    current_chunk = overlap_chunks
                    current_length = overlap_length
                
                # Handle splits larger than chunk_size
                if split_length > chunk_size:
                    for i in range(0, len(split), chunk_size - chunk_overlap):
                        chunk = split[i:min(i + chunk_size, len(split))]
                        if len(chunk) >= min_chunk_size or not chunks:
                            chunks.append(chunk)
                    
                    current_chunk = []
                    current_length = 0
                    continue
            
            # Add to current chunk
            current_chunk.append(split)
            current_length += split_length
        
        # Handle the final chunk
        if current_chunk:
            final_text = separator.join(current_chunk)
            if len(final_text) >= min_chunk_size or not chunks:
                chunks.append(final_text)
            elif chunks and len(chunks[-1]) + len(separator) + len(final_text) <= chunk_size:
                # Merge with previous chunk if too small and fits
                chunks[-1] = chunks[-1] + separator + final_text
        
        return chunks

    def split_text(text: str, level: int = 0) -> List[str]:
        """Split text using separators at current level."""
        # Base cases
        if len(text) <= chunk_size:
            return [text] if len(text) >= min_chunk_size or not text else []
        
        # If at the character level, chunk by size
        if level >= len(separators) - 1:
            chunks = []
            for i in range(0, len(text), max(1, chunk_size - chunk_overlap)):
                chunk = text[i:i + chunk_size]
                if len(chunk) >= min_chunk_size or not chunks:
                    chunks.append(chunk)
            return chunks
        
        # Try to split with current separator
        separator = separators[level]
        splits = [char for char in text] if separator == "" else text.split(separator)
        
        # If splitting doesn't work, try next separator
        if len(splits) <= 1:
            return split_text(text, level + 1)
        
        # Process each split
        results = []
        for split in splits:
            if len(split) <= chunk_size:
                results.append(split)
            else:
                results.extend(split_text(split, level + 1))
        
        # Merge the results
        return merge_chunks(results, separator)
    
    return split_text(text)

In [None]:
chunks = recursive_text_splitter(result.document.export_to_markdown())

In [None]:
len(chunks)

## Semantic Chunker

In [None]:
from typing import List, Dict, Any
import numpy as np
from openai import OpenAI

client = OpenAI()


def calculate_distances(sentences: List[str], buffer_size: int = 3) -> List[float]:
    """
    Calculates semantic distances between adjacent sentences with context.
    
    Args:
        sentences: List of sentence strings
        client: OpenAI client
        buffer_size: Number of sentences to include as context before and after
        
    Returns:
        distances: List of semantic distances between adjacent sentences
    """
    # Calculate embeddings directly in batches
    BATCH_SIZE = 500
    embedding_matrix = None
    
    # Process sentences in batches, combining with context on the fly
    for batch_start in range(0, len(sentences), BATCH_SIZE):
        batch_end = min(batch_start + BATCH_SIZE, len(sentences))
        
        # Create combined sentences for this batch
        batch_combined = []
        for i in range(batch_start, batch_end):
            context_start = max(0, i - buffer_size)
            context_end = min(len(sentences), i + buffer_size + 1)
            combined = ' '.join(sentences[context_start:context_end])
            batch_combined.append(combined)
        
        # Get embeddings for this batch using OpenAI API
        response = client.embeddings.create(model='text-embedding-3-small', input=batch_combined)
        batch_embeddings = np.array([item.embedding for item in response.data])
        
        if embedding_matrix is None:
            embedding_matrix = batch_embeddings
        else:
            embedding_matrix = np.concatenate((embedding_matrix, batch_embeddings), axis=0)
    
    # Normalize embeddings
    norms = np.linalg.norm(embedding_matrix, axis=1, keepdims=True)
    embedding_matrix = embedding_matrix / norms
    
    # Calculate similarity matrix and extract distances between adjacent sentences
    similarity_matrix = np.dot(embedding_matrix, embedding_matrix.T)
    distances = [1 - similarity_matrix[i, i + 1] for i in range(len(sentences) - 1)]
    
    return distances


def get_cut_indices(distances, target_cuts):
    """
    Find cut indices based on semantic distances and target number of cuts.
    
    Args:
        distances: List of semantic distances between adjacent sentences
        target_cuts: Target number of cuts
    
    Returns:
        List of cut indices
    """
    # Binary search for optimal threshold
    lower_limit, upper_limit = 0.0, 1.0
    distances_np = np.array(distances)
    
    while upper_limit - lower_limit > 1e-6:
        threshold = (upper_limit + lower_limit) / 2.0
        cuts = np.sum(distances_np > threshold)
        
        if cuts > target_cuts:
            lower_limit = threshold
        else:
            upper_limit = threshold
    
    # Find cut points based on threshold
    cut_indices = [i for i, d in enumerate(distances) if d > threshold] + [-1]
    
    return cut_indices

def semantic_text_splitter(text: str, 
                           avg_chunk_size: int = 1600, 
                           min_chunk_size: int = 800,
                           max_chunk_size: int = 4000) -> List[str]:
    """
    Split text into chunks of approximately avg_chunk_size characters based on semantic similarity.
    
    Args:
        text: The input text to be split
        client: OpenAI client instance
        avg_chunk_size: Target average size of chunks in characters
        min_chunk_size: Minimum size for initial text splitting
        
    Returns:
        List of text chunks
    """    
    # Split text into minimal sentence units
    sentences = recursive_text_splitter(text, min_chunk_size, int(min_chunk_size*0.5), chunk_overlap=0)
    # Calculate distances between sentences
    distances = calculate_distances(sentences)
    
    # Determine number of cuts needed based on character count
    total_length = sum(len(s) for s in sentences)
    target_cuts = total_length // avg_chunk_size
    
    cut_indices = get_cut_indices(distances, target_cuts)
    
    # Create chunks based on cut points
    chunks = []
    current_chunk = ''
    sentence_pointer = 0
    while sentence_pointer < len(sentences):
        sentence = sentences[sentence_pointer]
        if len(current_chunk) + len(sentence) > max_chunk_size:
            chunks.append(current_chunk.strip())
            current_chunk = ''
            cut_indices = [n+sentence_pointer for n in get_cut_indices(distances[sentence_pointer:], target_cuts-len(chunks))]
            continue
        if len(sentence) < int(min_chunk_size*0.5):
            print(sentence)
        current_chunk += f'\n{sentence}' if current_chunk else sentence
        if sentence_pointer == cut_indices[0]:
            chunks.append(current_chunk.strip())
            current_chunk = ''
            cut_indices.pop(0)
        sentence_pointer += 1
    if current_chunk:
        chunks.append(current_chunk.strip())
    return chunks

In [None]:
chunks = semantic_text_splitter(result.document.export_to_markdown(), 1600, 800)

In [None]:
for i in range(10):
    print(Panel.fit(chunks[i], title=f"Chunk {i+1}"))

In [None]:
from openai import OpenAI

client = OpenAI()


SYSTEM_PROMPT = "You are an assistant specialized in splitting text into thematically consistent sections."

USER_MSG = """The text has been divided into chunks, each marked with <|start_chunk_X|> and <|end_chunk_X|> tags, where X is the chunk number.
Your task is to identify the points where splits should occur, such that consecutive chunks of similar themes stay together. Try to avoid splitting in the middle of a topic/section/paragraph

{chunked_input}

Respond with a list of chunk IDs where you believe a split should be made. For example, if chunks 1 and 2 belong together but chunk 3 starts a new topic, you would suggest a split after chunk 2.
THE CHUNKS MUST BE IN ASCENDING ORDER.
Your response should be in the form: 'split_after: 3, 5'
Respond only with the IDs of the chunks where you believe a split should occur.
YOU MUST RESPOND WITH AT LEAST ONE SPLIT. THESE SPLITS MUST BE IN ASCENDING ORDER AND EQUAL OR LARGER THAN: {current_chunk}
"""

def llm_text_splitter(text,
                      min_chunk_size: int = 800,
                      n_chunks_per_prompt: int = 10,
                      max_retries: int = 5):
    chunks = recursive_text_splitter(text, min_chunk_size, int(min_chunk_size*0.5), chunk_overlap=0)
    split_indices = []
    current_chunk = 0
    while True:
        if current_chunk >= len(chunks) - 4:
            break
        chunked_input = []
        for i in range(current_chunk, min(len(chunks), current_chunk+n_chunks_per_prompt)):
            chunked_input.append(f"<|start_chunk_{i+1}|>{chunks[i]}<|end_chunk_{i+1}|>")
        chunked_input = '\n'.join(chunked_input)
        original_prompt = USER_MSG.format(chunked_input=chunked_input, current_chunk=current_chunk)
        prompt = original_prompt
        final_answer = None
        for _ in range(max_retries):
            result_string = client.chat.completions.create(model='gpt-4o-mini', messages=[{"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": prompt}], max_tokens=200, temperature=0.2)
            result_string = result_string.choices[0].message.content
            split_after_line = [line for line in result_string.split('\n') if 'split_after:' in line][0]
            numbers = re.findall(r'\d+', split_after_line)
            numbers = list(map(int, numbers))
            if not (numbers != sorted(numbers) or any(number < current_chunk for number in numbers)):
                final_answer = numbers
                break
            else:
                prompt = original_prompt + f"\nThe previous response of {numbers} was invalid. DO NOT REPEAT THIS ARRAY OF NUMBERS. Please try again." 
        if final_answer is None:
            raise ValueError("Failed to retrieve valid split")
        split_indices.extend(final_answer)
        current_chunk = numbers[-1]
        if len(numbers) == 0:
            break
    chunks_to_split_after = [i - 1 for i in split_indices]
    docs = []
    current_chunk = ''
    for i, chunk in enumerate(chunks):
        current_chunk += chunk + ' '
        if i in chunks_to_split_after:
            docs.append(current_chunk.strip())
            current_chunk = ''
    if current_chunk:
        docs.append(current_chunk.strip())
    return docs

In [None]:
chunks_llm = llm_text_splitter(result.document.export_to_markdown(), 800)

In [None]:
for i in range(10):
    print(Panel.fit(chunks_llm[i], title=f"Chunk {i+1}"))

## LLMs for OCR + Chunking

In [3]:
CHUNKING_PROMPT = """\
OCR the following page into Markdown. Tables should be formatted as HTML. 
Do not sorround your output with triple backticks.

Chunk the document into sections of roughly 250 - 1000 words. Our goal is 
to identify parts of the page with same semantic theme. These chunks will 
be embedded and used in a RAG pipeline. 

Surround the chunks with <chunk-page-X> </chunk-page-X> tags and X should be the page on which the the chunk starts. Keep paragarphs and sections, but ignore linebreaks.
"""

In [None]:
import vertexai
vertexai.init(project=os.environ["GCP_PROJECT"], location=os.environ["GCP_LOCATION"])

In [4]:

from vertexai.generative_models import GenerativeModel, Part


model = GenerativeModel("gemini-2.0-flash-001")

# If your image is stored in Google Cloud Storage, you can use the from_uri class method to create a Part object.
with open("papers/2501.07391v1.pdf", "rb") as f:
    pdf_data = f.read()
response = GenerativeModel(model_name='gemini-2.0-flash-001').generate_content(
  contents=[
    CHUNKING_PROMPT,
    Part.from_data(
        data=pdf_data,
        mime_type="application/pdf",
    ),
  ],
)




ValueError: Unable to find your project. Please provide a project ID by:
- Passing a constructor argument
- Using vertexai.init()
- Setting project using 'gcloud config set project my-project'
- Setting a GCP environment variable
- To create a Google Cloud project, please follow guidance at https://developers.google.com/workspace/guides/create-project

In [None]:
print(Panel.fit(response.text, title="Gemini Output"))

## LLMS for OCR + Chunking + Retrieval

In [1]:
import os
from google import genai
from google.genai import types
client = genai.Client(
  vertexai=True, project=os.environ["GCP_PROJECT"], location=os.environ["GCP_LOCATION"]
)
from vertexai.generative_models import GenerativeModel, Part

RETRIEVAL_PROMPT = """\
OCR the following page into Markdown. Tables should be formatted as HTML. Keep paragarphs and sections, but ignore linebreaks.
Do not sorround your output with triple backticks.
Return the chunks from the document that contain most relevant information on the following questions/topen: '{topic}'.
Don't return more than {num_chunks} chunks. Surround the relevant pages with <chunk-page-X>exact text of the chunk</chunk-page-X> tags and X should be number of the page from with the chunk is returned.
If less than {num_chunks} chunks are relevant return only the relevant chunks. For every chunk return at least 100 words to give enough context.
So for example, if pages 10,146, and 4 contain text chunks of relevance. The output should look like this.
If the document is shorter than {num_pages} pages, return all pages of the document and use 1 chunk per page.
<chunk-page-10>
 The prompt shapes how the model interprets its task and utilizes retrieved information (Sun et al., 2024).    
...                                                                          
Q4. How does the size of the knowledge base impact the overall performance? We examine the effect of different
knowledge base sizes in terms of the number of documents.                                                                                                
</chunk-page-10>
<chunk-page-145>
Small prompt changes may influence alignment, affecting response quality. We not only examine these small    
variations but also test counterfactual prompts, to explore the model's behavior under opposite guidance and 
...     
how different prompt crafting strategies can optimize performance.                                           
Q3. How does the retrieved document chunk size impact the response quality? Chunk size affects the balance 
</chunk-page-145>
<chunk-page-4>
...
</chunk-page-4>
...
"""


# If your image is stored in Google Cloud Storage, you can use the from_uri class method to create a Part object.
with open("papers/eu-ai-act.pdf", "rb") as f: # eu-ai-act.pdf
    pdf_data = f.read()
response = GenerativeModel(model_name='gemini-2.0-flash-001').generate_content(
  contents=[
    RETRIEVAL_PROMPT.format(topic="How high are the fines for companies not compliant with the EU AI ACT?", num_pages=5, num_chunks=5),
    Part.from_data(
        data=pdf_data,
        mime_type="application/pdf",
    ),
  ],
)


KeyError: 'GCP_PROJECT'

In [None]:
print(Panel.fit(response.text, title="Gemini Output"))

In [52]:
response.usage_metadata

prompt_token_count: 1197
candidates_token_count: 417
total_token_count: 1614

In [56]:
response.usage_metadata.prompt_token_count / 1_000_000 * 0.1 + response.usage_metadata.candidates_token_count / 1_000_000 * 0.4 

0.0002865