<a href="https://colab.research.google.com/github/vinaykrshnn-git2026/advanced-rag/blob/main/09_utility_code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## General Purpose Utility

In [None]:
!git clone https://github.com/vinaykrshnn-git2026/advanced-rag-refactored.git
%cd advanced-rag-refactored
!pip install -q -r requirement_rag_refactored.txt

In [None]:
# Mount Google Drive

from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Copy all pdf files from Drive to RAG Labs

import os
import shutil
import glob
from pathlib import Path
import fnmatch

# Define source and destination root paths within your mounted Drive
# Replace 'SourceFolder' and 'DestinationFolder' with your actual folder names/paths
SRC_ROOT = '/content/drive/MyDrive'
DEST_ROOT = '/content/drive/MyDrive/RAG_Labs/pdf_files'

# 2. Create destination folder if it doesn't exist
if not os.path.exists(DEST_ROOT):
    os.makedirs(DEST_ROOT)
    print(f"Created folder: {DEST_ROOT}")

# 3. Recursive copy with flattening and skipping
files_copied = 0
files_skipped = 0

for root, dirs, files in os.walk(SRC_ROOT):
    for file in files:
        if file.lower().endswith('.pdf'):
            source_file_path = os.path.join(root, file)
            destination_file_path = os.path.join(DEST_ROOT, file)

            # Check if file exists to skip
            if not os.path.exists(destination_file_path):
                shutil.copy2(source_file_path, destination_file_path)
                print(f"Copied: {file}")
                files_copied += 1
            else:
                print(f"Skipped (exists): {file}")
                files_skipped += 1

print(f"\nSummary:\nCopied: {files_copied}\nSkipped: {files_skipped}")



## Upsert a new image to existing Qdrant collection

In [None]:
from qdrant_client import QdrantClient
from qdrant_client.http.exceptions import UnexpectedResponse
from colpali_engine.models import ColPali, ColPaliProcessor
from google.colab import userdata
import torch

#####################################################################
#   Initializing Cloud Qdrant collection
#####################################################################


from qdrant_client import QdrantClient

# Replace these with your actual Cloud credentials
QDRANT_URL = "https://f7369634-b961-4d15-ba60-8b230e810658.us-east4-0.gcp.cloud.qdrant.io"

try:
    # Initialize the Cloud Client
    qdrant_client = QdrantClient(
        url=QDRANT_URL,
        api_key=userdata.get('QDRANT_API_KEY'),
    )
    print("Connected to Qdrant Cloud!")
except Exception as e:
    print(f"Cloud connection failed: {e}")



#####################################################################
#   Initializing Colpali
#####################################################################


# Initialize ColPali model and processor
model_name = (
    "vidore/colpali-v1.2"  # Use the latest version available
)
colpali_model = ColPali.from_pretrained(
    model_name,
    torch_dtype=torch.bfloat16,
    device_map="cuda:0",  # Use "cuda:0" for GPU, "cpu" for CPU, or "mps" for Apple Silicon
)
colpali_processor = ColPaliProcessor.from_pretrained(
    "vidore/colpaligemma-3b-pt-448-base"
)


In [None]:
import PIL.Image
import torch
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct
import base64
import uuid
from io import BytesIO

# 1. Load your image
image_path = "/content/drive/MyDrive/PHOTO-2022-01-12-09-24-39.jpg"
image = PIL.Image.open(image_path).convert("RGB")

# 2. Encode to Base64 (to store in Qdrant payload so the UI can display it later)
buffered = BytesIO()
image.save(buffered, format="PNG")
base64_string = base64.b64encode(buffered.getvalue()).decode("utf-8")

# 3. Generate ColPali Embeddings
# Use the same processor/model currently in your 'models' dictionary
with torch.no_grad():
    batch_images = colpali_processor.process_images([image]).to(colpali_model.device)
    image_embeddings = colpali_model(**batch_images)
    # Convert to list for Qdrant and flatten the list
    vector = image_embeddings.cpu().float().numpy().tolist()[0]


# 4. Upsert to Qdrant
unique_id = str(uuid.uuid4())
try:
      qdrant_client.upsert(
          collection_name="identity_documents",
          points=[
              PointStruct(
                  id=unique_id, # A unique integer or UUID
                  vector=vector,
                  payload={
                      "doc": "manual_upload",
                      "page": 1,
                      "base64_image": base64_string
                  }
              )
          ]
      )
except Exception as e:
                print(f"Error during upsert: {e}")

In [None]:
#####################################
###### CODE TO DISPLAY IMAGES IN THE SEARCH RESULT   ###########
######################################

import matplotlib.pyplot as plt
from IPython import display
from base64 import b64decode
from PIL import Image
from io import BytesIO

# Extract the top images from the search result for display
top_images = search_result.points[:6] # Adjust limit as needed, up to 6 for 2x3 grid

# Determine number of images to display and create subplots
num_images = len(top_images)
if num_images == 0:
    print("No images to display.")
else:
    # Calculate rows and columns for subplot grid
    cols = 3 # Max 3 columns for better readability
    rows = (num_images + cols - 1) // cols # Calculate rows needed

    fig, axs = plt.subplots(rows, cols, figsize=(cols * 5, rows * 5))
    axs = axs.flatten() # Flatten the array for easy iteration

    # Iterate over the top images and plot each one
    for i, point in enumerate(top_images):
        base64_string = point.payload.get('base64_image')
        if base64_string:
            image_data = b64decode(base64_string)
            img = Image.open(BytesIO(image_data))
            axs[i].imshow(img)
            pdf_file = point.payload.get('doc', 'N/A')
            page_num = point.payload.get('page', 'N/A')
            axs[i].set_title(f"Score: {point.score:.2f}\nDoc: {pdf_file}, Page: {page_num}")
            axs[i].axis('off')  # Do not display axes for better visualization
        else:
            axs[i].set_title("Image not found")
            axs[i].axis('off')

    # Hide any unused subplots
    for j in range(i + 1, len(axs)):
        fig.delaxes(axs[j])

    plt.tight_layout()
    plt.show()

