# RAG Pipeline + PostgreSQL + SQLAlchemy

Test RAG 


## Connect PostgreSQL through SQLAlchemy

In [1]:


from sqlalchemy import create_engine, MetaData, Table, Column, Integer, JSON, select
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import sessionmaker

from sqlalchemy import create_engine, MetaData, Table, Column, Integer, JSON
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "postgresql+psycopg2://postgres:abc123@/tourismdb?host=/tmp"

engine = create_engine(DATABASE_URL, echo=True)

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

metadata = MetaData()
print("success")


success


## Define rag table

In [2]:
from sqlalchemy import JSON

rag_table = Table(
    "rag_documents",
    metadata,
    Column("id", Integer, primary_key=True, autoincrement=True),
    Column("content", JSON, nullable=False),
)

metadata.create_all(engine)
print("Table 'rag_documents' created successfully!")

2025-12-03 10:52:53,889 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2025-12-03 10:52:53,890 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-12-03 10:52:53,891 INFO sqlalchemy.engine.Engine select current_schema()
2025-12-03 10:52:53,891 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-12-03 10:52:53,894 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2025-12-03 10:52:53,895 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-12-03 10:52:53,898 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-12-03 10:52:53,905 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname

Testing sample

## Loader: Input Data from SQL

In [3]:
from sqlalchemy import Table, MetaData
from typing import List, Dict, Any

def load_postgres_data_dynamic(engine, table_names: List[str]) -> List[Dict[str, Any]]:
    metadata = MetaData()
    results = []

    with engine.begin() as conn:
        for tbl_name in table_names:
            table = Table(tbl_name, metadata, autoload_with=engine)

            rows = conn.execute(table.select()).fetchall()
            for r in rows:
                row_dict = dict(r._mapping)
                # ID có thể là id hoặc table_name:id
                rid = row_dict.get("id", None)

                results.append({
                    "id": f"{tbl_name}:{rid}",
                    "record": row_dict  # giữ full JSON để embed
                })

    return results


## Splitter : Split data into chunks

In [4]:
def split_documents(data):
    chunks = []
    for item in data:
        chunks.append({
            "id": item["id"],
            "record": item["record"]
        })
    return chunks

## Embedder : Data -> npArray

In [5]:
!"/opt/homebrew/Cellar/jupyterlab/4.5.0_1/libexec/bin/python" -m pip install --upgrade pip
!"/opt/homebrew/Cellar/jupyterlab/4.5.0_1/libexec/bin/python" -m pip install google-generativeai numpy
!"/opt/homebrew/Cellar/jupyterlab/4.5.0_1/libexec/bin/python" -m pip install faiss-cpu
!"/opt/homebrew/Cellar/jupyterlab/4.5.0_1/libexec/bin/python" -m pip install python-dotenv
!"/opt/homebrew/Cellar/jupyterlab/4.5.0_1/libexec/bin/python" -m pip install sentence-transformers



In [6]:
from sentence_transformers import SentenceTransformer
import numpy as np

# Load model 1 lần
_e5_model = SentenceTransformer("intfloat/multilingual-e5-small")

def _record_to_text(record):
    if isinstance(record, dict):
        return " ".join(f"{k}: {v}" for k, v in record.items() if v)
    elif isinstance(record, str):
        return record
    else:
        raise TypeError(f"Unsupported record type: {type(record)}")


def get_embeddings(records):
    """
    records: danh sách dạng [{"record": {...}}, ...]
    return: numpy array (float32) emb
    """
    texts = [_record_to_text(r["record"]) for r in records]

    embeddings = _e5_model.encode(
        texts,
        convert_to_numpy=True,
        show_progress_bar=True
    )

    return embeddings.astype("float32")


  from .autonotebook import tqdm as notebook_tqdm


## VectorStore : Store the embedded data

In [7]:
import faiss
import numpy as np
import os
import pickle
from typing import List, Dict, Optional, Any

