In [1]:
from dotenv import load_dotenv
import torch
import io
from rag_system.embedding import load_sentence_embedding_model, load_clip_model
from rag_system.faiss_utils import load_faiss_index
from rag_system.mongo_utils import init_mongo_collections
from rag_system.prompt_builder import init_gemini_client
from rag_system.multimodal_rag import MultimodalRAG
from rag_system.ingest import batch_ingest_all
from rag_system.utils import normalize_embedding
import numpy as np
from PIL import Image
from config import (
    FAISS_INDEX_PATH,
    DB_NAME,
    GEMINI_API_KEY,
    GEMINI_MODEL_NAME,
    TOP_K,
    FAISS_DIM,
    TEXT_COLLECTION_NAME,
    MONGO_URI,
    IMAGE_COLLECTION_NAME,
)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:

# Initialize all models, clients, and collections using the modular rag_system code
load_dotenv()

gemini_client = init_gemini_client(GEMINI_API_KEY)
faiss_index = load_faiss_index(FAISS_INDEX_PATH)
mongo_client, text_col, image_col = init_mongo_collections()
text_model = load_sentence_embedding_model()
clip_model, clip_processor = load_clip_model()

rag = MultimodalRAG(
    gemini_client=gemini_client,
    faiss_index=faiss_index,
    text_collection=text_col,
    image_collection=image_col,
    text_model=text_model,
    clip_model=clip_model,
    clip_processor=clip_processor,
    gemini_model_name=GEMINI_MODEL_NAME,
    top_k=TOP_K,
)

In [3]:
import faiss
def init_faiss_index(dim: int = FAISS_DIM):
    """
    If an index already exists on disk at FAISS_INDEX_PATH, delete it and create a new empty IndexFlatIP.
    Otherwise, create a new one. Returns the new FAISS index object.
    """
    # Optionally, you can remove the old file to start fresh:
    # Create a new raw IndexFlatIP
    index = faiss.IndexFlatIP(dim)
    # Optionally, you can save the new index to disk
    faiss.write_index(index, FAISS_INDEX_PATH)
    return index
init_faiss_index()

<faiss.swigfaiss_avx2.IndexFlatIP; proxy of <Swig Object of type 'faiss::IndexFlatIP *' at 0x775252c387b0> >

In [4]:
def get_mongo_cursor(collection, batch_size: int):
    """
    Return a PyMongo cursor over the entire collection,
    with a specified batch_size for server‐side pagination.
    """
    return collection.find({}, batch_size=batch_size)

cursor = get_mongo_cursor(text_col, batch_size=100)

In [5]:

def embed_text_chunks(rag, docs: list) -> np.ndarray:
    """
    Given a list of text_docs (each has "text_chunk"), embed all with SentenceTransformer in one batch.
    Returns a (N, dim) numpy array of normalized float32 embeddings.
    """
    chunks = [d["text_chunk"] for d in docs]
    with torch.no_grad():
        embs = rag.text_model.encode(chunks, convert_to_numpy=True)
    # Normalize each vector
    embs = np.vstack([normalize_embedding(e) for e in embs]).astype(np.float32)
    return embs



In [6]:
def embed_image_docs(rag, docs: list) -> np.ndarray:
    """
    Given a list of image_docs, each with:
      - type == "image": use rag.embed_image on the downloaded image
      - type in ("ocr", "gemini_desc", "alt_text"): embed the corresponding text field
    Returns a (N, dim) numpy array of normalized float32 embeddings.
    """
    embs = []
    for doc in docs:
        mtype = doc.get("type")
        if mtype == "image":
            url = doc.get("image_url", "")
            img_bytes = rag.download_image_bytes(url)
            if not img_bytes:
                # fallback zero‐vector (will be near zero)
                dim = FAISS_DIM
                embs.append(np.zeros(dim, dtype=np.float32))
                continue
            try:
                pil = Image.open(io.BytesIO(img_bytes)).convert("RGB")
                emb = rag.embed_image(pil)
                embs.append(emb.astype(np.float32))
            except Exception as e:
                print(f"Failed embedding for {mtype}: {e}")
                dim = FAISS_DIM
                embs.append(np.zeros(dim, dtype=np.float32))
        else:
            # “ocr”, “gemini_desc”, or “alt_text”
            text = ""
            if mtype == "ocr":
                text = doc.get("ocr_text", "")
            elif mtype == "gemini_desc":
                text = doc.get("desc_text", "")
            elif mtype == "alt_text":
                text = doc.get("alt_text", "")
            try:
                emb = rag.embed_text(text)
                embs.append(emb.astype(np.float32))
            except Exception as e:
                print(f"Failed embedding for {mtype}: {e}")
                dim = FAISS_DIM
                embs.append(np.zeros(dim, dtype=np.float32))

    return np.vstack(embs)

