# Car Manual Explorer - GCP + MDB

Using GCP (Document AI + Vertex AI) to parse, chunk and create embeddings from a car manual which are stored in an Atlas Collection. This will be utilised to build a Technical Document Intelligence System.




## 1. Create a GCP project and setup configs for it

Installing GCP dependencies.

In [None]:
!pip install --upgrade --quiet google-cloud-storage==2.17.0
!pip install --upgrade --quiet google-cloud-documentai==2.29.1
!pip install --upgrade --quiet google-cloud-aiplatform==1.49.0
!pip install --upgrade --quiet vertexai==1.49.0

In [None]:
!gcloud auth application-default login
!gcloud config set project <PROJECT_NAME>
!gcloud config get-value project

In [None]:
!gcloud auth application-default set-quota-project <PROJECT_NAME>

In [None]:
#getting the Google Cloud Project ID and storing it in a variable
PROJECT_ID_DETAILS = !gcloud config get-value project
PROJECT_ID = PROJECT_ID_DETAILS[0]

In [None]:
#retrieve and store the Google Cloud Platform (GCP) project number
PROJECT_NUMBER_DETAILS = !gcloud projects describe $PROJECT_ID --format="value(projectNumber)"
PROJECT_NUMBER = PROJECT_NUMBER_DETAILS[0]  # The project number is item 0 in the list returned by the gcloud command

## 2. Parse using Document AI

**2.1** Utilizing **Document OCR** Processor from Document AI to parse the car manual.

Splitting the Car Manual pdf into chunks of 15 pages to stay under Document AI's 30 page limit.

1. PDF SPLITTER FUNCTION

In [None]:
# 1. Ensure PyPDF2 is installed
!pip install PyPDF2 tqdm

# 2. Save the UPDATED PDF splitter code to a file
with open('pdf_splitter.py', 'w') as f:
    f.write('''
from PyPDF2 import PdfReader, PdfWriter
import os
import math

def split_pdf_for_document_ai(
    input_pdf_path,
    output_dir,
    pages_per_chunk=15, # Reduced default for safety margin
    start_page_number=1 # 1-based page number to start processing from
    ):
    \"\"\"
    Split a PDF into smaller chunks for Document AI processing, optionally skipping initial pages.

    Args:
        input_pdf_path: Path to the input PDF file.
        output_dir: Directory to save the split PDF files.
        pages_per_chunk: Number of pages per chunk.
        start_page_number: The 1-based page number to start splitting from.

    Returns:
        Tuple: (List of paths to the split PDF files, total_pages_processed)
    \"\"\"
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    pdf_name = os.path.splitext(os.path.basename(input_pdf_path))[0]

    try:
        pdf = PdfReader(input_pdf_path)
        total_source_pages = len(pdf.pages)
    except Exception as e:
        print(f"Error reading PDF {input_pdf_path}: {e}")
        return [], 0

    # Adjust for 0-based indexing and calculate the first page index to include
    start_page_index = max(0, start_page_number - 1)

    if start_page_index >= total_source_pages:
        print(f"Warning: Start page ({start_page_number}) is beyond the total number of pages ({total_source_pages}). No pages to process.")
        return [], 0

    # Calculate the number of pages to actually process
    pages_to_process_count = total_source_pages - start_page_index
    num_chunks = math.ceil(pages_to_process_count / pages_per_chunk)

    chunk_paths = []
    total_pages_processed_in_chunks = 0

    print(f"Source PDF has {total_source_pages} pages. Processing from page {start_page_number} ({pages_to_process_count} pages).")
    print(f"Splitting into {num_chunks} chunks of up to {pages_per_chunk} pages each.")

    for i in range(num_chunks):
        output = PdfWriter()

        # Calculate start and end page indices relative to the *source* PDF
        chunk_start_index_rel_processed = i * pages_per_chunk
        chunk_end_index_rel_processed = min((i + 1) * pages_per_chunk, pages_to_process_count)

        # Map back to source page indices
        source_start_page_index = start_page_index + chunk_start_index_rel_processed
        source_end_page_index = start_page_index + chunk_end_index_rel_processed

        pages_in_this_chunk = 0
        for page_num_idx in range(source_start_page_index, source_end_page_index):
            # Ensure we don't go beyond the actual number of pages
            if page_num_idx < total_source_pages:
                try:
                    output.add_page(pdf.pages[page_num_idx])
                    pages_in_this_chunk += 1
                except Exception as e:
                     print(f"Error adding page {page_num_idx + 1} to chunk {i+1}: {e}")


        if pages_in_this_chunk > 0:
            output_path = os.path.join(output_dir, f"{pdf_name}_chunk_{i+1}_pages_{source_start_page_index+1}-{source_end_page_index}.pdf")
            try:
                with open(output_path, "wb") as output_file:
                    output.write(output_file)
                chunk_paths.append(output_path)
                total_pages_processed_in_chunks += pages_in_this_chunk
                print(f"Created chunk {i+1}/{num_chunks}: pages {source_start_page_index+1}-{source_end_page_index} -> {output_path}")
            except Exception as e:
                print(f"Error writing chunk {i+1} to {output_path}: {e}")
        else:
            print(f"Skipping chunk {i+1} as it contains no valid pages.")


    print(f"Finished splitting. Total pages included in chunks: {total_pages_processed_in_chunks}")
    return chunk_paths, total_pages_processed_in_chunks
''')

Writing the function to send chunks of manual pdf to Document AI for **OCR Parsing**.

In [None]:
# Keep parse_document function as is
from google.api_core.client_options import ClientOptions
from google.cloud import documentai_v1 as documentai
from typing import List, Dict, Tuple, Any
import tqdm
import os # Make sure os is imported
from PyPDF2 import PdfReader

# Import the updated splitter
from pdf_splitter import split_pdf_for_document_ai

def parse_document(file_path, project_id, location, processor_id):
    """
    Process a PDF document with Document AI. (Identical to original)
    """
    from google.cloud import documentai # Ensure import is within function if needed

    # Initialize Document AI client
    # Consider initializing the client outside the function if called repeatedly
    # for better performance, but this is safer for potential reuse.
    client_options = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com")
    client = documentai.DocumentProcessorServiceClient(client_options=client_options)


    # Format the resource name
    name = f"projects/{project_id}/locations/{location}/processors/{processor_id}"

    # Read the file into memory
    try:
        with open(file_path, "rb") as file:
            content = file.read()
    except FileNotFoundError:
        print(f"Error: Chunk file not found at {file_path}")
        return None
    except Exception as e:
        print(f"Error reading chunk file {file_path}: {e}")
        return None


    # Configure the process request
    document = documentai.RawDocument(content=content, mime_type="application/pdf")
    request = documentai.ProcessRequest(name=name, raw_document=document)

    # Process the document
    try:
        result = client.process_document(request=request)
        return result.document
    except Exception as e:
        print(f"Error processing document {file_path} with Document AI: {e}")
        return None


# --- REFACTORED Batch Processing Function ---
def process_pdf_in_batches_ocr_only_list_output(
    pdf_path: str,
    project_id: str,
    location: str,
    ocr_processor_id: str,
    output_dir: str = "split_pdfs",
    pages_per_chunk: int = 15,
    parallel_processing: bool = True,
    max_concurrent: int = 3
) -> Tuple[List[documentai.Document], int]:
    """
    Process a PDF by splitting it and processing each chunk with Document AI OCR.

    Args:
        pdf_path: Path to the PDF file.
        project_id: GCP project ID.
        location: GCP location.
        ocr_processor_id: Document AI OCR processor ID.
        output_dir: Directory to save split PDFs.
        pages_per_chunk: Number of pages per chunk.
        parallel_processing: Whether to process chunks in parallel.
        max_concurrent: Maximum number of concurrent API calls.

    Returns:
        Tuple: (List of processed Document AI document objects, expected_pages_processed)
    """
    # Get the original number of pages for validation
    try:
        reader = PdfReader(pdf_path)
        expected_pages_processed = len(reader.pages)
    except Exception as e:
        print(f"Error reading original PDF for page count: {e}")
        expected_pages_processed = -1
        return [], expected_pages_processed

    # Split the PDF into chunks
    print(f"Splitting PDF '{pdf_path}' into chunks...")
    chunk_paths, total_pages_in_chunks = split_pdf_for_document_ai(
        pdf_path, output_dir, pages_per_chunk
    )

    if not chunk_paths:
        print("No PDF chunks were created. Exiting processing.")
        return [], expected_pages_processed

    # Initialize list to store results
    processed_documents = []
    actual_pages_processed_api = 0

    print(f"\nProcessing {len(chunk_paths)} PDF chunks with Document AI OCR...")

    if parallel_processing:
        # Setup parallel processing of chunks
        from concurrent.futures import ThreadPoolExecutor
        import threading

        # Thread-safe lock for printing
        print_lock = threading.Lock()

        # Define worker function with its own tqdm progress display
        def process_chunk(idx_chunk_tuple):
            idx, chunk_path = idx_chunk_tuple
            with print_lock:
                print(f"\nProcessing chunk {idx+1}/{len(chunk_paths)}: {os.path.basename(chunk_path)}")

            # Process with OCR processor
            ocr_document = parse_document(
                chunk_path, project_id, location, ocr_processor_id
            )

            result = {"ocr_document": None, "pages_count": 0, "success": False}

            if ocr_document:
                if hasattr(ocr_document, 'pages') and ocr_document.pages:
                    pages_in_doc = len(ocr_document.pages)
                    with print_lock:
                        print(f"  ✓ OCR processing successful for chunk {idx+1} ({pages_in_doc} pages)")
                    result = {
                        "ocr_document": ocr_document,
                        "pages_count": pages_in_doc,
                        "success": True
                    }
                else:
                    with print_lock:
                        print(f"  ✓ OCR processing successful for chunk {idx+1}, but no pages found.")
            else:
                with print_lock:
                    print(f"  ✗ OCR processing failed for chunk {idx+1}")

            return result

        # Process chunks in parallel with tqdm progress bar
        with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
            results = list(tqdm.tqdm(
                executor.map(process_chunk, enumerate(chunk_paths)),
                total=len(chunk_paths),
                desc="Processing Chunks"
            ))

        # Collect results from parallel processing
        for result in results:
            if result["success"]:
                processed_documents.append(result["ocr_document"])
                actual_pages_processed_api += result["pages_count"]
    else:
        # Original sequential processing with tqdm
        for i, chunk_path in enumerate(tqdm.tqdm(chunk_paths, desc="Processing Chunks")):
            print(f"\nProcessing chunk {i+1}/{len(chunk_paths)}: {os.path.basename(chunk_path)}")

            # Process with OCR processor
            ocr_document = parse_document(
                chunk_path, project_id, location, ocr_processor_id
            )

            if ocr_document:
                if hasattr(ocr_document, 'pages') and ocr_document.pages:
                    pages_in_doc = len(ocr_document.pages)
                    print(f"  ✓ OCR processing successful ({pages_in_doc} pages found in result)")
                    processed_documents.append(ocr_document)
                    actual_pages_processed_api += pages_in_doc
                    # Debug first chunk
                    if i == 0:
                        print("\n=== OCR Document Debug (First Processed Chunk) ===")
                        explore_ocr_document(ocr_document)
                        print("===============================================\n")
                else:
                    print(f"  ✓ OCR processing successful, but no pages found in the result.")
            else:
                print(f"  ✗ OCR processing failed for {os.path.basename(chunk_path)}")

    print(f"\nFinished processing chunks.")
    print(f"Total pages expected to be processed: {expected_pages_processed}")
    print(f"Total pages found in successful API results: {actual_pages_processed_api}")

    if expected_pages_processed > 0 and actual_pages_processed_api != expected_pages_processed:
        print(f"Warning: Mismatch between expected pages ({expected_pages_processed}) and pages in API results ({actual_pages_processed_api}). Check logs for errors.")
    elif expected_pages_processed == actual_pages_processed_api:
        print("Page count validation successful.")

    return processed_documents, expected_pages_processed

Calling **OCR Parser**.

In [None]:
pdf_filename = "/content/ABC Manual.pdf" # Make sure this path is correct
PROJECT_ID = "ENTER_YOUR_ID_HERE"
LOCATION = "us"
ocr_processor_id = "ENTER_PROCESSOR_ID"


# Check if PDF exists before processing
if not os.path.exists(pdf_filename):
    print(f"Error: PDF file not found at {pdf_filename}")
else:
    # Process the PDF in batches, getting a list of documents
    list_of_ocr_documents, expected_pages = process_pdf_in_batches_ocr_only_list_output(
        pdf_path=pdf_filename,
        project_id=PROJECT_ID,
        location=LOCATION,
        ocr_processor_id=ocr_processor_id,
        output_dir="split_pdfs",
        pages_per_chunk=15
    )

    if list_of_ocr_documents:
        print(f"\nSuccessfully processed {len(list_of_ocr_documents)} chunks.")
        # You can now pass list_of_ocr_documents to the updated text anchor processor
    else:
        print("\nNo documents were successfully processed.")


