# Part 3 : Interactive Multimodal RAG with boto3

**[EN]** This notebook implements a complete, interactive multimodal RAG demo. It uses Colpali for visual search and directly calls Amazon Bedrock's Claude 3 Haiku model via boto3 to analyze the top-ranked image and generate a response. All code comments are in English.
**[KR]** 이 Notebook은 완전한 대화형 멀티모달 RAG 데모를 구현합니다. Colpali를 사용하여 시각적 검색을 수행하고, boto3를 통해 Amazon Bedrock의 Claude 3 Haiku 모델을 직접 호출하여 1위 이미지를 분석하고 답변을 생성합니다. 모든 코드 주석은 영어로 작성되었습니다.

### Step 1: Environment Setup and Library Installation

**[EN]** Install the necessary libraries, including boto3 for direct AWS communication.
**[KR]** AWS와 직접 통신하기 위한 boto3를 포함하여, RAG 파이프라인에 필요한 라이브러리를 설치합니다.

In [1]:
!pip install -q "git+https://github.com/illuin-tech/colpali.git"
!pip install -q elasticsearch python-dotenv Pillow "transformers>=4.41.0" accelerate numpy torch ipywidgets boto3


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


### Step 2: Load Credentials and Configure Connections

**[EN]** Load environment variables from `elastic.env` and `aws.env` to configure connections to Elastic Cloud and Amazon Bedrock.<br>
**[KR]** `elastic.env`와 `aws.env` 파일에서 환경 변수를 로드하여 Elastic Cloud와 Amazon Bedrock 연결 정보를 설정합니다.

In [2]:
from dotenv import load_dotenv
import os
from elasticsearch import Elasticsearch
import boto3

# Load environment variables from .env files
load_dotenv(dotenv_path='elastic.env')
load_dotenv(dotenv_path='aws.env', override=True)

# Elastic Cloud connection details
ES_URL = os.getenv("ES_URL")
ES_API_KEY = os.getenv("ES_API_KEY")
if not ES_URL or not ES_API_KEY:
    raise ValueError("Please set ES_URL and ES_API_KEY in elastic.env")

# Amazon Bedrock connection details
AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY", "")
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY", "")
AWS_REGION = os.getenv("AWS_REGION", "ap-northeast-2")
if not AWS_ACCESS_KEY or not AWS_SECRET_KEY or AWS_ACCESS_KEY == "<your-aws-access-key>":
    raise ValueError("Please set valid AWS credentials in aws.env")

# Create the Elasticsearch client
if ':' in ES_URL and not ES_URL.startswith('http'):
    es = Elasticsearch(cloud_id=ES_URL, api_key=ES_API_KEY, request_timeout=30)
else:
    es = Elasticsearch(hosts=[ES_URL], api_key=ES_API_KEY, request_timeout=30)
print(f"Connected to Elasticsearch version: {es.info()['version']['number']}")

# Create the Bedrock client
bedrock = boto3.client(
    "bedrock-runtime",
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION,
)
print(f"Connected to Bedrock in region: {AWS_REGION}")

Connected to Elasticsearch version: 8.11.0
Connected to Bedrock in region: ap-northeast-2


### Step 3: Define Helpers and Load the Embedding Model

**[EN]** Define all helper functions for visualization, vector generation, and the new Bedrock multimodal call. Load the `ColQwen` model to ensure vector dimensions match during search.<br>
**[KR]** 시각화, 벡터 생성, 그리고 새로운 Bedrock 멀티모달 호출을 위한 모든 헬퍼 함수를 정의합니다. 검색 시 벡터 차원을 일치시키기 위해 `ColQwen` 모델을 로드합니다.

In [3]:
import base64
from IPython.display import display, HTML
import torch
import numpy as np
import json
import time
from colpali_engine.models import ColQwen2_5, ColQwen2_5_Processor

BEDROCK_MODEL_ID = "anthropic.claude-3-haiku-20240307-v1:0"

