### üì¶ Install Required Dependencies

This cell installs all low-level libraries needed to build a **pure RAG system without LangChain**.

Libraries used:
- `faiss-cpu` ‚Üí vector similarity search
- `pypdf` ‚Üí extract raw text from PDFs
- `sentence-transformers` ‚Üí generate embeddings
- `google-generativeai` ‚Üí interact with Gemini models

These are core building blocks. No framework abstractions.


In [62]:
!pip install faiss-cpu pypdf sentence-transformers google-generativeai




### üîê Configure Google Gemini API

This cell:
1. Fetches the Gemini API key securely from **Colab Secrets**
2. Sets it as an environment variable
3. Configures the Google Generative AI client

This avoids hard-coding sensitive keys and keeps the notebook safe for GitHub.


In [63]:
from google.colab import userdata
import os
import google.generativeai as genai

os.environ["GOOGLE_API_KEY"] = userdata.get("RAGAGENTKEY")
genai.configure(api_key=os.environ["GOOGLE_API_KEY"])


### üß† PocketFlow Core Engine

This cell defines a **minimal workflow engine** inspired by PocketFlow.

Key concepts:
- **Node** ‚Üí one unit of work
- **Flow** ‚Üí orchestrates node execution
- **Actions** ‚Üí determine next node (conditional routing)
- **Shared state** ‚Üí used to pass data between nodes

This replaces LangChain‚Äôs hidden pipelines with **explicit, debuggable execution logic**.


In [64]:
import asyncio, warnings, copy, time

class BaseNode:
    def __init__(self): self.params,self.successors={},{}
    def set_params(self,params): self.params=params
    def next(self,node,action="default"):
        if action in self.successors: warnings.warn(f"Overwriting successor for action '{action}'")
        self.successors[action]=node; return node
    def prep(self,shared): pass
    def exec(self,prep_res): pass
    def post(self,shared,prep_res,exec_res): pass
    def _exec(self,prep_res): return self.exec(prep_res)
    def _run(self,shared): p=self.prep(shared); e=self._exec(p); self.post(shared,p,e); return e # Modified line: return e
    def run(self,shared):
        if self.successors: warnings.warn("Node won't run successors. Use Flow.")
        return self._run(shared)
    def __rshift__(self,other): return self.next(other)
    def __sub__(self,action):
        if isinstance(action,str): return _ConditionalTransition(self,action)
        raise TypeError("Action must be a string")

class _ConditionalTransition:
    def __init__(self,src,action): self.src,self.action=src,action
    def __rshift__(self,tgt): return self.src.next(tgt,self.action)

class Node(BaseNode):
    def __init__(self,max_retries=1,wait=0): super().__init__(); self.max_retries,self.wait=max_retries,wait
    def exec_fallback(self,prep_res,exc): raise exc
    def _exec(self,prep_res):
        for self.cur_retry in range(self.max_retries):
            try: return self.exec(prep_res)
            except Exception as e:
                if self.cur_retry==self.max_retries-1: return self.exec_fallback(prep_res,e)
                if self.wait>0: time.sleep(self.wait)

class BatchNode(Node):
    def _exec(self,items): return [super(BatchNode,self)._exec(i) for i in (items or [])]

class Flow(BaseNode):
    def __init__(self,start=None): super().__init__(); self.start_node=start
    def start(self,start): self.start_node=start; return start
    def get_next_node(self,curr,action):
        nxt=curr.successors.get(action or "default")
        if not nxt and curr.successors: warnings.warn(f"Flow ends: '{action}' not found in {list(curr.successors)}")
        return nxt
    def _orch(self,shared,params=None):
        curr,p,last_action =copy.copy(self.start_node),(params or {**self.params}),None
        while curr: curr.set_params(p); last_action=curr._run(shared); curr=copy.copy(self.get_next_node(curr,last_action))
        return last_action
    def _run(self,shared): p=self.prep(shared); o=self._orch(shared); return self.post(shared,p,o)
    def post(self,shared,prep_res,exec_res): return exec_res

