# Implementing multimodal retrieval using Cortex Search Service

Welcome! This tutorial shows a lightweight example where a customer has pump datasheet PDFs and wants to search and ask natural questions on them. On a high level, this tutorial demonstrates:

- Convert long PDF files to document screenshots (images).
- (Optional but highly recommended) Run parse_document on PDFs for auxiliary text retrieval to further improve quality.
- Embed document screenshots using EMBED_IMAGE_1024 (PrPr) which runs `voyage-multimodal-3` under the hood
- **NEW: Add metadata attributes (vendor, product, section titles) for better targeting**
- Create a Cortex Search Service using multimodal embeddings and OCR text.
- Retrieve top pages using Cortex Search.
- Get natural language answer with multimodal RAG!


In [None]:
%pip install pdfplumber


In [None]:
%pip install pypdf2


In [None]:
LS @DEMODB.DATASHEET_RAG.MULTIMODAL_DEMO_INTERNAL/raw_pdf/


Now let's run some python code:

The purpose is to paginate raw pages into pages -- in image and PDF format. Images are for multimodal retrieval, while PDFs are for better OCR quality (optional). As long as you configure the config correctly, you are good to go!

```
class Config:
    input_stage: str = "@CORTEX_SEARCH_DB.PYU.MULTIMODAL_DEMO_INTERNAL/raw_pdf/"
    output_stage: str = "@CORTEX_SEARCH_DB.PYU.MULTIMODAL_DEMO_INTERNAL/"
    input_path: str = "raw_pdf"
    output_pdf_path: str = "paged_pdf"
    output_image_path: str = "paged_image"
    allowed_extensions: List[str] = None
    max_dimension: int = 1500  # Maximum dimension in pixels before scaling
    dpi: int = 300  # Default DPI for image conversion

    def __post_init__(self):
        if self.allowed_extensions is None:
            self.allowed_extensions = [".pdf"]
```

**Make sure the output_stage is an internal stage**, because `embed_image_1024` only works with internal stages at the moment.


In [None]:
# Import python packages
import os
import sys
import tempfile
from contextlib import contextmanager
from dataclasses import dataclass
from typing import List
from typing import Tuple

import pdfplumber
import PyPDF2
import snowflake.snowpark.session as session
import streamlit as st


def print_info(msg: str) -> None:
    """Print info message"""
    print(f"INFO: {msg}", file=sys.stderr)


def print_error(msg: str) -> None:
    """Print error message"""
    print(f"ERROR: {msg}", file=sys.stderr)
    if hasattr(st, "error"):
        st.error(msg)


def print_warning(msg: str) -> None:
    """Print warning message"""
    print(f"WARNING: {msg}", file=sys.stderr)


@dataclass
class Config:
    input_stage: str = "@DEMODB.DATASHEET_RAG.MULTIMODAL_DEMO_INTERNAL/raw_pdf/"
    output_stage: str = (
        "@DEMODB.DATASHEET_RAG.MULTIMODAL_DEMO_INTERNAL/"  # Base output stage without subdirectories
    )
    input_path: str = "raw_pdf"
    output_pdf_path: str = "paged_pdf"
    output_image_path: str = "paged_image"
    allowed_extensions: List[str] = None
    max_dimension: int = 1500  # Maximum dimension in pixels before scaling
    dpi: int = 300  # Default DPI for image conversion

    def __post_init__(self):
        if self.allowed_extensions is None:
            self.allowed_extensions = [".pdf"]


class PDFProcessingError(Exception):
    """Base exception for PDF processing errors"""


class FileDownloadError(PDFProcessingError):
    """Raised when file download fails"""


class PDFConversionError(PDFProcessingError):
    """Raised when PDF conversion fails"""


@contextmanager
def managed_temp_file(suffix: str = None) -> str:
    """Context manager for temporary file handling"""
    temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
    try:
        yield temp_file.name
    finally:
        # Don't delete the file immediately, let the caller handle cleanup
        pass