Inspect **OCR Document**.


In [None]:
def explore_ocr_document(ocr_document):
    """Thoroughly explore OCR document structure to find where text is stored"""
    print("\n===== EXPLORING OCR DOCUMENT STRUCTURE =====")

    # Check document-level text
    if hasattr(ocr_document, 'text'):
        print(f"Document has text attribute with {len(ocr_document.text)} characters")
        print(f"Example: {ocr_document.text[:100]}...")
    else:
        print("Document does NOT have a text attribute")

    # Check document attributes
    print(f"\nDocument attributes: {[attr for attr in dir(ocr_document) if not attr.startswith('_') and not callable(getattr(ocr_document, attr))]}")

    # Check first page in detail
    if hasattr(ocr_document, 'pages') and len(ocr_document.pages) > 0:
        first_page = ocr_document.pages[0]
        print(f"\nFirst page attributes: {[attr for attr in dir(first_page) if not attr.startswith('_') and not callable(getattr(first_page, attr))]}")

        # Look for blocks
        if hasattr(first_page, 'blocks') and len(first_page.blocks) > 0:
            print(f"\nPage has {len(first_page.blocks)} blocks")
            first_block = first_page.blocks[0]
            print(f"First block attributes: {[attr for attr in dir(first_block) if not attr.startswith('_') and not callable(getattr(first_block, attr))]}")

            # Try to find text in blocks
            if hasattr(first_block, 'text'):
                print(f"Block has text: {first_block.text[:50]}...")

            # Try to find text anchor
            if hasattr(first_block, 'layout') and hasattr(first_block.layout, 'text_anchor'):
                print("Block has text_anchor - text might be referenced via offsets")

                text_anchor = first_block.layout.text_anchor
                print(f"Text anchor attributes: {[attr for attr in dir(text_anchor) if not attr.startswith('_') and not callable(getattr(text_anchor, attr))]}")

                if hasattr(text_anchor, 'text_segments') and len(text_anchor.text_segments) > 0:
                    segment = text_anchor.text_segments[0]
                    print(f"Text segment attributes: {[attr for attr in dir(segment) if not attr.startswith('_') and not callable(getattr(segment, attr))]}")

                    # Try to extract text from segment
                    if hasattr(segment, 'start_index') and hasattr(segment, 'end_index') and hasattr(ocr_document, 'text'):
                        start = segment.start_index
                        end = segment.end_index
                        print(f"Text segment points to: {ocr_document.text[start:end]}")

            # Look for paragraphs
            if hasattr(first_block, 'paragraphs') and len(first_block.paragraphs) > 0:
                print(f"\nBlock has {len(first_block.paragraphs)} paragraphs")
                first_para = first_block.paragraphs[0]
                print(f"First paragraph attributes: {[attr for attr in dir(first_para) if not attr.startswith('_') and not callable(getattr(first_para, attr))]}")

                if hasattr(first_para, 'text'):
                    print(f"Paragraph has text: {first_para.text[:50]}...")

                # Check for words in paragraph
                if hasattr(first_para, 'words') and len(first_para.words) > 0:
                    print(f"Paragraph has {len(first_para.words)} words")
                    first_word = first_para.words[0]
                    print(f"First word attributes: {[attr for attr in dir(first_word) if not attr.startswith('_') and not callable(getattr(first_word, attr))]}")

                    if hasattr(first_word, 'text'):
                        print(f"Word has text: {first_word.text}")

    # Check text_segments if available directly
    if hasattr(ocr_document, 'text_segments'):
        print(f"\nDocument has {len(ocr_document.text_segments)} text segments")
        if len(ocr_document.text_segments) > 0:
            print(f"First segment: {ocr_document.text_segments[0]}")

    print("\n=== RECOMMENDATION FOR TEXT EXTRACTION ===")
    # Recommend the best approach based on what was found
    if hasattr(ocr_document, 'text') and ocr_document.text:
        # If the document has full text and blocks have text_anchors, use that approach
        print("1. RECOMMENDED: Extract text using text_anchors that reference document.text")
    elif hasattr(first_page, 'blocks') and hasattr(first_block, 'paragraphs'):
        if hasattr(first_para, 'text'):
            print("2. RECOMMENDED: Extract text from block -> paragraph -> text")
        elif hasattr(first_para, 'words'):
            print("3. RECOMMENDED: Extract text from block -> paragraph -> words -> text")
    else:
        print("4. FALLBACK: Use simpler text extraction approach")

    print("==========================================\n")

    return True

VALIDATION FUNCTION

In [None]:
from PyPDF2 import PdfReader
import difflib

