In [2]:
# Cell 2 — Imports and global config
import os
import json
import time
import pickle
from typing import List, Dict, Any
from pathlib import Path

# For scraping & parsing
import trafilatura
from bs4 import BeautifulSoup
import requests

# For embeddings & vectors
from sentence_transformers import SentenceTransformer
import numpy as np
import faiss

# For model fallback generation if OpenAI isn't available
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline

# For API server (when we write app.py)
from fastapi import FastAPI, HTTPException, Request, Depends, Header
from fastapi.responses import JSONResponse

# For reading .env (optional)
from dotenv import load_dotenv
load_dotenv()

# Constants / file paths
DATA_DIR = Path("rag_data")
DATA_DIR.mkdir(exist_ok=True)
FAISS_INDEX_PATH = DATA_DIR / "faiss_index.index"
METADATA_PATH = DATA_DIR / "metadata.pkl"
INDEXED_URLS_PATH = DATA_DIR / "indexed_urls.json"

# Environment-configurable secrets
API_TOKEN = os.getenv("API_TOKEN", "CHANGE_ME_API_TOKEN")  # set this in your env for real usage
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")  # optional, for best LLM answers

# Embedding model name (sentence-transformers)
EMBEDDING_MODEL_NAME = "all-MiniLM-L6-v2"  # small and fast (dim=384)

# RAG hyperparams
CHUNK_SIZE_WORDS = 400  # approximate words per chunk
CHUNK_OVERLAP = 50      # overlap in words
TOP_K = 5               # number of docs to retrieve

print("Configuration loaded. API_TOKEN set:", API_TOKEN != "CHANGE_ME_API_TOKEN")
print("OPENAI available:", bool(OPENAI_API_KEY))


Configuration loaded. API_TOKEN set: False
OPENAI available: False


In [2]:
import sys
print(sys.executable)


c:\Users\saubh\AppData\Local\Programs\Python\Python310\python.exe


In [3]:
# Cell 3 — fetch_text_from_url(url) using trafilatura with BeautifulSoup fallback
def fetch_text_from_url(url: str) -> Dict[str, str]:
    """
    Fetch clean text from a URL.
    Returns dict: {"url": url, "title": title, "text": text}
    """
    try:
        downloaded = trafilatura.fetch_url(url)
        if downloaded:
            text = trafilatura.extract(downloaded, include_comments=False, include_tables=False)
            # try to get title
            soup = BeautifulSoup(downloaded, "html.parser")
            title = soup.title.string.strip() if soup.title and soup.title.string else url
            if text and len(text) > 50:
                return {"url": url, "title": title, "text": text}
    except Exception as e:
        # fallback to requests + BeautifulSoup
        print(f"trafilatura failed for {url}: {e}")

    # Fallback
    try:
        resp = requests.get(url, timeout=15, headers={"User-Agent": "Mozilla/5.0"})
        resp.raise_for_status()
        soup = BeautifulSoup(resp.text, "html.parser")

        # Remove scripts/styles
        for s in soup(["script", "style", "header", "footer", "nav", "aside"]):
            s.extract()
        # heuristics: join text blocks
        paragraphs = [p.get_text(separator=" ", strip=True) for p in soup.find_all(["p", "article", "div"])]
        text = "\n".join([p for p in paragraphs if len(p) > 40])
        title = soup.title.string.strip() if soup.title and soup.title.string else url
        if text.strip() == "":
            raise ValueError("No text extracted from page")
        return {"url": url, "title": title, "text": text}
    except Exception as e:
        raise ValueError(f"Could not fetch or parse {url}: {e}")

# Quick test (optional)
# print(fetch_text_from_url("https://huyenchip.com/2024/07/25/genai-platform.html")["title"])


In [4]:
# Cell 4 — chunk_text helper
import re

def chunk_text(text: str, chunk_size_words: int = CHUNK_SIZE_WORDS, overlap: int = CHUNK_OVERLAP):
    """
    Splits text into chunks approximated by words with overlap.
    Returns list of dicts: {"chunk_id": str, "text": str, "start": int, "end": int}
    """
    words = re.split(r"\s+", text.strip())
    chunks = []
    i = 0
    chunk_idx = 0
    while i < len(words):
        start = max(0, i - overlap) if chunk_idx > 0 else i
        end = min(len(words), i + chunk_size_words)
        chunk_words = words[start:end]
        chunk_text = " ".join(chunk_words).strip()
        if chunk_text:
            chunks.append({"chunk_id": f"chunk_{chunk_idx}", "text": chunk_text, "start": start, "end": end})
        i += chunk_size_words - overlap
        chunk_idx += 1
    return chunks

# Quick test
# sample = " ".join([f"word{i}" for i in range(1000)])
# print(len(chunk_text(sample)))


