In [None]:
!pip install faiss-cpu google-genai python-dotenv

In [None]:
from pathlib import Path

data_dir = Path("./data")
print(f"Embeddings exist: {(data_dir / 'embeddings/embeddings.pt').exists()}")
print(f"Metadata exists: {(data_dir / 'embeddings/metadata.json').exists()}")
print(f"Images count: {len(list((data_dir / 'page_images').glob('*.png')))}")

In [None]:
import torch
import faiss
import numpy as np
import json
from pathlib import Path
from google import genai
from PIL import Image
import os
from dotenv import load_dotenv

# Load environment
load_dotenv()

class UrbanPlanningRAG:
    """Complete RAG system with ColQwen + FAISS + Gemini"""
    
    def __init__(self, data_dir: str = "./data"):
        self.data_dir = Path(data_dir)
        self.embeddings_path = self.data_dir / "embeddings" / "embeddings.pt"
        self.metadata_path = self.data_dir / "embeddings" / "metadata.json"
        self.images_dir = self.data_dir / "page_images"
        
        print("=" * 60)
        print("üöÄ Initializing Urban Planning RAG System")
        print("=" * 60)
        
        # Load embeddings and metadata
        print("\nüìÇ Loading embeddings...")
        self.embeddings_data = torch.load(self.embeddings_path, map_location='cpu')
        
        print("üìÇ Loading metadata...")
        with open(self.metadata_path, 'r') as f:
            self.metadata = json.load(f)
        
        # Build FAISS index
        print("üóÑÔ∏è  Building FAISS index...")
        self._build_faiss_index()
        
        # Load ColQwen for query encoding
        print("üì¶ Loading ColQwen for query encoding...")
        self._load_query_encoder()
        
        # Initialize Gemini
        print("ü§ñ Initializing Gemini VLM...")
        self._init_gemini()
        
        print(f"\n‚úÖ RAG system ready with {len(self.metadata)} pages indexed")
        print("=" * 60)
    
    def _build_faiss_index(self):
        """Build FAISS index from embeddings"""
        embeddings_list = []
        
        for idx in range(len(self.metadata)):
            page_embedding = self.embeddings_data[idx].float()
            avg_embedding = page_embedding.mean(dim=0).numpy()
            embeddings_list.append(avg_embedding)
        
        embeddings_matrix = np.vstack(embeddings_list).astype('float32')
        self.embedding_dim = embeddings_matrix.shape[1]
        
        # Create FAISS index
        self.index = faiss.IndexFlatIP(self.embedding_dim)
        faiss.normalize_L2(embeddings_matrix)
        self.index.add(embeddings_matrix)
        
        print(f"  ‚úÖ Indexed {self.index.ntotal} pages")
    
    def _load_query_encoder(self):
        """Load ColQwen model for query encoding"""
        from transformers import AutoModel, AutoProcessor
        
        MODEL_ID = "TomoroAI/tomoro-colqwen3-embed-8b"
        
        self.processor = AutoProcessor.from_pretrained(
            MODEL_ID,
            trust_remote_code=True,
            max_num_visual_tokens=1280,
        )
        self.model = AutoModel.from_pretrained(
            MODEL_ID,
            torch_dtype=torch.bfloat16,
            attn_implementation="sdpa",
            trust_remote_code=True,
            device_map="cpu",                       #cpu/cuda
        ).eval()
        
        print(f"  ‚úÖ ColQwen loaded")
    
    def _init_gemini(self):
        """Initialize Gemini client"""
        api_key = os.getenv('GEMINI_API_KEY')
        if not api_key:
            raise ValueError("GEMINI_API_KEY not found in .env")
        
        self.gemini_client = genai.Client(api_key=api_key)
        print(f"  ‚úÖ Gemini client ready")
    
    def encode_query(self, query: str):
        """Encode text query using ColQwen"""
        features = self.processor.process_texts([query])
        features = {k: v.to("cpu") if isinstance(v, torch.Tensor) else v #cpu/cuda
                   for k, v in features.items()}
        
        with torch.inference_mode():
            out = self.model(**features)
            query_vec = out.embeddings[0].float().mean(dim=0).cpu().numpy()
        
        return query_vec
    
    def retrieve(self, query: str, top_k: int = 3):
        """Retrieve top-k relevant pages"""
        print(f"\nüîç Query: '{query}'")
        print(f"üìä Retrieving top {top_k} pages...")
        
        # Encode query
        query_vec = self.encode_query(query)
        query_norm = query_vec.reshape(1, -1).astype('float32')
        faiss.normalize_L2(query_norm)
        
        # Search
        distances, indices = self.index.search(query_norm, top_k)
        
        # Format results
        results = []
        for i in range(len(indices[0])):
            idx = indices[0][i]
            if idx < len(self.metadata):
                item = self.metadata[idx]
                source_name = item['source'].replace('.pdf', '').replace(' ', '_').lower()
                image_filename = f"{source_name}__page_{item['page']:04d}.png"
                
                results.append({
                    'source': item['source'],
                    'page': item['page'],
                    'total_pages': item['total_pages'],
                    'image_path': str(self.images_dir / image_filename),
                    'similarity': float(distances[0][i])
                })
        
        return results
    
    def answer_query(self, query: str, top_k: int = 3):
        """Complete RAG: Retrieve + Generate answer"""
        
        # Retrieve relevant pages
        retrieved = self.retrieve(query, top_k=top_k)
        
        print(f"\nüìã Retrieved pages:")
        for i, r in enumerate(retrieved, 1):
            print(f"  {i}. {r['source']} - Page {r['page']} (similarity: {r['similarity']:.3f})")
        
        # Load page images
        print(f"\nüñºÔ∏è  Loading page images...")
        page_images = []
        for r in retrieved:
            img_path = Path(r['image_path'])
            if img_path.exists():
                page_images.append(Image.open(img_path))
        
        if not page_images:
            return "‚ùå No valid page images found"
        
        # Generate answer with Gemini
        print(f"ü§ñ Generating answer with Gemini...")
        
        prompt = f"""You are an expert in Indian urban planning regulations. 

Question: {query}

I've provided {len(page_images)} relevant pages from planning documents. Please:
1. Answer the question based on the provided pages
2. Cite which page number contains the information
3. If the information is not in the provided pages, say so

Be concise and specific."""

        response = self.gemini_client.models.generate_content(
            model='gemini-3-flash-preview',
            contents=[prompt] + page_images
        )
        
        return response.text

# Initialize RAG system
rag = UrbanPlanningRAG()

In [None]:
# Test it!
answer = rag.answer_query("what are indicators of good governace", top_k=3)

print("\n" + "=" * 60)
print("üìù ANSWER:")
print("=" * 60)
print(answer)