In [None]:
#####################################
###### CODE TO PRINT ALL DOCUMENTS IN COLLECTION   ###########
######################################

def get_unique_doc_names(collection_name="rag_documents_v2"):
    doc_names = set()
    next_page_offset = None

    while True:
        # Scroll through the collection
        points, next_page_offset = qdrant_client.scroll(
            collection_name=collection_name,
            limit=100, # Adjust batch size as needed
            with_payload=["doc"], # Only fetch the 'doc' field to save bandwidth
            with_vectors=False,   # We don't need the vectors for this
            offset=next_page_offset
        )

        for point in points:
            if "doc" in point.payload:
                doc_names.add(point.payload["doc"])

        # If next_page_offset is None, we've reached the end
        if next_page_offset is None:
            break

    return list(doc_names)

# Usage
unique_docs = get_unique_doc_names()
print(f"Documents found: {unique_docs}")

In [None]:
#####################################
###### CODE TO UPSERT A NEW PDF INTO AN EXISTING COLLECTION   ###########
######################################

import torch
from pdf2image import convert_from_path
from qdrant_client import QdrantClient
from qdrant_client.http import models
from colpali_engine.models import ColPali
#from colpali_engine.processor import ColPaliProcessor
from colpali_engine.models import ColPali, ColPaliProcessor
import base64
import uuid
from io import BytesIO

def process_and_upsert_pdf(pdf_path: str):
    # 2. Convert PDF to Images (Generates one PIL image per page)
    # Using 150 DPI is a good balance for ColPali
    images = convert_from_path(pdf_path, dpi=150)

    points = []

    for i, image in enumerate(images):
        page_num = i + 1
        print(f"Processing page {page_num}...")

        # 3. Generate ColPali Embeddings
        with torch.no_grad():
            batch_images = colpali_processor.process_images([image]).to(colpali_model.device)
            image_embeddings = colpali_model(**batch_images)

            #Access the attention mask
            mask = batch_images.attention_mask
            # Remove batch dim and move to CPU for Qdrant
            #multivector = image_embeddings[0].cpu().float().numpy().tolist()

        # 2. Prepare points with Base64 payloads
            points = []

            # If image_embeddings is a list, we iterate through it directly
            for j, embedding in enumerate(image_embeddings):
                # Determine the number of non-padding tokens for this specific image
                actual_num_patches = mask[j].sum().item()

                # Filter the embedding to only include 'real' visual patches before converting to list
                # This prevents 'diluting' the search score with empty padding vectors
                filtered_embedding = embedding[:actual_num_patches].cpu().float().numpy().tolist()

                # --- Convert PIL image to Base64 string ---
                buffered = BytesIO()
                images[j].save(buffered, format="PNG")
                base64_string = base64.b64encode(buffered.getvalue()).decode("utf-8")

                unique_id = str(uuid.uuid4())

                points.append(
                    models.PointStruct(
                        id=unique_id,
                        vector=filtered_embedding, # Use the filtered multivector
                        payload={
                            "doc": "Ishya_Passport",
                            "page": i + j + 1,
                            "base64_image": base64_string
                        },
                    )
                )

           # 3. Upsert to Qdrant Cloud
            try:
                qdrant_client.upsert(
                    collection_name="rag_documents_v2",
                    points=points
                )
            except Exception as e:
                print(f"Error during upsert: {e}")
                continue

    print(f"Finished upserting {pdf_path}")

# Run the pipeline
process_and_upsert_pdf("/content/advanced-rag/data/Ishya Passport 2026.pdf")


In [None]:
#####################################
###### CODE TO UPSERT A NEW IMAGE INTO AN EXISTING COLLECTION   ###########
######################################

import PIL.Image
import torch
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct
import base64
import uuid
from io import BytesIO

# 1. Load your image
image_path = "/content/drive/MyDrive/RAG_Labs/pdf_files/medicare_files/savi_medicare_img.jpg"
image = PIL.Image.open(image_path).convert("RGB")

# 2. Encode to Base64 (to store in Qdrant payload so the UI can display it later)
buffered = BytesIO()
image.save(buffered, format="PNG")
base64_string = base64.b64encode(buffered.getvalue()).decode("utf-8")

# 3. Generate ColPali Embeddings
# Use the same processor/model currently in your 'models' dictionary
with torch.no_grad():
    batch_images = colpali_processor.process_images([image]).to(colpali_model.device)
    image_embeddings = colpali_model(**batch_images)
    # Convert to list for Qdrant and flatten the list
    vector = image_embeddings.cpu().float().numpy().tolist()[0]


# 4. Upsert to Qdrant
unique_id = str(uuid.uuid4())
try:
      qdrant_client.upsert(
          collection_name="rag_documents_v2",
          points=[
              PointStruct(
                  id=unique_id, # A unique integer or UUID
                  vector=vector,
                  payload={
                      "doc": "manual_upload",
                      "page": 1,
                      "base64_image": base64_string
                  }
              )
          ]
      )
except Exception as e:
                print(f"Error during upsert: {e}")