In [5]:
# Cell 5 — VectorStoreFAISS class (embedding + faiss)
class VectorStoreFAISS:
    def __init__(self, embedding_model_name=EMBEDDING_MODEL_NAME, index_path=FAISS_INDEX_PATH, metadata_path=METADATA_PATH):
        self.embedding_model_name = embedding_model_name
        self.embedding_model = SentenceTransformer(self.embedding_model_name)
        self.dim = self.embedding_model.get_sentence_embedding_dimension()
        self.index_path = Path(index_path)
        self.metadata_path = Path(metadata_path)
        self._init_index_and_metadata()

    def _init_index_and_metadata(self):
        # If index exists, load; else create new index
        if self.index_path.exists() and self.metadata_path.exists():
            print("Loading existing FAISS index and metadata...")
            self.index = faiss.read_index(str(self.index_path))
            with open(self.metadata_path, "rb") as f:
                self.metadata = pickle.load(f)
        else:
            print("Creating new FAISS index and metadata...")
            # We'll use Inner Product on normalized vectors for cosine similarity
            self.index = faiss.IndexFlatIP(self.dim)
            self.metadata = {"documents": [], "next_id": 0}
            self.save()

    def save(self):
        faiss.write_index(self.index, str(self.index_path))
        with open(self.metadata_path, "wb") as f:
            pickle.dump(self.metadata, f)

    def _embed_texts(self, texts: List[str]) -> np.ndarray:
        embs = self.embedding_model.encode(texts, convert_to_numpy=True, show_progress_bar=False)
        # Normalize for cosine similarity with inner product
        faiss.normalize_L2(embs)
        return embs

    def add_documents(self, docs: List[Dict[str, Any]]):
        """
        docs: list of dicts with keys: 'text', 'source', 'source_title', 'chunk_id'
        """
        texts = [d["text"] for d in docs]
        embs = self._embed_texts(texts)
        n_before = self.index.ntotal
        self.index.add(embs)
        # record metadata per vector (order matters)
        for i, d in enumerate(docs):
            entry = {
                "id": self.metadata["next_id"],
                "text": d["text"],
                "source": d.get("source"),
                "source_title": d.get("source_title"),
                "chunk_id": d.get("chunk_id"),
            }
            self.metadata["documents"].append(entry)
            self.metadata["next_id"] += 1
        self.save()
        return {"added": len(docs), "total": self.index.ntotal, "was": n_before}

    def search(self, query: str, top_k: int = TOP_K):
        q_emb = self._embed_texts([query])
        D, I = self.index.search(q_emb, top_k)
        results = []
        for score, idx in zip(D[0], I[0]):
            if idx < 0 or idx >= len(self.metadata["documents"]):
                continue
            md = self.metadata["documents"][idx]
            results.append({"score": float(score), "id": md["id"], "text": md["text"], "source": md["source"], "source_title": md.get("source_title")})
        return results

# Create vector store (instantiates model, may take a moment)
vector_store = VectorStoreFAISS()
print("Vector store ready. Current vectors:", vector_store.index.ntotal)


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Creating new FAISS index and metadata...
Vector store ready. Current vectors: 0


In [6]:
# Cell 6 — Indexing functions
def load_indexed_urls():
    if INDEXED_URLS_PATH.exists():
        with open(INDEXED_URLS_PATH, "r") as f:
            return json.load(f)
    return []

def save_indexed_urls(urls):
    with open(INDEXED_URLS_PATH, "w") as f:
        json.dump(urls, f, indent=2)

def index_url(url: str, force: bool = False) -> Dict[str, Any]:
    """
    Fetch page, chunk, and add to vector store. Returns dict with status.
    """
    indexed = load_indexed_urls()
    if (url in indexed) and (not force):
        return {"status": "skipped", "reason": "already indexed", "url": url}
    try:
        page = fetch_text_from_url(url)
        chunks = chunk_text(page["text"])
        docs = []
        for c in chunks:
            docs.append({
                "text": c["text"],
                "source": url,
                "source_title": page.get("title", url),
                "chunk_id": c["chunk_id"],
            })
        res = vector_store.add_documents(docs)
        # update indexed urls
        indexed.append(url)
        save_indexed_urls(indexed)
        return {"status": "success", "url": url, "added_chunks": res["added"]}
    except Exception as e:
        return {"status": "failed", "url": url, "error": str(e)}

def index_urls(urls: List[str], force=False):
    results = {"indexed_url": [], "failed_url": []}
    for u in urls:
        r = index_url(u, force=force)
        if r["status"] == "success":
            results["indexed_url"].append(u)
        else:
            results["failed_url"].append({"url": u, "reason": r.get("reason", r.get("error"))})
    return results

# Example usage:
# print(index_url("https://huyenchip.com/2024/07/25/genai-platform.html"))


