# 🧠 Seclock Document Q&A – Multimodal Document Pipeline (Streamlit UI)

This script powers an interactive Streamlit app that answers technical questions about Seclock’s door hardware documents using **multimodal AI**—combining vector search, LLM reasoning, and image validation via Snowflake Cortex.

## 🔧 High-Level Features

- 📥 **Document Ingestion & Processing**
  - Splits PDFs into per-page images and text using `parse_document()`.
  - Stores images in a Snowflake stage and text/metadata in structured tables.
  - Generates image embeddings using `embed_image_1024`.
---
- 🔍 **Semantic Search & Retrieval**
  - Converts user questions into temporary query images to leverage image embeddings.
  - Embeds the image and performs vector search via Cortex Search Service.
  - Retrieves top matching enriched chunks (text + metadata + image reference).
---
- 🧠 **LLM-Based Text Answering**
  - Feeds retrieved context into Claude (`claude-3-7-sonnet`) to generate:
    - Direct answers
    - Confidence scores
    - Justifications
    - Markdown-linked citations (`[Document - page X](presigned_url)`)
---
- 🖼️ **Image-Based Reasoning**
  - Identifies cited pages from the text answer and filters matching images.
  - Submits images asynchronously to Cortex for answer validation.
  - Uses image metadata and visual context to extract direct answers or critiques.
---
- 🧪 **Answer Synthesis**
  - Combines text and image-based answers into a final, human-readable response.
  - Rewrites for clarity, accuracy, and directness.
  - Prioritizes newer or more reliable sources and flags conflicting evidence.
  - Provides full citation trail with links to specific document pages.
---
- 🔗 **Embedding & Retrieval Enrichment**
  - Merges document metadata, summaries, page-level chunks, and vectors into a retrieval-optimized format.
  - Allows precise filtering and future expansion (e.g., by product line or brand).
---
- 💬 **Streamlit Chat UI**
  - Provides an interactive chatbot interface with step-by-step progress feedback.
  - Includes expandable debug sections for:
    - Source documents used
    - Text answer reasoning
    - Image-based critiques
    - LLM prompt previews


In [None]:
# Import python packages
import os
import sys
import json
import shutil
import datetime
import re
import time
import hashlib
from difflib import SequenceMatcher
import tempfile
from textwrap import dedent
import streamlit as st
from PIL import Image, ImageDraw, ImageFont
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import contextmanager
from dataclasses import dataclass
from typing import List
from typing import Tuple
import snowflake.snowpark.session as session
import pdfplumber
import PyPDF2
import streamlit as st
from snowflake.snowpark.context import get_active_session
from snowflake.core import Root
from snowflake.cortex import complete, CompleteOptions
sp_session = get_active_session()

## 📄 PDF Preprocessing Pipeline for Document Analysis

Preprocesses PDFs stored in a Snowflake stage, preparing them for downstream AI document analysis. It lists all PDF files in a specified input stage, downloads each file temporarily, and performs two key operations: 

1. Splitting the PDF into individual pages and uploading each as a separate PDF file
2. Converting each page into a high-resolution image, optionally scaled to a maximum dimension, and uploading the images back to a specified output stage. 

In [None]:
def print_info(msg: str) -> None:
    """Print info message"""
    print(f"INFO: {msg}", file=sys.stderr)


def print_error(msg: str) -> None:
    """Print error message"""
    print(f"ERROR: {msg}", file=sys.stderr)
    if hasattr(st, "error"):
        st.error(msg)


def print_warning(msg: str) -> None:
    """Print warning message"""
    print(f"WARNING: {msg}", file=sys.stderr)


@dataclass
class Config:
    input_stage: str = "@UTILS.AI.STOCK_IKB_DOCUMENTS/pre_processed/"
    output_stage: str = (
        "@UTILS.AI.STOCK_IKB_DOCUMENTS/"  # Base output stage without subdirectories
    )
    input_path: str = "pre_processed"
    output_pdf_path: str = "paged_pdf"
    output_image_path: str = "paged_image"
    allowed_extensions: List[str] = None
    max_dimension: int = 1500  # Maximum dimension in pixels before scaling
    dpi: int = 300  # Default DPI for image conversion

    def __post_init__(self):
        if self.allowed_extensions is None:
            self.allowed_extensions = [".pdf"]


class PDFProcessingError(Exception):
    """Base exception for PDF processing errors"""


class FileDownloadError(PDFProcessingError):
    """Raised when file download fails"""


class PDFConversionError(PDFProcessingError):
    """Raised when PDF conversion fails"""


@contextmanager
def managed_temp_file(suffix: str = None) -> str:
    """Context manager for temporary file handling"""
    temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
    try:
        yield temp_file.name
    finally:
        # Don't delete the file immediately, let the caller handle cleanup
        pass


def cleanup_temp_file(file_path: str) -> None:
    """Clean up a temporary file"""
    try:
        if os.path.exists(file_path):
            os.unlink(file_path)
    except OSError as e:
        print_warning(f"Failed to delete temporary file {file_path}: {e}")


def list_pdf_files(session: session.Session, config: Config) -> List[dict]:
    """List all PDF files in the source stage"""
    try:
        # Use LIST command instead of DIRECTORY function
        query = f"""
        LIST {config.input_stage}
        """

        file_list = session.sql(query).collect()

        # Filter for PDF files
        pdf_files = []
        for file_info in file_list:
            full_path = file_info["name"]
            # Extract just the filename from the full path
            file_name = os.path.basename(full_path)

            if any(
                file_name.lower().endswith(ext) for ext in config.allowed_extensions
            ):
                pdf_files.append(
                    {
                        "RELATIVE_PATH": file_name,  # Use just the filename
                        "FULL_STAGE_PATH": full_path,  # Use full path for download
                        "SIZE": file_info["size"] if "size" in file_info else 0,
                    }
                )

        print_info(f"Found {len(pdf_files)} PDF files in the stage")
        return pdf_files
    except Exception as e:
        print_error(f"Failed to list files: {e}")
        raise


def download_file_from_stage(
    session: session.Session, file_path: str, config: Config
) -> str:
    """Download a file from stage using session.file.get"""
    # Create a temporary directory
    temp_dir = tempfile.mkdtemp()
    try:
        # Ensure there are no double slashes in the path
        stage_path = f"{config.input_stage.rstrip('/')}/{file_path.lstrip('/')}"

        # Get the file from stage
        get_result = session.file.get(stage_path, temp_dir)
        if not get_result or get_result[0].status != "DOWNLOADED":
            raise FileDownloadError(f"Failed to download file: {file_path}")

        # Construct the local path where the file was downloaded
        local_path = os.path.join(temp_dir, os.path.basename(file_path))
        if not os.path.exists(local_path):
            raise FileDownloadError(f"Downloaded file not found at: {local_path}")

        return local_path
    except Exception as e:
        print_error(f"Error downloading {file_path}: {e}")
        # Clean up the temporary directory
        try:
            import shutil

            shutil.rmtree(temp_dir)
        except Exception as cleanup_error:
            print_warning(f"Failed to clean up temporary directory: {cleanup_error}")
        raise FileDownloadError(f"Failed to download file: {e}")