def cleanup_temp_file(file_path: str) -> None:
    """Clean up a temporary file"""
    try:
        if os.path.exists(file_path):
            os.unlink(file_path)
    except OSError as e:
        print_warning(f"Failed to delete temporary file {file_path}: {e}")


def list_pdf_files(session: session.Session, config: Config) -> List[dict]:
    """List all PDF files in the source stage"""
    try:
        # Use LIST command instead of DIRECTORY function
        query = f"""
        LIST {config.input_stage}
        """

        file_list = session.sql(query).collect()

        # Filter for PDF files
        pdf_files = []
        for file_info in file_list:
            full_path = file_info["name"]
            # Extract just the filename from the full path
            file_name = os.path.basename(full_path)

            if any(
                file_name.lower().endswith(ext) for ext in config.allowed_extensions
            ):
                pdf_files.append(
                    {
                        "RELATIVE_PATH": file_name,  # Use just the filename
                        "FULL_STAGE_PATH": full_path,  # Use full path for download
                        "SIZE": file_info["size"] if "size" in file_info else 0,
                    }
                )

        print_info(f"Found {len(pdf_files)} PDF files in the stage")
        return pdf_files
    except Exception as e:
        print_error(f"Failed to list files: {e}")
        raise


def download_file_from_stage(
    session: session.Session, file_path: str, config: Config
) -> str:
    """Download a file from stage using session.file.get"""
    # Create a temporary directory
    temp_dir = tempfile.mkdtemp()
    try:
        # Ensure there are no double slashes in the path
        stage_path = f"{config.input_stage.rstrip('/')}/{file_path.lstrip('/')}"

        # Get the file from stage
        get_result = session.file.get(stage_path, temp_dir)
        if not get_result or get_result[0].status != "DOWNLOADED":
            raise FileDownloadError(f"Failed to download file: {file_path}")

        # Construct the local path where the file was downloaded
        local_path = os.path.join(temp_dir, os.path.basename(file_path))
        if not os.path.exists(local_path):
            raise FileDownloadError(f"Downloaded file not found at: {local_path}")

        return local_path
    except Exception as e:
        print_error(f"Error downloading {file_path}: {e}")
        # Clean up the temporary directory
        try:
            import shutil

            shutil.rmtree(temp_dir)
        except Exception as cleanup_error:
            print_warning(f"Failed to clean up temporary directory: {cleanup_error}")
        raise FileDownloadError(f"Failed to download file: {e}")


def upload_file_to_stage(
    session: session.Session, file_path: str, output_path: str, config: Config
) -> str:
    """Upload file to the output stage"""
    try:
        # Get the directory and filename from the output path
        output_dir = os.path.dirname(output_path)
        base_name = os.path.basename(output_path)

        # Create the full stage path with subdirectory
        stage_path = f"{config.output_stage.rstrip('/')}/{output_dir.lstrip('/')}"

        # Read the content of the original file
        with open(file_path, "rb") as f:
            file_content = f.read()

        # Create a new file with the correct name
        temp_dir = tempfile.gettempdir()
        temp_file_path = os.path.join(temp_dir, base_name)

        # Write the content to the new file
        with open(temp_file_path, "wb") as f:
            f.write(file_content)

        # Upload the file using session.file.put with compression disabled
        put_result = session.file.put(
            temp_file_path, stage_path, auto_compress=False, overwrite=True
        )

        # Check upload status
        if not put_result or len(put_result) == 0:
            raise Exception(f"Failed to upload file: {base_name}")

        if put_result[0].status not in ["UPLOADED", "SKIPPED"]:
            raise Exception(f"Upload failed with status: {put_result[0].status}")

        # Clean up the temporary file
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)

        return f"Successfully uploaded {base_name} to {stage_path}"
    except Exception as e:
        print_error(f"Error uploading file: {e}")
        raise


