In [1]:
# Install packages for multi-format loading + indexing
# - Checks which packages are already installed
# - Installs only missing ones
import sys
import subprocess
import importlib.util

REQUIRED_PACKAGES = {
    "sentence-transformers": "sentence_transformers",
    "faiss-cpu": "faiss",
    "transformers": "transformers",
    "accelerate": "accelerate",
    "pypdf": "pypdf",
    "python-docx": "docx",
    "beautifulsoup4": "bs4",
    "rank-bm25": "rank_bm25",
    "tqdm": "tqdm",
    "streamlit": "streamlit",
    "requests": "requests"
}

def is_installed(module_name: str) -> bool:
    return importlib.util.find_spec(module_name) is not None

missing = []
for pip_name, module_name in REQUIRED_PACKAGES.items():
    if not is_installed(module_name):
        missing.append(pip_name)

if missing:
    print("ðŸ“¦ Installing missing packages:", missing)
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", *missing])
    print("âœ… Installation complete")
else:
    print("âœ… All required packages already installed")



âœ… All required packages already installed


In [2]:
# Define project folders

from pathlib import Path
import sys

if "google.colab" in sys.modules:
    # Running in Google Colab
    from google.colab import drive
    drive.mount('/content/drive')
    BASE = Path("/content/drive/MyDrive/IR_RAG_App")
    print("âœ… Running in Google Colab")
else:
    # Running locally (Jupyter / VS Code)
    BASE = Path("./IR_RAG_App")
    print("âœ… Running locally")

DATA_DIR    = BASE / "data_raw"
STORAGE_DIR = BASE / "storage"
APP_DIR     = BASE / "webapp"

DATA_DIR.mkdir(parents=True, exist_ok=True)
STORAGE_DIR.mkdir(parents=True, exist_ok=True)
APP_DIR.mkdir(parents=True, exist_ok=True)

print("BASE:", BASE.resolve())
print("DATA_DIR:", DATA_DIR.resolve())
print("STORAGE_DIR:", STORAGE_DIR.resolve())
print("APP_DIR:", APP_DIR.resolve())


âœ… Running locally
BASE: C:\Users\reychel\Documents\GitHub\food-rag-web\IR_RAG_App
DATA_DIR: C:\Users\reychel\Documents\GitHub\food-rag-web\IR_RAG_App\data_raw
STORAGE_DIR: C:\Users\reychel\Documents\GitHub\food-rag-web\IR_RAG_App\storage
APP_DIR: C:\Users\reychel\Documents\GitHub\food-rag-web\IR_RAG_App\webapp


In [3]:
# Load documents from DATA_DIR (supports txt/pdf/docx/html)

from pathlib import Path
from typing import List, Dict
import re

from pypdf import PdfReader
import docx
from bs4 import BeautifulSoup

SUPPORTED_EXT = {".txt", ".md", ".pdf", ".docx", ".html", ".htm"}

def clean_text(text: str) -> str:
    text = text.replace("\x00", " ")
    text = re.sub(r"[ \t]+", " ", text)
    text = re.sub(r"\n{3,}", "\n\n", text)
    return text.strip()

def load_txt_file(path: Path) -> str:
    return clean_text(path.read_text(encoding="utf-8", errors="ignore"))

def load_pdf_file(path: Path) -> str:
    reader = PdfReader(str(path))
    pages = [(p.extract_text() or "") for p in reader.pages]
    return clean_text("\n".join(pages))

def load_docx_file(path: Path) -> str:
    d = docx.Document(str(path))
    return clean_text("\n".join(p.text for p in d.paragraphs))

def load_html_file(path: Path) -> str:
    html = path.read_text(encoding="utf-8", errors="ignore")
    soup = BeautifulSoup(html, "html.parser")
    for tag in soup(["script", "style", "nav", "footer", "header"]):
        tag.decompose()
    return clean_text(soup.get_text("\n"))

def load_one_file(path: Path) -> str:
    ext = path.suffix.lower()
    if ext in {".txt", ".md"}:
        return load_txt_file(path)
    if ext == ".pdf":
        return load_pdf_file(path)
    if ext == ".docx":
        return load_docx_file(path)
    if ext in {".html", ".htm"}:
        return load_html_file(path)
    return ""

