# Simple RAG System (Patched) — LLM Answer via FLAN-T5 Base

This notebook implements a **Retrieval-Augmented Generation (RAG)** pipeline and uses a pre-trained LLM (**google/flan-t5-base**) to synthesize final answers from the retrieved context chunks.

- Embeddings: Sentence Transformers (preferred), with TF–IDF fallbacks if unavailable.
- Vector store: simple **pandas DataFrame**.
- Retrieval: cosine similarity on normalized embeddings.
- Final answer: **LLM generation** (swapped in for the previous stitcher).


In [1]:
# Imports & environment detection
from __future__ import annotations
import os
from typing import List, Tuple, Optional, Dict

import numpy as np
import pandas as pd

_SENTENCE_TRANSFORMERS_AVAILABLE = False
_TFIDF_SKLEARN_AVAILABLE = False

try:
    from sentence_transformers import SentenceTransformer
    _SENTENCE_TRANSFORMERS_AVAILABLE = True
except Exception:
    _SENTENCE_TRANSFORMERS_AVAILABLE = False

try:
    from sklearn.feature_extraction.text import TfidfVectorizer
    _TFIDF_SKLEARN_AVAILABLE = True
except Exception:
    _TFIDF_SKLEARN_AVAILABLE = False

print("sentence-transformers available:", _SENTENCE_TRANSFORMERS_AVAILABLE)
print("scikit-learn TF-IDF available:", _TFIDF_SKLEARN_AVAILABLE)


sentence-transformers available: True
scikit-learn TF-IDF available: True


In [2]:
# Chunking utility (word-based with overlap)
def chunk_text(text: str, chunk_size: int = 500, chunk_overlap: int = 100) -> List[str]:
    words = text.split()
    if not words:
        return []
    chunks = []
    start = 0
    while start < len(words):
        end = min(start + chunk_size, len(words))
        chunk = " ".join(words[start:end])
        chunks.append(chunk)
        if end == len(words):
            break
        start = max(0, end - chunk_overlap)
    return chunks

def _normalize(v: np.ndarray) -> np.ndarray:
    n = np.linalg.norm(v)
    return v if n == 0 else (v / n)


In [3]:
# Embedding backends
class SentenceTransformerEmbedder:
    def __init__(self, model_name: str = 'sentence-transformers/all-MiniLM-L6-v2'):
        if not _SENTENCE_TRANSFORMERS_AVAILABLE:
            raise RuntimeError('sentence-transformers not available')
        try:
            self.model = SentenceTransformer(model_name)
        except Exception as e:
            raise RuntimeError(("Failed to load SentenceTransformer model '{}'\n\nEnsure it's cached locally. Original error: {}").format(model_name, e))
    def encode(self, texts: List[str]) -> np.ndarray:
        return self.model.encode(texts, convert_to_numpy=True, normalize_embeddings=True)

class SklearnTfidfEmbedder:
    def __init__(self):
        if not _TFIDF_SKLEARN_AVAILABLE:
            raise RuntimeError('scikit-learn not available')
        self.vectorizer = TfidfVectorizer(stop_words='english')
        self.fitted = False
    def fit(self, texts: List[str]):
        self.vectorizer.fit(texts)
        self.fitted = True
    def encode(self, texts: List[str]) -> np.ndarray:
        if not self.fitted:
            raise RuntimeError('Vectorizer not fitted. Call fit(texts) first.')
        mat = self.vectorizer.transform(texts)
        arr = mat.toarray()
        arr = np.vstack([_normalize(row) for row in arr])
        return arr
    def encode_query(self, query: str) -> np.ndarray:
        arr = self.vectorizer.transform([query]).toarray()[0]
        return _normalize(arr)