def process_pdf_files(config: Config) -> None:
    """Main process to orchestrate the PDF splitting"""
    try:
        session = get_active_session()
        pdf_files = list_pdf_files(session, config)

        for file_info in pdf_files:
            file_path = file_info["RELATIVE_PATH"]
            print_info(f"Processing: {file_path}")

            try:
                # Download the PDF file
                local_pdf_path = download_file_from_stage(session, file_path, config)

                # Get base filename without extension
                base_name = os.path.splitext(os.path.basename(file_path))[0]

                # Extract individual PDF pages
                with open(local_pdf_path, "rb") as file:
                    pdf_reader = PyPDF2.PdfReader(file)
                    num_pages = len(pdf_reader.pages)
                    print_info(f"Converting PDF to {num_pages} pages of PDFs")

                    for i in range(num_pages):
                        page_num = i + 1
                        s3_pdf_output_path = (
                            f"{config.output_pdf_path}/{base_name}_page_{page_num}.pdf"
                        )
                        pdf_writer = PyPDF2.PdfWriter()
                        pdf_writer.add_page(pdf_reader.pages[i])
                        temp_file = tempfile.NamedTemporaryFile(
                            delete=False, suffix=".pdf"
                        )
                        local_pdf_tmp_file_name = temp_file.name
                        with open(local_pdf_tmp_file_name, "wb") as output_file:
                            pdf_writer.write(output_file)
                        
                        upload_file_to_stage(
                            session, local_pdf_tmp_file_name, s3_pdf_output_path, config
                        )
                        cleanup_temp_file(local_pdf_tmp_file_name)
                            
                # Convert PDF to images                
                with pdfplumber.open(local_pdf_path) as pdf:
                    print_info(f"Converting PDF to {len(pdf.pages)} images")
                    for i, page in enumerate(pdf.pages):
                        page_num = i + 1
                        # Get page dimensions
                        width = page.width
                        height = page.height

                        # Determine if scaling is needed
                        max_dim = max(width, height)
                        if max_dim > config.max_dimension:
                            # Calculate scale factor to fit within max_dimension
                            scale_factor = config.max_dimension / max_dim
                            width = int(width * scale_factor)
                            height = int(height * scale_factor)

                        img = page.to_image(resolution=config.dpi)
                        temp_file = tempfile.NamedTemporaryFile(
                            delete=False, suffix=".png"
                        )
                        local_image_tmp_file_name = temp_file.name
                        img.save(local_image_tmp_file_name)

                        s3_image_output_path = (
                            f"{config.output_image_path}/{base_name}_page_{page_num}.png"
                        )
                        
                        upload_file_to_stage(
                            session, local_image_tmp_file_name, s3_image_output_path, config
                        )
                        cleanup_temp_file(local_image_tmp_file_name)
                        
                # Clean up the original downloaded file
                cleanup_temp_file(local_pdf_path)

            except Exception as e:
                print_error(f"Error processing {file_path}: {e}")
                continue

    except Exception as e:
        print_error(f"Fatal error in process_pdf_files: {e}")
        raise


config = Config(dpi=200)
process_pdf_files(config)


Check out one image and see if it's clear. If you can't read clearly, neural models won't either!


In [None]:
session = get_active_session()

image=session.file.get_stream(
     f"@DEMODB.DATASHEET_RAG.MULTIMODAL_DEMO_INTERNAL/paged_image/AHLSTAREndSuctionSingleStage_E10083_page_2.png",  # change to one image on your stage
     decompress=False).read()
st.image(image)


Now let's start the multimodal embedding part! We first create an intermediate table that holds relative file names of images, and then call `SNOWFLAKE.CORTEX.embed_image_1024` to turn them into vectors!


In [None]:
CREATE OR REPLACE TABLE DEMODB.DATASHEET_RAG.DATASHEET_IMAGE_CORPUS AS
SELECT
    CONCAT('paged_image/', split_part(metadata$filename, '/', -1)) AS FILE_NAME,
    REGEXP_SUBSTR(metadata$filename, '_page_([0-9]+)', 1, 1, 'e',1)::INTEGER as PAGE_NUMBER,
    '@DEMODB.DATASHEET_RAG.MULTIMODAL_DEMO_INTERNAL' AS STAGE_PREFIX
FROM
    @DEMODB.DATASHEET_RAG.MULTIMODAL_DEMO_INTERNAL/paged_image/
GROUP BY 1, 2, 3
;