class BatchFlow(Flow):
    def _run(self,shared):
        pr=self.prep(shared) or []
        for bp in pr: self._orch(shared,{**self.params,**bp})
        return self.post(shared,pr,None)

class AsyncNode(Node):
    async def prep_async(self,shared): pass
    async def exec_async(self,prep_res): pass
    async def exec_fallback_async(self,prep_res,exc): raise exc
    async def post_async(self,shared,prep_res,exec_res): pass
    async def _exec(self,prep_res):
        for self.cur_retry in range(self.max_retries):
            try: return await self.exec_async(prep_res)
            except Exception as e:
                if self.cur_retry==self.cur_retry-1: return await self.exec_fallback_async(prep_res,e)
                if self.wait>0: await asyncio.sleep(self.wait)
    async def run_async(self,shared):
        if self.successors: warnings.warn("Node won't run successors. Use AsyncFlow.")
        return await self._run_async(shared)
    async def _run_async(self,shared): p=await self.prep_async(shared); e=await self._exec(p); return await self.post_async(shared,p,e)
    def _run(self,shared): raise RuntimeError("Use run_async.")

class AsyncBatchNode(AsyncNode,BatchNode):
    async def _exec(self,items): return [await super(AsyncBatchNode,self)._exec(i) for i in items]

class AsyncParallelBatchNode(AsyncNode,BatchNode):
    async def _exec(self,items): return await asyncio.gather(*(super(AsyncParallelBatchNode,self)._exec(i) for i in items))

class AsyncFlow(Flow,AsyncNode):
    async def _orch_async(self,shared,params=None):
        curr,p,last_action =copy.copy(self.start_node),(params or {**self.params}),None
        while curr: curr.set_params(p); last_action=await curr._run_async(shared) if isinstance(curr,AsyncNode) else curr._run(shared); curr=copy.copy(self.get_next_node(curr,last_action))
        return last_action
    async def _run_async(self,shared): p=await self.prep_async(shared); o=await self._orch_async(shared); return await self.post_async(shared,p,o)
    async def post_async(self,shared,prep_res,exec_res): return exec_res

class AsyncBatchFlow(AsyncFlow,BatchFlow):
    async def _run_async(self,shared):
        pr=await self.prep_async(shared) or []
        for bp in pr: await self._orch_async(shared,{**self.params,**bp})
        return await self.post_async(shared,pr,None)

class AsyncParallelBatchFlow(AsyncFlow,BatchFlow):
    async def _run_async(self,shared):
        pr=await self.prep_async(shared) or []
        await asyncio.gather(*(self._orch_async(shared,{**self.params,**bp}) for bp in pr))
        return await self.post_async(shared,pr,None)

### üìÑ Load PDF File

This cell:
- Uses `pypdf` to read a PDF file
- Extracts raw text from every page
- Combines all page text into a single string

This is the first step of the RAG pipeline: **getting raw knowledge**.


In [65]:
from pypdf import PdfReader

def load_pdf(path):
    reader = PdfReader(path)
    text = ""
    for page in reader.pages:
        text += page.extract_text() + "\n"
    return text

raw_text = load_pdf("/content/drive/MyDrive/PDF/RAG.pdf")


### ‚úÇÔ∏è Split Text into Chunks

Large text cannot be embedded directly.

This function:
- Breaks text into chunks of 200 characters
- Uses 50-character overlap to preserve context
- Produces a list of small, searchable text chunks

This replaces LangChain text splitters with a transparent approach.


In [66]:
def split_text(text, chunk_size=200, overlap=50):
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunks.append(text[start:end])
        start = end - overlap
    return chunks

chunks = split_text(raw_text)


### üß¨ Generate Embeddings

This cell:
- Loads a lightweight embedding model (`all-MiniLM-L6-v2`)
- Converts each text chunk into a numerical vector

Embeddings allow semantic similarity search instead of keyword matching.


In [67]:
from sentence_transformers import SentenceTransformer

embed_model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = embed_model.encode(chunks)


### üóÇÔ∏è Store Embeddings in FAISS

This cell:
- Creates a FAISS index based on embedding dimensions
- Stores all embeddings for fast similarity search