class MinimalTfidfEmbedder:
    def __init__(self):
        self.vocab: Dict[str, int] = {}
        self.idf: Optional[np.ndarray] = None
        self.fitted = False
    @staticmethod
    def _tokenize(text: str) -> List[str]:
        return [t.lower() for t in text.split()]
    def fit(self, texts: List[str]):
        tokenized = [self._tokenize(t) for t in texts]
        vocab = {}
        for doc in tokenized:
            for tok in set(doc):
                if tok not in vocab:
                    vocab[tok] = len(vocab)
        self.vocab = vocab
        df = np.zeros(len(vocab), dtype=float)
        for doc in tokenized:
            seen = set(doc)
            for tok in seen:
                df[self.vocab[tok]] += 1.0
        N = float(len(tokenized))
        self.idf = np.log((N + 1.0) / (df + 1.0)) + 1.0
        self.fitted = True
    def _tfidf_vec(self, text: str) -> np.ndarray:
        if not self.fitted:
            raise RuntimeError('Embedder not fitted')
        vec = np.zeros(len(self.vocab), dtype=float)
        toks = self._tokenize(text)
        counts = {}
        for t in toks:
            if t in self.vocab:
                counts[t] = counts.get(t, 0) + 1
        if not counts:
            return vec
        total = sum(counts.values())
        for t, c in counts.items():
            idx = self.vocab[t]
            tf = c / total
            vec[idx] = tf * (self.idf[idx] if self.idf is not None else 1.0)
        return _normalize(vec)
    def encode(self, texts: List[str]) -> np.ndarray:
        return np.vstack([self._tfidf_vec(t) for t in texts])
    def encode_query(self, query: str) -> np.ndarray:
        return self._tfidf_vec(query)


In [4]:
# === Retrieval helper functions (top-1 to top-2 chunks) ===
import numpy as np
from typing import List, Dict, Any

def retrieve_top_chunks(rag, query: str, top_k: int = 2) -> List[Dict[str, Any]]:
    """
    Retrieve the most relevant text chunks for a user query using cosine similarity.

    Args:
        rag: An instance of RAGSystem with a built KB (rag.kb_df is not None).
        query: The user's query string.
        top_k: Number of chunks to return (1–2 recommended for concise answers).

    Returns:
        A list of dictionaries, each containing:
            - 'chunk_id': int
            - 'score': float (cosine similarity)
            - 'text': str (chunk content)
    """
    if rag.kb_df is None or len(rag.kb_df) == 0:
        raise RuntimeError('KB not built yet. Call rag.build_kb(file_path) first.')
    if not isinstance(top_k, int) or top_k < 1:
        raise ValueError('top_k must be a positive integer.')

    q_vec = rag._embed_query(query)
    A = np.vstack(rag.kb_df['embedding'].values)
    sims = A @ q_vec
    top_idx = np.argsort(-sims)[:top_k]

    results = []
    for i in top_idx:
        row = rag.kb_df.iloc[i]
        results.append({
            'chunk_id': int(row['chunk_id']),
            'score': float(sims[i]),
            'text': row['text']
        })
    return results


def retrieve_context(rag, query: str, top_k: int = 2, sep: str = "\n\n") -> str:
    """Return a single context string made from the top retrieved chunks."""
    results = retrieve_top_chunks(rag, query, top_k=top_k)
    return sep.join(r["text"] for r in results)


In [5]:


# === Build final RAG prompt (query + retrieved context) ===
def build_rag_prompt(query: str, context_chunks: list, max_words: int = 600) -> str:
    """
    Construct an instruction-style prompt for an LLM from query + retrieved context.
    - context_chunks: list[str] (top retrieved chunk texts)
    - max_words: approximate cap to keep prompt within model input length
    """
    def truncate_words(text: str, limit: int) -> str:
        words = text.split()
        return " ".join(words[:limit]) if len(words) > limit else text

    # Join chunk texts with explicit newlines and cap length safely
    context = "\n\n".join(ch for ch in context_chunks if isinstance(ch, str) and ch.strip())
    context = truncate_words(context, max_words)

    prompt = (
        "You are a helpful assistant. Respond in English.\n\n"
        "Answer the user's question using ONLY the provided context. "
        "If the answer is not in the context, say: I don't know.\n\n"
        + "Context:\n" + context + "\n\n"
        + "Question:\n" + query + "\n\n"
        + "Answer:\n"
    )
    return prompt



In [6]:

# === LLM generation via Hugging Face (default: google/flan-t5-base) ===
try:
    from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoModelForCausalLM, AutoConfig
    _TRANSFORMERS_AVAILABLE = True
except Exception:
    _TRANSFORMERS_AVAILABLE = False