def display_results(hits):
    # This function renders search results as an HTML table.
    if not hits:
        print("No documents found.")
        return
    html = "<div style='display:flex; flex-wrap:wrap;'>"
    for i, hit in enumerate(hits):
        doc_id = hit["_id"]
        score = hit["_score"]
        path = hit["_source"].get("image_path", "")
        category = hit["_source"].get("category", "N/A")
        try:
            with open(path, "rb") as image_file:
                img_str = base64.b64encode(image_file.read()).decode()
                html += f"<div style='margin:10px; padding:10px; border:1px solid #ddd; text-align:center; width: 220px;'><b>Rank #{i+1}</b><br><img src='data:image/png;base64,{img_str}' style='width:200px; height:auto; margin-top:5px;'><br><div style='font-size:12px; margin-top:5px;'><b>ID:</b> {doc_id[:15]}...<br><b>Score:</b> {score:.4f}<br><b>Category:</b> {category}</div></div>"
        except Exception:
            html += f"<div style='margin:10px; padding:10px; border:1px solid #ddd; text-align:center; width: 220px; height: 300px;'><b>Rank #{i+1}</b><br><div style='width:200px; height:200px; background-color:#f0f0f0; margin-top:5px; display:flex; align-items:center; justify-content:center; font-size:12px;'>Image not available</div><div style='font-size:12px; margin-top:5px;'><b>ID:</b> {doc_id[:15]}...<br><b>Score:</b> {score:.4f}<br><b>Category:</b> {category}</div></div>"
    html += "</div>"
    display(HTML(html))

def get_media_type(image_path):
    # Determines the media type from the file extension.
    ext = os.path.splitext(image_path)[1].lower()
    return {".jpeg": "image/jpeg", ".jpg": "image/jpeg", ".png": "image/png", ".webp": "image/webp"}.get(ext, "image/jpeg")

def generate_llm_answer_with_image(bedrock_client, query_text, hits):
    # Encodes the top image and generates an answer using Bedrock.
    if not hits:
        print("No documents found, cannot generate answer.")
        return
    top_hit = hits[0]
    image_path = top_hit["_source"].get("image_path")
    if not image_path or not os.path.exists(image_path):
        print("Top result has no valid image path.")
        return
    print(f"\nAnalyzing top image for context: {image_path}")
    with open(image_path, "rb") as image_file:
        image_base64 = base64.b64encode(image_file.read()).decode('utf-8')
    
    system_prompt = """You are an intelligent document analysis assistant. Your task is to analyze a document image that a search system has retrieved in response to a user's query. Your response should be structured as follows:\n\n1.  **Relevance Assessment**: Start by explaining how relevant the document is to the user's query.\n2.  **Summary**: Provide a concise summary of the document's key information.\n3.  **Direct Answer**: Directly answer the user's query based on the document's content.\n4.  **Contextual Explanation**: If the document is not a perfect match for the query, explain why it was likely the most relevant result found. You can describe the relationship between the user's query terms and the document's content. For instance, you could say, 'While this document does not specifically mention [a key term from the query], it discusses [a related topic in the document], making it the most relevant document found.'"""
    
    bedrock_request_body = {
        "anthropic_version": "bedrock-2023-05-31", 
        "max_tokens": 2048,
        "system": system_prompt,
        "messages": [{
            "role": "user",
            "content": [
                {"type": "image", "source": {"type": "base64", "media_type": get_media_type(image_path), "data": image_base64}},
                {"type": "text", "text": f"User Query: '{query_text}'\n\nPlease analyze the provided document image based on the instructions in the system prompt."}
            ]}]
    }
    response = bedrock_client.invoke_model(
        body=json.dumps(bedrock_request_body), modelId=BEDROCK_MODEL_ID,
        contentType="application/json", accept="application/json"
    )
    response_body = json.loads(response.get("body").read())
    result = response_body.get("content", [{}])[0].get("text", "No response generated.")
    print(f"\n[FINAL LLM RESPONSE]:\n{result}")