def upload_file_to_stage(
    session: session.Session, file_path: str, output_path: str, config: Config
) -> str:
    """Upload file to the output stage"""
    try:
        # Get the directory and filename from the output path
        output_dir = os.path.dirname(output_path)
        base_name = os.path.basename(output_path)

        # Create the full stage path with subdirectory
        stage_path = f"{config.output_stage.rstrip('/')}/{output_dir.lstrip('/')}"

        # Read the content of the original file
        with open(file_path, "rb") as f:
            file_content = f.read()

        # Create a new file with the correct name
        temp_dir = tempfile.gettempdir()
        temp_file_path = os.path.join(temp_dir, base_name)

        # Write the content to the new file
        with open(temp_file_path, "wb") as f:
            f.write(file_content)

        # Upload the file using session.file.put with compression disabled
        put_result = session.file.put(
            temp_file_path, stage_path, auto_compress=False, overwrite=True
        )

        # Check upload status
        if not put_result or len(put_result) == 0:
            raise Exception(f"Failed to upload file: {base_name}")

        if put_result[0].status not in ["UPLOADED", "SKIPPED"]:
            raise Exception(f"Upload failed with status: {put_result[0].status}")

        # Clean up the temporary file
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)

        return f"Successfully uploaded {base_name} to {stage_path}"
    except Exception as e:
        print_error(f"Error uploading file: {e}")
        raise


def process_pdf_files(config: Config) -> None:
    """Main process to orchestrate the PDF splitting"""
    try:
        session = get_active_session()
        pdf_files = list_pdf_files(session, config)

        for file_info in pdf_files:
            file_path = file_info["RELATIVE_PATH"]
            print_info(f"Processing: {file_path}")

            try:
                # Download the PDF file
                local_pdf_path = download_file_from_stage(session, file_path, config)

                # Get base filename without extension
                base_name = os.path.splitext(os.path.basename(file_path))[0]

                # Extract individual PDF pages
                with open(local_pdf_path, "rb") as file:
                    pdf_reader = PyPDF2.PdfReader(file)
                    num_pages = len(pdf_reader.pages)
                    print_info(f"Converting PDF to {num_pages} pages of PDFs")

                    for i in range(num_pages):
                        page_num = i + 1
                        s3_pdf_output_path = (
                            f"{config.output_pdf_path}/{base_name}_page_{page_num}.pdf"
                        )
                        pdf_writer = PyPDF2.PdfWriter()
                        pdf_writer.add_page(pdf_reader.pages[i])
                        temp_file = tempfile.NamedTemporaryFile(
                            delete=False, suffix=".pdf"
                        )
                        local_pdf_tmp_file_name = temp_file.name
                        with open(local_pdf_tmp_file_name, "wb") as output_file:
                            pdf_writer.write(output_file)
                        
                        upload_file_to_stage(
                            session, local_pdf_tmp_file_name, s3_pdf_output_path, config
                        )
                        cleanup_temp_file(local_pdf_tmp_file_name)
                            
                # Convert PDF to images                
                with pdfplumber.open(local_pdf_path) as pdf:
                    print_info(f"Converting PDF to {len(pdf.pages)} images")
                    for i, page in enumerate(pdf.pages):
                        page_num = i + 1
                        # Get page dimensions
                        width = page.width
                        height = page.height

                        # Determine if scaling is needed
                        max_dim = max(width, height)
                        if max_dim > config.max_dimension:
                            # Calculate scale factor to fit within max_dimension
                            scale_factor = config.max_dimension / max_dim
                            width = int(width * scale_factor)
                            height = int(height * scale_factor)

                        img = page.to_image(resolution=config.dpi)
                        temp_file = tempfile.NamedTemporaryFile(
                            delete=False, suffix=".png"
                        )
                        local_image_tmp_file_name = temp_file.name
                        img.save(local_image_tmp_file_name)

                        s3_image_output_path = (
                            f"{config.output_image_path}/{base_name}_page_{page_num}.png"
                        )
                        
                        upload_file_to_stage(
                            session, local_image_tmp_file_name, s3_image_output_path, config
                        )
                        cleanup_temp_file(local_image_tmp_file_name)
                        
                # Clean up the original downloaded file
                cleanup_temp_file(local_pdf_path)

            except Exception as e:
                print_error(f"Error processing {file_path}: {e}")
                continue

    except Exception as e:
        print_error(f"Fatal error in process_pdf_files: {e}")
        raise

In [None]:
config = Config(dpi=300)
process_pdf_files(config)

## 🔍 Document Image Preview

To check everything has been processed as planned, we can look at an image representing a page from the PDFs.

In [None]:
image=sp_session.file.get_stream(
     f"@UTILS.AI.STOCK_IKB_DOCUMENTS/paged_image/2025-sargent-price-book_page_16.png",
     decompress=False).read()
st.image(image)

In [None]:
use role utils__ai__owner__s_role

## 🧠 Batch Image Embedding with Cortex and Snowpark

This workflow performs batch image embedding using a Python stored procedure

1. **Identify Unprocessed Images**
   A temporary table (`limit_directory_table`) is created by listing all image files in the stage (`@utils.ai.stock_ikb_documents/paged_image/`) and filtering out those already embedded in the `output_vector_table`.

2. **Assign Row Numbers for Batching**
   Each unprocessed image file is assigned a `row_number()` so batches can be defined by row ranges (`start_rn` to `end_rn`).

3. **Define Embedding Procedure**
   A Python stored procedure `run_image_embedding_batch(start_rn, end_rn)` is created. It:

   * Reads a batch of image files from the temporary table.
   * Extracts file and metadata (e.g. file name, page number).
   * Computes an image embedding using `snowflake.cortex.embed_image_1024` with the `voyage-multimodal-3` model.
   * Saves the embeddings to `output_vector_table`.

4. **Queue Up Batch Jobs**
   The total number of batches is calculated, and a list of SQL `CALL` statements is built, one per batch.

5. **Run Jobs Concurrently**
   A loop manages job execution with up to 5 concurrent asynchronous jobs at a time. Each job is submitted using `.collect_nowait()` and polled until it completes.

6. **Monitor and Retry**
   Each batch is logged upon completion or failure, and the loop continues until all batches are processed.

This setup allows high-throughput embedding of images inside Snowflake, using Cortex's multimodal capabilities with minimal manual orchestration.


In [None]:
create or replace procedure run_image_embedding_batch(start_rn int, end_rn int)
returns string
language python
runtime_version = 3.9
packages = ('snowflake-snowpark-python')
handler = 'embed_handler'
AS
$$
def embed_handler(session, start_rn, end_rn):
    df = session.sql(f'''
        select
            concat('paged_image/', split_part(relative_path, '/', -1)) as file_name,
            regexp_substr(file_name, 'paged_image/(.*)\\.png$', 1, 1, 'e', 1) as paged_file_name,
            split_part(paged_file_name, '_page_', 0) as original_file_name,
            split_part(paged_file_name, '_page_', 2)::int as page_number,
            '@utils.ai.stock_ikb_documents' as stage_prefix,
            to_file(file_url)  as image_file,
            snowflake.cortex.embed_image_1024(
                'voyage-multimodal-3', 
                '@utils.ai.stock_ikb_documents', 
                concat('paged_image/', split_part(relative_path, '/', -1))
            ) as image_vector
        from limit_directory_table
        where rn between {start_rn} and {end_rn}
    ''')
    df.write.save_as_table("OUTPUT_VECTOR_TABLE", mode="append")
    return f"Embedded and saved RN {start_rn} to {end_rn}"