def generate_llm_answer(
    prompt: str,
    model_name: str = "google/flan-t5-base",
    max_new_tokens: int = 128,
    temperature: float = 0.0
) -> str:
    """
    Run inference with FLAN-T5 base.
    - Truncates long inputs to the tokenizer's max length (typically 512).
    - Uses beam search and anti-repetition constraints to avoid loops.
    """
    if not _TRANSFORMERS_AVAILABLE:
        return "(Transformers unavailable)\n" + prompt

    try:
        cfg = AutoConfig.from_pretrained(model_name)
        is_enc_dec = getattr(cfg, "is_encoder_decoder", False)
        tok = AutoTokenizer.from_pretrained(model_name)

        # Respect tokenizer max length (commonly 512 for T5)
        max_len = getattr(tok, "model_max_length", 512)
        if not isinstance(max_len, int) or max_len <= 0 or max_len > 100000:
            max_len = 512  # defensive default

        if is_enc_dec:
            mdl = AutoModelForSeq2SeqLM.from_pretrained(model_name)
            inputs = tok(
                prompt,
                return_tensors="pt",
                truncation=True,
                max_length=max_len
            )
            gen = mdl.generate(
                **inputs,
                max_new_token.0
                    3
                    30
                    3s=max_new_tokens,
                # Decoding constraints
                do_sample=(temperature > 0.0),
                temperature=temperature,
                num_beams=4,
                length_penalty=1.2,
                repetition_penalty=1.2,
                no_repeat_ngram_size=3,
                early_stopping=True,
                eos_token_id=tok.eos_token_id,
            )
            return tok.decode(gen[0], skip_special_tokens=True)

        # (If you swap to a causal model later)
        mdl = AutoModelForCausalLM.from_pretrained(model_name)
        inputs = tok(
            prompt,
            return_tensors="pt",
            truncation=True,
            max_length=max_len
        )
        gen = mdl.generate(
            **inputs,
            max_new_tokens=max_new_tokens,
            do_sample=(temperature > 0.0),
            temperature=temperature,
            num_beams=4,
            length_penalty=1.2,
            repetition_penalty=1.2,
            no_repeat_ngram_size=3,
            early_stopping=True,
            pad_token_id=tok.eos_token_id,
            eos_token_id=tok.eos_token_id,
        )
        return tok.decode(gen[0], skip_special_tokens=True)

    except Exception as e:
        return f"(Model load/inference failed: {e})\n" + prompt