# Set up the device (GPU or CPU)
device_map = "cpu"
if torch.backends.mps.is_available(): device_map = "mps"
elif torch.cuda.is_available(): device_map = "cuda:0"
print(f"Using device: {device_map}")

# Load the ColQwen model used for indexing.
MODEL_NAME = "tsystems/colqwen2.5-3b-multilingual-v1.0"
model = ColQwen2_5.from_pretrained(MODEL_NAME, torch_dtype=torch.bfloat16 if device_map != "cpu" else torch.float32, device_map=device_map).eval()
processor = ColQwen2_5_Processor.from_pretrained(MODEL_NAME)
print(f"Embedding model '{MODEL_NAME}' loaded successfully.")

def create_colqwen_query_vectors(query_text, model, processor):
    # Creates multi-vector embeddings for a text query.
    inputs = processor.process_queries([query_text]).to(model.device)
    with torch.no_grad(): outputs = model(**inputs)
    return outputs.cpu().to(torch.float32).numpy().tolist()[0]

def to_avg_vector(vectors):
    # Calculates a single, normalized average vector.
    vectors_array = np.array(vectors)
    avg_vector = np.mean(vectors_array, axis=0)
    norm = np.linalg.norm(avg_vector)
    return (avg_vector / norm).tolist() if norm > 0 else avg_vector.tolist()

Using device: cuda:0


Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
You have video processor config saved in `preprocessor.json` file which is deprecated. Video processor configs should be saved in their own `video_preprocessor.json` file. You can rename the file or load and save the processor back which renames it automatically. Loading from `preprocessor.json` will be removed in v5.0.


Embedding model 'tsystems/colqwen2.5-3b-multilingual-v1.0' loaded successfully.


### Step 4: Create and Display the Interactive RAG Chatbot

**[EN]** This cell creates an interactive UI using `ipywidgets`. You can select a search strategy, input a query, and trigger the full multimodal RAG pipeline.<br>
**[KR]** 이 셀은 `ipywidgets`를 사용하여 대화형 UI를 생성합니다. 검색 전략을 선택하고, 쿼리를 입력하여 전체 멀티모달 RAG 파이프라인을 실행할 수 있습니다.

In [4]:
import ipywidgets as widgets
from IPython.display import display

# --- 1. Define Constants and Main RAG Pipeline Function ---
SEARCH_MODES = {
    "A. Colpali(colqwen) RAG search (Part 1)": {
        "index": "colqwen-rvlcdip-demo-part1",
        "multi_vector_field": "colqwen_vectors"
    },
    "B. Average RAG search (Part 2 - KNN Only)": {
        "index": "colqwen-rvlcdip-demo-part2",
        "avg_vector_field": "colqwen_avg_vector"
    },
    "C. Rescore RAG search (Part 2 - KNN + Rescore)": {
        "index": "colqwen-rvlcdip-demo-part2",
        "avg_vector_field": "colqwen_avg_vector",
        "multi_vector_field": "colqwen_vectors"
    }
}

