# Retrieval-Augmented Generation (RAG) System

### Dependencies installation

In [None]:
!pip install accelerate>=1.8.1
!pip install docling>=2.41.0
!pip install lancedb
!pip install pandas
!pip install sentence-transformers

### Step 1: Parsing & Chunking

In [None]:
import os

from docling import chunking
from docling.document_converter import DocumentConverter
from docling_core.transforms.chunker.tokenizer.huggingface import HuggingFaceTokenizer
from transformers import AutoTokenizer


class Chunker:
    def __init__(self, embedding_model: str, max_tokens: int = 1024):
        tokenizer = HuggingFaceTokenizer(
            tokenizer=AutoTokenizer.from_pretrained(embedding_model),
            max_tokens=max_tokens,
        )
        self.__chunker = chunking.HybridChunker(tokenizer=tokenizer, merge_peers=True)

    def chunk(self, source: str):
        doc = DocumentConverter().convert(source=source).document
        chunk_iter = self.__chunker.chunk(dl_doc=doc)
        chunks = list(chunk_iter)
        chunks_dicts = []
        for chunk in chunks:
            chunks_dicts.append(
                {
                    "content": chunk.text,
                    "page_number": chunk.meta.doc_items[0].prov[0].page_no,
                    "pdf_name": os.path.basename(source),
                }
            )
        return chunks_dicts


### Step 2: Embedding with SentenceTransformer


In [None]:
from typing import List

import torch
from sentence_transformers import SentenceTransformer


class CustomEmbeddings:

    def __init__(
            self,
            model_name: str,
            trust_remote_code: bool = True,
            device: str = torch.device("cuda" if torch.cuda.is_available() else "cpu"),
            normalize_embeddings: bool = True,
    ):
        self.model_name = model_name
        self.normalize_embeddings = normalize_embeddings
        self.model = SentenceTransformer(
            model_name,
            trust_remote_code=trust_remote_code,
            device=device,
        )

    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        embeddings = self.model.encode(
            texts,
            normalize_embeddings=self.normalize_embeddings,
            convert_to_tensor=False
        )
        return embeddings.tolist()

    def embed_query(self, text: str) -> List[float]:
        embedding = self.model.encode(
            text,
            normalize_embeddings=self.normalize_embeddings,
            convert_to_tensor=False
        )
        return embedding.tolist()

    @property
    def embedding_dimension(self) -> int:
        """Get the dimension of the embeddings."""
        return self.model.get_sentence_embedding_dimension()






### Step 3: Indexing & Semantic Search with LanceDB

In [None]:
from datetime import timedelta

import lancedb
from lancedb.table import Table
from pandas import DataFrame


class LanceDB:

    def __init__(self,
                 vector_storage_path: str = "./lancedb/vector_storage",
                 table_name: str = "knowledge_base"):
        db = lancedb.connect(uri=vector_storage_path)
        import pyarrow as pa
        schema = pa.schema([
            pa.field("content", pa.string()),
            pa.field("page_number", pa.int32()),
            pa.field("pdf_name", pa.string()),
            pa.field("embeddings", pa.list_(pa.float32(), 1024)),
        ])
        try:
            db.create_table(table_name, schema=schema)
            print(f"Table {table_name} created successfully.")
        except Exception as e:
            print(f"Table {table_name} already exists. {e}")
        self.__table: Table = db.open_table(name=table_name)

    def semantic_search(self, vector_query: list[float], n: int = 10, distance_threshold=0.50) -> DataFrame:
        search_results = self.__table.search(vector_query, vector_column_name="embeddings").distance_type(
            "cosine").limit(n).to_pandas()
        print(f"search_results\n\n {search_results}")
        return search_results.loc[search_results["_distance"] <= distance_threshold]

    def get_count(self) -> int:
        return self.__table.count_rows()

    def save(self, df: DataFrame):
        self.__table.add(df)
        print(f"total records in lancedb : {self.__table.count_rows()}")

    def create_index(self):
        try:
            self.__table.create_index(metric="cosine", vector_column_name="embeddings")
        except Exception as e:
            print(f"Seems index already exist {e}")


### Step 4: Prompt Template


In [None]:
class PromptTemplate:
    @staticmethod
    def build(context: str, question: str, max_token: int = 512) -> str:
        prompt = f"""You are a Climate Science Assistant using IPCC research to explain climate change clearly and compassionately.

**Your Approach:**
- Use solid IPCC scientific evidence
- Explain concepts accessibly for all audiences
- Be honest about uncertainties while providing clear guidance
- Support responses with specific data and findings
- Remain helpful, accurate, and encouraging
- **Keep responses under {max_token} tokens**

**Available Scientific Context (IPCC 2023 Synthesis Report):**
{context}

**Question:**
{question}

**Your Response (max {max_token} tokens):**

        """
        return prompt