$$;

In [None]:
BATCH_SIZE = 10
MAX_CONCURRENT = 5

# 1. Create LIMIT_DIRECTORY_TABLE if not exists
sp_session.sql("""
    create or replace temporary table limit_directory_table as
    select
        *,
        row_number() over (order by relative_path) as rn
    from
        directory(@utils.ai.stock_ikb_documents)
    where
        relative_path like '%paged_image/%'
        and
        relative_path not in (
            select file_name from output_vector_table
        )
""").collect()

# 2. Get total batches
max_rn = sp_session.sql("select max(rn) AS max_rn from limit_directory_table").collect()[0]["MAX_RN"]
total_batches = (max_rn + BATCH_SIZE - 1) // BATCH_SIZE

# 3. Prepare all batch configs
batch_queue = []
for i in range(total_batches):
    start_rn = i * BATCH_SIZE + 1
    end_rn = min((i + 1) * BATCH_SIZE, max_rn)
    label = f"Batch {i+1}: RN {start_rn}-{end_rn}"
    sql = f"call run_image_embedding_batch({start_rn}, {end_rn})"
    batch_queue.append((sql, label))

# 4. Loop with max 5 concurrent jobs
active_jobs = []

while batch_queue or active_jobs:
    # Launch jobs if we have capacity
    while batch_queue and len(active_jobs) < MAX_CONCURRENT:
        sql, label = batch_queue.pop(0)
        print(f"🚀 Submitting async job for {label}")
        try:
            job = sp_session.sql(sql).collect_nowait()
            active_jobs.append((job, label))
        except Exception as e:
            print(f"❌ Failed to submit {label}: {e}")

    # Poll active jobs
    for job, label in active_jobs.copy():
        if job.is_done():
            try:
                result = job.result()
                print(f"✅ {label} completed: {result}")
            except Exception as e:
                print(f"❌ {label} failed: {e}")
            active_jobs.remove((job, label))

    if active_jobs:
        time.sleep(15)

print("🎉 All batches processed.")

## 🔖 Extract Text from PDF Pages

This SQL script creates a table (`pdf_pages`) that extracts and stores parsed text content from individual PDF pages:

1. **Filter Input Files**
   It queries the stage `@utils.ai.stock_ikb_documents` and filters files whose path matches the pattern `%paged_pdf/%`, meaning individual page PDFs from previously split documents.

2. **Extract File Metadata**
   For each PDF file:

   * `file_name` is constructed by prefixing the relative path with `paged_pdf/`.
   * `paged_file_name` extracts just the PDF filename using regex.
   * `original_file_name` removes the `_page_X` suffix to get the base document name.
   * `page_number` is parsed from the filename to track the page.

3. **Generate File References**
   The `to_file(file_url)` function creates a file object for use in Cortex functions.

4. **Parse PDF Content with Cortex**
   The `snowflake.cortex.parse_document` function is called on each page to extract its text layout. The result is cast to a string, then parsed as JSON and stored in the `pdf_text` column.

5. **Output the Resulting Table**
   The final table `pdf_pages` includes:

   * File path and name metadata
   * Page number
   * File reference
   * Structured PDF content parsed by Cortex

This process enables structured, searchable access to individual page-level text from large documents using Cortex's layout-aware parsing engine.


In [None]:
create or replace table pdf_pages as
select
    concat('paged_pdf/', split_part(relative_path, '/', -1)) as file_name,
    regexp_substr(file_name, 'paged_pdf/(.*)\\.pdf$', 1, 1, 'e', 1) as paged_file_name,
    split_part(paged_file_name, '_page_', 0) as original_file_name,
    split_part(paged_file_name, '_page_', 2)::int as page_number,
    '@UTILS.AI.STOCK_IKB_DOCUMENTS' as stage_prefix,
    to_file(file_url) as pdf_file,
    parse_json(
        to_varchar(
            snowflake.cortex.parse_document(
                '@UTILS.AI.STOCK_IKB_DOCUMENTS',
                file_name,
                {'mode': 'LAYOUT'}
            )
        )
    ):content as pdf_text
from
    directory(@utils.ai.stock_ikb_documents)
where
    relative_path like '%paged_pdf/%'
;

## 📖 Enriching PDF Pages with Metadata and Text Chunks for Semantic Search

This SQL pipeline creates a comprehensive table (`utils.ai.pdf_images_joined`) that combines page-level text, image embeddings, structured metadata, and semantically formatted chunks optimized for multimodal document retrieval using Snowflake Cortex.

### ✅ Steps:

1. **🖇️ Join PDF Pages with Image Embeddings**

   * Merges parsed PDF page data from `pdf_pages` with vector embeddings from `output_vector_table` via `paged_file_name`.
---
2. **📄 Select Representative Pages for Metadata**

   * Uses `row_number()` to select:

     * The **first 10 pages** (for coverage of typical document headers).
     * The **last 2 pages** (often contain part indexes or summaries).
---
3. **🧠 Generate Document-Level Metadata**

   * Concatenates the selected pages’ text and feeds it into `ai_complete()` (with `llama4-scout`) to extract:

     * `manufacturer`
     * `product_line`
     * `document_type`
     * `effective_date`, `copyright`
     * `category`
     * `concise_document_summary`
---
4. **📝 Generate Page-Level Metadata**

   * Runs `ai_complete()` (with `llama4-scout`) on each page’s text to extract:

     * `page_title`
     * `concise_page_summary`
---
5. **🔗 Join Metadata with Full Page Content**

   * Combines document-level and page-level metadata with:

     * Raw page text
     * Vector embeddings
     * File references
---
6. **✂️ Split Pages into Chunks**

   * Uses `cortex.split_text_recursive_character()` to break page text into \~1800-character, markdown-safe blocks, ensuring semantic cohesion for chunk-level retrieval.
---
7. **🔍 Enrich Chunks with Visual Context**

   * For each chunk:

     * Runs `ai_complete()` (with `llama4-maverick`) using the **full page image** and **chunk text**.
     * Extracts structured visual context such as:

       * **Page region**
       * **Table sections or headers**
       * **Related elements not captured in the chunk**
     * Encourages bullet-point or key-value output grounded in visual layout.
---
8. **🧱 Build Final Enriched Chunks**

   * Combines:

     * Source file
     * Document and page metadata
     * Chunk visual context
     * Raw chunk text
   * Stores final result in an `enriched_chunk` field optimized for LLM prompts and semantic indexing.