SELECT * FROM DEMODB.DATASHEET_RAG.DATASHEET_IMAGE_CORPUS LIMIT 5;


In [None]:
CREATE OR REPLACE TABLE DEMODB.DATASHEET_RAG.DATASHEET_VM3_VECTORS AS
SELECT
    FILE_NAME,
    STAGE_PREFIX,
    AI_EMBED(
        'voyage-multimodal-3',
        TO_FILE('@DEMODB.DATASHEET_RAG.MULTIMODAL_DEMO_INTERNAL', FILE_NAME)
    ) AS IMAGE_VECTOR
FROM DEMODB.DATASHEET_RAG.DATASHEET_IMAGE_CORPUS;


SELECT * FROM DEMODB.DATASHEET_RAG.DATASHEET_VM3_VECTORS LIMIT 5;


Similarly, we call `SNOWFLAKE.CORTEX.PARSE_DOCUMENT` to extract text from PDF pages. We discover that, although multimodal retrieval is powerful, augmenting it with text retrieval for keyword matching can bring quality improvement on certain types of search tasks/queries.


In [None]:
CREATE OR REPLACE TABLE DEMODB.DATASHEET_RAG.DATASHEET_PDF_CORPUS AS
SELECT
    CONCAT('paged_pdf/', split_part(metadata$filename, '/', -1)) AS FILE_NAME,
    REGEXP_SUBSTR(metadata$filename, '_page_([0-9]+)', 1, 1, 'e',1)::INTEGER as PAGE_NUMBER,
    '@DEMODB.DATASHEET_RAG.MULTIMODAL_DEMO_INTERNAL' AS STAGE_PREFIX
FROM
    @DEMODB.DATASHEET_RAG.MULTIMODAL_DEMO_INTERNAL/paged_pdf/
GROUP BY 1, 2, 3
;

CREATE OR REPLACE TABLE DEMODB.DATASHEET_RAG.DATASHEET_PARSE_DOC AS
    SELECT
        FILE_NAME,
        PAGE_NUMBER,
        STAGE_PREFIX,
        PARSE_JSON(TO_VARCHAR(SNOWFLAKE.CORTEX.PARSE_DOCUMENT(
            '@DEMODB.DATASHEET_RAG.MULTIMODAL_DEMO_INTERNAL',
            FILE_NAME,
            {'mode': 'LAYOUT'}
        ))):content AS PARSE_DOC_OUTPUT
    FROM DEMODB.DATASHEET_RAG.DATASHEET_PDF_CORPUS
;

SELECT * FROM DEMODB.DATASHEET_RAG.DATASHEET_PARSE_DOC LIMIT 5;


## Adding metadata to Cortex Search Services

Before creating the final search service, let's add metadata attributes that will help with better targeting and filtering:

- **Directory-level metadata**: Vendor, Product ID, Pump Model, Datasheet Type
- **Page-level metadata**: Section Title derived from OCR text

This metadata will be projected as attributes in the Cortex Search Service, allowing for better filtering and context in search results.


In [None]:
-- Directory table (one row per datasheet)
CREATE OR REPLACE TABLE DEMODB.DATASHEET_RAG.DATASHEET_DIRECTORY (
  FILE_NAME STRING PRIMARY KEY,
  VENDOR STRING,
  PRODUCT_ID STRING,
  PUMP_MODEL STRING,
  DATASHEET_TYPE STRING
);

INSERT OVERWRITE INTO DEMODB.DATASHEET_RAG.DATASHEET_DIRECTORY (FILE_NAME, VENDOR, PRODUCT_ID, PUMP_MODEL, DATASHEET_TYPE) VALUES
  ('AHLSTAREndSuctionSingleStage_E10083.pdf','Sulzer','AHLSTAR-E10083','AHLSTAR End Suction Single Stage','Performance Datasheet'),
  ('B_3196i.pdf','Goulds','3196-iFRAME','3196 i-FRAME','ANSI Process Datasheet'),
  ('BEEndSuctionSingleStageCentrifugalPump60HzUS.pdf','Sulzer','BE-60Hz-US','BE End Suction Single Stage','Technical Specification'),
  ('Centrifugal-Curvebook-2020-1.pdf','Fristam','Curvebook-2020','Centrifugal Sanitary Pumps','Curve Book'),
  ('Pump-Selection-Guide-brochure.pdf','Various','Selection-Guide','General Selection','Selection Guide');