"""
    VectorStore dùng FAISS IndexFlatL2 (mặc định). Lưu embeddings + records (metadata).
    - vectors: np.ndarray shape (n, d)
    - records: list[dict] tương ứng
    - persist_path: nếu truyền sẽ ghi faiss.index và records.pkl
"""
class VectorStore:

    def __init__(self, vectors: np.ndarray, records: List[Dict[str, Any]], persist_path: Optional[str] = None):
        assert len(vectors) == len(records), "vectors and records must have same length"
        self.records = records
        self.d = vectors.shape[1]

        self.index = faiss.IndexFlatL2(self.d)
        self.index.add(vectors.astype("float32"))

        self.persist_path = persist_path
        if persist_path:
            os.makedirs(persist_path, exist_ok=True)
            faiss.write_index(self.index, os.path.join(persist_path, "faiss.index"))
            with open(os.path.join(persist_path, "records.pkl"), "wb") as f:
                pickle.dump(self.records, f)


    @classmethod
    def load(cls, persist_path: str) -> "VectorStore":
        """
        Tạo VectorStore từ persist_path (faiss.index + records.pkl).
        """
        idx_path = os.path.join(persist_path, "faiss.index")
        rec_path = os.path.join(persist_path, "records.pkl")
        if not os.path.exists(idx_path) or not os.path.exists(rec_path):
            raise FileNotFoundError("Persist files not found in persist_path")

        index = faiss.read_index(idx_path)
        with open(rec_path, "rb") as f:
            records = pickle.load(f)

        d = index.d
        placeholder = np.zeros((0, d), dtype="float32")
        inst = cls.__new__(cls)
        inst.records = records
        inst.d = d
        inst.index = index
        inst.persist_path = persist_path
        return inst
    

    def search(self, query_vector: np.ndarray, top_k: int = 3) -> List[Dict[str, Any]]:
        """
        Tìm top_k nearest records theo L2. 
        - query_vector shape: (1, d) hoặc (n, d) nhưng we assume (1, d) here.
        Trả về list các record dict theo thứ tự gần nhất -> xa.
        """
        if query_vector.ndim == 1:
            query_vector = query_vector.reshape(1, -1)
        distances, indices = self.index.search(query_vector.astype("float32"), top_k)
        inds = indices[0].tolist()
        result = []
        for i in inds:
            if i < 0 or i >= len(self.records):
                continue
            result.append(self.records[i])
        return result

    def save(self) -> None:
        """
        Explicit persist nếu muốn.
        """
        if not self.persist_path:
            raise RuntimeError("persist_path not configured")
        faiss.write_index(self.index, os.path.join(self.persist_path, "faiss.index"))
        with open(os.path.join(self.persist_path, "records.pkl"), "wb") as f:
            pickle.dump(self.records, f)


## Retriever : Retrieve from embedded data

In [8]:
def retrieve_relevant_docs(query: str, store: VectorStore, top_k: int = 3, embed_model: str = None) -> List[Dict[str, str]]:
    # 1. Embed query đúng kiểu string
    q_vec = get_embeddings([{"record": query}])  # chỉ dùng string, như khi build index

    # 2. Lấy top_k records từ vector store
    records = store.search(q_vec, top_k=top_k)  # giả sử trả về list[str] hoặc list[record]

    # 3. Map về dict cho RAGPipelinePG
    contexts = [{"title": f"Doc {i}", "body": r} for i, r in enumerate(records)]
    return contexts


## Generator : Generate answer from user's queries and retrieved data

In [9]:
from typing import List, Dict
import sys
import os

# Lấy đường dẫn tuyệt đối tới backend
backend_path = os.path.abspath(os.path.join(".."))  # từ notebooks lên backend
print("Backend path:", backend_path)

# Thêm vào sys.path nếu chưa có
if backend_path not in sys.path:
    sys.path.insert(0, backend_path)

# Bây giờ import được
from app.api.llm_module import ask_gemini

import json