In [7]:
# Cell 7 — RAG answer generation with citation
import os
import textwrap

# Optional: OpenAI client if API key available
try:
    import openai
    if OPENAI_API_KEY:
        openai.api_key = OPENAI_API_KEY
except Exception:
    openai = None

# Prepare HF fallback model (lazy init)
HF_MODEL = None
HF_TOKENIZER = None
def init_hf_model():
    global HF_MODEL, HF_TOKENIZER
    if HF_MODEL is None:
        model_name = "google/flan-t5-base"  # balanced model
        print("Loading HF model (this may take time)... Model:", model_name)
        HF_TOKENIZER = AutoTokenizer.from_pretrained(model_name)
        HF_MODEL = AutoModelForSeq2SeqLM.from_pretrained(model_name)
    return HF_MODEL, HF_TOKENIZER

def build_prompt_with_context(question: str, retrieved: List[Dict[str, Any]]):
    """
    Create a prompt instructing model to use only the provided context chunks.
    Include citation markers [1], [2] mapping to source URLs.
    """
    # unique sources and mapping
    sources = []
    for r in retrieved:
        if r["source"] not in sources:
            sources.append(r["source"])
    # Build context
    context_parts = []
    for i, r in enumerate(retrieved, start=1):
        src_index = sources.index(r["source"]) + 1
        header = f"[{src_index}] {r.get('source_title', r['source'])} ({r['source']})"
        part = f"{header}\n{r['text']}"
        context_parts.append(part)
    context = "\n\n---\n\n".join(context_parts)
    # System + user prompt
    prompt = textwrap.dedent(f"""
    You are an assistant that must answer the question using ONLY the provided CONTEXT sections. 
    Use the context to answer and do NOT hallucinate extra facts.
    For each factual statement that comes from a context, include an inline citation like [1], [2] referencing the source list below.
    If the context doesn't contain the answer, say "I don't know based on the provided sources." and suggest where to search.

    CONTEXT:
    {context}

    QUESTION:
    {question}

    ANSWER (provide concise answer; include citations inline and at the end list unique source URLs):
    """).strip()
    return prompt, sources

def call_llm(prompt: str, max_tokens: int = 512) -> str:
    # Prefer OpenAI chat if available
    if openai and OPENAI_API_KEY:
        try:
            resp = openai.ChatCompletion.create(
                model="gpt-3.5-turbo",
                messages=[{"role": "system", "content": "You are a helpful assistant."},
                          {"role": "user", "content": prompt}],
                max_tokens=max_tokens,
                temperature=0.0,
            )
            return resp.choices[0].message.content.strip()
        except Exception as e:
            print("OpenAI call failed:", e)

    # Fallback to HF model
    try:
        model, tokenizer = init_hf_model()
        input_ids = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1024).input_ids
        outputs = model.generate(input_ids, max_new_tokens=256, do_sample=False)
        out = tokenizer.decode(outputs[0], skip_special_tokens=True)
        return out.strip()
    except Exception as e:
        print("HF generation failed:", e)
        # final fallback: return concatenated contexts + a note
        return " / ".join([c['text'][:300] for c in retrieved]) + "\n\n[Note: fallback generation used; set OPENAI_API_KEY for better answers]"

def generate_answer(messages: List[Dict[str, str]], top_k: int = TOP_K):
    """
    messages: list of {"role": "user"/"assistant", "content": "..."}
    Returns: {"answer": "...", "citations": ["url1","url2"]}
    """
    # get last user message content
    last_user = None
    for m in reversed(messages):
        if m.get("role") == "user":
            last_user = m.get("content")
            break
    if not last_user:
        raise ValueError("No user message found in messages")

    # Retrieve similar chunks
    retrieved = vector_store.search(last_user, top_k=top_k)
    if not retrieved:
        return {"answer": "No indexed content found. Please index some URLs first.", "citations": []}

    # Build prompt and call LLM
    prompt, sources = build_prompt_with_context(last_user, retrieved)
    answer_text = call_llm(prompt)
    # Prepare citations (unique)
    citations = sources
    return {"answer": answer_text, "citations": citations}

# Quick test (after indexing pages): 
# messages = [{"role":"user","content":"What is a GenAI platform?"}]
# print(generate_answer(messages))