SELECT * FROM DEMODB.DATASHEET_RAG.DATASHEET_DIRECTORY;


In [None]:
-- Extract section titles from parsed document content
CREATE OR REPLACE TABLE DEMODB.DATASHEET_RAG.DATASHEET_PAGE_METADATA AS
WITH section_extraction AS (
  SELECT
    FILE_NAME,
    PAGE_NUMBER,
    PARSE_DOC_OUTPUT,
        -- Extract potential section titles (all caps text lines) - simplified regex
    TRIM(REGEXP_SUBSTR(TO_VARCHAR(PARSE_DOC_OUTPUT), '[A-Z][A-Z0-9 /\\(\\)\\-]{3,}')) AS SECTION_TITLE
  FROM DEMODB.DATASHEET_RAG.DATASHEET_PARSE_DOC
)
SELECT
  FILE_NAME,
  PAGE_NUMBER,
  PARSE_DOC_OUTPUT,
  COALESCE(SECTION_TITLE, 'GENERAL') AS SECTION_TITLE,
  -- Extract base filename for joining with directory
  REGEXP_SUBSTR(FILE_NAME, 'paged_pdf/(.*)_page_[0-9]+\\.pdf$', 1, 1, 'e', 1) || '.pdf' AS BASE_FILE_NAME
FROM section_extraction;

SELECT * FROM DEMODB.DATASHEET_RAG.DATASHEET_PAGE_METADATA LIMIT 5;


Now we join image vectors and texts into a single table with metadata attributes, and create a Cortex Search service!


In [None]:
CREATE OR REPLACE TABLE DEMODB.DATASHEET_RAG.DATASHEET_JOINED_DATA AS
SELECT
    v.FILE_NAME,
    p.PAGE_NUMBER,
    v.IMAGE_VECTOR AS VECTOR_MAIN,
    p.PARSE_DOC_OUTPUT AS TEXT,
    v.FILE_NAME AS IMAGE_FILEPATH,
    -- Add metadata attributes
    d.VENDOR,
    d.PRODUCT_ID,
    d.PUMP_MODEL,
    d.DATASHEET_TYPE,
    p.SECTION_TITLE
FROM
    DEMODB.DATASHEET_RAG.DATASHEET_VM3_VECTORS v
JOIN
    DEMODB.DATASHEET_RAG.DATASHEET_PAGE_METADATA p
ON
    REGEXP_SUBSTR(v.FILE_NAME, 'paged_image/(.*)\\.png$', 1, 1, 'e', 1) = REGEXP_SUBSTR(p.FILE_NAME, 'paged_pdf/(.*)\\.pdf$', 1, 1, 'e', 1)
LEFT JOIN
    DEMODB.DATASHEET_RAG.DATASHEET_DIRECTORY d
ON
    d.FILE_NAME = p.BASE_FILE_NAME;


CREATE OR REPLACE CORTEX SEARCH SERVICE DEMODB.DATASHEET_RAG.DATASHEET_CORTEX_SEARCH_SERVICE
  TEXT INDEXES TEXT
  VECTOR INDEXES VECTOR_MAIN
  ATTRIBUTES VENDOR, PRODUCT_ID, PUMP_MODEL, DATASHEET_TYPE, SECTION_TITLE
  WAREHOUSE='COMPUTE_WH'
  TARGET_LAG='1 day'
AS (
    SELECT 
        TO_VARCHAR(TEXT) AS TEXT, 
        PAGE_NUMBER, 
        VECTOR_MAIN,
        IMAGE_FILEPATH,
        VENDOR,
        PRODUCT_ID,
        PUMP_MODEL,
        DATASHEET_TYPE,
        SECTION_TITLE
    FROM DEMODB.DATASHEET_RAG.DATASHEET_JOINED_DATA
);