def validate_parsed_text(
    combined_data: Dict[Tuple[int, float], Dict[str, Any]],
    original_pdf_path: str,
    start_page_number: int,
    expected_pages_processed: int,
    num_sample_pages: int = 3
    ):
    """
    Validates the parsed text against the original PDF and checks structure.

    Args:
        combined_data: The output from process_ocr_list_with_text_anchors.
        original_pdf_path: Path to the source PDF.
        start_page_number: 1-based page number where processing started.
        expected_pages_processed: Number of pages expected to be processed.
        num_sample_pages: How many pages to sample for text comparison.
    """
    print("\n--- Starting Validation ---")

    if not combined_data:
        print("Validation Error: No combined data provided.")
        return

    processed_pages = sorted(list(set(key[0] for key in combined_data.keys())))
    if not processed_pages:
        print("Validation Error: No pages found in combined data.")
        return

    actual_pages_count = len(processed_pages)
    min_page = min(processed_pages)
    max_page = max(processed_pages)

    print(f"Found {actual_pages_count} unique page numbers in processed data (Range: {min_page}-{max_page}).")
    if expected_pages_processed > 0 :
         if actual_pages_count == expected_pages_processed:
             print(f"Page Count Check: OK (Matches expected {expected_pages_processed} pages)")
         else:
              print(f"Page Count Check: MISMATCH (Expected {expected_pages_processed}, Found {actual_pages_count})")
    else:
         print("Page Count Check: Skipped (Could not get expected count from source PDF)")


    # 2. Text Comparison for Sample Pages
    print(f"\nComparing text for {num_sample_pages} sample pages...")
    try:
        pdf = PdfReader(original_pdf_path)
        total_source_pages = len(pdf.pages)

        # Select sample page numbers from the processed range
        sample_page_indices = []
        if actual_pages_count > 0:
            step = max(1, actual_pages_count // num_sample_pages)
            sample_page_indices = [processed_pages[i] for i in range(0, actual_pages_count, step)]
            # Ensure first and last processed pages are included if possible
            if min_page not in sample_page_indices:
                 sample_page_indices.insert(0, min_page)
            if max_page not in sample_page_indices and max_page != min_page:
                  sample_page_indices.append(max_page)
            sample_page_indices = sorted(list(set(sample_page_indices)))[:num_sample_pages] # Ensure unique and limit count


        for page_num_to_check in sample_page_indices:
            print(f"\n--- Comparing Page {page_num_to_check} ---")
            page_num_0_based = page_num_to_check - 1

            # Extract text from original PDF using PyPDF2
            original_text = ""
            if 0 <= page_num_0_based < total_source_pages:
                try:
                    page = pdf.pages[page_num_0_based]
                    original_text = page.extract_text()
                    if not original_text:
                         original_text = "[PyPDF2 extracted no text]"
                    else:
                         original_text = re.sub(r'\s+', ' ', original_text).strip() # Normalize whitespace
                except Exception as e:
                    original_text = f"[Error extracting text with PyPDF2: {e}]"
            else:
                 original_text = "[Page number out of range for original PDF]"


            # Extract text from processed data
            processed_elements_on_page = sorted(
                [elem for key, elem in combined_data.items() if key[0] == page_num_to_check],
                key=lambda x: x['id'] # Sort by element ID for consistent order
            )
            processed_text = " ".join(elem['text'] for elem in processed_elements_on_page)
            processed_text = re.sub(r'\s+', ' ', processed_text).strip() # Normalize


            # Compare texts
            print(f"Original PDF (PyPDF2):\n\"{original_text[:300]}...\"")
            print(f"\nProcessed (Document AI):\n\"{processed_text[:300]}...\"")

            # Simple similarity check (ratio)
            similarity = difflib.SequenceMatcher(None, original_text.lower(), processed_text.lower()).ratio()
            print(f"\nText Similarity Ratio: {similarity:.2f}")
            if similarity < 0.7: # Threshold can be adjusted
                print("WARNING: Low similarity detected. Manual review recommended.")
            else:
                print("Similarity Check: OK")

    except Exception as e:
        print(f"Error during text comparison: {e}")

    # 3. Check for empty text elements
    empty_text_elements = [elem['id'] for elem in combined_data.values() if not elem['text'].strip()]
    if empty_text_elements:
        print(f"\nWarning: Found {len(empty_text_elements)} elements with empty text (e.g., {empty_text_elements[:5]}).")
    else:
        print("\nEmpty Text Check: OK (No empty text elements found)")

    # 4. Check section hierarchy consistency (basic check)
    print("\nChecking Section Hierarchy (Sample):")
    hierarchy_issues = 0
    last_hierarchy = {}
    sample_elements = list(combined_data.values())[::max(1, len(combined_data)//20)] # Sample ~5% of elements

    for elem in sample_elements:
         current_hierarchy = elem.get('section_hierarchy', {})
         # Simple check: lower level heading should not appear without a higher level one set previously or concurrently
         if current_hierarchy.get(3) and not current_hierarchy.get(2) and not last_hierarchy.get(2):
              hierarchy_issues += 1
              # print(f"  Potential hierarchy issue at element {elem['id']} (page {elem['page_number']}): Subheading found without recent Heading.")
         if current_hierarchy.get(2) and not current_hierarchy.get(1) and not last_hierarchy.get(1):
              hierarchy_issues += 1
              # print(f"  Potential hierarchy issue at element {elem['id']} (page {elem['page_number']}): Heading found without recent Title.")
         last_hierarchy = current_hierarchy # Update for next comparison


    if hierarchy_issues > 0:
         print(f"WARNING: Found {hierarchy_issues} potential hierarchy inconsistencies in sample. Review 'section_hierarchy' in data.")
    else:
         print("Section Hierarchy Check: OK (Basic consistency in sample)")

    print("\n--- Validation Complete ---")



---

TEXT ANCHOR



---



In [None]:
import numpy as np
import re
from typing import List, Dict, Tuple, Any
from google.cloud import documentai_v1 as documentai # Ensure import if needed

# --- REFACTORED Text Anchor Processing Function ---
def process_ocr_list_with_text_anchors(
    ocr_documents: List[documentai.Document]
) -> Dict[Tuple[int, float], Dict[str, Any]]:
    """
    Extract text and infer structure from a list of OCR document results using text anchors.
    Enhanced for car manual structure.

    Args:
        ocr_documents: List of Document AI results, one per processed chunk.

    Returns:
        Dictionary of position tuples to element dictionaries for the entire document.
    """
    print(f"Processing {len(ocr_documents)} OCR document chunks using text anchors...")
    combined_data = {}
    element_id_counter = 0
    current_doc_page_offset = 0

    # Track document hierarchical structure ACROSS chunks
    current_title = None
    current_section = None
    current_heading = None
    current_subheading = None

    # Car manual specific patterns
    car_manual_section_patterns = [
        r'(?i)^(chapter|section)\s+\d+',
        r'(?i)^(introduction|overview|specifications|maintenance|troubleshooting|appendix)'
    ]

    warning_patterns = [
        r'(?i)^(warning|caution|note|danger|attention|important):',
        r'(?i)^(warning|caution|note|danger|attention|important)$'
    ]

    procedure_patterns = [
        r'(?i)^\d+\.\s+[A-Z]',  # Numbered steps starting with capital letter
        r'(?i)^to\s+[a-z]+',    # "To do something" procedure titles
    ]

    spec_patterns = [
        r'(?i)specifications',
        r'(?i)technical data',
        r'[\d.]+ (mm|cm|in|inches|kg|lbs|L|liters|gal|gallons)'  # Measurements
    ]

    # Ignore patterns for context (avoid these as section headers)
    ignore_patterns = [
        r'Part Number:', r'Copyright', r'Ford Motor Company', r'www\.ford\.com',
        r'\d{8}', r'^\d{6}$', r'@', r'^\s*$'
    ]

    # Process each document (chunk) in the list
    for doc_idx, ocr_document in enumerate(ocr_documents):
        # Check for document text
        if not hasattr(ocr_document, 'text') or not ocr_document.text:
            print(f"Warning: Document chunk {doc_idx+1} has no text attribute. Skipping.")
            continue

        full_text = ocr_document.text # Text is relative to THIS document chunk
        print(f"Processing document chunk {doc_idx+1} with {len(full_text)} characters.")

        # Check if OCR document has pages
        if not hasattr(ocr_document, 'pages') or not ocr_document.pages:
            print(f"Warning: Document chunk {doc_idx+1} has no pages. Skipping.")
            continue

        pages_in_current_doc = len(ocr_document.pages)

        # Process each page within the current document chunk
        for page_idx_in_doc, page in enumerate(ocr_document.pages):
            # Calculate the absolute page number based on the progress
            absolute_page_num = current_doc_page_offset + page_idx_in_doc + 1

            # Get blocks on the page
            if not hasattr(page, 'blocks') or not page.blocks:
                continue

            # Process each block on the page
            for block_idx, block in enumerate(page.blocks):
                if not hasattr(block, 'layout') or not hasattr(block.layout, 'text_anchor'):
                    continue

                text_anchor = block.layout.text_anchor
                if not hasattr(text_anchor, 'text_segments') or not text_anchor.text_segments:
                    continue

                # Get text segments within the block
                block_text_parts = []
                segment_start_indices = []

                for segment in text_anchor.text_segments:
                    if not hasattr(segment, 'start_index') or not hasattr(segment, 'end_index'):
                        continue

                    start_index = segment.start_index
                    end_index = segment.end_index

                    if start_index < 0 or end_index > len(full_text) or start_index >= end_index:
                        continue

                    # Extract text using indices relative to the CURRENT document's text
                    segment_text = full_text[start_index:end_index]
                    block_text_parts.append(segment_text)
                    segment_start_indices.append(start_index)

                # Combine text parts for the block and clean it
                text = "".join(block_text_parts).strip()
                text = re.sub(r'\s+', ' ', text) # Normalize whitespace

                if not text: # Skip empty blocks
                    continue

                # Use the start index of the *first* segment for positioning/sorting
                sort_pos = (segment_start_indices[0] / 1000000.0) if segment_start_indices else (block_idx / 1000.0)
                position_key = (absolute_page_num, sort_pos)

                # Enhanced element type detection for car manuals
                element_type = "PARAGRAPH"
                is_potential_header = False

                # Car manual specific classifications
                if any(re.search(pattern, text) for pattern in warning_patterns):
                    element_type = "WARNING"
                elif any(re.search(pattern, text) for pattern in spec_patterns):
                    element_type = "SPECIFICATION"
                elif any(re.search(pattern, text) for pattern in procedure_patterns):
                    element_type = "PROCEDURE"
                # Hierarchy patterns
                elif text.isupper() and len(text.split()) <= 6 and len(text) > 3:
                    if any(re.search(pattern, text) for pattern in car_manual_section_patterns):
                        element_type = "SECTION"
                        is_potential_header = True
                    elif not any(re.search(pattern, text, re.IGNORECASE) for pattern in ignore_patterns):
                        element_type = "HEADING"
                        is_potential_header = True
                elif (re.match(r'^\d+\.', text.strip()) or
                      (re.match(r'^[A-Z][a-zA-Z\s]+:', text.strip()) and len(text) < 100)) and \
                     not any(re.search(pattern, text, re.IGNORECASE) for pattern in ignore_patterns):
                    element_type = "HEADING"
                    is_potential_header = True
                elif re.match(r'^[A-Z].{5,50}:$', text.strip()) and \
                     not any(re.search(pattern, text, re.IGNORECASE) for pattern in ignore_patterns):
                    element_type = "SUBHEADING"
                    is_potential_header = True
                elif text.strip().startswith(('•', '-', '*')) or re.match(r'^\d+[\.)]', text.strip()):
                    element_type = "LIST_ITEM"
                elif bool(re.search(r'figure|fig\.', text, re.IGNORECASE)):
                    element_type = "FIGURE"
                elif text.count('|') > 2 or text.count('\t') > 2 or (':' in text and text.count('.') > 3):
                    element_type = "TABLE"

                # Update hierarchy if this element is a header type
                if element_type == 'SECTION':
                    current_section = text
                    current_heading = None  # Reset lower levels
                    current_subheading = None
                elif element_type == 'HEADING':
                    current_heading = text
                    current_subheading = None  # Reset lower level
                elif element_type == 'SUBHEADING':
                    current_subheading = text

                # Construct current hierarchy snapshot
                current_hierarchy_snapshot = {}
                if current_section and not any(re.search(pattern, current_section, re.IGNORECASE) for pattern in ignore_patterns):
                    current_hierarchy_snapshot[1] = current_section
                if current_heading and not any(re.search(pattern, current_heading, re.IGNORECASE) for pattern in ignore_patterns):
                    current_hierarchy_snapshot[2] = current_heading
                if current_subheading and not any(re.search(pattern, current_subheading, re.IGNORECASE) for pattern in ignore_patterns):
                    current_hierarchy_snapshot[3] = current_subheading

                # Bounding box
                bbox = {"x1": 0.1, "y1": block_idx / 100.0, "x2": 0.9, "y2": (block_idx / 100.0) + 0.05}
                if hasattr(block.layout, 'bounding_poly') and block.layout.bounding_poly.normalized_vertices:
                    vertices = block.layout.bounding_poly.normalized_vertices
                    if len(vertices) >= 4:
                        bbox = {
                            "x1": vertices[0].x,
                            "y1": vertices[0].y,
                            "x2": vertices[2].x,
                            "y2": vertices[2].y
                        }

                # Create the structured element
                element = {
                    "bbox": bbox,
                    "type": element_type,
                    "confidence": block.layout.confidence if hasattr(block.layout, 'confidence') else 0.9,
                    "page_number": absolute_page_num,
                    "text": text,
                    "id": f"element_{element_id_counter}",
                    "section_hierarchy": current_hierarchy_snapshot,
                    "doc_chunk_index": doc_idx
                }
                element_id_counter += 1

                # Store in combined data
                if position_key in combined_data:
                    print(f"Warning: Position key collision for page {absolute_page_num}. Overwriting.")
                combined_data[position_key] = element

        # Update the page offset for the next document chunk
        current_doc_page_offset += pages_in_current_doc

    print(f"Finished processing text anchors. Extracted {len(combined_data)} structured elements.")
    return combined_data



---

EXECUTE VALIDATION



---



In [None]:
START_PAGE = 1

if list_of_ocr_documents:
    # Process the list of documents to extract structured text
    combined_data_structured = process_ocr_list_with_text_anchors(
        ocr_documents=list_of_ocr_documents
    )

    # Validate the results
    if combined_data_structured:
        validate_parsed_text(
            combined_data=combined_data_structured,
            original_pdf_path=pdf_filename,
            start_page_number=START_PAGE,
            expected_pages_processed=expected_pages # Pass the expected count from the batch processor
        )

        # You can now proceed to the chunking steps using combined_data_structured
        # For example:
        # initial_chunks = create_meaningful_chunks_ocr(combined_data_structured, ...)
        print("\nReady to proceed to chunking using 'combined_data_structured'.")

    else:
        print("\nText anchor processing failed to produce data. Cannot validate.")
else:
    print("\nSkipping text anchor processing and validation as OCR processing yielded no documents.")

# 3. Chunking

In [None]:
pip install --quiet nltk

In [None]:
!python -m nltk.downloader punkt

In [None]:
import re
import copy
import datetime
import json
import gc
import os
from typing import List, Dict, Tuple, Any, Generator, Set, Optional
from tqdm import tqdm

# --- Optional: NLTK for better sentence splitting ---
USE_NLTK = True # Set to True if you have nltk installed
try:
    import nltk
    # Uncomment the next line if you have downloaded the 'punkt' resource
    nltk.download('punkt', quiet=True)
    from nltk.tokenize import sent_tokenize
    if not USE_NLTK:
        print("NLTK found, but USE_NLTK is False. Using regex sentence splitting.")
    else:
        print("Using NLTK for sentence splitting.")
except ImportError:
    print("NLTK not found or 'punkt' not downloaded. Using regex-based sentence splitting.")
    USE_NLTK = False
    def sent_tokenize(text): # Basic regex fallback
        sentences = re.split(r'(?<=[.!?])\s+', text)
        return [s for s in sentences if s] # Filter empty strings

# --- Configuration ---
MAX_CHUNK_SIZE = 1800  # Target maximum characters
MIN_CHUNK_SIZE = 500   # Target minimum characters (Increased for better context)
OVERLAP_SENTENCE_COUNT = 1 # Number of sentences for overlap
HEADING_SPLIT_THRESHOLD_FACTOR = 0.8 # Only split before headings if chunk size > 80% of max_chunk_size
AGGRESSIVE_MERGE_THRESHOLD = 150     # Chunks smaller than this will be merged more aggressively



# --- Memory-Optimized Chunking Implementation ---
# (process_elements_in_batches remains largely the same, ensuring context is passed correctly)
def process_elements_in_batches(
    combined_data: Dict[Tuple[int, float], Dict[str, Any]],
    max_chunk_size: int = MAX_CHUNK_SIZE,
    min_chunk_size: int = MIN_CHUNK_SIZE,
    overlap_sentences: int = OVERLAP_SENTENCE_COUNT,
    batch_size: int = 1000
) -> List[Dict[str, Any]]:
    """
    Memory-optimized function to process large manuals in batches.
    (Context passing updated to remove procedure state)
    """
    if not combined_data:
        return []

    sorted_keys = sorted(combined_data.keys())
    total_elements = len(sorted_keys)

    print(f"Processing {total_elements} elements in batches of {batch_size}...")

    all_chunks = []
    last_chunk_last_sentences = [] # Store sentences for overlap
    last_elements_context = {} # Preserve hierarchy context

    page_ranges = {key[0] for key in sorted_keys}
    total_pages = len(page_ranges)
    print(f"Manual spans {total_pages} pages across {total_elements} elements.")

    for i in range(0, total_elements, batch_size):
        batch_end = min(i + batch_size, total_elements)
        batch_keys = sorted_keys[i:batch_end]
        batch_elements = {key: combined_data[key] for key in batch_keys}

        batch_pages = set(key[0] for key in batch_keys)
        min_page, max_page = min(batch_pages), max(batch_pages)

        print(f"\nProcessing batch {i//batch_size + 1}/{(total_elements+batch_size-1)//batch_size}: "
              f"Elements {i+1}-{batch_end} (Pages {min_page}-{max_page})")

        batch_chunks = create_enriched_chunks(
            batch_elements,
            max_chunk_size,
            min_chunk_size,
            overlap_sentences,
            context_from_previous_batch=last_elements_context,
            overlap_sentences_from_previous=last_chunk_last_sentences
        )

        # Save the last chunk's ending sentences for potential overlap
        if batch_chunks:
            last_chunk_text = batch_chunks[-1]['text']
            sentences = sent_tokenize(last_chunk_text)
            last_chunk_last_sentences = sentences[-overlap_sentences:] if sentences else []

            # Save hierarchy context for continuation in next batch
            context_levels = ['heading_level_1', 'heading_level_2', 'heading_level_3']
            last_elements_context = {level: batch_chunks[-1].get(level) for level in context_levels}

        all_chunks.extend(batch_chunks)

        batch_elements = None # Free memory
        gc.collect()

        print(f"Batch complete. Generated {len(batch_chunks)} chunks. "
              f"Total chunks so far: {len(all_chunks)}")

    print("\nFinalizing all chunks...")
    final_chunks = post_process_chunks(all_chunks)

    print(f"Processing complete. Total chunks: {len(final_chunks)}")
    return final_chunks



---

MERGE CHUNK - HELPER FUNCTION



---



In [None]:
# --- Helper Function for Merging Chunks ---
def _perform_merge(prev_chunk: Dict[str, Any], current_chunk: Dict[str, Any], overlap_sentences: int):
    """
    Merges the current_chunk into the prev_chunk in place.
    Updates text, page numbers, features, and metadata.
    Returns the last sentences of the merged chunk for potential overlap.
    """
    # Merge text with a clear separator
    prev_chunk['text'] += "\n\n---\n\n" + current_chunk['text'] # Add a distinct separator

    # Update page numbers
    prev_chunk['page_numbers'] = sorted(list(set(prev_chunk['page_numbers'] + current_chunk['page_numbers'])))

    # Re-run feature extraction and metadata update on the *merged* content
    # Note: This assumes extract_chunk_features works purely on text. If it relied
    #       on element details, merging features would be more complex.
    updated_features = extract_chunk_features(prev_chunk['text'])
    prev_chunk.update(updated_features) # Overwrite/add new features

    # Combine content types carefully
    prev_chunk['content_type'] = sorted(list(set(prev_chunk.get('content_type', []) + current_chunk.get('content_type', []))))

    # Update metadata
    if 'metadata' not in prev_chunk: prev_chunk['metadata'] = {}
    prev_chunk['metadata']['chunk_length'] = len(prev_chunk['text'])
    prev_chunk['metadata']['page_count'] = len(prev_chunk['page_numbers'])
    # Simple merge for other metadata - might need refinement based on specific needs
    # For example, how to merge 'systems' or 'parts'? Simple union is often best.
    for key in ['systems', 'parts']:
        if key in current_chunk.get('metadata', {}):
             prev_items = set(prev_chunk['metadata'].get(key, []))
             current_items = set(current_chunk['metadata'].get(key, []))
             prev_chunk['metadata'][key] = sorted(list(prev_items.union(current_items)))
    if current_chunk.get('metadata', {}).get('has_torque'): prev_chunk['metadata']['has_torque'] = True
    if current_chunk.get('metadata', {}).get('has_safety'): prev_chunk['metadata']['has_safety'] = True


    # Return the last sentences of the *merged* chunk for overlap
    merged_sentences = sent_tokenize(prev_chunk['text'])
    return merged_sentences[-overlap_sentences:] if merged_sentences else []



---

MAIN CHUNKING FUNCTION


---



In [None]:
# --- Main Chunking Function ---
def create_enriched_chunks(
    combined_data: Dict[Tuple[int, float], Dict[str, Any]],
    max_chunk_size: int = MAX_CHUNK_SIZE,
    min_chunk_size: int = MIN_CHUNK_SIZE,
    overlap_sentences: int = OVERLAP_SENTENCE_COUNT,
    context_from_previous_batch: Optional[Dict[str, str]] = None,
    overlap_sentences_from_previous: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
    """
    Creates enriched, contextualized chunks with improved splitting and merging.
    - Fixes IndexError.
    - Less aggressive splitting before headings.
    - More aggressive merging for very small chunks.
    """
    final_chunks = []
    if not combined_data:
        return final_chunks

    sorted_elements = sorted(combined_data.items(), key=lambda item: item[0])

    current_chunk_elements = []
    current_chunk_text_parts = []
    current_size = 0

    # Initialize hierarchy from previous batch
    current_title = context_from_previous_batch.get('heading_level_1') if context_from_previous_batch else None
    current_heading = context_from_previous_batch.get('heading_level_2') if context_from_previous_batch else None
    current_subheading = context_from_previous_batch.get('heading_level_3') if context_from_previous_batch else None

    # Add overlap from previous batch/chunk if applicable
    overlap_text = ""
    if overlap_sentences_from_previous and overlap_sentences > 0:
        overlap_text = " ".join(overlap_sentences_from_previous)
        if overlap_text:
            # Use a distinct marker for overlap
            current_chunk_text_parts.append(f"[Overlap Start] {overlap_text} [Overlap End]\n\n")
            current_size += len(current_chunk_text_parts[-1])

    last_chunk_last_sentences_for_overlap = [] # Track sentences for next overlap

    for i, (pos, element) in enumerate(tqdm(sorted_elements, desc="Creating chunks", unit="element")):
        element_text = element['text'].strip()
        if not element_text: continue # Skip empty elements

        element_type = element['type']
        element_len = len(element_text)

        is_heading = element_type in ['TITLE', 'HEADING', 'SUBHEADING']
        is_list_item = element_type == 'LIST_ITEM'
        is_table_related = element_type == 'TABLE'

        # --- Determine if a new chunk should start BEFORE adding this element ---
        new_chunk_needed = False
        break_reason = "" # For conditional overlap

        # 1. Size Limit Check: If adding this element *might* exceed max size
        potential_new_size = current_size + element_len + 2 # Estimate with newline/space
        if current_size > 0 and potential_new_size > max_chunk_size:
            # Only break if current chunk is already substantial
            if current_size >= min_chunk_size:
                new_chunk_needed = True
                break_reason = "size"
            # If current chunk is small, let it grow (might create oversized chunks if elements are huge)

        # 2. Heading Rule (Less Aggressive): Start new chunk *before* a heading,
        #    ONLY if current chunk is already large.
        if is_heading and current_size >= max_chunk_size * HEADING_SPLIT_THRESHOLD_FACTOR:
             new_chunk_needed = True
             break_reason = f"heading_{element_type}"

        # 3. Page Break Rule: Consider breaking, but prioritize semantic flow
        if current_chunk_elements and element['page_number'] > current_chunk_elements[-1]['page_number']:
            last_element_type = current_chunk_elements[-1]['type']
            # Only break on page change if not mid-list/table and chunk is reasonably sized
            # Adjust the size threshold here if needed (e.g., min_chunk_size // 2)
            if not (is_list_item and last_element_type == 'LIST_ITEM') and \
               not (is_table_related and last_element_type == 'TABLE') and \
               current_size >= min_chunk_size * 0.6: # Break if chunk is >60% of min size on page turn
                   new_chunk_needed = True
                   break_reason = "page"

        # --- Finalize the previous chunk if needed ---\
        if new_chunk_needed and current_chunk_elements:
            finalized_chunk = _finalize_chunk(
                current_chunk_elements,
                current_chunk_text_parts,
                last_chunk_last_sentences_for_overlap # Pass sentences for potential overlap usage
            )

            # --- Aggressive Merging Logic ---
            merged = False
            if final_chunks: # Can only merge if there's a previous chunk
                prev_chunk = final_chunks[-1]
                finalized_chunk_len = len(finalized_chunk['text'])
                prev_chunk_len = len(prev_chunk['text'])
                # Approx combined length, actual merge adds separators
                combined_len_approx = prev_chunk_len + finalized_chunk_len

                # Condition 1: Aggressive merge for VERY small chunks
                if finalized_chunk_len < AGGRESSIVE_MERGE_THRESHOLD:
                    # Relax context check: only require H1 to match (or both be None)
                    h1_compatible = (prev_chunk.get('heading_level_1') == finalized_chunk.get('heading_level_1'))
                    # Still respect a reasonable size limit to avoid huge merged chunks
                    if h1_compatible and combined_len_approx < max_chunk_size * 1.3: # Allow slightly larger merges
                        print(f"\nAggressively merging very small chunk ({finalized_chunk_len} chars, Page {finalized_chunk['page_numbers'][0]}) "
                              f"into previous (Pages {prev_chunk['page_numbers']}) based on H1 context.")
                        # Perform merge and get sentences for next overlap
                        last_chunk_last_sentences_for_overlap = _perform_merge(prev_chunk, finalized_chunk, overlap_sentences)
                        merged = True
                    # else:
                        # Optional: Add logging if aggressive merge fails due to size or H1 mismatch
                        # print(f"\\nInfo: Very small chunk ({finalized_chunk_len} chars) not merged (H1 mismatch or size limit).")


                # Condition 2: Standard merge (if not aggressively merged and chunk is below min_size)
                elif not merged and finalized_chunk_len < min_chunk_size:
                     # Existing stricter context check (H1 and H2 must match)
                    compatible_context = (
                        prev_chunk.get('heading_level_1') == finalized_chunk.get('heading_level_1') and
                        prev_chunk.get('heading_level_2') == finalized_chunk.get('heading_level_2')
                    )
                    if compatible_context and combined_len_approx < max_chunk_size * 1.2: # Standard size limit
                         print(f"\nMerging small chunk ({finalized_chunk_len} chars, Page {finalized_chunk['page_numbers'][0]}) "
                               f"into previous (Pages {prev_chunk['page_numbers']}).")
                         # Perform merge and get sentences for next overlap
                         last_chunk_last_sentences_for_overlap = _perform_merge(prev_chunk, finalized_chunk, overlap_sentences)
                         merged = True
                    # else:
                         # Optional: Log standard merge failure
                         # print(f"\\nWarning: Small chunk ({finalized_chunk_len} chars) not merged (context mismatch or size limit).")


            # Add the finalized chunk to the list IF it wasn't merged
            if not merged:
                final_chunks.append(finalized_chunk)
                # Update last sentences for overlap *only* if a new chunk was added
                sentences = sent_tokenize(finalized_chunk['text'])
                last_chunk_last_sentences_for_overlap = sentences[-overlap_sentences:] if sentences else []
            # If merged, last_chunk_last_sentences_for_overlap was already updated by _perform_merge

            # Reset for the new chunk
            current_chunk_elements = []
            current_chunk_text_parts = []
            current_size = 0

            # Add overlap conditionally to the *start* of the new chunk
            if overlap_sentences > 0 and last_chunk_last_sentences_for_overlap:
                 apply_overlap = True
                 # Don't add overlap before a major new section starts (TITLE or HEADING)
                 if break_reason.startswith("heading_") and element_type in ['TITLE', 'HEADING']:
                     apply_overlap = False

                 if apply_overlap:
                    overlap_text = " ".join(last_chunk_last_sentences_for_overlap)
                    if overlap_text:
                        # Use distinct marker
                        current_chunk_text_parts.append(f"[Overlap Start] {overlap_text} [Overlap End]\n\n")
                        current_size += len(current_chunk_text_parts[-1])


        # Update hierarchy tracking (before adding element to list, but after potential split)
        # This reflects the context the *current* element belongs to
        if element_type == 'TITLE':
            current_title = element_text
            current_heading = None
            current_subheading = None
        elif element_type == 'HEADING':
            current_heading = element_text
            current_subheading = None
        elif element_type == 'SUBHEADING':
            current_subheading = element_text
        # Store the hierarchy context *with* the element for later use in _finalize_chunk
        element['section_hierarchy'] = {1: current_title, 2: current_heading, 3: current_subheading}

        # --- Add the current element to the current chunk ---
        # Decide prefix BEFORE appending the current element (FIX for IndexError)
        prefix = ""
        if current_chunk_text_parts: # If not the first part of the chunk
            # Check the type of the LAST element ADDED to the list
            last_element_type = current_chunk_elements[-1]['type'] if current_chunk_elements else None

            if is_heading:
                 prefix = "\n\n" # More space before headings
            elif is_list_item:
                 # Add newline before list item unless the PREVIOUS element was also a list item.
                 if last_element_type == 'LIST_ITEM':
                      prefix = "\n" # Default to newline for lists for better readability
                 else:
                      prefix = "\n" # Newline if starting a list or following a non-list item
            # Add more sophisticated rules for tables, paragraphs etc. if needed
            # elif is_table_related and last_element_type == 'TABLE':
            #     prefix = "\n"
            else:
                 # Add extra newline if previous was a heading
                 if last_element_type in ['TITLE', 'HEADING', 'SUBHEADING']:
                      prefix = "\n\n"
                 else:
                     # Add newline between paragraphs unless previous was list/table?
                     if last_element_type not in ['LIST_ITEM', 'TABLE']:
                          prefix = "\n\n" # Default paragraph spacing
                     else:
                          prefix = "\n"   # Less space after list/table

        # Now append the element AFTER determining the prefix based on the previous state
        current_chunk_elements.append(element)
        current_chunk_text_parts.append(prefix + element_text)
        current_size += len(element_text) + len(prefix)

    # --- Finalize the very last chunk ---\
    if current_chunk_elements:
        finalized_chunk = _finalize_chunk(
            current_chunk_elements,
            current_chunk_text_parts,
            last_chunk_last_sentences_for_overlap
        )

        # --- Final check for merging the last chunk ---
        merged = False
        if final_chunks: # Can only merge if there's a previous chunk
            prev_chunk = final_chunks[-1]
            finalized_chunk_len = len(finalized_chunk['text'])
            prev_chunk_len = len(prev_chunk['text'])
            combined_len_approx = prev_chunk_len + finalized_chunk_len

            # Condition 1: Aggressive merge
            if finalized_chunk_len < AGGRESSIVE_MERGE_THRESHOLD:
                h1_compatible = (prev_chunk.get('heading_level_1') == finalized_chunk.get('heading_level_1'))
                if h1_compatible and combined_len_approx < max_chunk_size * 1.3:
                    print(f"\nAggressively merging final very small chunk ({finalized_chunk_len} chars, Page {finalized_chunk['page_numbers'][0]}) "
                          f"into previous (Pages {prev_chunk['page_numbers']}).")
                    _perform_merge(prev_chunk, finalized_chunk, overlap_sentences) # Discard return value here
                    merged = True

            # Condition 2: Standard merge
            elif not merged and finalized_chunk_len < min_chunk_size:
                compatible_context = (
                    prev_chunk.get('heading_level_1') == finalized_chunk.get('heading_level_1') and
                    prev_chunk.get('heading_level_2') == finalized_chunk.get('heading_level_2')
                )
                if compatible_context and combined_len_approx < max_chunk_size * 1.2:
                    print(f"\nMerging final small chunk ({finalized_chunk_len} chars, Page {finalized_chunk['page_numbers'][0]}) "
                          f"into previous (Pages {prev_chunk['page_numbers']}).")
                    _perform_merge(prev_chunk, finalized_chunk, overlap_sentences) # Discard return value here
                    merged = True
                # else:
                    # Optional: Log standard merge failure for the final chunk
                    # print(f"\\nWarning: Final chunk is small ({finalized_chunk_len} chars), but not merged.")

        # Add the finalized chunk if it wasn't merged
        if not merged:
            final_chunks.append(finalized_chunk)

    return final_chunks

In [None]:
# --- Main Chunking Function  ---
def create_enriched_chunks(
    combined_data: Dict[Tuple[int, float], Dict[str, Any]],
    max_chunk_size: int = MAX_CHUNK_SIZE,
    min_chunk_size: int = MIN_CHUNK_SIZE,
    overlap_sentences: int = OVERLAP_SENTENCE_COUNT,
    context_from_previous_batch: Optional[Dict[str, str]] = None,
    overlap_sentences_from_previous: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
    """
    Creates enriched, contextualized chunks with complete removal of specified terms.
    """
    final_chunks = []
    if not combined_data:
        return final_chunks

    sorted_elements = sorted(combined_data.items(), key=lambda item: item[0])

    current_chunk_elements = []
    current_chunk_text_parts = []
    current_size = 0
    current_title = context_from_previous_batch.get('heading_level_1') if context_from_previous_batch else None
    current_heading = context_from_previous_batch.get('heading_level_2') if context_from_previous_batch else None
    current_subheading = context_from_previous_batch.get('heading_level_3') if context_from_previous_batch else None

    overlap_text = ""
    if overlap_sentences_from_previous and overlap_sentences > 0:
        overlap_text = " ".join(overlap_sentences_from_previous)
        if overlap_text:
            current_chunk_text_parts.append(f"[Overlap Start] {overlap_text} [Overlap End]\n\n")
            current_size += len(current_chunk_text_parts[-1])

    last_chunk_last_sentences_for_overlap = []

    for i, (pos, element) in enumerate(tqdm(sorted_elements, desc="Creating chunks", unit="element")):
        element_text = element['text'].strip()
        if not element_text: continue

        element_type = element['type']
        element_len = len(element_text)
        is_heading = element_type in ['TITLE', 'HEADING', 'SUBHEADING']
        is_list_item = element_type == 'LIST_ITEM'
        is_table_related = element_type == 'TABLE'

        new_chunk_needed = False
        break_reason = ""
        potential_new_size = current_size + element_len + 2
        if current_size > 0 and potential_new_size > max_chunk_size:
            if current_size >= min_chunk_size:
                new_chunk_needed = True
                break_reason = "size"
        if is_heading and current_size >= max_chunk_size * HEADING_SPLIT_THRESHOLD_FACTOR:
             new_chunk_needed = True
             break_reason = f"heading_{element_type}"
        if current_chunk_elements and element['page_number'] > current_chunk_elements[-1]['page_number']:
            last_element_type = current_chunk_elements[-1]['type']
            if not (is_list_item and last_element_type == 'LIST_ITEM') and \
               not (is_table_related and last_element_type == 'TABLE') and \
               current_size >= min_chunk_size * 0.6:
                   new_chunk_needed = True
                   break_reason = "page"

        if new_chunk_needed and current_chunk_elements:
            finalized_chunk = _finalize_chunk(
                current_chunk_elements,
                current_chunk_text_parts,
                last_chunk_last_sentences_for_overlap
            )
            merged = False
            if final_chunks:
                prev_chunk = final_chunks[-1]
                finalized_chunk_len = len(finalized_chunk['text'])
                prev_chunk_len = len(prev_chunk['text'])
                combined_len_approx = prev_chunk_len + finalized_chunk_len
                if finalized_chunk_len < AGGRESSIVE_MERGE_THRESHOLD:
                    h1_compatible = (prev_chunk.get('heading_level_1') == finalized_chunk.get('heading_level_1'))
                    if h1_compatible and combined_len_approx < max_chunk_size * 1.3:
                        last_chunk_last_sentences_for_overlap = _perform_merge(prev_chunk, finalized_chunk, overlap_sentences)
                        merged = True
                elif not merged and finalized_chunk_len < min_chunk_size:
                    compatible_context = (
                        prev_chunk.get('heading_level_1') == finalized_chunk.get('heading_level_1') and
                        prev_chunk.get('heading_level_2') == finalized_chunk.get('heading_level_2')
                    )
                    if compatible_context and combined_len_approx < max_chunk_size * 1.2:
                         last_chunk_last_sentences_for_overlap = _perform_merge(prev_chunk, finalized_chunk, overlap_sentences)
                         merged = True
            if not merged:
                final_chunks.append(finalized_chunk)
                sentences = sent_tokenize(finalized_chunk['text'])
                last_chunk_last_sentences_for_overlap = sentences[-overlap_sentences:] if sentences else []
            current_chunk_elements = []
            current_chunk_text_parts = []
            current_size = 0
            if overlap_sentences > 0 and last_chunk_last_sentences_for_overlap:
                 apply_overlap = True
                 if break_reason.startswith("heading_") and element_type in ['TITLE', 'HEADING']:
                     apply_overlap = False
                 if apply_overlap:
                    overlap_text = " ".join(last_chunk_last_sentences_for_overlap)
                    if overlap_text:
                        current_chunk_text_parts.append(f"[Overlap Start] {overlap_text} [Overlap End]\n\n")
                        current_size += len(current_chunk_text_parts[-1])

        if element_type == 'TITLE':
            current_title = element_text
            current_heading = None
            current_subheading = None
        elif element_type == 'HEADING':
            current_heading = element_text
            current_subheading = None
        elif element_type == 'SUBHEADING':
            current_subheading = element_text
        element['section_hierarchy'] = {1: current_title, 2: current_heading, 3: current_subheading}


        prefix = ""
        if current_chunk_text_parts:
            last_element_type = current_chunk_elements[-1]['type'] if current_chunk_elements else None
            if is_heading:
                 prefix = "\n\n"
            elif is_list_item:
                 if last_element_type == 'LIST_ITEM':
                      prefix = "\n"
                 else:
                      prefix = "\n"
            else:
                 if last_element_type in ['TITLE', 'HEADING', 'SUBHEADING']:
                      prefix = "\n\n"
                 else:
                     if last_element_type not in ['LIST_ITEM', 'TABLE']:
                          prefix = "\n\n"
                     else:
                          prefix = "\n"

        current_chunk_elements.append(element)
        current_chunk_text_parts.append(prefix + element_text)
        current_size += len(element_text) + len(prefix)

    if current_chunk_elements:
        finalized_chunk = _finalize_chunk(
            current_chunk_elements,
            current_chunk_text_parts,
            last_chunk_last_sentences_for_overlap
        )
        merged = False
        if final_chunks:
            prev_chunk = final_chunks[-1]
            finalized_chunk_len = len(finalized_chunk['text'])
            prev_chunk_len = len(prev_chunk['text'])
            combined_len_approx = prev_chunk_len + finalized_chunk_len
            if finalized_chunk_len < AGGRESSIVE_MERGE_THRESHOLD:
                h1_compatible = (prev_chunk.get('heading_level_1') == finalized_chunk.get('heading_level_1'))
                if h1_compatible and combined_len_approx < max_chunk_size * 1.3:
                    _perform_merge(prev_chunk, finalized_chunk, overlap_sentences)
                    merged = True
            elif not merged and finalized_chunk_len < min_chunk_size:
                compatible_context = (
                    prev_chunk.get('heading_level_1') == finalized_chunk.get('heading_level_1') and
                    prev_chunk.get('heading_level_2') == finalized_chunk.get('heading_level_2')
                )
                if compatible_context and combined_len_approx < max_chunk_size * 1.2:
                    _perform_merge(prev_chunk, finalized_chunk, overlap_sentences)
                    merged = True
        if not merged:
            final_chunks.append(finalized_chunk)

    return final_chunks



---

ENAHNCED FEATURE EXTRACTION



---




In [None]:
def _finalize_chunk(
    elements: List[Dict[str, Any]],
    text_parts: List[str],
    last_chunk_sentences_for_overlap: List[str] # Used only if overlap logic needs it here
) -> Dict[str, Any]:
    """Helper to finalize a chunk: combine text, extract features, format, handle warnings."""
    # Join text parts respecting prefixes added earlier
    raw_text = "".join(text_parts).strip()

    # --- Centralized Safety Warning Handling ---
    warnings_found = []
    processed_text = raw_text
    # Use finditer to get match objects for accurate replacement
    for match in WARNING_PATTERN.finditer(raw_text):
        warning_type = match.group(1).upper()
        # Group 2 is the main content, Group 3 is the terminator (.!? or \n\n)
        content = (match.group(2) + match.group(3)).strip()
        full_match_text = match.group(0) # The entire matched string

        # Check if we've already processed this exact text span to avoid duplicates from overlapping matches
        already_added = False
        for _, existing_content, _ in warnings_found:
            if content in existing_content or existing_content in content:
                 # Basic check to avoid adding subsections of already captured warnings
                 # This might need refinement depending on pattern behavior
                 already_added = True
                 break
        if not already_added:
             warnings_found.append((warning_type, content, full_match_text))


    warning_block = ""
    if warnings_found:
        warning_texts = [f"⚠️ {wtype}: {wcontent}" for wtype, wcontent, _ in warnings_found]
        warning_block = "\n".join(warning_texts) + "\n\n"

        # Remove original warnings from the text - be careful!
        # Sort by length descending to remove longer matches first (helps with nested cases)
        warnings_found.sort(key=lambda x: len(x[2]), reverse=True)
        temp_text = processed_text
        for _, _, full_match_text in warnings_found:
             # Replace only the first occurrence found in this pass to avoid cascading issues
             temp_text = temp_text.replace(full_match_text, "", 1)
        processed_text = temp_text


    # Combine warning block and cleaned text
    final_text = warning_block + processed_text
    final_text = re.sub(r'\n\s*\n', '\n\n', final_text) # Clean up multiple blank lines
    final_text = re.sub(r'[ \t]+', ' ', final_text) # Consolidate spaces/tabs
    final_text = final_text.strip()

    # Determine context from hierarchy (get the most specific available)
    final_hierarchy = {}
    current_title, current_heading, current_subheading = None, None, None
    for element in elements:
        h = element.get('section_hierarchy', {})
        # Update if the element provides a non-empty value for a level
        if h.get(1): current_title = h[1]
        if h.get(2): current_heading = h[2]
        if h.get(3): current_subheading = h[3]
    # Store the final determined hierarchy for the chunk
    final_hierarchy = {1: current_title, 2: current_heading, 3: current_subheading}

    # Create breadcrumb from hierarchy
    context_parts = [final_hierarchy.get(i) for i in sorted(final_hierarchy.keys()) if final_hierarchy.get(i)]
    breadcrumb_trail = ' > '.join(context_parts)

    # Extract features from the combined text
    features = extract_chunk_features(final_text) # Run on final text including warnings


    page_numbers = sorted(list(set(elem['page_number'] for elem in elements)))

    chunk = {
        'text': final_text,
        'context': breadcrumb_trail, # For simpler access
        'page_numbers': page_numbers,
        'breadcrumb_trail': breadcrumb_trail, # Explicitly keep
        'heading_level_1': final_hierarchy.get(1),
        'heading_level_2': final_hierarchy.get(2),
        'heading_level_3': final_hierarchy.get(3),
        'has_overlap_prefix': text_parts[0].startswith("[Overlap]") if text_parts else False
    }

    chunk.update(features) # Add all extracted features

    # Add content type indicators based on features
    content_type = set()
    if chunk.get('is_procedure', False): content_type.add('procedure')
    if chunk.get('contains_table', False): content_type.add('table')
    if chunk.get('figure_references'): content_type.add('figure')
    if chunk.get('safety_notices'): content_type.add('safety') # Populated by extract_chunk_features
    if chunk.get('is_diagnostic_content', False): content_type.add('diagnostic')
    if chunk.get('torque_specifications'): content_type.add('specification')
    if chunk.get('maintenance_intervals'): content_type.add('maintenance')
    if chunk.get('contains_diagnostic_codes'): content_type.add('dtc')
    if content_type:
        chunk['content_type'] = sorted(list(content_type))

    # Consolidate Metadata
    metadata = {
        'source_pages': f"{min(page_numbers)}-{max(page_numbers)}" if page_numbers else "N/A",
        'page_count': len(page_numbers),
        'chunk_length': len(final_text),
    }
    if chunk.get('vehicle_systems'): metadata['systems'] = chunk['vehicle_systems']
    if chunk.get('torque_specifications'): metadata['has_torque'] = True
    if chunk.get('safety_notices'): metadata['has_safety'] = True

    chunk['metadata'] = metadata

    # Clean up redundant top-level keys if they exist in metadata
    for key in ['vehicle_systems', 'torque_specifications', 'safety_notices']:
       if key in chunk:
            del chunk[key]

    return chunk

# --- Enhanced Feature Extraction Function ---
# (Largely similar, ensures patterns match against potentially modified final text)
def extract_chunk_features(text: str) -> Dict[str, Any]:
    """Extracts important features from chunk text with Ford-specific enhancements."""
    features = {}

    # Safety notices (relies on the prioritized format from _finalize_chunk)
    # Extract from the "⚠️ TYPE: content" format
    safety_matches = re.findall(r"⚠️\s*(WARNING|CAUTION|DANGER|IMPORTANT|NOTE|ATTENTION):\s*(.*?)(?=\n⚠️|\Z)", text, re.DOTALL)
    if safety_matches:
        features['safety_notices'] = [
            {'type': match[0].upper(), 'content': match[1].strip()}
            for match in safety_matches
        ]

    # Procedural steps
    # Simplified: Check for common list markers or step indicators
    step_indicators = re.findall(r'(?m)^\s*(\d+\.|\*|\-|•)\s+', text) # Check for lines starting with markers
    numbered_steps = re.search(r'\b(step\s+\d+)\b', text, re.IGNORECASE)
    if step_indicators or numbered_steps:
         features['is_procedure'] = True # Flag existence, detailed steps maybe too complex here
         # Example: Extract first few steps if needed
         # procedural_steps_pattern = re.compile(r'(?m)^\s*(\d+\.|\*|\-|•)\s+(.*)')
         # steps = procedural_steps_pattern.findall(text)
         # if steps: features['procedural_steps_preview'] = [f"{s[0]} {s[1][:100]}..." for s in steps[:3]]


    # Torque specifications
    torque_pattern = re.compile(r'(\d+(?:\.\d+)?)\s*(?:Nm|N·m|lb-ft|lb·ft|ft-lb|ft·lb|lb-in|lb·in|in-lb|in·lb)', re.IGNORECASE)
    torque_specs = torque_pattern.findall(text)
    if torque_specs:
        # Simple extraction for now, units require more parsing
        features['torque_specifications'] = [{'value': spec} for spec in torque_specs]


    # Table detection (basic heuristic)
    # More than 2 lines with multiple pipe symbols or consistent spacing might indicate a table
    lines = text.splitlines()
    pipe_lines = [line for line in lines if '|' in line.strip()]
    plus_lines = [line for line in lines if '+' in line.strip() and '-' in line.strip()]
    if len(pipe_lines) > 2 or len(plus_lines) > 1 :
        features['contains_table'] = True

    # Figure references
    figure_pattern = re.compile(r'\b(Fig(?:ure)?\.?\s+\d+[a-zA-Z]?)\b', re.IGNORECASE)
    figures = figure_pattern.findall(text)
    if figures:
        features['figure_references'] = sorted(list(set(figures)))

    # Vehicle systems (simple keyword check)
    systems_dict = { # Keep this relatively concise
        'engine': ['engine', 'ecoboost', 'powerstroke', 'cylinder', 'piston', 'timing'],
        'transmission': ['transmission', 'gearbox', 'clutch', 'transaxle', 'powershift', 'atf'],
        'electrical': ['battery', 'alternator', 'starter', 'fuse', 'relay', 'wiring', 'module', 'sensor', 'pcm', 'bcm'],
        'fuel': ['fuel tank', 'fuel pump', 'fuel filter', 'injector', 'emission'],
        'cooling': ['coolant', 'radiator', 'thermostat', 'water pump'],
        'brakes': ['brake', 'abs', 'rotor', 'caliper', 'pad'],
        'suspension': ['suspension', 'shock', 'strut', 'spring', 'control arm', 'alignment'],
        'drivetrain': ['transfer case', '4x4', 'awd', 'differential', 'axle', 'driveshaft'],
        'steering': ['steering', 'epas', 'rack', 'pinion', 'tie rod'],
        'body': ['door', 'hood', 'trunk', 'liftgate', 'bumper', 'windshield', 'wiper', 'mirror', 'body control module'],
        'hvac': ['climate', 'a/c', 'heater', 'compressor', 'condenser'],
        'exhaust':['exhaust', 'muffler', 'catalytic converter']
    }
    detected_systems = set()
    text_lower = text.lower()
    for system, keywords in systems_dict.items():
        for keyword in keywords:
            if re.search(r'\b' + keyword + r'\b', text_lower):
                detected_systems.add(system)
                break
    if detected_systems:
        features['vehicle_systems'] = sorted(list(detected_systems))

    # Maintenance intervals
    interval_pattern = re.compile(r'(\d{1,3}(?:,\d{3})*|\d+)\s+(miles|mi|kilometers|km|months|years)', re.IGNORECASE)
    intervals = interval_pattern.findall(text)
    if intervals:
        features['maintenance_intervals'] = [
            {'value': match[0].replace(',', ''), 'unit': match[1].lower()}
            for match in intervals
        ]

    # Diagnostic codes (OBDII)
    obd_code_pattern = re.compile(r'\b([PBUC]\d{4})\b')
    dtcs = obd_code_pattern.findall(text)
    if dtcs:
        features['contains_diagnostic_codes'] = True
        features['diagnostic_codes'] = sorted(list(set(dtcs)))

    diagnostic_keywords = re.search(r'\b(diagnos(e|is|tic)|troubleshoot|symptom|fault|error|dtc|code|check engine)\b', text, re.IGNORECASE)
    if diagnostic_keywords:
        features['is_diagnostic_content'] = True

    return features



---

POST PROCESSING



---





In [None]:
# --- Post Processing Function ---
def post_process_chunks(chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Perform final post-processing: IDs, links, related chunks."""
    if not chunks: return []

    # Sort chunks by first page number for stable ID assignment
    # Removed 'id' from sort key as it doesn't exist yet
    chunks.sort(key=lambda x: (min(x['page_numbers']) if x['page_numbers'] else 0))

    # --- Pass 1: Assign IDs and Link Previous ---
    for i, chunk in enumerate(chunks):
        # Assign ID to the current chunk
        chunk['id'] = f"chunk_{i+1:05d}" # Pad more for potentially many chunks

        # Link current chunk to the previous chunk (if it exists)
        if i > 0:
            chunk['prev_chunk_id'] = chunks[i-1]['id']

    # --- Pass 2: Link Next and Calculate Related Chunks ---
    heading_map_h2 = {}
    heading_map_h3 = {}
    feature_map = {}

    # Build maps using the now assigned IDs and link next chunks
    for i, chunk in enumerate(chunks):
        chunk_id = chunk['id'] # ID now exists

        # Link current chunk to the next chunk (if it exists)
        if i < len(chunks) - 1:
             chunk['next_chunk_id'] = chunks[i+1]['id']
             # Ensure the previous chunk also has its next_chunk_id set correctly
             # (This covers the case where we link from the previous loop)
             # If chunk i-1 exists, its next_chunk_id should point to chunk i
             if i > 0 and 'next_chunk_id' not in chunks[i-1]:
                 chunks[i-1]['next_chunk_id'] = chunk_id


        # Build maps for related chunk calculation
        h2 = chunk.get('heading_level_2')
        h3 = chunk.get('heading_level_3')
        if h3: # Prioritize H3 for specificity
             heading_map_h3.setdefault(h3, []).append(chunk_id)
        elif h2:
             heading_map_h2.setdefault(h2, []).append(chunk_id)


    # Assign related chunks (using the maps built above)
    for chunk in chunks:
        related_ids = set()
        chunk_id = chunk['id']

        # Related by heading
        h3 = chunk.get('heading_level_3')
        h2 = chunk.get('heading_level_2')
        if h3 and h3 in heading_map_h3:
             related_ids.update(cid for cid in heading_map_h3[h3] if cid != chunk_id)
        elif h2 and h2 in heading_map_h2: # Use H2 if H3 isn't specific enough or absent
             related_ids.update(cid for cid in heading_map_h2[h2] if cid != chunk_id)

        if related_ids:
             # Convert to list, sort, limit
             chunk['related_chunks'] = sorted(list(related_ids))[:5] # Limit for clarity

    return chunks



---

VALIDATION



---



In [None]:
# --- Validation Function ---
def validate_chunking(
    final_chunks: List[Dict[str, Any]],
    min_size: int = MIN_CHUNK_SIZE,
    max_size: int = MAX_CHUNK_SIZE,
    overlap_expected: bool = (OVERLAP_SENTENCE_COUNT > 0)
    ):
    """Validates the generated chunks based on size, overlap, and context."""
    print("\n--- Starting Chunk Validation ---")
    num_chunks = len(final_chunks)
    if num_chunks == 0:
        print("Validation Error: No chunks were generated.")
        return

    print(f"Total Chunks Generated: {num_chunks}")

    sizes = [len(chunk['text']) for chunk in final_chunks]
    below_min = sum(1 for s in sizes if s < min_size)
    # Allow some flexibility above max size due to sentence splitting logic
    oversized_threshold = max_size * 1.20
    above_max = sum(1 for s in sizes if s > max_size)
    significantly_above_max = sum(1 for s in sizes if s > oversized_threshold)
    avg_size = sum(sizes) / num_chunks if num_chunks > 0 else 0

    print("\nChunk Size Analysis:")
    print(f"  Average Size: {avg_size:.0f} characters")
    print(f"  Min Size: {min(sizes)} | Max Size: {max(sizes)}")
    print(f"  Chunks below Min ({min_size}): {below_min} ({below_min/num_chunks:.1%})")
    print(f"  Chunks above Max ({max_size}): {above_max} ({above_max/num_chunks:.1%})")
    print(f"  Chunks significantly above Max ({oversized_threshold:.0f}): {significantly_above_max} ({significantly_above_max/num_chunks:.1%})")

    if below_min > num_chunks * 0.02: # Tolerate fewer small chunks due to merging
         print(f"  WARNING: Still found {below_min} chunks below {min_size} chars. Review merge logic or MIN_CHUNK_SIZE.")
    if significantly_above_max > num_chunks * 0.05: # If > 5% are significantly oversized
         print(f"  WARNING: {significantly_above_max} chunks significantly exceed max size ({oversized_threshold:.0f}). Review splitting logic or MAX_CHUNK_SIZE.")

    # Overlap Validation
    if overlap_expected:
        overlap_chunks = [chunk for chunk in final_chunks if chunk.get('has_overlap_prefix')]
        num_overlap = len(overlap_chunks)
        expected_overlap = num_chunks - 1 if num_chunks > 0 else 0 # Rough expectation
        print(f"\nOverlap Analysis (Expected sentences: {OVERLAP_SENTENCE_COUNT}):")
        print(f"  Chunks with Overlap Prefix: {num_overlap} ({num_overlap/num_chunks:.1%})")
        # This check is less reliable with conditional overlap
        # if num_overlap < expected_overlap * 0.7 and num_chunks > 1:
        #      print("  INFO: Fewer chunks have overlap than expected, possibly due to conditional logic (breaks at headings).")
    else:
         print("\nOverlap Analysis: Overlap Disabled.")

    # Context Coherence Check (Sample) - check hierarchy levels
    print("\nContext Coherence Check (Sample):")
    coherent = True
    for i in range(1, min(num_chunks, 10)): # Check first few transitions
        prev_chunk = final_chunks[i-1]
        curr_chunk = final_chunks[i]

        # Check if context seems logically consistent (same level or one level down/up)
        p_h1, p_h2, p_h3 = prev_chunk.get('heading_level_1'), prev_chunk.get('heading_level_2'), prev_chunk.get('heading_level_3')
        c_h1, c_h2, c_h3 = curr_chunk.get('heading_level_1'), curr_chunk.get('heading_level_2'), curr_chunk.get('heading_level_3')

        context_jump = False
        if p_h1 != c_h1: context_jump = True # Major jump if H1 changes
        elif p_h2 != c_h2:
             # Allow change if H3 is None in both (moving between H2 sections)
             if p_h3 is not None or c_h3 is not None:
                  # Allow moving up from H3 to parent H2
                  if not (p_h3 is not None and c_h3 is None and p_h2 == c_h2):
                      context_jump = True
        # Allow H3 to change if H1 and H2 are the same
        # This basic check is not perfect but flags major discontinuities

        if context_jump:
             print(f"  Potential Context Jump:")
             print(f"    Chunk {i} Context: {prev_chunk.get('breadcrumb_trail', 'N/A')}")
             print(f"    Chunk {i+1} Context: {curr_chunk.get('breadcrumb_trail', 'N/A')}")
             coherent = False
             # break

    if coherent:
         print("  Context appears generally coherent in sample based on hierarchy.")

    # Readability Check Prompt
    print("\nReadability Check:")
    print("  Please manually review chunks in the generated HTML file ('enriched_chunks_viewer.html'),")
    print("  especially those containing lists, procedures, tables, or warnings,")
    print("  to ensure text flows correctly and formatting is acceptable.")

    print("\n--- Validation Complete ---")



---

CHUNK HTML EXPORT



---




In [None]:
import html # Import the html module for escaping

# --- HTML Export Function ---
def create_html_viewer(chunks: List[Dict[str, Any]], filename: str = "enriched_chunks_viewer.html"):
    """Generates an HTML file to browse the chunks."""
    if not chunks:
        print("No chunks to generate HTML viewer for.")
        return None

    num_chunks = len(chunks)

    # Use f-string for the main structure and escape CSS braces {{ }}
    html_content = f"""
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Chunk Viewer</title>
    <style>
        /* Main styling */
        body {{ font-family: sans-serif; line-height: 1.5; padding: 20px; }}
        .widget-label {{
            min-width: 160px;
            max-width: 160px;
            font-weight: bold;
            color: #444;
        }}

        /* Header styling */
        .search-header {{
            background: linear-gradient(to right, #3a7bd5, #00d2ff);
            color: white;
            padding: 15px;
            border-radius: 8px;
            margin-bottom: 20px;
            box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
            text-align: center;
        }}

        .search-header h2 {{
            margin-top: 0;
            margin-bottom: 10px;
        }}

        /* Panel styling */
        .panel {{
            border-radius: 8px;
            padding: 15px;
            margin-bottom: 15px;
            box-shadow: 0 2px 5px rgba(0, 0, 0, 0.05);
            border: 1px solid #e0e0e0;
            background-color: #fff;
        }}

        .panel-header {{
            font-size: 16px;
            font-weight: bold;
            margin-bottom: 10px;
            color: #2c3e50;
            border-bottom: 2px solid #ecf0f1;
            padding-bottom: 8px;
        }}

        /* Results styling */
        .result-item {{
            background-color: #f8f9fa;
            border-left: 4px solid #3a7bd5;
            border-radius: 5px;
            padding: 15px;
            margin-bottom: 15px;
            transition: all 0.2s ease;
            box-shadow: 0 2px 4px rgba(0, 0, 0, 0.05);
            page-break-inside: avoid; /* Try to prevent printing breaks inside */
        }}

        .result-item:hover {{
            box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1);
            transform: translateY(-2px);
        }}

        .result-item h3 {{
            color: #2c3e50;
            margin-top: 0;
            font-size: 18px;
            border-bottom: 1px solid #eee;
            padding-bottom: 8px;
        }}
        .chunk-header {{
            border-bottom: 1px solid #eee;
            margin-bottom: 10px;
            padding-bottom: 10px;
            display: flex;
            justify-content: space-between;
            align-items: center;
        }}
        .chunk-id {{ font-weight: bold; font-size: 1.1em; color: #333; }}
        .chunk-nav a {{ text-decoration: none; margin-left: 10px; color: #007bff; }}
        .chunk-nav a:hover {{ text-decoration: underline; }}
        .chunk-meta {{ font-size: 0.9em; color: #555; margin-bottom: 10px; }}
        .chunk-meta strong {{ color: #333; }}
        .chunk-text {{ white-space: pre-wrap; word-wrap: break-word; }}
        .features {{ margin-top: 10px; font-size: 0.9em; }}
        .features strong {{ display: block; margin-bottom: 5px; color: #444; }}
        .features ul {{ margin: 0; padding-left: 20px; list-style: disc; }}
        .features li {{ margin-bottom: 3px; }}
        .warning {{ color: #d9534f; font-weight: bold; }} /* Style for ⚠️ */
        .overlap-prefix {{ color: #777; font-style: italic; font-size: 0.9em; }}

        /* Score visualization */
        .score-bar {{
            height: 8px;
            background-color: #ecf0f1;
            border-radius: 4px;
            margin: 5px 0;
            overflow: hidden;
        }}

        .score-fill {{
            height: 100%;
            background: linear-gradient(to right, #3a7bd5, #00d2ff);
        }}

        .text-score-fill {{
            height: 100%;
            background: linear-gradient(to right, #11998e, #38ef7d);
        }}

        /* Badges for metadata */
        .badge {{
            display: inline-block;
            padding: 3px 8px;
            border-radius: 12px;
            font-size: 12px;
            font-weight: bold;
            margin-right: 5px;
            color: white;
        }}

        .page-badge {{
            background-color: #3498db;
        }}

        .type-badge {{
            background-color: #9b59b6;
        }}

        /* Content panels */
        .context-panel {{
            background-color: #f1f8fb;
            border-radius: 5px;
            padding: 8px 12px;
            margin: 8px 0;
            border-left: 3px solid #3498db;
        }}

        .text-panel {{
            max-height: 200px;
            overflow-y: auto;
            background-color: #f9f9f9;
            border-radius: 5px;
            padding: 12px;
            border: 1px solid #eaeaea;
            font-family: system-ui, -apple-system, sans-serif;
            line-height: 1.6;
        }}

        /* Button styling */
        .search-button {{
            text-transform: uppercase;
            letter-spacing: 1px;
            transition: all 0.3s ease;
        }}

        .search-button:hover {{
            transform: translateY(-2px);
            box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1);
        }}

        /* Loading animation */
        .spinner {{
            border: 4px solid rgba(0, 0, 0, 0.1);
            width: 36px;
            height: 36px;
            border-radius: 50%;
            border-left-color: #3a7bd5;
            animation: spin 1s linear infinite;
            margin: 20px auto;
        }}

        @keyframes spin {{
            0% {{ transform: rotate(0deg); }}
            100% {{ transform: rotate(360deg); }}
        }}

        /* Divider styling */
        .divider {{
            display: flex;
            align-items: center;
            margin: 15px 0;
        }}

        .divider span {{
            padding: 0 10px;
            color: #718093;
            font-weight: bold;
        }}

        .divider:before,
        .divider:after {{
            content: "";
            flex: 1;
            border-bottom: 1px solid #ecf0f1;
        }}

        /* Status area */
        .status-box {{
            background-color: #f8f9fa;
            border-radius: 5px;
            padding: 12px;
            margin: 10px 0;
            border-left: 4px solid #3498db;
        }}

        /* No results */
        .no-results {{
            text-align: center;
            padding: 40px;
            color: #666;
        }}

        .no-results-icon {{
            font-size: 60px;
            margin-bottom: 20px;
        }}

        /* Tooltip styling */
        .tooltip {{
            position: relative;
            display: inline-block;
            cursor: help;
            color: #3498db;
            margin-left: 5px;
        }}

        .tooltip:hover::after {{
            content: attr(data-tooltip);
            position: absolute;
            top: -30px;
            left: 50%;
            transform: translateX(-50%);
            background: #2c3e50;
            color: white;
            padding: 5px 10px;
            border-radius: 4px;
            white-space: nowrap;
            font-size: 12px;
            z-index: 1000;
        }}
    </style>
</head>
<body>
    <h1>Chunk Viewer</h1>
    <p>Displaying {num_chunks} chunks from Ford car manual</p>
""" # End of initial HTML string

    for i, chunk in enumerate(chunks):
        chunk_id = chunk.get('id', f'chunk_{i+1}')  # Use f-string for fallback ID
        prev_id = chunk.get('prev_chunk_id')
        next_id = chunk.get('next_chunk_id')

        # Use f-strings for building the rest of the HTML
        html_content += f'<div class="chunk" id="{chunk_id}">\n'
        # Header with ID and Nav
        html_content += '  <div class="chunk-header">\n'
        html_content += f'    <span class="chunk-id">{chunk_id}</span>\n'
        html_content += '    <span class="chunk-nav">'
        if prev_id:
            html_content += f'<a href="#{prev_id}">Previous</a>'
        if next_id:
            html_content += f'<a href="#{next_id}">Next</a>'
        html_content += '    </span>\n'
        html_content += '  </div>\n'

        # Metadata
        html_content += '  <div class="chunk-meta">\n'
        pages = chunk.get('page_numbers', [])
        page_str = f"{min(pages)}-{max(pages)}" if pages else "N/A"
        # Use html.escape for potentially unsafe values if needed, though numbers/context are likely safe
        html_content += f"    <strong>Pages:</strong> {page_str} ({len(pages)}) | "
        html_content += f"<strong>Length:</strong> {len(chunk.get('text', ''))} chars | "
        html_content += f"<strong>Context:</strong> {html.escape(chunk.get('breadcrumb_trail', 'N/A'))}<br/>\n" # Escape context just in case
        if chunk.get('content_type'):
            html_content += f"    <strong>Content Types:</strong> {', '.join(chunk['content_type'])}\n"
        html_content += '  </div>\n'

        # Features (using f-strings and html.escape where appropriate)
        features_html = ""
        meta = chunk.get('metadata', {})
        if meta.get('has_safety'):
            features_html += "<li>Safety Warnings Present</li>"
        if meta.get('systems'):
            features_html += f"<li><strong>Systems:</strong> {', '.join(html.escape(s) for s in meta['systems'])}</li>"
        if meta.get('parts'):
            features_html += f"<li><strong>Parts:</strong> {', '.join(html.escape(p) for p in meta['parts'])}</li>"
            features_html += f"<li><em>{html.escape(cat)}:</em> {', '.join(html.escape(t) for t in terms)}</li>"
            features_html += "</ul></li>"
        if chunk.get('figure_references'): # Assuming this comes from chunk, not meta
             features_html += f"<li><strong>Figures:</strong> {', '.join(html.escape(fig) for fig in chunk['figure_references'])}</li>"
        if chunk.get('diagnostic_codes'): # Assuming this comes from chunk, not meta
             features_html += f"<li><strong>DTCs:</strong> {', '.join(html.escape(dtc) for dtc in chunk['diagnostic_codes'])}</li>"

        if chunk.get('related_chunks'):
            links = [f'<a href="#{rid}">{rid}</a>' for rid in chunk['related_chunks']]
            features_html += f'<li><strong>Related:</strong> {" ".join(links)}</li>'

        if features_html:
            html_content += f'<div class="features"><strong>Extracted Features:</strong><ul>{features_html}</ul></div>\n'

        # Text Content - IMPORTANT: Escape the chunk text!
        text_display = html.escape(chunk.get('text', ''))
        # Highlight overlap prefix after escaping
        if chunk.get('has_overlap_prefix'):
            import re
            # Use non-capturing group for the prefix to simplify replacement
            # Match the literal brackets, not regex special characters
            prefix_pattern = r'^(\[Overlap Start\].*?\[Overlap End\]\\n\\n)'
            match = re.match(prefix_pattern, text_display, re.DOTALL)
            if match:
                prefix = match.group(1)
                remaining_text = text_display[len(prefix):]
                text_display = f'<span class="overlap-prefix">{prefix}</span>{remaining_text}'

        # Basic highlighting for warnings
        text_display = text_display.replace("⚠️", '<span class="warning">⚠️</span>')
        html_content += f'  <div class="chunk-text"><pre>{text_display}</pre></div>\n'  # Use <pre> for spacing

        html_content += '</div>\n\n'

    html_content += """
</body>
</html>
"""
    try:
        with open(filename, 'w', encoding='utf-8') as f:
            f.write(html_content)
        print(f"HTML viewer saved to '{filename}'")
        return filename
    except Exception as e:
        print(f"Error saving HTML viewer: {e}")
        return None



---

MAIN



---



In [None]:
import nltk
nltk.download('punkt_tab')

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


True

In [None]:
# --- Main Execution ---
if 'combined_data_structured' in locals() and combined_data_structured:
    print("Starting enhanced chunk creation process...")
    # Using process_elements_in_batches even for smaller data for consistency
    enriched_final_chunks = process_elements_in_batches(
        combined_data=combined_data_structured,
        max_chunk_size=MAX_CHUNK_SIZE,
        min_chunk_size=MIN_CHUNK_SIZE,
        overlap_sentences=OVERLAP_SENTENCE_COUNT
        # batch_size can be adjusted based on memory constraints
    )

    # Perform validation
    validate_chunking(enriched_final_chunks)

    # Save the final chunks for inspection
    output_filename_json = 'final_chunks_output.json'
    try:
        # Use a custom encoder if you have non-serializable types (like datetime), else standard dump
        class CustomEncoder(json.JSONEncoder):
            def default(self, obj):
                if isinstance(obj, (datetime.datetime, datetime.date)):
                    return obj.isoformat()
                # Add other type handlers if needed
                return json.JSONEncoder.default(self, obj)

        with open(output_filename_json, 'w', encoding='utf-8') as f:
            json.dump(enriched_final_chunks, f, indent=2, ensure_ascii=False, cls=CustomEncoder)
        print(f"\nFinal chunks saved to '{output_filename_json}'")

    except Exception as e:
        print(f"Error saving final chunks to JSON: {e}")

    # Generate HTML viewer
    create_html_viewer(enriched_final_chunks, "enriched_chunks_viewer.html")

else:
    print("Error: 'combined_data_structured' not found or is empty. Please ensure it's loaded correctly before running this script.")

Starting enhanced chunk creation process...
Processing 8172 elements in batches of 1000...
Manual spans 486 pages across 8172 elements.

Processing batch 1/9: Elements 1-1000 (Pages 1-70)


Creating chunks: 100%|██████████| 1000/1000 [00:00<00:00, 2790.26element/s]


Batch complete. Generated 99 chunks. Total chunks so far: 99

Processing batch 2/9: Elements 1001-2000 (Pages 70-128)


Creating chunks: 100%|██████████| 1000/1000 [00:00<00:00, 3530.58element/s]


Batch complete. Generated 85 chunks. Total chunks so far: 184

Processing batch 3/9: Elements 2001-3000 (Pages 128-184)


Creating chunks: 100%|██████████| 1000/1000 [00:00<00:00, 3998.52element/s]


Batch complete. Generated 84 chunks. Total chunks so far: 268

Processing batch 4/9: Elements 3001-4000 (Pages 184-244)


Creating chunks: 100%|██████████| 1000/1000 [00:00<00:00, 3109.36element/s]


Batch complete. Generated 96 chunks. Total chunks so far: 364

Processing batch 5/9: Elements 4001-5000 (Pages 244-304)


Creating chunks: 100%|██████████| 1000/1000 [00:00<00:00, 3058.57element/s]


Batch complete. Generated 98 chunks. Total chunks so far: 462

Processing batch 6/9: Elements 5001-6000 (Pages 304-346)


Creating chunks: 100%|██████████| 1000/1000 [00:00<00:00, 6943.07element/s]


Batch complete. Generated 49 chunks. Total chunks so far: 511

Processing batch 7/9: Elements 6001-7000 (Pages 346-422)


Creating chunks: 100%|██████████| 1000/1000 [00:00<00:00, 2587.51element/s]


Batch complete. Generated 114 chunks. Total chunks so far: 625

Processing batch 8/9: Elements 7001-8000 (Pages 422-475)


Creating chunks: 100%|██████████| 1000/1000 [00:00<00:00, 4323.79element/s]


Batch complete. Generated 71 chunks. Total chunks so far: 696

Processing batch 9/9: Elements 8001-8172 (Pages 475-486)


Creating chunks: 100%|██████████| 172/172 [00:00<00:00, 2706.25element/s]


Batch complete. Generated 18 chunks. Total chunks so far: 714

Finalizing all chunks...
Processing complete. Total chunks: 714

--- Starting Chunk Validation ---
Total Chunks Generated: 714

Chunk Size Analysis:
  Average Size: 1240 characters
  Min Size: 300 | Max Size: 2119
  Chunks below Min (500): 43 (6.0%)
  Chunks above Max (1800): 19 (2.7%)
  Chunks significantly above Max (2160): 0 (0.0%)

Overlap Analysis (Expected sentences: 1):
  Chunks with Overlap Prefix: 0 (0.0%)

Context Coherence Check (Sample):
  Context appears generally coherent in sample based on hierarchy.

Readability Check:
  Please manually review chunks in the generated HTML file ('enriched_chunks_viewer.html'),
  to ensure text flows correctly and formatting is acceptable.

--- Validation Complete ---

Final chunks saved to 'final_chunks_output.json'
HTML viewer saved to 'enriched_chunks_viewer.html'


# 4. Create Embeddings with Vertex AI and Store in MongoDB Atlas Cluster

We will now send our final chunks to a **Vertex AI embedding model** and store them with their respective embeddings in MongoDB in the following steps:

1. Generate embeddings using Vertex AI
2. Format the data for MongoDB
3. Store the chunks and embeddings in MongoDB

In [None]:
# Install necessary packages
!pip install pymongo
!pip install pymongo google-cloud-aiplatform

In [None]:
import json
import os
import datetime
import time
from google.cloud import aiplatform
from typing import List, Dict, Any, Optional

def safe_mongodb_imports():
    """Import MongoDB-related modules safely, avoiding the SON conflict."""
    import json
    import pymongo # Import the main package
    # Create local references to needed functions and classes from pymongo
    MongoClient = pymongo.MongoClient
    # Import json_util from bson (which should now be pymongo's bson)
    from bson import json_util

    # Return the necessary components
    return MongoClient, json_util

# --- Use the safe import function defined previously ---
try:
    MongoClient, json_util = safe_mongodb_imports()
    print("MongoDB modules imported safely.")
except NameError:
    print("Error: 'safe_mongodb_imports' not defined. Please run the cell that defines it (e.g., Bdbbnyonzsma).")
    # Fallback to direct imports, hoping the environment is fixed
    from pymongo import MongoClient, ASCENDING, IndexModel
    from pymongo.errors import ConnectionFailure, OperationFailure
    from bson import json_util


# --- Configuration ---
project_id = "ENTER_PROJECT_ID" # Your GCP Project ID
location = "ENTER_LOCATION" # GCP location for Vertex AI
model_name = "text-embedding-005" # Or the specific model you use
# Embedding dimension for the model (e.g., text-embedding-005 uses 768)
EXPECTED_EMBEDDING_DIMENSION = 768

mongodb_uri = "YOUR_URI" #enter correct uri here
db_name = "DB_NAME" # Your DB name
collection_name = "COLLECTION_NAME" # Your Collection name

MongoDB modules imported safely.




---

GENERATE EMBEDDINGS



---



In [None]:
# Generate embeddings using Vertex AI with batching and error handling
def generate_embeddings_vertex(
    chunks: List[Dict[str, Any]],
    project_id: str,
    location: str = "us-central1",
    model_id: str = "text-embedding-005",
    task_type: str = "RETRIEVAL_DOCUMENT",
    output_dim: Optional[int] = None,
    batch_size: int = 5
):
    """
    Generate embeddings using the Vertex AI Text Embedding API.

    Args:
        chunks: List of chunks to generate embeddings for
        project_id: GCP project ID
        location: GCP location (e.g., "us-central1")
        model_id: Model ID (e.g., "text-embedding-005")
        task_type: Task type for embedding (e.g., "RETRIEVAL_DOCUMENT")
        output_dim: Optional output dimensionality (between 1 and max dimensions)
        batch_size: Number of chunks to process in each batch

    Returns:
        List of embeddings corresponding to the chunks
    """
    try:
        # Import the Vertex AI SDK modules
        from vertexai.language_models import TextEmbeddingModel, TextEmbeddingInput
    except ImportError:
        raise ImportError(
            "The vertexai package is required. Install with: pip install google-cloud-aiplatform"
        )

    # Initialize Vertex AI
    import vertexai
    vertexai.init(project=project_id, location=location)

    # Load the text embedding model
    model = TextEmbeddingModel.from_pretrained(model_id)
    print(f"Successfully loaded embedding model: {model_id}")

    all_embeddings = []

    # Process chunks in batches
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i+batch_size]
        print(f"Processing embedding batch {i//batch_size + 1}/{(len(chunks)-1)//batch_size + 1}")

        # Extract text from each chunk
        batch_texts = [chunk['text'] if isinstance(chunk, dict) else chunk for chunk in batch]

        # Create TextEmbeddingInput objects
        inputs = [TextEmbeddingInput(text=text, task_type=task_type) for text in batch_texts]

        try:
            # Set up the keyword arguments for dimensionality
            kwargs = {}
            if output_dim is not None:
                kwargs['output_dimensionality'] = output_dim

            # Get embeddings for the batch
            embeddings = model.get_embeddings(inputs, **kwargs)

            # Extract the embedding vectors
            batch_embeddings = [emb.values for emb in embeddings]
            all_embeddings.extend(batch_embeddings)

            # Add short delay to avoid rate limiting
            if i + batch_size < len(chunks):
                time.sleep(0.1)

        except Exception as e:
            print(f"Error generating embeddings for batch {i//batch_size + 1}: {str(e)}")
            # Add None for failed embeddings
            all_embeddings.extend([None] * len(batch))

    # Check for any failed embeddings
    failed_count = sum(1 for emb in all_embeddings if emb is None)
    if failed_count > 0:
        print(f"Warning: {failed_count} chunks failed to generate embeddings")

    return all_embeddings



---

CREATE MONGODB DOCUMENTS



---



In [None]:
# Create MongoDB documents with chunks and embeddings
def create_mongodb_documents(chunks, embeddings):
    """
    Create MongoDB documents from chunks and embeddings.

    Args:
        chunks: List of chunk documents
        embeddings: List of embeddings for the chunks

    Returns:
        List of documents ready for MongoDB insertion
    """
    documents = []
    for chunk, embedding in zip(chunks, embeddings):
        # Skip if embedding generation failed
        if embedding is None:
            continue

        # Create document with all chunk fields and embedding
        document = chunk.copy() if isinstance(chunk, dict) else {"text": chunk}
        document["embedding"] = embedding
        document["embedding_timestamp"] = datetime.datetime.now().isoformat()

        # Sanitize document for MongoDB
        _sanitize_document(document)

        documents.append(document)

    return documents

# Helper function to sanitize documents for MongoDB compatibility
def _sanitize_document(doc):
    """Clean document for MongoDB compatibility"""
    # Replace '.' in keys with '_' (MongoDB doesn't allow dots in field names)
    keys_to_update = [k for k in doc.keys() if '.' in k]
    for key in keys_to_update:
        new_key = key.replace('.', '_')
        doc[new_key] = doc.pop(key)

    # Handle nested dictionaries
    for key, value in doc.items():
        if isinstance(value, dict):
            _sanitize_document(value)
        elif isinstance(value, list):
            for item in value:
                if isinstance(item, dict):
                    _sanitize_document(item)

# Store documents in MongoDB with proper connection handling
def store_in_mongodb(documents, mongodb_uri, db_name, collection_name):
    """
    Store documents in MongoDB with error handling and connection management.

    Args:
        documents: List of documents to store
        mongodb_uri: MongoDB connection URI
        db_name: Database name
        collection_name: Collection name

    Returns:
        Result of the insert operation
    """
    if not documents:
        print("No documents to store in MongoDB")
        return None

    # Validate MongoDB connection parameters
    if not mongodb_uri or not db_name or not collection_name:
        raise ValueError("MongoDB URI, database name, and collection name are required")

    client = None
    try:
        # Connect to MongoDB with a timeout
        client = MongoClient(mongodb_uri, serverSelectionTimeoutMS=5000)

        # Test the connection
        client.admin.command('ping')

        db = client[db_name]
        collection = db[collection_name]

        # Insert documents with a reasonable batch size
        batch_size = min(1000, len(documents))  # MongoDB has a 16MB limit per batch
        result = None

        for i in range(0, len(documents), batch_size):
            batch = documents[i:i+batch_size]
            batch_result = collection.insert_many(batch)

            if result is None:
                result = batch_result
            else:
                # Combine inserted IDs
                result.inserted_ids.extend(batch_result.inserted_ids)

            print(f"Inserted batch {i//batch_size + 1}/{(len(documents)-1)//batch_size + 1} ({len(batch)} documents)")

        return result
    except ConnectionFailure as e:
        print(f"MongoDB connection error: {str(e)}")
        raise
    except Exception as e:
        print(f"Error storing documents in MongoDB: {str(e)}")
        raise
    finally:
        # Ensure the client connection is closed
        if client:
            client.close()
            print("MongoDB connection closed")

# Main pipeline with comprehensive error handling
def process_chunks_to_mongodb(
    final_chunks,
    project_id,
    location,
    model_id,
    mongodb_uri,
    db_name,
    collection_name,
    task_type="RETRIEVAL_DOCUMENT",
    output_dim=None
):
    """
    Process chunks, generate embeddings, and store in MongoDB.

    Args:
        final_chunks: List of processed chunks
        project_id: GCP project ID
        location: GCP location
        model_id: Model ID for embeddings
        mongodb_uri: MongoDB connection URI
        db_name: MongoDB database name
        collection_name: MongoDB collection name
        task_type: Task type for embeddings
        output_dim: Optional output dimensionality

    Returns:
        Result of the MongoDB insert operation
    """
    try:
        # Generate embeddings
        print(f"Generating embeddings for {len(final_chunks)} chunks...")
        embeddings = generate_embeddings_vertex(
            chunks=final_chunks,
            project_id=project_id,
            location=location,
            model_id=model_id,
            task_type=task_type,
            output_dim=output_dim
        )
        print(f"Generated {len(embeddings)} embeddings")

        # Create MongoDB documents
        print("Creating MongoDB documents...")
        documents = create_mongodb_documents(final_chunks, embeddings)
        print(f"Created {len(documents)} documents")

        # Store in MongoDB
        print(f"Storing documents in MongoDB ({db_name}.{collection_name})...")
        result = store_in_mongodb(documents, mongodb_uri, db_name, collection_name)

        return result
    except Exception as e:
        print(f"Error in processing pipeline: {str(e)}")
        raise