"""
    Tạo prompt kết hợp giữa contexts (list record dicts) và câu hỏi.
    retrieved_contexts: list of dict (record).
    Mỗi record được serialize đầy đủ sang JSON để gửi cho LLM.
"""
def build_prompt(user_query: str, retrieved_contexts: List[Dict]) -> str:
    context_texts = []
    for r in retrieved_contexts:
        # Serialize toàn bộ record sang JSON, giữ UTF-8 và readable
        record_json = json.dumps(r, ensure_ascii=False, indent=2)
        context_texts.append(record_json)

    # Nối các record với separator để LLM phân biệt
    context_text = "\n\n---\n\n".join(context_texts)

    prompt = f"""
    Dưới đây là các thông tin tham khảo:
    
    {context_text}
    
    ---
    
    Câu hỏi của người dùng: {user_query}
    
    Hướng dẫn cho hệ thống:
    Bạn là một hướng dẫn viên du lịch chuyên nghiệp, trả lời bằng tiếng Việt, với phong cách thân thiện – gãy gọn – dễ hiểu – giàu cảm xúc như khi đang trò chuyện với du khách.
    YÊU CẦU BẮT BUỘC KHI TRẢ LỜI:
    Phân tích và tổng hợp đầy đủ tất cả thông tin xuất hiện trong phần tham khảo (context).
    Không được bỏ sót bất kỳ chi tiết nào.
    Nếu có nhiều nguồn cung cấp thông tin về cùng một chủ đề, phải gom lại và viết thành một đoạn liền mạch.
    Câu trả lời phải mượt mà, tự nhiên như người thật nói chuyện.
    Tuyệt đối không được nhắc đến “tài liệu”, “doc”, “nguồn số X”, “ngữ cảnh số Y”…
    Chỉ viết nội dung đã tổng hợp, trình bày liền mạch trong một đoạn hoặc nhiều đoạn.
    Không tự bịa thêm thông tin ngoài context.
    Nếu tổng hợp xong mà vẫn không đủ dữ liệu để trả lời câu hỏi, phải trả lời:
    “Xin lỗi, tôi chưa có thông tin về điều đó.”
    Giọng văn phải mang sắc thái của một hướng dẫn viên du lịch:
    nhiệt tình
    am hiểu
    gợi ý thêm hoạt động phù hợp
    đưa lời khuyên hữu ích
    Định dạng trả lời:
    → Trả lời bằng một đoạn văn mô tả tự nhiên (có thể chia đoạn), thân thiện, liền mạch, đầy đủ thông tin đã tổng hợp từ context."""
    return prompt.strip()


async def generate_answer(user_query: str, retrieved_contexts: List[Dict]) -> str:
    try:
        prompt = build_prompt(user_query, retrieved_contexts)
        return await ask_gemini(prompt)
    except Exception as e:
        return f"Lỗi trong generator: {e}"

Backend path: /Users/phungquochuy/smart-tourism-system/backend


# Rag_pipeline