def load_documents(data_dir: Path) -> List[Dict]:
    docs = []
    for p in sorted(data_dir.glob("**/*")):
        if p.is_dir():
            continue
        if p.suffix.lower() not in SUPPORTED_EXT:
            continue

        text = load_one_file(p)
        if len(text) < 400:   # filter tiny/noisy docs
            continue

        docs.append({
            "source": p.name,
            "path": str(p),
            "ext": p.suffix.lower(),
            "text": text
        })
    return docs

# Quick preview
docs = load_documents(DATA_DIR)
print(f"âœ… Loaded {len(docs)} documents from: {DATA_DIR}")
if docs:
    print("Example:", docs[0]["source"], "| ext:", docs[0]["ext"], "| chars:", len(docs[0]["text"]))


âœ… Loaded 0 documents from: IR_RAG_App\data_raw


In [4]:
# Chunk the documents (chunking improves retrieval + generation)

CHUNK_SIZE = 900
CHUNK_OVERLAP = 150

def chunk_text(text: str, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP):
    chunks = []
    n = len(text)
    step = max(1, chunk_size - overlap)
    start = 0
    while start < n:
        end = min(start + chunk_size, n)
        c = text[start:end].strip()
        if c:
            chunks.append(c)
        start += step
    return chunks

chunks = []
for d in docs:
    parts = chunk_text(d["text"])
    for i, c in enumerate(parts):
        chunks.append({
            "source": d["source"],
            "path": d["path"],
            "ext": d["ext"],
            "chunk_id": f"{d['source']}::chunk{i}",
            "text": c
        })

print(f"âœ… Created {len(chunks)} chunks from {len(docs)} documents")
print("Preview:", chunks[0]["chunk_id"], "|", chunks[0]["text"][:180], "...")


âœ… Created 0 chunks from 0 documents


IndexError: list index out of range

In [None]:
# Build embeddings + FAISS index and save to STORAGE_DIR (persistent)
# Re-run this block whenever you add new files.

import json
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer

EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"

INDEX_PATH = STORAGE_DIR / "faiss.index"
META_PATH  = STORAGE_DIR / "chunks_meta.json"
EMB_PATH   = STORAGE_DIR / "chunk_embs.npy"  # saved embeddings for MMR/rerank later

embedder = SentenceTransformer(EMBED_MODEL)

texts = [c["text"] for c in chunks]
X = embedder.encode(
    texts,
    batch_size=64,
    show_progress_bar=True,
    normalize_embeddings=True
).astype("float32")

index = faiss.IndexFlatIP(X.shape[1])  # cosine similarity using normalized vectors
index.add(X)

faiss.write_index(index, str(INDEX_PATH))
META_PATH.write_text(json.dumps(chunks, ensure_ascii=False, indent=2), encoding="utf-8")
np.save(EMB_PATH, X)

print("âœ… Embeddings shape:", X.shape)
print("âœ… FAISS index size:", index.ntotal)
print("âœ… Saved index:", INDEX_PATH)
print("âœ… Saved metadata:", META_PATH)
print("âœ… Saved embeddings:", EMB_PATH)


In [None]:
# Retrieval = Hybrid (BM25 + FAISS) + Rerank
from rank_bm25 import BM25Okapi
from sentence_transformers import CrossEncoder
import numpy as np
import re, json, faiss

# load persistent artifacts
index = faiss.read_index(str(INDEX_PATH))
chunks_meta = json.loads(META_PATH.read_text(encoding="utf-8"))

def tokenize(text):
    text = text.lower()
    text = re.sub(r"[^a-z0-9\s]", " ", text)
    return [t for t in text.split() if len(t) > 2]

# BM25 index
bm25 = BM25Okapi([tokenize(c["text"]) for c in chunks_meta])

# Reranker model
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