In [None]:
create or replace table utils.ai.pdf_images_joined as
with pdf_images_joined as (
    select
        pdfs.file_name as pdf_file_name,
        images.file_name as image_file_name,
        pdfs.original_file_name,
        pdfs.page_number,
        pdfs.pdf_file,
        images.image_file,
        images.image_vector,
        pdfs.pdf_text,
    from
        pdf_pages as pdfs
    join
        output_vector_table as images
        on
        pdfs.paged_file_name = images.paged_file_name
),
first_10_pages as (
  select
    original_file_name,
    page_number,
    pdf_text,
    row_number() over (partition by original_file_name order by page_number) as row_num_start,
    row_number() over (partition by original_file_name order by page_number desc) as row_num_end
  from
    pdf_images_joined
),
limited_pages as (
  select
    original_file_name,
    page_number,
    pdf_text
  from 
    first_10_pages
  where 
    row_num_start <= 10
    or
    row_num_end <= 2
),
document_text as (
  select
    original_file_name,
    listagg(pdf_text, '\n\n') within group (order by page_number) as full_text
  from 
    limited_pages
  group by 
    original_file_name
),
get_document_summary as (
    select
      original_file_name,
      full_text,
      ai_complete(
        model => 'llama4-scout',
        prompt => concat(
          'You are a document summarization agent processing technical manuals and sales documents ',
          'from Seclock, a wholesale distributor of electrical and mechanical door hardware.',
          'I am going to provide a document which will be indexed by a retrieval system containing ',
          'many similar documents. I want you to provide key information associated with this document ',
          'that can help differentiate this document in the index. Follow these instructions:\n',
          '    1. Do not dwell on low level details. Only provide key high level information that a ',
          'human might be expected to provide when searching for this doc.\n\n',
          '    2. Do not use any formatting, just provide keys and values using a colon to separate key ',
          'and value. Have each key and value be on a new line.\n\n',
          '    3. Only extract at most the following information. If you are not confident with pulling ',
          'any one of these keys, then do not include that key:\n',
          '    4. Return *nothing* but the key:value pairs.\n\n',
            array_to_string(
                array_construct(
                    'manufacturer',
                    'product_line',
                    'document_type',
                    'effective_date',
                    'year_of_publication',
                    'copyright_year',
                    'category',
                    'concise_document_summary'
                ),
                '\t\t* '
            ),
          '\n\nDoc starts here:\n', full_text, '\nDoc ends here\n\n'
        ),
        model_parameters => {
          'temperature': 0.2
        }
      )::string as document_metadata
    from 
        document_text
),
describe_pages as (
    select
        pdf_file_name,
        image_file_name,
        original_file_name,
        page_number,
        pdf_file,
        image_file,
        image_vector,
        pdf_text,
        ai_complete(
          model => 'llama4-scout',
          prompt => concat(
            'You are a metadata extraction agent working with individual pages from technical manuals and sales documents ',
            'from Seclock, a wholesale distributor of electrical and mechanical door hardware.\n\n',
            'I am going to provide the full text of one page. I want you to extract high-level, distinguishing metadata from this page ',
            'that could help index it effectively within a larger document retrieval system.\n\n',
            'Follow these rules:\n',
            '   1. Do not dwell on low-level or repetitive details.\n',
            '   2. Only provide the following keys as colon-separated key-value pairs, one per line:\n',
              array_to_string(
                array_construct(
                  'page_title',
                  'concise_page_summary'
                ),
                '\t\t* '
              ), '\n',
            '   3. If you are not confident about a key, omit it entirely.\n',
            '   4. Return *nothing* but the key:value pairs.\n\n',
            'Doc starts here:\n', pdf_text, '\nDoc ends here\n\n'
          ),
          model_parameters => {
            'temperature': 0.1,
            'max_tokens': 1024
          }
        )::string as page_metadata
    from
        pdf_images_joined
),
pages_with_metadata as (
  select
    page.pdf_file_name,
    page.image_file_name,
    page.original_file_name,
    page.page_number,
    page.pdf_file,
    page.image_file,
    page.image_vector,
    page.pdf_text,
    page.page_metadata,
    doc.document_metadata
  from
    describe_pages page
  join
    get_document_summary doc
    on 
        page.original_file_name = doc.original_file_name
),
split_pages_into_chunks as (
    select
        pdf_file_name,
        image_file_name,
        original_file_name,
        page_number,
        image_vector,
        pdf_text,
        document_metadata,
        page_metadata,
        ai_complete(
            model => 'llama4-maverick',
            predicate => concat(
                'You are a metadata tagging agent working with scanned document **images** from Seclock,',
                'a wholesale distributor of door hardware.\n\n',
                
                'You will be shown:\n',
                '- A **full image** of one page from a technical manual\n',
                '- A **chunk of extracted text** from that page\n\n',
                
                'Your job is to briefly describe what this chunk represents **in context** of the full page image.\n\n',
                
                'If the page is a table, focus on answering these precisely:\n',
                '1. Position: where the chunk appears in the page\n',
                '2. Section Names: section breaks in the table (if they exist)\n',
                '3. Column Headers: e.g. PART No., QTY\n',

                'If the page is not a table, focus on answering these precisely: ',
                '1. Position: where the chunk appears in the page\n',
                '2. Relevant Details: precise and brief list of relevant item from the page without which ',
                'the chunk cannot be understood (if any)\n',
                
                '* Be brief. Use bullet points or key:value format.\n',
                '* Do not repeat the chunk.\n',
                '* Use the page to extract details relevant to the chunk, not just the chunk itself.\n',
                '* Do not speculate beyond the chunk or image.\n\n',
                
                '---\n\n',
                'Chunk Text:\n', value::string, '\n\n'
            ),
            file => image_file,
          model_parameters => {
            'temperature': 0.1,
            'max_tokens': 1024
          }
        )::string as chunk_context,
        concat(
            '**Source File:** ', original_file_name, '\n',
            '**Document Metadata:** ', coalesce(document_metadata, 'N/A'), '\n\n',
            '----------------\n\n',
            '**Page Metadata:** ', coalesce(page_metadata, 'N/A'), '\n',
            '**Page Number:** ', page_number::string, '\n\n',
            '----------------\n\n',
            '**Chunk Context:**', chunk_context,
             '\n\n----------------\n\n',
            '**Chunk:**\n\n', value::string
        ) as enriched_chunk
    from
        pages_with_metadata,
    lateral flatten(
        input=>snowflake.cortex.split_text_recursive_character(
            pdf_text,
            'markdown',
            1800,
            200
        )
    )
)
select
    pdf_file_name,
    image_file_name,
    original_file_name,
    page_number,
    image_vector,
    pdf_text,
    enriched_chunk
from
    split_pages_into_chunks

## 🕵️‍♀️ Build Cortex Search Service

We're using the [User-Provided Vector Embeddings in Cortex Search](https://docs.snowflake.com/LIMITEDACCESS/cortex-search/user-provided-vectors) private preview. This allows us to provide precomputed vector embeddings to index and query with Cortex Search, which allows us to use our image embeddings as part of the search service.

In [None]:
create or replace cortex search service docs_search_service
    text indexes (enriched_chunk)
    vector indexes (image_vector)
    warehouse = compute_wh
    target_lag = '1 day'
    as 
    select 
        pdf_file_name,
        image_file_name,
        original_file_name,
        page_number,
        image_vector,
        pdf_text,
        enriched_chunk
    from 
        utils.ai.pdf_images_joined
;

## 🤖 Multimodal Document Question Answering with Image-Aided Semantic Search

This system answers precise technical questions about door hardware products using both **text** and **image-based** reasoning across PDFs. It completes a three-step multimodal process:

1. **Text-Based Answering** using enriched document chunks.
2. **Image-Based Answer Validation** via OCR page images.
3. **Answer Synthesis** to revise and finalize the response.

This is done using the following steps:

### 1. **Convert User Question into Image Embedding**

* `get_text_embedding_via_image()`:

  * Renders the user's question as a temporary PNG image.
  * Uploads it to a Snowflake stage (`@utils.ai.stock_ikb_documents/queries/`).
  * Uses `snowflake.cortex.embed_image_1024()` to generate a **multimodal embedding** via `voyage-multimodal-3`.

---