In [10]:
class RAGPipelinePG:
    def __init__(
        self,
        tables: List[str],
        splitter_fn=split_documents,
        embedder_fn=get_embeddings,
        vectorstore_cls=None,
        retriever_fn=None,
        generator_fn=None,
        persist_path=None,
        rebuild_on_init=True,
        engine=engine
    ):
        self.tables = tables
        self.splitter_fn = splitter_fn
        self.embedder_fn = embedder_fn
        self.vectorstore_cls = vectorstore_cls or VectorStore
        self.retriever_fn = retriever_fn or retrieve_relevant_docs
        self.generator_fn = generator_fn or generate_answer
        self.persist_path = persist_path
        self.rebuild_on_init = rebuild_on_init
        self.engine = engine

        self._store = None
        self._chunks = []
        self._vectors = None

        self.build_index()

    def build_index(self):
        """
        - Nếu đã có index trên disk và rebuild_on_init == False -> load index.
        - Ngược lại -> build mới từ loader_fn.
        """
        index_path = None
        if self.persist_path:
            index_path = os.path.join(self.persist_path, "faiss.index")

        # Nếu có index trên disk và caller không muốn rebuild -> load
        if index_path and os.path.exists(index_path) and not self.rebuild_on_init:
            print("Loading existing index from persist_path...")
            self._store = self.vectorstore_cls.load(self.persist_path)
            return

        # Nếu không có index hoặc caller muốn rebuild -> build mới
        print(f"Building new index from tables: {self.tables}")

        data = load_postgres_data_dynamic(self.engine, self.tables)
        chunks = self.splitter_fn(data)
        self._chunks = chunks

        records_for_embed = [{"record": c["record"]} for c in chunks]
        vectors = self.embedder_fn(records_for_embed)

        records = []
        for c in chunks:
            raw = c["record"]
            text = _record_to_text(raw)
            records.append({"raw": raw, "text": text})

        self._store = self.vectorstore_cls(
            vectors=vectors,
            records=records,
            persist_path=self.persist_path
        )

        print("Index built and saved.")


    def retrieve_context(self, query, top_k=3):
        return self.retriever_fn(query, self._store, top_k=top_k)

    async def generate_final_answer(self, question, contexts):
        if not contexts:
            return "Xin lỗi, tôi chưa có thông tin về điều đó."
        return await self.generator_fn(question, contexts)

    async def answer(self, question: str, top_k=3):
        ctx = self.retrieve_context(question, top_k)
        ans = await self.generate_final_answer(question, ctx)
        return {"question": question, "answer": ans, "contexts": ctx}


## Test rag_pipeline

In [11]:
rag = RAGPipelinePG(
    tables=[
        "tourism_places",
        "vietnam_hotels",
        "vietnam_foods"
    ],
    persist_path="../app/rag_store",
    rebuild_on_init=False
)

print("1. RAG initialized")

result = await rag.answer("tỉnh : An Giang, rừng", top_k=3)

print("\n2. KẾT QUẢ TRẢ VỀ:")

if not result:
    print("RAG trả về NONE hoặc RỖNG")
else:
    # In câu trả lời
    print("\n--- ANSWER ---")
    print(result.get("answer", "Không có answer"))

    # In các đoạn retrieve
    print("\n--- RETRIEVED CONTEXTS ---")
    contexts = result.get("contexts", [])
    if len(contexts) == 0:
        print("Không retrieve được context nào")
    else:
        for i, ctx in enumerate(result["contexts"], 1):
            print(f"\nContext #{i}:")
            print(json.dumps(ctx, ensure_ascii=False, indent=2))


print("\n3. DONE")


Loading existing index from persist_path...
1. RAG initialized


Batches: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00,  3.57it/s]



2. KẾT QUẢ TRẢ VỀ:

--- ANSWER ---
Chào bạn! An Giang có hai điểm đến nổi tiếng liên quan đến rừng mà bạn có thể quan tâm đó là Rừng Tràm Trà Sư và Hồ Soài So:

⭐ **Rừng Tràm Trà Sư**: Đây là khu rừng tràm đẹp và nổi tiếng nhất Việt Nam, nơi sinh sống của nhiều loài chim nước và động vật hoang dã. Đến đây, bạn có thể đi tắc ráng xuyên rừng, khám phá cầu tre vạn bước và ngắm sân chim. Giá vé tham quan khoảng 100.000 - 200.000 VNĐ, mở cửa từ 7:00 đến 17:00, và bạn nên dành khoảng 2-3 giờ để khám phá hết vẻ đẹp nơi đây. Đặc biệt, mùa nước nổi (tháng 9-11) là thời điểm đẹp nhất để ghé thăm rừng tràm.

⭐ **Hồ Soài So**: Hồ nước nhân tạo lớn nhất An Giang, nằm dưới chân núi Cô Tô, với khung cảnh hữu tình và yên bình. Bạn có thể cắm trại và check-in suối Vàng tại đây. Hồ mở cửa cả ngày và miễn phí vé vào cổng, bạn nên dành khoảng 1-2 giờ để tham quan. Lưu ý tránh đi vào những ngày mưa để có trải nghiệm tốt nhất nhé!

Ngoài ra, khi đến An Giang, bạn đừng quên thưởng thức các món đặc sản như l

## Print result