In [1]:
import sys
print(sys.version)


3.12.12 (main, Oct 10 2025, 08:52:57) [GCC 11.4.0]


In [2]:
pip install fastapi uvicorn python-dotenv




In [3]:
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI(title="Modular RAG API")

class HealthResponse(BaseModel):
    status: str

@app.get("/health", response_model=HealthResponse)
def health_check():
    return {"status": "ok"}


In [7]:
%%writefile main.py
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI(title="Modular RAG API")

class HealthResponse(BaseModel):
    status: str

@app.get("/health", response_model=HealthResponse)
def health_check():
    return {"status": "ok"}


Writing main.py


In [9]:
!ls


main.py  __pycache__  sample_data


In [10]:
!pkill -f uvicorn


In [15]:
!uvicorn main:app --host 0.0.0.0 --port 8000


[32mINFO[0m:     Started server process [[36m13735[0m]
[32mINFO[0m:     Waiting for application startup.
[32mINFO[0m:     Application startup complete.
[32mINFO[0m:     Uvicorn running on [1mhttp://0.0.0.0:8000[0m (Press CTRL+C to quit)
[32mINFO[0m:     Shutting down
[32mINFO[0m:     Finished server process [[36m13735[0m]
[31mERROR[0m:    Traceback (most recent call last):
  File "/usr/lib/python3.12/asyncio/runners.py", line 195, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/base_events.py", line 678, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.12/asyncio/base_events.py", line 645, in run_forever
    self._run_once()
  File "/usr/lib/python3.12/asyncio/base_events.py", line 1999, in _run_once
    handle._run()
  File "/usr/lib/pyt

In [19]:
import requests

response = requests.get("http://127.0.0.1:8000/health")
print(response.json())


{'status': 'ok'}


In [23]:
pip install openai




In [26]:
%%writefile llm_factory.py
from typing import Any

class BaseLLM:
    """Abstract base class for all LLMs"""
    def generate(self, prompt: str) -> str:
        raise NotImplementedError("LLM must implement generate method")


class GPTLLM(BaseLLM):
    def __init__(self, api_key: str):
        import openai
        openai.api_key = api_key
        self.client = openai

    def generate(self, prompt: str) -> str:
        response = self.client.ChatCompletion.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content


class DeepSeekLLM(BaseLLM):
    def generate(self, prompt: str) -> str:
        return f"[DeepSeek] simulated response for: {prompt}"


class GeminiLLM(BaseLLM):
    def generate(self, prompt: str) -> str:
        return f"[Gemini] simulated response for: {prompt}"


class LLMFactory:
    @staticmethod
    def create(model_provider: str, **kwargs) -> BaseLLM:
        provider = model_provider.lower()

        if provider == "openai":
            return GPTLLM(api_key=kwargs.get("api_key", ""))
        elif provider == "deepseek":
            return DeepSeekLLM()
        elif provider == "gemini":
            return GeminiLLM()
        else:
            raise ValueError(f"Unknown LLM provider: {model_provider}")


Writing llm_factory.py


In [27]:
!ls


llm_factory.py	main.py  __pycache__  sample_data


In [28]:
%%writefile main.py
from fastapi import FastAPI, Query
from pydantic import BaseModel
from llm_factory import LLMFactory

app = FastAPI(title="Modular RAG API")

class HealthResponse(BaseModel):
    status: str

@app.get("/health", response_model=HealthResponse)
def health_check():
    return {"status": "ok"}

@app.get("/test-llm")
def test_llm(model: str = Query(...), prompt: str = Query(...)):
    llm = LLMFactory.create(
        model_provider=model,
        api_key="DUMMY_KEY"
    )
    response = llm.generate(prompt)
    return {"model": model, "response": response}


Overwriting main.py


In [31]:
!pkill -f uvicorn

In [32]:
import requests

response = requests.get(
    "http://127.0.0.1:8000/test-llm",
    params={"model": "deepseek", "prompt": "Hello RAG"}
)

print(response.json())


{'model': 'deepseek', 'response': '[DeepSeek] simulated response for: Hello RAG'}


In [41]:
%%writefile data_loader.py
... (کد loader)


Writing data_loader.py


In [42]:
!ls



data  data_loader.py  llm_factory.py  main.py  __pycache__  sample_data


In [34]:
%%writefile data/sample.txt
This is a sample text file.
It contains internal company policy information.
The termination clause is explained in section 4.
Employees must follow the guidelines strictly.


Writing data/sample.txt


In [39]:
from fpdf import FPDF

pdf = FPDF()
pdf.add_page()
pdf.set_font("Arial", size=12)
pdf.multi_cell(
    0,
    8,
    "This PDF document describes a contract agreement.\n"
    "Termination is allowed with a 30 days written notice.\n"
    "This agreement is governed by internal regulations."
)
pdf.output("data/sample.pdf")


''

In [43]:
!ls


data  data_loader.py  llm_factory.py  main.py  __pycache__  sample_data


In [47]:
ls data


sample.pdf  sample.txt


In [49]:
%%writefile data_loader.py
from pathlib import Path
from typing import List, Dict
from pypdf import PdfReader


def load_txt(file_path: Path) -> Dict:
    text = file_path.read_text(encoding="utf-8")
    return {
        "content": text,
        "source": file_path.name,
        "type": "txt"
    }


def load_pdf(file_path: Path) -> Dict:
    reader = PdfReader(str(file_path))
    pages = [page.extract_text() or "" for page in reader.pages]
    text = "\n".join(pages)
    return {
        "content": text,
        "source": file_path.name,
        "type": "pdf"
    }


def load_documents(data_dir: str) -> List[Dict]:
    documents = []
    path = Path(data_dir)

    for file in path.iterdir():
        if file.suffix.lower() == ".txt":
            documents.append(load_txt(file))
        elif file.suffix.lower() == ".pdf":
            documents.append(load_pdf(file))

    return documents


Overwriting data_loader.py


In [51]:
!sed -n '1,20p' data_loader.py


from pathlib import Path
from typing import List, Dict
from pypdf import PdfReader


def load_txt(file_path: Path) -> Dict:
    text = file_path.read_text(encoding="utf-8")
    return {
        "content": text,
        "source": file_path.name,
        "type": "txt"
    }


def load_pdf(file_path: Path) -> Dict:
    reader = PdfReader(str(file_path))
    pages = [page.extract_text() or "" for page in reader.pages]
    text = "\n".join(pages)
    return {
        "content": text,


In [52]:
from data_loader import load_documents

documents = load_documents("data")

for doc in documents:
    print("SOURCE:", doc["source"])
    print("TYPE:", doc["type"])
    print("CONTENT PREVIEW:")
    print(doc["content"][:200])
    print("-" * 50)


SOURCE: sample.txt
TYPE: txt
CONTENT PREVIEW:
This is a sample text file.
It contains internal company policy information.
The termination clause is explained in section 4.
Employees must follow the guidelines strictly.

--------------------------------------------------
SOURCE: sample.pdf
TYPE: pdf
CONTENT PREVIEW:
This PDF document describes a contract agreement.
Termination is allowed with a 30 days written notice.
This agreement is governed by internal regulations.
--------------------------------------------------


In [55]:
%%writefile chunker.py
from typing import List, Dict


def chunk_text(
    text: str,
    chunk_size: int = 500,
    overlap: int = 100
) -> List[str]:
    chunks = []
    start = 0
    text_length = len(text)

    while start < text_length:
        end = start + chunk_size
        chunk = text[start:end]
        chunks.append(chunk)
        start = end - overlap

        if start < 0:
            start = 0

    return chunks


def chunk_documents(documents: List[Dict]) -> List[Dict]:
    all_chunks = []

    for doc in documents:
        chunks = chunk_text(doc["content"])

        for idx, chunk in enumerate(chunks):
            all_chunks.append({
                "content": chunk,
                "source": doc["source"],
                "type": doc["type"],
                "chunk_id": idx
            })

    return all_chunks


Overwriting chunker.py


In [56]:
from data_loader import load_documents
from chunker import chunk_documents

documents = load_documents("data")
chunks = chunk_documents(documents)

print(f"Total chunks: {len(chunks)}\n")

for c in chunks[:5]:
    print("SOURCE:", c["source"])
    print("CHUNK ID:", c["chunk_id"])
    print("CONTENT PREVIEW:")
    print(c["content"][:200])
    print("-" * 50)


Total chunks: 2

SOURCE: sample.txt
CHUNK ID: 0
CONTENT PREVIEW:
This is a sample text file.
It contains internal company policy information.
The termination clause is explained in section 4.
Employees must follow the guidelines strictly.

--------------------------------------------------
SOURCE: sample.pdf
CHUNK ID: 0
CONTENT PREVIEW:
This PDF document describes a contract agreement.
Termination is allowed with a 30 days written notice.
This agreement is governed by internal regulations.
--------------------------------------------------


In [69]:
%%writefile embedding_factory.py
from typing import List


class BaseEmbedding:
    def embed(self, texts: List[str]) -> List[List[float]]:
        raise NotImplementedError


# ---- OpenAI Embedding (paid) ----
class OpenAIEmbedding(BaseEmbedding):
    def __init__(self, api_key: str, model: str):
        from openai import OpenAI
        self.client = OpenAI(api_key=api_key)
        self.model = model

    def embed(self, texts: List[str]) -> List[List[float]]:
        response = self.client.embeddings.create(
            model=self.model,
            input=texts
        )
        return [item.embedding for item in response.data]


# ---- Mock Embedding (free, dev/testing) ----
class MockEmbedding(BaseEmbedding):
    def embed(self, texts: List[str]) -> List[List[float]]:
        vectors = []
        for text in texts:
            # simple deterministic fake embedding
            vectors.append([float(len(text))] * 10)
        return vectors


# ---- Factory ----
class EmbeddingFactory:
    @staticmethod
    def create(provider: str, **kwargs) -> BaseEmbedding:
        provider = provider.lower()

        if provider == "openai":
            return OpenAIEmbedding(
                api_key=kwargs["api_key"],
                model=kwargs.get("model", "text-embedding-3-small")
            )

        if provider == "mock":
            return MockEmbedding()

        raise ValueError(f"Unknown embedding provider: {provider}")


Overwriting embedding_factory.py


In [75]:
%%writefile embedding_factory.py
from typing import List


class BaseEmbedding:
    def embed(self, texts: List[str]) -> List[List[float]]:
        raise NotImplementedError


class MockEmbedding(BaseEmbedding):
    def embed(self, texts: List[str]) -> List[List[float]]:
        vectors = []
        for text in texts:
            vectors.append([float(len(text))] * 10)
        return vectors


class EmbeddingFactory:
    @staticmethod
    def create(provider: str, **kwargs) -> BaseEmbedding:
        provider = provider.lower()

        if provider == "mock":
            return MockEmbedding()

        raise ValueError(f"Unknown embedding provider: {provider}")


Overwriting embedding_factory.py


In [1]:
%%writefile embedding_factory.py
from typing import List


class BaseEmbedding:
    def embed(self, texts: List[str]) -> List[List[float]]:
        raise NotImplementedError


class MockEmbedding(BaseEmbedding):
    def embed(self, texts: List[str]) -> List[List[float]]:
        return [[float(len(text))] * 10 for text in texts]


class EmbeddingFactory:
    @staticmethod
    def create(provider: str, **kwargs) -> BaseEmbedding:
        provider = provider.lower()

        if provider == "mock":
            return MockEmbedding()

        raise ValueError(f"Unknown embedding provider: {provider}")


Overwriting embedding_factory.py


In [1]:
from embedding_factory import EmbeddingFactory

embedder = EmbeddingFactory.create("mock")
print(type(embedder))


<class 'embedding_factory.MockEmbedding'>


In [65]:
%%writefile vector_store.py
from typing import List, Dict
import math


def cosine_similarity(vec1: List[float], vec2: List[float]) -> float:
    dot = sum(a * b for a, b in zip(vec1, vec2))
    norm1 = math.sqrt(sum(a * a for a in vec1))
    norm2 = math.sqrt(sum(b * b for b in vec2))
    return dot / (norm1 * norm2 + 1e-10)


class InMemoryVectorStore:
    def __init__(self):
        self.vectors = []
        self.metadata = []

    def add(self, vectors: List[List[float]], metadatas: List[Dict]):
        for v, m in zip(vectors, metadatas):
            self.vectors.append(v)
            self.metadata.append(m)

    def search(self, query_vector: List[float], top_k: int = 3):
        scores = []

        for idx, vector in enumerate(self.vectors):
            score = cosine_similarity(query_vector, vector)
            scores.append((score, self.metadata[idx]))

        scores.sort(key=lambda x: x[0], reverse=True)
        return scores[:top_k]


Writing vector_store.py


In [2]:
from data_loader import load_documents
from chunker import chunk_documents
from embedding_factory import EmbeddingFactory
from vector_store import InMemoryVectorStore

documents = load_documents("data")
chunks = chunk_documents(documents)

texts = [c["content"] for c in chunks]
metadata = [
    {
        "source": c["source"],
        "chunk_id": c["chunk_id"],
        "preview": c["content"][:100]
    }
    for c in chunks
]

embedder = EmbeddingFactory.create(provider="mock")
vectors = embedder.embed(texts)

store = InMemoryVectorStore()
store.add(vectors, metadata)

query = "How can a contract be terminated?"
query_vector = embedder.embed([query])[0]

results = store.search(query_vector, top_k=3)

for score, meta in results:
    print("Score:", score)
    print("Source:", meta["source"])
    print("Chunk ID:", meta["chunk_id"])
    print("Preview:", meta["preview"])
    print("-" * 40)


Score: 0.9999999999999982
Source: sample.txt
Chunk ID: 0
Preview: This is a sample text file.
It contains internal company policy information.
The termination clause 
----------------------------------------
Score: 0.999999999999998
Source: sample.pdf
Chunk ID: 0
Preview: This PDF document describes a contract agreement.
Termination is allowed with a 30 days written noti
----------------------------------------


In [3]:
%%writefile llm_factory.py
class BaseLLM:
    def generate(self, question: str, context: str) -> str:
        raise NotImplementedError


class MockLLM(BaseLLM):
    def generate(self, question: str, context: str) -> str:
        return (
            "ANSWER (mock)\n\n"
            "QUESTION:\n"
            f"{question}\n\n"
            "CONTEXT USED:\n"
            f"{context}\n\n"
            "NOTE: This answer is generated by a mock LLM."
        )


class LLMFactory:
    @staticmethod
    def create(provider: str) -> BaseLLM:
        provider = provider.lower()

        if provider == "mock":
            return MockLLM()

        raise ValueError(f"Unknown LLM provider: {provider}")


Overwriting llm_factory.py


In [4]:
%%writefile rag_pipeline.py
from embedding_factory import EmbeddingFactory
from vector_store import InMemoryVectorStore
from llm_factory import LLMFactory


class RAGPipeline:
    def __init__(self, store: InMemoryVectorStore):
        self.store = store
        self.embedder = EmbeddingFactory.create(provider="mock")
        self.llm = LLMFactory.create(provider="mock")

    def answer(self, question: str, top_k: int = 3) -> str:
        query_vector = self.embedder.embed([question])[0]
        results = self.store.search(query_vector, top_k=top_k)

        context_parts = []
        for score, meta in results:
            context_parts.append(meta["preview"])

        context = "\n---\n".join(context_parts)

        return self.llm.generate(question=question, context=context)


Writing rag_pipeline.py


In [5]:
%%writefile main.py
from fastapi import FastAPI
from pydantic import BaseModel

from data_loader import load_documents
from chunker import chunk_documents
from embedding_factory import EmbeddingFactory
from vector_store import InMemoryVectorStore
from rag_pipeline import RAGPipeline


app = FastAPI(title="RAG API (Mock)")

# ---- Build knowledge base at startup ----
documents = load_documents("data")
chunks = chunk_documents(documents)

texts = [c["content"] for c in chunks]
metadata = [
    {
        "source": c["source"],
        "chunk_id": c["chunk_id"],
        "preview": c["content"][:200]
    }
    for c in chunks
]

embedder = EmbeddingFactory.create(provider="mock")
vectors = embedder.embed(texts)

store = InMemoryVectorStore()
store.add(vectors, metadata)

rag = RAGPipeline(store)


# ---- API schema ----
class QuestionRequest(BaseModel):
    question: str


class AnswerResponse(BaseModel):
    answer: str


@app.post("/ask", response_model=AnswerResponse)
def ask_question(req: QuestionRequest):
    answer = rag.answer(req.question)
    return {"answer": answer}


Overwriting main.py


In [6]:
import requests

response = requests.post(
    "http://127.0.0.1:8000/ask",
    json={"question": "How can a contract be terminated?"}
)

print(response.json())


{'answer': 'ANSWER (mock)\n\nQUESTION:\nHow can a contract be terminated?\n\nCONTEXT USED:\nThis is a sample text file.\nIt contains internal company policy information.\nThe termination clause is explained in section 4.\nEmployees must follow the guidelines strictly.\n\n---\nThis PDF document describes a contract agreement.\nTermination is allowed with a 30 days written notice.\nThis agreement is governed by internal regulations.\n\nNOTE: This answer is generated by a mock LLM.'}


In [8]:
ls


chunker.py  data_loader.py        llm_factory.py  rag_pipeline.py
[0m[01;34mdata[0m/       embedding_factory.py  main.py         vector_store.py


In [9]:
%%writefile requirements.txt
fastapi
uvicorn
pydantic
pypdf
fpdf


Writing requirements.txt


In [10]:
%%writefile README.md
# RAG From Scratch (Vendor-Agnostic)

This project implements a complete Retrieval-Augmented Generation (RAG) pipeline from scratch.

## Features
- PDF and TXT ingestion
- Deterministic chunking with overlap
- Pluggable embedding providers (mock)
- In-memory vector store with cosine similarity
- Retrieval-based context injection
- Pluggable LLM providers (mock)
- FastAPI backend

## Architecture
Documents
→ Chunking
→ Embeddings
→ Vector Store
→ Retrieval
→ LLM

## Run locally
```bash
pip install -r requirements.txt
uvicorn main:app --reload


Writing README.md


In [12]:
import requests

response = requests.post(
    "http://127.0.0.1:8000/ask",
    json={
        "question": "How can a contract be terminated?"
    }
)

print(response.json())



{'answer': 'ANSWER (mock)\n\nQUESTION:\nHow can a contract be terminated?\n\nCONTEXT USED:\nThis is a sample text file.\nIt contains internal company policy information.\nThe termination clause is explained in section 4.\nEmployees must follow the guidelines strictly.\n\n---\nThis PDF document describes a contract agreement.\nTermination is allowed with a 30 days written notice.\nThis agreement is governed by internal regulations.\n\nNOTE: This answer is generated by a mock LLM.'}