In [7]:
# RAG system using pandas DataFrame as vector store (LLM-based answer)
class RAGSystem:
    def __init__(self, embedder: str = 'auto'):
        self.embedder_type = embedder
        self.embedder = None
        self.kb_df: Optional[pd.DataFrame] = None
        self._init_embedder(embedder)

    def _init_embedder(self, embedder: str):
        if embedder == 'sentence-transformer':
            self.embedder = SentenceTransformerEmbedder()
        elif embedder == 'sklearn-tfidf':
            self.embedder = SklearnTfidfEmbedder()
        elif embedder == 'minimal-tfidf':
            self.embedder = MinimalTfidfEmbedder()
        elif embedder == 'auto':
            try:
                if _SENTENCE_TRANSFORMERS_AVAILABLE:
                    self.embedder = SentenceTransformerEmbedder()
                    return
            except Exception:
                pass
            try:
                if _TFIDF_SKLEARN_AVAILABLE:
                    self.embedder = SklearnTfidfEmbedder()
                    return
            except Exception:
                pass
            self.embedder = MinimalTfidfEmbedder()
        else:
            raise ValueError('Unknown embedder type')

    def build_kb(self, file_path: str, chunk_size: int = 500, chunk_overlap: int = 100) -> pd.DataFrame:
        if not os.path.exists(file_path):
            raise FileNotFoundError(f'Knowledge base file not found: {file_path}')
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        chunks = chunk_text(content, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
        if not chunks:
            raise ValueError('No chunks generated; the document might be empty.')
        if isinstance(self.embedder, SklearnTfidfEmbedder):
            self.embedder.fit(chunks)
        if isinstance(self.embedder, MinimalTfidfEmbedder):
            self.embedder.fit(chunks)
        embs = self.embedder.encode(chunks)
        df = pd.DataFrame({
            'chunk_id': list(range(len(chunks))),
            'text': chunks,
            'embedding': list(embs)
        })
        self.kb_df = df
        return df

    def _embed_query(self, query: str) -> np.ndarray:
        if isinstance(self.embedder, SentenceTransformerEmbedder):
            return self.embedder.encode([query])[0]
        elif isinstance(self.embedder, SklearnTfidfEmbedder):
            return self.embedder.encode_query(query)
        elif isinstance(self.embedder, MinimalTfidfEmbedder):
            return self.embedder.encode_query(query)
        else:
            raise RuntimeError('Unknown embedder type')

    @staticmethod
    def _cosine_similarity_matrix(A: np.ndarray, b: np.ndarray) -> np.ndarray:
        return A @ b

    def retrieve(self, query: str, top_k: int = 3) -> pd.DataFrame:
        if self.kb_df is None or len(self.kb_df) == 0:
            raise RuntimeError('KB not built yet. Call build_kb(file_path) first.')
        q = self._embed_query(query)
        A = np.vstack(self.kb_df['embedding'].values)
        sims = self._cosine_similarity_matrix(A, q)
        top_idx = np.argsort(-sims)[:top_k]
        result = self.kb_df.iloc[top_idx].copy()
        result['score'] = sims[top_idx]
        return result

    def answer(self, query: str, top_k: int = 3, context_max_words: int = 600, model_name: str = 'google/flan-t5-base', temperature: float = 0.0) -> Tuple[str, pd.DataFrame]:
        """Generate an LLM-based answer using retrieved context chunks.
        Returns (llm_answer, retrieved_df).
        """
        retrieved = self.retrieve(query, top_k=top_k)
        context_chunks = retrieved['text'].tolist()
        prompt = build_rag_prompt(query, context_chunks, max_words=context_max_words)
        llm_answer = generate_llm_answer(prompt, model_name=model_name, max_new_tokens=256, temperature=temperature)
        return llm_answer, retrieved[['chunk_id', 'score', 'text']]


## Configuration
Set your file path and chunking parameters here.


In [8]:
# Point to your knowledge base file
FILE_PATH = 'pikachu_detailed_article.txt'  # change to your file
CHUNK_SIZE = 40
CHUNK_OVERLAP = 10
EMBEDDER = 'auto'

# Helper: create a small sample doc if none exists (for demo only)
if not os.path.exists(FILE_PATH):
    sample = (
        'Pikachu is an Electric-type Pokémon known for its ability to store electricity in its cheeks. '
        'Its signature move is Thunderbolt, and it often uses Quick Attack for agility. '
        'Pikachu evolves from Pichu via friendship and can evolve into Raichu when exposed to a Thunder Stone. '
        'It prefers forests and grassy plains, where it forages and interacts with its group. '
        'When threatened, Pikachu releases bursts of electricity to deter predators. '
        'Trainers often focus on speed and special attack builds to maximize battle effectiveness.'
    )
    with open(FILE_PATH, 'w', encoding='utf-8') as f:
        f.write(sample)
    print('Created sample file:', FILE_PATH)
else:
    print('Using existing file:', FILE_PATH)


Using existing file: pikachu_detailed_article.txt


## Build the Knowledge Base
This step reads the file, chunks the content, computes embeddings, and stores them in a DataFrame.


In [9]:
rag = RAGSystem(embedder=EMBEDDER)
kb_df = rag.build_kb(FILE_PATH, chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
print('KB chunks:', len(kb_df))
kb_df.head()


KB chunks: 15


Unnamed: 0,chunk_id,text,embedding
0,0,Pikachu is one of the most recognizable Pokémo...,"[-0.03211138, 0.02871644, -0.042600363, 0.0217..."
1,1,Pikachu was intended to be a cute and approach...,"[-0.038644757, 0.022668956, -0.023038378, 0.06..."
2,2,Pikachu evolves from Pichu when it reaches a h...,"[-0.03887519, 0.009050059, -0.029975615, 0.069..."
3,3,the Pokémon evolutionary chain. The design of ...,"[-0.06461291, 0.03256252, 0.02205632, 0.040416..."
4,4,"As the franchise grew, Pikachu's design became...","[0.0045826556, 0.039224394, 0.012920311, -0.01..."


## Ask (LLM-powered)
Use `rag.answer(query, top_k=2)` to retrieve context and synthesize an answer via FLAN-T5 base.


In [12]:
def ask_llm(query: str, top_k: int = 2):
    answer, docs = rag.answer(query, top_k=top_k, context_max_words=100, model_name='google/flan-t5-base')
    print('=== Retrieved Context (top chunks) ===')
    for _, row in docs.iterrows():
        print(f"[chunk {row['chunk_id']}] score={row['score']:.4f}\n{row['text'][:400]}\n")
    print('=== LLM Answer ===')
    print(answer)

# Example question
ask_llm('How does Pikachu evolve?')


=== LLM Answer ===
when it reaches a high level of friendship


In [13]:

ask_llm("How does Pikachu evolve?", top_k=2)
ask_llm("What is Pikachu's base Speed stat in the original games?", top_k=2)
ask_llm("In two sentences, summarize Pikachu's role in the games and anime.", top_k=2)


=== LLM Answer ===
when it reaches a high level of friendship
=== LLM Answer ===
I don't know
=== LLM Answer ===
Pikachu became a cultural icon.