FAISS is used directly ‚Äî no LangChain wrappers.


In [68]:
import faiss
import numpy as np

dimension = embeddings.shape[1]
index = faiss.IndexFlatL2(dimension)
index.add(np.array(embeddings))


### üîç Semantic Retriever

This function:
1. Converts the user query into an embedding
2. Searches FAISS for the most similar chunks
3. Returns the top-K relevant text chunks

This is the **retrieval** part of RAG.


In [69]:
def retrieve(query, top_k=5):
    q_embedding = embed_model.encode([query])
    distances, indices = index.search(np.array(q_embedding), top_k)
    return [chunks[i] for i in indices[0]]


### ü§ñ Initialize Gemini Model

This cell:
- Loads the Gemini Flash model
- Prepares it for text generation

Gemini is used only for **generation**, not retrieval.
### üì§ Gemini Call Wrapper

This helper function:
- Sends a prompt to Gemini
- Returns only the generated text

All Gemini interaction is centralized here.


In [70]:
model = genai.GenerativeModel("gemini-2.5-flash")

def call_gemini(prompt):
    response = model.generate_content(prompt)
    return response.text


### ‚å®Ô∏è Input Node

This node:
- Accepts user questions from the terminal
- Detects `exit` to stop the flow
- Stores the current question in shared state
- Emits an action to control flow routing

This is the entry point for each RAG query.


In [71]:
class InputNode(Node):
    def exec(self, _):
        q = input("Ask a question (or exit): ").strip()
        if q.lower() == "exit":
            return "exit_flow"
        self.question_input = q # Temporarily store to pass via post
        return "question_available" # Action for flow to proceed

    def post(self, shared, prep_res, exec_res):
        if exec_res == "question_available":
            shared['current_question'] = self.question_input
        elif 'current_question' in shared: # If exiting, clear question
            del shared['current_question']
        return exec_res # This return becomes `last_action` in Flow

### üîé Retrieve Node

This node:
- Reads the current question
- Retrieves relevant chunks from FAISS
- Stores `{question, context}` in shared state

This connects user input to document knowledge.


In [72]:
class RetrieveNode(Node):
    def prep(self, shared):
        return shared.get('current_question') # Get question from shared state

    def exec(self, question):
        if question is None:
            return "exit_flow"
        context = retrieve(question)
        self.retrieved_data = {"question": question, "context": context}
        return "context_available"

    def post(self, shared, prep_res, exec_res):
        if exec_res == "context_available":
            shared['retrieved_data'] = self.retrieved_data
        elif 'retrieved_data' in shared:
            del shared['retrieved_data']
        return exec_res # Pass action to Flow

### üßæ Prompt Builder Node

This node:
- Combines retrieved context + user question
- Enforces ‚Äúanswer only from context‚Äù
- Prevents hallucinations

This is where **retrieval becomes generation**.


In [73]:
class PromptNode(Node):
    def prep(self, shared):
        return shared.get('retrieved_data')

    def exec(self, data):
        if data is None or data.get('question') is None:
            return "exit_flow"

        context_text = "\n".join(data["context"])
        prompt = f"""
You are a smart assistant.
Answer ONLY using the context.
Do not hallucinate.

Context:
{context_text}

Question:
{data['question']}
"""
        self.prepared_prompt = prompt
        return "prompt_ready"

    def post(self, shared, prep_res, exec_res):
        if exec_res == "prompt_ready":
            shared['generated_prompt'] = self.prepared_prompt
        elif 'generated_prompt' in shared:
            del shared['generated_prompt']
        return exec_res

### ü§ñ Gemini Generation Node

This node:
- Sends the prepared prompt to Gemini
- Handles safety / empty responses
- Stores the generated answer

Errors are caught so the flow never crashes.