### Step 5: LLM Inference Using Qwen


In [None]:
from typing import List, Dict, Tuple

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer


class QwenLLM:

    def __init__(self, model_name: str = "Qwen/Qwen3-1.7B"):
        self.model_name = model_name
        self.tokenizer = None
        self.model = None
        self.device = None
        self._load_model()

    def _load_model(self) -> None:
        print(f"Loading model: {self.model_name}")
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        self.model = AutoModelForCausalLM.from_pretrained(
            self.model_name,
            torch_dtype=torch.float16,
            device_map="auto"
        )
        self.device = self.model.device
        print(f"Model loaded successfully on device: {self.device}")

    def _prepare_messages(self, prompt: str) -> List[Dict[str, str]]:
        return [{"role": "user", "content": prompt}]

    def _parse_thinking_content(self, output_ids: List[int]) -> Tuple[str, str]:
        try:
            # Find the index of </think> token (151668)
            index = len(output_ids) - output_ids[::-1].index(151668)
        except ValueError:
            # If </think> token not found, no thinking content
            index = 0
        thinking_content = self.tokenizer.decode(output_ids[:index], skip_special_tokens=True).strip("\n")
        main_content = self.tokenizer.decode(output_ids[index:], skip_special_tokens=True).strip("\n")

        return thinking_content, main_content

    def invoke(self,
               prompt: str,
               max_new_tokens: int = 1024,
               enable_thinking: bool = True,
               return_thinking: bool = True,
               **generation_kwargs) -> Dict[str, str]:
        messages = self._prepare_messages(prompt)
        text = self.tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True,
            enable_thinking=enable_thinking
        )
        model_inputs = self.tokenizer([text], return_tensors="pt").to(self.device)
        with torch.no_grad():
            generated_ids = self.model.generate(
                **model_inputs,
                max_new_tokens=max_new_tokens,
                **generation_kwargs
            )
        output_ids = generated_ids[0][len(model_inputs.input_ids[0]):].tolist()
        if enable_thinking and return_thinking:
            thinking_content, main_content = self._parse_thinking_content(output_ids)
            return {
                "response": main_content,
                "thinking": thinking_content
            }
        else:
            content = self.tokenizer.decode(output_ids, skip_special_tokens=True).strip("\n")
            return {
                "response": content,
                "thinking": ""
            }


### Step 6: Putting It All Together


In [None]:
import pandas as pd

from src.chunker.chunker import Chunker
from src.embedding.custom_embedding import CustomEmbeddings
from src.llm.qwen_llm import QwenLLM
from src.prompt.prompt_template import PromptTemplate
from src.storage.lancedb import LanceDB

pdf_data = "https://www.ipcc.ch/report/ar6/syr/downloads/report/IPCC_AR6_SYR_LongerReport.pdf"

EMBEDDING_MODEL = "Qwen/Qwen3-Embedding-0.6B"
LLM_MODEL = "Qwen/Qwen3-1.7B"


# initialize the embedding model
embeddings = CustomEmbeddings(model_name=EMBEDDING_MODEL)

# initialize the LLM
llm = QwenLLM(model_name=LLM_MODEL)

# initialize the Chunker
chunker = Chunker(embedding_model=EMBEDDING_MODEL)

# initialize the Vector DB
lancedb = LanceDB(table_name="rag_table")
# Run document Indexing
print("Start Chunking ....")
documents = chunker.chunk(pdf_data)
print("Chunking done....")
df = pd.DataFrame(documents, columns=["content", "page_number", "pdf_name"])
print("Start Embedding ....")
df["embeddings"] = df["content"].apply(embeddings.embed_query)
print("Embedding  done....")
print(df)
print("Start saving ....")
lancedb.save(df)

# RAG
query = "How is climate change affecting biodiversity?"

vector_query = embeddings.embed_query(query)
result_df = lancedb.semantic_search(vector_query=vector_query, n=2)
context = "\n\n".join(result_df["content"].tolist())
formatted_prompt = PromptTemplate.build(context=context, question=query)
print("\nFormatted Prompt:" + "\n" + formatted_prompt)
final_response = llm.invoke(formatted_prompt, enable_thinking=True, return_thinking=True)
print("\nFinal RAG Response:")
print(final_response["response"])