### 2. **Perform Semantic Search with Text + Embedding**

* `query_search_service()`:

  * Submits both the raw question and the image embedding to a **Cortex search service**.
  * Returns the top 50 `ENRICHED_CHUNK`s (text + metadata + image references) relevant to the question.

---

### 3. **Rephrase Question for Search Compatibility**

* `rephrase_for_search()`:

  * Normalizes the user query by trimming and lowercasing it.
  * Helps align question formatting with indexed content.

---

### 4. **Build Context and Generate Text-Based Answer**

* `ai_complete_on_text()`:

  * Constructs a markdown-rich prompt from the top retrieved chunks.
  * Includes clickable links to the original PDF pages via `presigned_url`.
  * Uses `claude-3-7-sonnet` to answer the question directly and include:

    * A clear **direct answer**,
    * A **confidence score** (0–1),
    * A short **justification**, and
    * Properly formatted **CITED SOURCES**:
      [`Document Name - page X`](presigned_url)

---

### 5. **Deduplicate and Filter Pages for Image Critique**

* Duplicates are removed based on `(PDF_FILE_NAME, IMAGE_FILE_NAME)` pairs.
* `extract_cited_docs_and_pages()` ensures only **relevant document pages** are processed.
* `extract_page_number()` helps match images to cited pages.

---

### 6. **Run Image-Based Validations (Async)**

* `ai_complete_on_image_async()`:

  * Submits each matched image to `claude-3-7-sonnet` via `ai_complete()` using:

    * Document metadata
    * Page number
    * The original text answer for critique
  * Prompt instructs the model to confirm or revise the answer, ensuring it's grounded in the visual page content.

* `resolve_async_job()`:

  * Polls the result and extracts fields like:

    * `RESULT`, `PAGE_NUMBER`, `IMAGE_FILE_NAME`, `PRESIGNED_URL`.

---

### 7. **Filter Image-Based Answers by Confidence**

* `filter_by_confidence()`:

  * Retains only image completions with `CONFIDENCE >= 0.5`.
  * Helps ensure only high-quality critiques contribute to the final answer.

---

### 8. **Synthesize the Final Answer for the User**

* `synthesise_all_answers()`:

  * Merges text and image critiques into a unified prompt.
  * Final LLM prompt includes:

    * Original text result
    * All image critiques (linked to page images)
  * The LLM is instructed to:

    * Revise or reaffirm the answer
    * Rephrase for user clarity (customer-facing tone)
    * Exclude technical fields like `CONFIDENCE` or `JUSTIFICATION`
    * Append a **"Cited Sources"** section with markdown hyperlinks

---

In [None]:
def query_search_service(session, my_service, query_text):
    query_embedding = get_text_embedding_via_image(session, query_text)
    resp = my_service.search(
        query = query_text,
        experimental = {
            "QueryEmbedding": query_embedding
        },
        columns=[
            "ENRICHED_CHUNK",
            "PDF_FILE_NAME",
            "IMAGE_FILE_NAME",
            "ORIGINAL_FILE_NAME",
            "PAGE_NUMBER"
        ],
        limit=50
    )
    return resp.to_json()


def create_temp_image_from_text(text: str) -> tuple[str, str]:
    query_hash = hashlib.md5(text.strip().lower().encode()).hexdigest()
    image_filename = f"{query_hash}.png"

    temp_file = tempfile.NamedTemporaryFile(suffix=".png", delete=False)
    file_path = temp_file.name
    temp_file.close()

    image = Image.new("RGB", (1000, 200), "white")
    draw = ImageDraw.Draw(image)
    font = ImageFont.load_default()
    draw.text((10, 10), text, fill="black", font=font)
    image.save(file_path)


    return file_path, image_filename


def extract_cited_docs_and_pages(text_answer_str):
    cited = {}
    matches = re.findall(r"CITED SOURCES:\s*(.+)", text_answer_str, re.IGNORECASE)
    for match in matches:
        doc_page_pairs = re.findall(r"([a-zA-Z0-9._ -]+?)\s*-\s*page\s*(\d+)", match)
        for doc, page in doc_page_pairs:
            doc = doc.strip().lower()
            page = page.strip()
            cited.setdefault(doc, set()).add(page)
    return cited

def extract_page_number(image_file_name: str) -> str:
    match = re.search(r'_page_(\d+)\.png$', image_file_name)
    return match.group(1) if match else "N/A"


def file_exists_in_stage(session, stage_name: str, file_path: str) -> bool:
    result = session.sql(f"list @{stage_name}/{file_path}").collect()
    return bool(result)


def fuzzy_match(a, b, threshold=0.6):
    return SequenceMatcher(None, a.lower(), b.lower()).ratio() >= threshold


def upload_file_to_stage(session, local_path: str, stage_name: str, dest_file_name: str):
    temp_dir = tempfile.gettempdir()
    temp_named_path = os.path.join(temp_dir, dest_file_name)

    os.makedirs(os.path.dirname(temp_named_path), exist_ok=True)
    shutil.copyfile(local_path, temp_named_path)

    try:
        result = session.file.put(
            temp_named_path,
            f"@{stage_name}/queries",
            overwrite=True,
            auto_compress=False
        )
        
    finally:
        os.remove(temp_named_path)


def get_text_embedding_via_image(
    session, 
    text: str, 
    stage_name="@utils.ai.stock_ikb_documents"
):
    temp_path, image_filename = create_temp_image_from_text(text)
    stage_subpath = f"queries/{image_filename}"

    try:
        if not file_exists_in_stage(session, stage_name.lstrip("@"), stage_subpath):
            upload_file_to_stage(session, temp_path, stage_name.lstrip("@"), stage_subpath)
            
        query = f"""
            select 
                snowflake.cortex.embed_image_1024(
                    'voyage-multimodal-3', 
                    '{stage_name}',
                    '{stage_subpath.lstrip('/')}'
                )
        """
        embedding = session.sql(query).collect()[0][0]
    finally:
        os.remove(temp_path)

    return embedding


def resolve_async_job(job):
    try:
        row = job.result()[0].asDict()
        return {
            "RESULT": row["RESULT"],
            "ORIGINAL_FILE_NAME": row["ORIGINAL_FILE_NAME"],
            "IMAGE_FILE_NAME": row["IMAGE_FILE_NAME"],
            "PRESIGNED_URL": row.get("PRESIGNED_URL", "#")
        }
    except Exception as e:
        return {
            "RESULT": f"Error: {e}",
            "ORIGINAL_FILE_NAME": None,
            "IMAGE_FILE_NAME": None,
            "PRESIGNED_URL": "#"
        }


def rephrase_for_search(question):
    return question.strip().lower()


def filter_by_confidence(responses, threshold=0.5):
    filtered = []
    for item in responses:
        match = re.search(r"CONFIDENCE:\s*([0-9.]+)", item["RESULT"], re.IGNORECASE)
        score = float(match.group(1)) if match else 0.0
        if score >= threshold:
            filtered.append(item)
    return filtered


def sql_escape(value):
    return str(value).replace("'", "''") if value is not None else ""


def run_model(model_name, llm_prompt, session, temperature, max_tokens, top_p, guardrails, stream):
    return complete(
        model=model_name,
        prompt=[{"role": "user", "content": llm_prompt}],
        session=session,
        options=CompleteOptions(
            temperature=temperature,
            max_tokens=max_tokens,
            top_p=top_p,
            guardrails=guardrails
        ),
        stream=stream
    )