def run_rag_pipeline(query_text, search_mode):
    # This function contains the logic for all three search modes.
    mode_config = SEARCH_MODES[search_mode]
    index_name = mode_config["index"]
    es_query_body = None

    print(f"--- Running RAG for query: '{query_text}' (Mode: {search_mode}) ---")
    try:
        start_time = time.time()
        query_multi_vectors = create_colqwen_query_vectors(query_text, model, processor)
        if not query_multi_vectors: raise ValueError("Failed to create query vectors.")

        if search_mode.startswith("A."):
            es_query_body = {"size": 10, "query": {"script_score": {"query": {"match_all": {}}, "script": {"source": f"maxSimDotProduct(params.query_vector, '{mode_config['multi_vector_field']}')", "params": {"query_vector": query_multi_vectors}}}}, "_source": ["image_path", "category"]}
        elif search_mode.startswith("B."):
            query_avg_vector = to_avg_vector(query_multi_vectors)
            es_query_body = {"size": 10, "knn": {"field": mode_config['avg_vector_field'], "query_vector": query_avg_vector, "k": 200, "num_candidates": 500}, "_source": ["image_path", "category"]}
        else:
            query_avg_vector = to_avg_vector(query_multi_vectors)
            knn_query = {"field": mode_config['avg_vector_field'], "query_vector": query_avg_vector, "k": 200, "num_candidates": 500}
            rescore_definition = {"window_size": 50, "query": {"rescore_query": {"script_score": {"query": {"match_all": {}}, "script": {"source": f"maxSimDotProduct(params.query_vector, '{mode_config['multi_vector_field']}')", "params": {"query_vector": query_multi_vectors}}}}, "query_weight": 0.0, "rescore_query_weight": 1.0}}
            es_query_body = {"size": 10, "knn": knn_query, "rescore": rescore_definition, "_source": ["image_path", "category"]}

        response = es.search(index=index_name, body=es_query_body)
        end_time = time.time()
        latency_ms = (end_time - start_time) * 1000
        hits = response["hits"]["hits"]

        print(f"\n[VISUAL SEARCH RESULTS] - Retrieved {len(hits)} documents in {latency_ms:.2f} ms:")
        display_results(hits)
        
        # Generate answer from the top image using Bedrock
        generate_llm_answer_with_image(bedrock, query_text, hits)

    except Exception as e:
        print(f"\nAn error occurred: {e}")

# --- 2. Create and Display the UI Widgets ---
query_input = widgets.Text(placeholder='Enter your query here and press Enter', description='Query:', layout=widgets.Layout(width='95%'))
search_mode_selector = widgets.RadioButtons(options=list(SEARCH_MODES.keys()), description='Search Mode:', disabled=False, layout=widgets.Layout(width='max-content'))
output_area = widgets.Output()
example_queries = ["Do you have a benefits policy change notice from HR?", "HR에서 보내온 복리후생 정책 변경 안내문이 있나?"]
example_buttons = [widgets.Button(description=q, layout=widgets.Layout(width='auto')) for q in example_queries]
button_box = widgets.HBox([widgets.Label("Examples:")] + example_buttons)

def on_search_triggered(query):
    if not query: 
        with output_area: output_area.clear_output(); print("Please enter a query.")
        return
    with output_area: 
        output_area.clear_output(wait=True)
        run_rag_pipeline(query, search_mode_selector.value)

def handle_text_submit(sender): on_search_triggered(sender.value)
def handle_button_click(button): query_input.value = button.description; on_search_triggered(button.description)

query_input.on_submit(handle_text_submit)
for btn in example_buttons: btn.on_click(handle_button_click)

ui = widgets.VBox([search_mode_selector, button_box, query_input, output_area])
display(ui)

  query_input.on_submit(handle_text_submit)


VBox(children=(RadioButtons(description='Search Mode:', layout=Layout(width='max-content'), options=('A. Colpa…

### Step 5: Clean Up Memory (Optional)

**[EN]** As a best practice, explicitly delete the model and processor to free up GPU or system memory after the demonstration is complete.<br>
**[KR]** 모범 사례로서, 데모가 완료된 후 모델과 프로세서를 명시적으로 삭제하여 GPU 또는 시스템 메모리를 확보합니다.

In [5]:
import gc

try:
    del model
    del processor
    print("Model and processor variables deleted.")
except NameError:
    print("Model and processor variables not found, skipping deletion.")

if 'torch' in locals() and torch.cuda.is_available():
    torch.cuda.empty_cache()
    print("CUDA cache cleared.")
elif 'torch' in locals() and torch.backends.mps.is_available():
    torch.mps.empty_cache()
    print("MPS cache cleared.")

# Call Python's garbage collector to clean up memory.
gc.collect()
print("Memory cleanup complete.")

Model and processor variables deleted.
CUDA cache cleared.
Memory cleanup complete.