In [7]:
def _flush_to_faiss_and_mongo(rag, 
                              text_embs: np.ndarray, text_docs: list,
                              img_embs: np.ndarray, img_docs: list):
    """
    1) Bulk-add text_embs (shape: [N_text, dim]) to FAISS, assign vector_id to text_docs.
    2) Bulk-add img_embs  (shape: [N_img,  dim]) to FAISS, assign vector_id to img_docs.
    3) Bulk-update MongoDB docs: set new vector_id for each.
    """
    # 1) Flush text embeddings
    if len(text_embs) > 0:
        start = rag.faiss_index.ntotal
        rag.faiss_index.add(text_embs)
        # Now assign vector_id back to text_docs
        for i, doc in enumerate(text_docs):
            vid = start + i
            doc["vector_id"] = int(vid)

        # Bulk‐write updated vector_id fields into MongoDB
        for doc in text_docs:
            _id = doc["_id"]
            new_vid = doc["vector_id"]
            # write each doc back to MongoDB
            rag.text_collection.update_one(
                {"_id": _id},
                {"$set": {"vector_id": new_vid}}
            )


    # 2) Flush image embeddings
    if len(img_embs) > 0:
        start = rag.faiss_index.ntotal
        rag.faiss_index.add(img_embs)
        for i, doc in enumerate(img_docs):
            vid = start + i
            doc["vector_id"] = int(vid)

        for doc in img_docs:
            _id = doc["_id"]
            new_vid = doc["vector_id"]
            # write each doc back to MongoDB
            rag.image_collection.update_one(
                {"_id": _id},
                {"$set": {"vector_id": new_vid}}
            )
    print(f"Flushed {len(text_embs)} text and {len(img_embs)} image embeddings to FAISS and MongoDB.")
    # 3) Flush text and image docs to MongoDB
    # (this is already done in the above loops)

In [8]:
from pymongo import MongoClient
def reindex_all(rag, batch_size: int = 100, mini_batch_size: int = 500):
    """
    1) Initializes a new empty FAISS index.
    2) Reads text_chunks and image_embeddings collections in batches (batch_size).
    3) Accumulates embeddings + docs in lists.
    4) Every time (len(text_embs) + len(img_embs)) >= mini_batch_size, calls _flush_to_faiss_and_mongo.
    5) After all documents, does one final flush and saves FAISS index.
    """
    # 1) Initialize new FAISS index
    rag.faiss_index = init_faiss_index(dim=FAISS_DIM)

    # 2) Connect to Mongo collections
    client = MongoClient(MONGO_URI)
    db = client[DB_NAME]
    text_col = db[TEXT_COLLECTION_NAME]
    img_col  = db[IMAGE_COLLECTION_NAME]

    # Initialize accumulators
    all_text_docs = []
    all_text_embs = []
    all_img_docs  = []
    all_img_embs  = []

    # 3) Iterate text_docs in pages
    text_cursor = get_mongo_cursor(text_col, batch_size=batch_size)
    for text_doc in text_cursor:
        all_text_docs.append(text_doc)
        # We'll embed in mini‐batches below

        # When we reach mini_batch size, embed & flush
        if len(all_text_docs) >= mini_batch_size:
            # Embed this mini‐batch of text  
            text_emb_batch = embed_text_chunks(rag, all_text_docs)
            # Clear the accumulator after copying to local
            tdocs = all_text_docs.copy()
            all_text_docs.clear()

            # No images yet, so img_embs and img_docs empty for this flush
            _flush_to_faiss_and_mongo(rag, text_emb_batch, tdocs, np.zeros((0, FAISS_DIM), dtype=np.float32), [])

    # After text cursor, if any remain
    if all_text_docs:
        text_emb_batch = embed_text_chunks(rag, all_text_docs)
        tdocs = all_text_docs.copy()
        all_text_docs.clear()
        _flush_to_faiss_and_mongo(rag, text_emb_batch, tdocs, np.zeros((0, FAISS_DIM), dtype=np.float32), [])

    # 4) Iterate image_docs in pages
    img_cursor = get_mongo_cursor(img_col, batch_size=batch_size)
    for img_doc in img_cursor:
        all_img_docs.append(img_doc)

        if len(all_img_docs) >= mini_batch_size:
            # Embed this mini-batch of images
            img_emb_batch = embed_image_docs(rag, all_img_docs)
            idocs = all_img_docs.copy()
            all_img_docs.clear()

            # No text in this flush; pass empty arrays for text
            _flush_to_faiss_and_mongo(rag, np.zeros((0, FAISS_DIM), dtype=np.float32), [],
                                      img_emb_batch, idocs)

    # Final flush for any remaining images
    if all_img_docs:
        img_emb_batch = embed_image_docs(rag, all_img_docs)
        idocs = all_img_docs.copy()
        all_img_docs.clear()
        _flush_to_faiss_and_mongo(rag, np.zeros((0, FAISS_DIM), dtype=np.float32), [],
                                  img_emb_batch, idocs)

    # 5) Save the rebuilt FAISS index once at the end
    rag.faiss_index.save(FAISS_INDEX_PATH)
    print(f"Reindexed all documents. FAISS index saved to {FAISS_INDEX_PATH}.")

In [10]:
# Example: Query the filled RAG system
query = "Give me a summary of the latest news on AI ethics."
results = rag.query_and_generate(query)
print("Query:", query)
print("Results:")
print(results)

Query: Give me a summary of the latest news on AI ethics.
Results:
Here's a summary of the latest news on AI ethics:

There is a growing need for more actionable AI ethics guidelines developed by the AI community itself, not just corporations and governments, to ensure the community's values are reflected and followed. As mentioned in ["AI ethics must be actionable"](https://www.deeplearning.ai/the-batch/ai-ethics-must-be-actionable), deeplearning.ai hosted a Pie & AI event on AI and ethics across four cities, and participants suggested actionable ethics statements for AI engineers.

However, many companies investing in AI are not adequately addressing the ethical issues and social biases that AI raises. As reported in ["Irresponsible AI"](https://www.deeplearning.ai/the-batch/irresponsible-ai), a Fico report revealed that while AI investments are growing, efforts to ensure AI is ethical, responsible, and free of bias are not keeping pace. A substantial percentage of large companies ar