def ai_complete_on_text(session, question, retrieved_chunks):
    seen = set()
    enriched_context_blocks = []

    for chunk in retrieved_chunks:
        enriched_chunk = chunk["ENRICHED_CHUNK"]
        original_file = chunk.get("ORIGINAL_FILE_NAME")
        image_file = chunk.get("IMAGE_FILE_NAME")

        if not image_file or not original_file:
            continue

        key = (original_file, image_file, enriched_chunk)
        if key in seen:
            continue
        seen.add(key)

        # Generate presigned URL
        presigned_url = session.sql(
            f"SELECT GET_PRESIGNED_URL(@utils.ai.stock_ikb_documents, '{sql_escape(image_file)}')"
        ).collect()[0][0]

        # Format for the model
        block = dedent(f"""
        ---
        📄 **Source**: [{original_file}]({presigned_url})
        📜 **Extracted Content**:
        {enriched_chunk}
        """).strip()

        enriched_context_blocks.append(block)

    if not enriched_context_blocks:
        return {"result": "No usable context.", "metadata": {}}

    full_context = "\n\n".join(enriched_context_blocks)

    prompt = dedent(f"""
    You are a document reasoning assistant helping users answer precise technical 
    questions about door hardware products, based on structured document 
    text (e.g., catalogs, price lists, manuals).
    
    ---
    
    ## Your Task:
    Answer the user's question using only the provided context. 
    Each context block includes extracted text from one document page.
    
    ---
    ## Precision Requirements:
    - Only use rows or facts that match **all explicit constraints** 
        in the question (e.g., function, design, series, finish, cylinder type, etc.).
    - Do not include unrelated variants 
        (e.g., do not include "passage" if the question is about "cylinders").
    - If a match is ambiguous or unsupported, say so rather than guessing.
    - Avoid over-including part numbers or configurations that don't clearly satisfy the question.
    - If conflicting information exists, use the context to determine which source is more up-to-date.
    - Cited sources MUST include hyperlinks to the original document using the presigned_url
    
    ---
    ## Response Format:
    - Start with a **DIRECT ANSWER** — a clear response to the question (price, part number, YES/NO, etc.).
    - Then include:
      - `CONFIDENCE:` (a float from 0.0 to 1.0)
      - `JUSTIFICATION:` why this answer is supported by the data
      - `CITED SOURCES:` in the form [`Document Name - page X`](presigned_url)
    
    ---
    
    ## Question:
    {question.strip()}
    
    ## Context:
    {full_context}
    
    ---
    
    ## Output Format:
    DIRECT ANSWER:  
    CONFIDENCE:  
    JUSTIFICATION:  
    CITED SOURCES:
    """)


    result = complete(
        model="claude-3-7-sonnet",
        prompt=[{"role": "user", "content": prompt}],
        session=session,
        options=CompleteOptions(
            temperature=0,
            max_tokens=2048,
            top_p=1,
            guardrails=False
        ),
        stream=False
    )

    return {
        "result": "".join(result),
        "metadata": {
            "source": "TEXT",
            "num_chunks": len(retrieved_chunks)
        },
        "prompt": prompt
    }


def ai_complete_on_image_async(session, question, item, text_answer):
    image_file_name = item["IMAGE_FILE_NAME"]
    original_file_name = item.get("ORIGINAL_FILE_NAME", "")
    page_number = item.get("PAGE_NUMBER", "")

    # Escape for SQL
    image_file_escaped = sql_escape(image_file_name)
    original_file_escaped = sql_escape(original_file_name)
    document_metadata_escaped = sql_escape(original_file_name)
    page_metadata_escaped = sql_escape(str(page_number))
    answer_snippet_escaped = sql_escape(text_answer["result"][:2000])  # trim long strings for prompt

    prompt = dedent(f"""
    You are validating a textual answer using the actual document page image.

    ---
    **User Question**: {question.strip()}

    ---
    **Answer to Critique**:
    {text_answer["result"]}

    ---
    **Image Source**: Document: `{original_file_name}`, Page: {page_number}

    Your job is to check if the provided answer is correct and properly grounded in the image.
    - Identify any factual errors, missing context, or overclaims.
    - Be strict: only endorse what is clearly present in the image.
    - If the image is not useful for critiquing the answer then say so.
    - If the response is incorrect, provide the correct answer.
    
    Output format:
    CRITIQUE_RESULT:  
    JUSTIFICATION:  
    CITED SOURCES:
    """)

    prompt_escaped = prompt.replace("'", "\\'")

    df = session.sql(f"""
        select 
            '{original_file_escaped}' as original_file_name,
            '{image_file_escaped}' as image_file_name,
            '{document_metadata_escaped}' as document_metadata,
            '{page_metadata_escaped}' as page_metadata,
            get_presigned_url('@utils.ai.stock_ikb_documents', '{image_file_escaped}') as presigned_url,
            ai_complete(
                'claude-3-7-sonnet',
                '{prompt_escaped}',
                to_file('@utils.ai.stock_ikb_documents', '{image_file_escaped}'),
                object_construct('temperature', 0.2, 'top_p', 1.0, 'max_tokens', 2048, 'guardrails', FALSE)
            ) as result
    """)
    return df.collect_nowait()


def synthesise_all_answers(session, question, text_answer_dict, image_answer_dicts):
    text_result = text_answer_dict["result"]
    text_meta = text_answer_dict.get("metadata", {})

    image_sections = []
    for img in image_answer_dicts:
        presigned_link = img.get("PRESIGNED_URL", "#")
        section = dedent(f"""
        --- 
        📄 **Source**: [{img["ORIGINAL_FILE_NAME"]}]({presigned_link})
        🖼️ Image File: `{img["IMAGE_FILE_NAME"]}`

        📘 Page Metadata:
        {img.get("PAGE_METADATA", "N/A")}

        📚 Document Metadata:
        {img.get("DOCUMENT_METADATA", "N/A")}

        📌 Critique of Text Answer:
        {img["RESULT"]}
        """)
        image_sections.append(section.strip())

    image_critique_block = "\n\n".join(image_sections)

    prompt = dedent(f"""\
    You are refining a text-based answer using critiques from document image analysis.

    ## User Question:
    {question}

    ## Original Text Answer:
    {text_result}

    ## Image-Based Critiques:
    {image_critique_block}

    ---
    ## Final Synthesized Answer:
    - Revise the original text answer if needed.
    - Improve factual accuracy using the critiques.
    - Remove anything unsupported.
    - Clearly cite updated sources.
    - Final answer should be **concise, accurate, and directly responsive** to the user's question.
    - Rephrase the technical answer so it's clear and understandable to the user.
    - Do **not** include system fields like CONFIDENCE or JUSTIFICATION.
    - Instead, write the answer as if you were explaining it to a knowledgeable customer.
    - At the end, include a **Cited Sources** section with **Markdown hyperlinks** using the provided list.

    ## Final Output:
    - ANSWER:
    - CITED SOURCES: 
        - [<file_name> - page <number>](url)
    """)

    result = complete(
        model="claude-3-7-sonnet",
        prompt=prompt,
        session=session,
        options=CompleteOptions(
            temperature=0.2, max_tokens=2048, top_p=1.0, guardrails=False
        ),
        stream=False
    )

    return "".join(result)