def retrieve_hybrid(query, faiss_k=30, bm25_k=30, alpha=0.6):
    qv = embedder.encode([query], normalize_embeddings=True).astype("float32")
    faiss_scores, faiss_ids = index.search(qv, faiss_k)
    faiss_scores, faiss_ids = faiss_scores[0], faiss_ids[0]

    q_tokens = tokenize(query)
    bm25_scores = bm25.get_scores(q_tokens)
    bm25_top = np.argsort(bm25_scores)[::-1][:bm25_k]

    # normalize for merge
    faiss_s = (faiss_scores - faiss_scores.min()) / (faiss_scores.max() - faiss_scores.min() + 1e-9)
    bm25_s = (bm25_scores - bm25_scores.min()) / (bm25_scores.max() - bm25_scores.min() + 1e-9)

    cand = set(map(int, faiss_ids.tolist())) | set(map(int, bm25_top.tolist()))
    faiss_map = {int(i): float(s) for i, s in zip(faiss_ids, faiss_s)}

    merged = []
    for cid in cand:
        score = alpha * faiss_map.get(cid, 0.0) + (1 - alpha) * float(bm25_s[cid])
        item = chunks_meta[cid]
        merged.append({**item, "score": float(score)})

    merged.sort(key=lambda x: x["score"], reverse=True)
    return merged

def rerank(query, docs, top_n=5):
    pairs = [(query, d["text"]) for d in docs[:50]]  # rerank only top-50 for speed
    scores = reranker.predict(pairs)
    for d, s in zip(docs[:50], scores):
        d["rerank_score"] = float(s)
    docs[:50] = sorted(docs[:50], key=lambda x: x.get("rerank_score", -1e9), reverse=True)
    return docs[:top_n]

def retrieve_best(query, k=5):
    cands = retrieve_hybrid(query)
    top = rerank(query, cands, top_n=k)
    return top

# quick test
test = retrieve_best("What are calories and how are they measured?", k=4)
[(t["source"], round(t["score"],3), round(t["rerank_score"],3)) for t in test]


In [None]:
#Final RAG answering (short answers + source citations)

import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

LLM_NAME = "google/flan-t5-base"
tokenizer = AutoTokenizer.from_pretrained(LLM_NAME)
llm = AutoModelForSeq2SeqLM.from_pretrained(LLM_NAME)
device = "cuda" if torch.cuda.is_available() else "cpu"
llm = llm.to(device)

def build_prompt(question, contexts):
    sources_block = "\n\n".join([f"[{i+1}] ({c['source']} | {c['chunk_id']})\n{c['text']}"
                                 for i, c in enumerate(contexts)])
    return (
        "Answer clearly in 3â€“5 short sentences.\n"
        "Use simple language.\n"
        "Use ONLY the sources.\n"
        "If not enough info, say: I don't have enough information in the indexed data.\n"
        "End with: Sources: [1], [2], ... (only those you used)\n\n"
        f"Question: {question}\n\n"
        f"Sources:\n{sources_block}\n\n"
        "Answer:"
    )

def rag_answer(question, k=5, max_new_tokens=170):
    contexts = retrieve_best(question, k=k)
    prompt = build_prompt(question, contexts)

    inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=2048).to(device)
    with torch.no_grad():
        out = llm.generate(**inputs, max_new_tokens=max_new_tokens, num_beams=4)
    answer = tokenizer.decode(out[0], skip_special_tokens=True).strip()

    return answer, contexts

# demo
q = "Why is junk food considered unhealthy?"
ans, ctx = rag_answer(q, k=4)
print(ans)
print("\nTop sources:", [c["source"] for c in ctx])

In [5]:
# Step 3B: Copy files from another folder into DATA_DIR
# Change SOURCE_FOLDER to where your files currently are.

import shutil
from pathlib import Path

SOURCE_FOLDER = Path(r"C:\path\to\your\files")  # <-- change this

allowed = {".txt",".md",".pdf",".docx",".html",".htm"}

copied = 0
for p in SOURCE_FOLDER.glob("**/*"):
    if p.is_file() and p.suffix.lower() in allowed:
        shutil.copy2(p, DATA_DIR / p.name)
        copied += 1

print("âœ… Copied files:", copied)
print("Now DATA_DIR contains:", len(list(DATA_DIR.glob('*'))), "items")


âœ… Copied files: 0
Now DATA_DIR contains: 0 items