We have created a multi-index Cortex Search Service with both text and vector indexes, plus metadata attributes. This allows us to perform hybrid search combining keyword matching on text content and semantic similarity on vector embeddings, with the ability to filter by vendor, product, or section type. We'll embed queries directly with `SNOWFLAKE.CORTEX.EMBED_TEXT_1024` and use the new multi-index query syntax to search across both index types.

**Note:** The multi-index query syntax requires Snowflake Python API version 1.6.0 or later.


In [None]:
session = get_active_session()
demo_query_text = "What is the NPSH required at 120% flow for Sulzer pump?"
sql_output = session.sql(f"""SELECT SNOWFLAKE.CORTEX.EMBED_TEXT_1024('voyage-multimodal-3', 'Represent the query for retrieving supporting documents:  {demo_query_text}')""").collect()
query_vector = list(sql_output[0].asDict().values())[0]
print(query_vector)


In [None]:
from snowflake.core import Root

def multi_modal_answer(question_text):
    sql_output = session.sql(f"""SELECT SNOWFLAKE.CORTEX.EMBED_TEXT_1024('voyage-multimodal-3', 'Represent the query for retrieving supporting documents:  {question_text}')""").collect()
    query_vector = list(sql_output[0].asDict().values())[0]

    ## Use Multi-Index Querying 
    root = Root(session)
    # fetch service
    my_service = (root
        .databases["DEMODB"]
        .schemas["DATASHEET_RAG"]
        .cortex_search_services["DATASHEET_CORTEX_SEARCH_SERVICE"]
    )
    
    # query service using multi-index query syntax
    resp = my_service.search(
    multi_index_query={
        "TEXT": [{"text": question_text}],
        "VECTOR_MAIN": [{"vector": query_vector}]
    },
        columns=["TEXT", "PAGE_NUMBER", "IMAGE_FILEPATH", "VENDOR", "PRODUCT_ID", "PUMP_MODEL", "SECTION_TITLE"],
        limit=5
    )
    
    # Display search results with metadata
    print("\n=== SEARCH RESULTS ===")
    for i, result in enumerate(resp.to_dict()["results"]):
        print(f"\nRank {i+1}:")
        print(f"  Vendor: {result.get('VENDOR', 'N/A')}")
        print(f"  Product: {result.get('PRODUCT_ID', 'N/A')} - {result.get('PUMP_MODEL', 'N/A')}")
        print(f"  Section: {result.get('SECTION_TITLE', 'N/A')}")
        print(f"  Page: {result.get('PAGE_NUMBER', 'N/A')}")
        print(f"  Image: {result.get('IMAGE_FILEPATH', 'N/A')}")

    top_page_path = resp.to_dict()["results"][0]["IMAGE_FILEPATH"]

    # grab the relative path from search result
    sql = """
        SELECT AI_COMPLETE(
        'claude-3-7-sonnet',
        PROMPT(
        'Answer the question using the image {0}. Question: {1}',
        TO_FILE('@DEMODB.DATASHEET_RAG.MULTIMODAL_DEMO_INTERNAL', ?),
        ?
        )
        ) AS answer
    """
    row = session.sql(sql, params=[top_page_path, question_text]).collect()[0]
    print(f"\n=== TOP RESULT ===")
    print(f"Image: {top_page_path}")
    print(f"\n=== ANSWER ===")
    print(row["ANSWER"])

    return resp


In [None]:
multi_modal_answer("What is the NPSH required at 120% flow for the Sulzer BE pump?")


In [None]:
multi_modal_answer("What is the maximum operating temperature specified for the Goulds 3196 pump?")


In [None]:
multi_modal_answer("What is the casting material listed in the Ahlstar pump datasheet?")


In [None]:
multi_modal_answer("What is the maximum flow rate capacity of the Fristam FKL pump?")


In [None]:
multi_modal_answer("What is the maximum liquid temperature with the high-temperature option for the Goulds 3196?")


In [None]:
multi_modal_answer("Using the performance curve, at 50 GPM and 135 ft head, what impeller diameter and NPSH requi")