In [74]:
class GeminiNode(Node):
    def prep(self, shared):
        return shared.get('generated_prompt')

    def exec(self, prompt):
        if prompt is None:
            return "exit_flow"
        try:
            response = model.generate_content(prompt)
            generated_text = ""
            if response.candidates:
                if hasattr(response.candidates[0].content, 'parts') and response.candidates[0].content.parts:
                    generated_text = response.text
                else:
                    print("Warning: Gemini API generated no text content (possibly blocked by safety filters).")
                    generated_text = "No answer generated due to content policy or other issues."
            else:
                print("Warning: Gemini API returned no candidates.")
                generated_text = "No answer generated."

            self.gemini_output = generated_text
            return "answer_ready"
        except Exception as e:
            print(f"Error during Gemini API call: {e}")
            self.gemini_output = f"Error: Could not generate answer: {e}"
            return "answer_ready"

    def post(self, shared, prep_res, exec_res):
        if exec_res == "answer_ready":
            shared['generated_answer'] = self.gemini_output
        elif 'generated_answer' in shared:
            del shared['generated_answer']
        return exec_res

### üñ•Ô∏è Display Node

This node:
- Prints the final answer to the user
- Clears previous state to avoid data leakage
- Loops control back to the input node

This ensures each question is handled cleanly.


In [75]:
class DisplayNode(Node):
    def prep(self, shared):
        return shared.get('generated_answer')

    def exec(self, answer):
        if answer is None: # Propagate exit if no answer was generated upstream
            return "exit_flow"
        print("\nAnswer:\n", answer, "\n")
        return "continue_conversation" # Loop back to input

    def post(self, shared, prep_res, exec_res):
        # Clear previous answers/prompts/contexts to avoid showing old data on next loop
        if 'generated_answer' in shared:
            del shared['generated_answer']
        if 'generated_prompt' in shared:
            del shared['generated_prompt']
        if 'retrieved_data' in shared:
            del shared['retrieved_data']
        if 'current_question' in shared:
            del shared['current_question']
        return exec_res

### üîó Flow Wiring (RAG Graph)

This cell defines the execution graph:

Input ‚Üí Retrieve ‚Üí Prompt ‚Üí Gemini ‚Üí Display ‚Üí Input

Conditional transitions allow:
- Looping
- Clean exits
- Controlled execution paths

This is **explicit agent orchestration**, not magic chaining.


In [76]:
input_node = InputNode()
retrieve_node = RetrieveNode()
prompt_node = PromptNode()
gemini_node = GeminiNode()
display_node = DisplayNode()

flow = Flow(input_node)

input_node - "question_available" >> retrieve_node
retrieve_node - "context_available" >> prompt_node
prompt_node - "prompt_ready" >> gemini_node
gemini_node - "answer_ready" >> display_node

# Loop back for continuation
display_node - "continue_conversation" >> input_node

# Handle exit from any point in the flow
input_node - "exit_flow" >> None
retrieve_node - "exit_flow" >> None
prompt_node - "exit_flow" >> None
gemini_node - "exit_flow" >> None
display_node - "exit_flow" >> None

### ‚ñ∂Ô∏è Run the PocketFlow RAG System

This cell:
- Instantiates the flow
- Starts execution
- Keeps the chatbot running until exit

You now have a fully working **PocketFlow-driven RAG system**.


In [77]:
flow._run({})


Answer:
 The RAG-Sequence model uses the same retrieved document to generate the complete sequence. 

Ask a question (or exit): what are  Training setup Details?

Answer:
 The training setup details include:

*   **Trainable Parameters:** A total of 626M trainable parameters, including T-large with 406M parameters.
*   **Knowledge Access:** The ability to access knowledge is present without additional training, achieved by using pre-trained access mechanisms.
*   **Training Type:** "Mixed precision training" is referenced from ICLR 2018 papers.
*   **Datasets and Instances (Train, Development, Test):**
    *   **Natural Questions:** 79169, 8758, 3611
    *   **TriviaQA:** 78786, 8838, 11314
    *   **WebQuestions:** 3418, 362, 2033
    *   **CuratedTrec:** 635, 134, 635
    *   **Jeopardy Question Generation:** 97392, 13714, 2684
    *   *A hidden subset of this data is used for evaluation.* 

Ask a question (or exit): Appendices for Retrieval-Augmented Generation for Knowledge-Intens

KeyboardInterrupt: Interrupted by user