## 📘 Streamlit App: Seclock Multimodal Q\&A Workflow

This app provides an interactive interface for querying technical door hardware documents using a **multimodal pipeline**—combining semantic search, text comprehension, image-based validation, and answer synthesis.

### 🔢 **Workflow Overview**

The pipeline consists of **7 sequential steps**, shown with real-time status updates in the UI:

---

### 1. **Document Search (Cortex Semantic Search)**

* The user enters a question via `st.chat_input()`.
* The question is normalized using `rephrase_for_search()`.
* `query_search_service()` performs an **embedding + keyword** search using the Cortex Search Service.
* Returns up to 50 enriched document chunks with metadata, image filenames, and associated PDF references.
* If no chunks are found, the assistant replies with a fallback message.

---

### 2. **Answer Generation from Text Chunks**

* `ai_complete_on_text()` receives the retrieved chunks and builds a prompt with:

  * **Clickable presigned URLs**
  * Clean markdown formatting
  * Strong precision constraints to avoid hallucinations
* The model (`claude-3-7-sonnet`) returns:

  * A **direct answer**
  * A **confidence score**
  * A **justification**
  * **CITED SOURCES** with Markdown links

---

### 3. **Image Deduplication**

* To reduce cost and noise, `(PDF_FILE_NAME, IMAGE_FILE_NAME)` pairs are deduplicated.
* This ensures each page is only submitted once for image-based validation.

---

### 4. **Extract Cited Sources & Pages**

* The model's output from Step 2 is parsed using `extract_cited_docs_and_pages()` to identify only the **relevant documents and pages** to check against image data.
* Uses regex to extract all `Document - page X` citations for focused validation.

---

### 5. **Filter Images to Cited Pages**

* The deduplicated images are **filtered** to those matching the cited documents and pages.
* Matching is done via fuzzy string matching and page number extraction.
* Each matched page is then submitted to Cortex with `ai_complete_on_image_async()` for image critique.

---

### 6. **Run Image-Based Validation (Async)**

* Cortex jobs are resolved using `resolve_async_job()` and visual progress is displayed.
* Errors are handled gracefully, and failed results are tagged with placeholders.
* The results include `RESULT`, `PRESIGNED_URL`, and other metadata fields.

---

### 7. **Final Answer Synthesis**

* All validated image responses are passed to `synthesise_all_answers()` along with the original text answer.
* The synthesis prompt:

  * Merges, reconciles, and improves factual accuracy.
  * Rephrases the result clearly for human readers.
  * **Cites each document+page with a working hyperlink**.
* The result is then displayed to the user in the chat.

---

In [None]:
user_question = st.chat_input("Ask a question about door hardware documents...")

root = Root(sp_session)
search_service = (root
  .databases["UTILS"]
  .schemas["AI"]
  .cortex_search_services["DOCS_SEARCH_SERVICE"]
)

if user_question:
    overall_start = time.time()
    st.chat_message("user").write(user_question)

    steps = [
        "Search documents",
        "Generate text answer",
        "Deduplicate image chunks",
        "Extract cited pages",
        "Filter images to cited pages",
        "Run image-based critique",
        "Synthesize final answer"
    ]

    with st.spinner("Thinking..."):
        # Init UI placeholders
        status_step = st.empty()
        status_stage = st.empty()
        status_detail = st.empty()
        status_progress = st.empty()
        status_total_time = st.empty()

        # --- Step 1: Search documents ---
        step = 0
        start = time.time()
        status_step.markdown(f"### 🔍 Step {step + 1} of {len(steps)}: {steps[step]}")
        status_stage.markdown("Searching document chunks using Cortex search...")
        search_query = rephrase_for_search(user_question)
        raw_results = query_search_service(sp_session, search_service, search_query)
        results = json.loads(raw_results)["results"]
        retrieved_chunks = results
        status_detail.markdown(f"Found `{len(retrieved_chunks)}` document chunks in `{time.time() - start:.2f}` seconds.")

        if not retrieved_chunks:
            st.chat_message("assistant").write("I couldn't find anything relevant in the documents.")
        else:
            # --- Step 2: Text generation ---
            step += 1
            start = time.time()
            status_step.markdown(f"### 📝 Step {step + 1} of {len(steps)}: {steps[step]}")
            status_stage.markdown("Generating structured answer from retrieved text...")
            answer_text = ai_complete_on_text(sp_session, user_question, retrieved_chunks)
            status_detail.markdown(f"Answer generated in `{time.time() - start:.2f}` seconds.")

            # --- Step 3: Deduplicate images ---
            step += 1
            start = time.time()
            status_step.markdown(f"### 🧼 Step {step + 1} of {len(steps)}: {steps[step]}")
            status_stage.markdown("Removing duplicate images by document + page...")
            seen = set()
            deduped_results = []
            for item in results:
                key = (item["PDF_FILE_NAME"], item["IMAGE_FILE_NAME"])
                if key not in seen:
                    seen.add(key)
                    deduped_results.append(item)
            status_detail.markdown(f"Deduplicated to `{len(deduped_results)}` unique images in `{time.time() - start:.2f}` seconds.")

            # --- Step 4: Extract cited pages ---
            step += 1
            status_step.markdown(f"### 📄 Step {step + 1} of {len(steps)}: {steps[step]}")
            status_stage.markdown("Parsing cited documents and pages from answer...")
            cited_docs_pages = extract_cited_docs_and_pages(answer_text["result"])
            status_detail.markdown(f"Found `{sum(len(pgs) for pgs in cited_docs_pages.values())}` total cited pages.")

            # --- Step 5: Filter to cited pages ---
            step += 1
            start = time.time()
            status_step.markdown(f"### 🔎 Step {step + 1} of {len(steps)}: {steps[step]}")
            status_stage.markdown("Matching deduplicated images to cited documents and pages...")

            jobs = []
            matched_files = []
            skipped_files = []

            for item in deduped_results:
                original_file = item.get("ORIGINAL_FILE_NAME", "")
                base_name = os.path.splitext(original_file)[0].lower()
                image_file = item.get("IMAGE_FILE_NAME", "")
                page_number = extract_page_number(image_file)

                matched = any(
                    cited_doc in base_name and page_number in cited_docs_pages[cited_doc]
                    for cited_doc in cited_docs_pages
                )

                if matched:
                    matched_files.append(f"{base_name} - page {page_number}")
                    job = ai_complete_on_image_async(sp_session, user_question, item, answer_text)
                    if job:
                        jobs.append((item, job))
                else:
                    skipped_files.append(f"{base_name} - page {page_number}")

            status_detail.markdown(f"Matched `{len(matched_files)}` / `{len(deduped_results)}` images in `{time.time() - start:.2f}` seconds.")

            # --- Step 6: Run image-based critique ---
            step += 1
            start = time.time()
            status_step.markdown(f"### 🧠 Step {step + 1} of {len(steps)}: {steps[step]}")
            status_stage.markdown("Submitting matched images to Cortex for validation...")

            image_answers = []
            total_jobs = len(jobs)
            progress_bar = status_progress.progress(0.0, text="Processing image critiques...")

            for i, (item, job) in enumerate(jobs, start=1):
                try:
                    result = resolve_async_job(job)
                    result["PAGE_NUMBER"] = extract_page_number(item["IMAGE_FILE_NAME"])
                    image_answers.append(result)
                except Exception as e:
                    image_answers.append({
                        "RESULT": f"Error: {e}",
                        "ORIGINAL_FILE_NAME": item.get("ORIGINAL_FILE_NAME"),
                        "IMAGE_FILE_NAME": item.get("IMAGE_FILE_NAME"),
                        "PAGE_NUMBER": extract_page_number(item.get("IMAGE_FILE_NAME", "")),
                        "PRESIGNED_URL": "#"
                    })
                progress_bar.progress(i / total_jobs, text=f"Processing image critiques... ({i}/{total_jobs})")

            status_detail.markdown(f"Image critique completed in `{time.time() - start:.2f}` seconds.")

            # --- Step 7: Synthesize answer ---
            step += 1
            start = time.time()
            status_step.markdown(f"### 🧪 Step {step + 1} of {len(steps)}: {steps[step]}")
            status_stage.markdown("Synthesizing text + image answers into a final response...")

            image_answers_filtered = filter_by_confidence(image_answers)
            final_answer = synthesise_all_answers(
                sp_session,
                user_question,
                answer_text,
                image_answers_filtered
            )
            status_detail.markdown(f"Final synthesis completed in `{time.time() - start:.2f}` seconds.")

            # --- Display ---
            total_time = time.time() - overall_start
            status_total_time.markdown(f"⏱️ **Total time taken**: `{total_time:.2f}` seconds")
            st.chat_message("assistant").markdown(f"**Final Answer:**\n{final_answer}")

            with st.expander("🔍 Debug - Raw Text Answer"):
                with st.chat_message("assistant"):
                    if results:
                        st.write("📚 **Documents included:**")
                        for doc in sorted(set(item["ORIGINAL_FILE_NAME"] for item in results)):
                            st.write(f"• `{doc}`")
                    st.markdown("---")
                    st.markdown("📝 **Text-Based Answer:**")
                    st.markdown(answer_text["result"])

            with st.expander("🔍 Debug - Raw Image Answers"):
                for i, ans in enumerate(image_answers, start=1):
                    with st.chat_message("assistant"):
                        st.markdown(f"**Image Answer {i}:**")
                        st.write(f"📄 **Document**: `{ans['ORIGINAL_FILE_NAME']}`")
                        st.write(f"🖼️ **Image File**: `{ans['IMAGE_FILE_NAME']}`")
                        st.markdown(ans["RESULT"])


            with st.expander("Prompt"):
                st.write(answer_text["prompt"])