In [8]:
# Cell 8 — Write full FastAPI app to app.py
APP_CODE = r'''
from fastapi import FastAPI, HTTPException, Request, Header
from pydantic import BaseModel
from typing import List, Optional
import os, json

# import local helpers (we assume running from same folder)
from pathlib import Path
import pickle
import time

# We'll import functions/classes defined in the notebook environment by reloading
# For the simple local setup, we also provide a thin wrapper that reuses the
# vector_store and functions if the notebook started the objects; else we lazy import.

app = FastAPI(title="RAG Website Retrieval API")

# Auth dependency
API_TOKEN = os.getenv("API_TOKEN", "CHANGE_ME_API_TOKEN")
def check_api_key(x_api_key: Optional[str] = Header(None)):
    if x_api_key != API_TOKEN:
        raise HTTPException(status_code=401, detail="Invalid or missing X-API-KEY header")

class IndexRequest(BaseModel):
    url: List[str]

class ChatMessage(BaseModel):
    content: str
    role: str

class ChatRequest(BaseModel):
    messages: List[ChatMessage]

@app.post("/api/v1/index")
async def index_endpoint(body: IndexRequest, x_api_key: str = Header(None)):
    check_api_key(x_api_key)
    # We'll call functions from the notebook environment if available (for local dev)
    # Try to import helper functions from rag notebook module or rely on index_urls being in globals
    try:
        from __main__ import index_urls
    except Exception:
        raise HTTPException(status_code=500, detail="Indexer functions not found. Start server from notebook or ensure helper functions are accessible.")
    # index the urls
    results = index_urls(body.url)
    return {"status": "success", "indexed_url": results.get("indexed_url", []), "failed_url": results.get("failed_url", None)}

@app.post("/api/v1/chat")
async def chat_endpoint(body: ChatRequest, x_api_key: str = Header(None)):
    check_api_key(x_api_key)
    try:
        from __main__ import generate_answer
    except Exception:
        raise HTTPException(status_code=500, detail="RAG functions not found. Start server from notebook or ensure helper functions are accessible.")
    # Convert pydantic message objects to dicts
    messages = [{"role": m.role, "content": m.content} for m in body.messages]
    try:
        resp = generate_answer(messages)
        return {"response": [{"answer": {"content": resp["answer"], "role": "assistant"}, "citation": resp.get("citations", [])}]}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
'''

# Write to app.py in the current directory
with open("app.py", "w", encoding="utf-8") as f:
    f.write(APP_CODE)

print("Wrote app.py to current directory. Run it with:")
print("uvicorn app:app --reload --port 8000")
print("Remember to set environment variable API_TOKEN to match X-API-KEY used by the client.")


Wrote app.py to current directory. Run it with:
uvicorn app:app --reload --port 8000
Remember to set environment variable API_TOKEN to match X-API-KEY used by the client.


In [9]:
# Cell 9 — Write streamlit_app.py (UI) to disk
STREAMLIT_CODE = r'''
import streamlit as st
import requests
import json
from urllib.parse import urljoin

st.set_page_config(page_title="RAG Website QA", layout="centered")

st.title("RAG Website Q&A (with citations)")

# Inputs: API base + API key
api_base = st.sidebar.text_input("API Base URL", value="http://localhost:8000")
api_key = st.sidebar.text_input("X-API-KEY (API token)", type="password")

st.sidebar.markdown("**Index a URL**")
url_to_index = st.sidebar.text_input("URL to index")
if st.sidebar.button("Index URL"):
    if not api_key:
        st.sidebar.error("Set X-API-KEY in sidebar first")
    else:
        try:
            resp = requests.post(urljoin(api_base, "/api/v1/index"), json={"url":[url_to_index]}, headers={"X-API-KEY": api_key})
            st.sidebar.write(resp.json())
        except Exception as e:
            st.sidebar.error(str(e))

st.header("Ask a question")
messages = st.session_state.get("messages", [{"role":"assistant", "content":"Index some URLs first (use the sidebar) and then ask questions."}])
user_input = st.text_input("Your question", key="user_input")

if st.button("Ask"):
    if not api_key:
        st.error("Set X-API-KEY in the sidebar.")
    else:
        # prepare messages history (we send only the last user message suffice for RAG)
        payload = {"messages": [{"role":"user","content":user_input}]}
        try:
            resp = requests.post(urljoin(api_base, "/api/v1/chat"), json=payload, headers={"X-API-KEY": api_key})
            data = resp.json()
            # Response format: {"response":[{"answer":{"content": "...", "role":"assistant"}, "citation":[...]}]}
            ans = data["response"][0]["answer"]["content"]
            citations = data["response"][0].get("citation", [])
            st.markdown("### Answer")
            st.write(ans)
            if citations:
                st.markdown("### Citations")
                for c in citations:
                    st.write(f"- {c}")
        except Exception as e:
            st.error(str(e))
'''

with open("streamlit_app.py", "w", encoding="utf-8") as f:
    f.write(STREAMLIT_CODE)

print("Wrote streamlit_app.py. Run it with:")
print("streamlit run streamlit_app.py --server.port 8501")


Wrote streamlit_app.py. Run it with:
streamlit run streamlit_app.py --server.port 8501