## Groundtruth Questions

1. What is the part number for a 10 line Y design cylinder?

    **🔴 Seclock Answer**: 10-0613 left, 10-0614 right

    **🤖 Solution Answer**:

    > The part number for a 10 line Y design cylinder is 10-0613 (left-handed) or 10-0614 (right-handed). These are part of the Coastal Series and include both the cylinder and emergency release components.

---
2. Is chexit available on a 9947-F?

    **🔴 Seclock Answer**: No, dogging is not an option on fire rated devices

    **🤖 Solution Answer**:

    > Yes, Chexit is available on a 9947-F device.
    > 
    > The documentation confirms that Chexit (CX) is compatible with the 9947/9947-F model. This is shown in the minimum door opening sizes table for CX devices, which specifically lists "CX9847/9847-F/9947/9947-F" as compatible devices. Additionally, the Von Duprin price book states that "CHEXIT - Available on all 98/99 Series devices," which includes the 9947-F model.
    > 
    > When ordering, you would use the prefix "CXA" before the device model number to specify the Chexit option.

---
3. What is the case quantity and weight of a 281 Series O function?

    **🔴 Seclock Answer**: Average weight is 24lbs, case quantity is 2

    **🤖 Solution Answer**:
    > The 281 Series O function has a case quantity of 2 and weighs 24 pounds per case.
    > 
    > This information comes from the Sargent 2025 Price Book's Freight Weight Chart section, which specifically lists these specifications for the 281 Series.

---
4. On a Schlage ND, is vandlgard available in double cylinder functions?

    **🔴 Seclock Answer**: Yes, functions 72 with XN12-003, 93 and 98

    **🤖 Solution Answer**:
    > Yes, Vandlgard is available in double cylinder functions on the Schlage ND Series. Specifically, it's available in these models:
    > 
    > ND72 with XN12-003 (F80): Vandlgard communicating lock
    > ND93 (F88): Vandlgard vestibule lock
    > ND98 (F110): Vandlgard classroom security lock with 180-degree lockdown
    > The Vandlgard feature allows the outside lever to rotate freely when locked, which helps prevent damage from vandalism by limiting the force that can be applied to the internal lock components.

---
5. What’s the part number for an L9460 armor front?

    **🔴 Seclock Answer**: 09-665, you must also specify the finish

    **🤖 Solution Answer**:

    > The part number for an L9460 armor front is 09-665. This part is specifically designed for deadbolt configurations including the L9460 model.

---
6. What is the prefix for rail monitoring on a Sargent 80 device?

    **🔴 Seclock Answer**: 55

    **🤖 Solution Answer**:
    > The prefix for rail monitoring on a Sargent 80 device is "55-". This prefix designates the Request-to-Exit option, which provides push rail monitoring functionality for various applications on Sargent's 80 Series exit devices. When ordering, you would include this prefix before the model number (for example: 55-8913F x ETMA x 32D x 36").

---
7. What’s the list price for a Sargent 13-0404 cylinder plug size 44?

    **🔴 Seclock Answer**: $44.44

    **🤖 Solution Answer**: 
    
    > The list price for a Sargent 13-0404 cylinder plug size 44 is $44.44.
    
---
8. What finishes do Sargent mortise cylinder barrels coming in?
    
    **🔴 Seclock Answer**: US4, US15, and BSP

    **🤖 Solution Answer**:

    > Sargent mortise cylinder barrels come in three finishes: US4 (satin brass), US15 (satin nickel), and BSP (black suede powder coat).
    > 
    > It's important to note that while the overall mortise cylinders are available in many different finishes (including US3, US9, US10, US10B, US10BE, US10BL, US20D, US32, US32D, BSP, WSP), the cylinder plugs/barrels specifically are only offered in these three finish options.

---
9. In ALX, what are the Less FSIC cylinder options?
    
    **🔴 Seclock Answer**: J-CO6 Less FSIC, made to fit Corbin 8000, J-CO7 Less FSIC, made to fit Corbin 8000-7, J-SAR Less FSIC, made to fit Sargent 6300

    **🤖 Solution Answer**:
    > In the ALX Series, the Less FSIC (Full Size Interchangeable Core) cylinder options are:
    > 
    > * J-SAR: Less cylinder, made to fit Sargent 6300 6-pin FSIC
    > * J-CO6: Less cylinder, made to fit Corbin 8000 6-pin FSIC
    > * J-CO7: Less cylinder, made to fit Corbin 8000-7 7-pin FSIC
    > 
    > These options allow you to use your existing FSIC cylinders with ALX Series locks, providing flexibility when integrating with existing key systems.

---
10. Is Schlage 7800 KE3/KE4 available with double cylinders?
    
    **🔴 Seclock Answer**: No, designed for exposed barrels only

    **🤖 Solution Answer**:
    
    > No, the Schlage 7800 KE3/KE4 is not available with double cylinders. The product catalog specifically states that the KE3/KE4 escutcheon design cannot be used with double cylinder functions, which is an important limitation to note when considering this particular lock option.