<H3>Install libs<H3>

In [None]:
# C√†i ƒë·∫∑t c√°c th∆∞ vi·ªán ch√≠nh cho m√¥ h√¨nh ng√¥n ng·ªØ, embedding, RAG v√† giao di·ªán web
!pip install -q \
  "torch>=2.0.0" \
  "transformers>=4.40.0" \
  "accelerate>=0.30.0" \
  "huggingface-hub>=0.23.0" \
  "sentence-transformers>=2.7.0" \
  "langchain>=0.2.0" \
  "langchain-core>=0.2.0" \
  "langchain-community>=0.1.0" \
  "langchain-text-splitters>=0.2.0" \
  "chromadb>=0.5.0" \
  "langchain-chroma>=0.2.0" \
  "pypdf>=4.2.0" \
  "gradio>=5.0.0" \
  "langchain-huggingface" \
  "wget" \
  "tqdm" \
  "ipywidgets"

<H3>Setup project + t·∫°o c·∫•u tr√∫c th∆∞ m·ª•c</H3>

In [None]:
import os, sys

# Root d·ª± √°n: d√πng folder "rag_langchain" n·∫±m c√πng c·∫•p notebook (nh∆∞ ·∫£nh b·∫°n)
PROJECT_ROOT = os.path.abspath("rag_langchain")

DATA_DIR = os.path.join(PROJECT_ROOT, "data_source", "generative_ai")  # b·∫°n copy PDF v√†o ƒë√¢y
CUSTOM_DIR = os.path.join(PROJECT_ROOT, "data_source", "custom")       # tu·ª≥ ch·ªçn
CHROMA_DIR = os.path.join(PROJECT_ROOT, "chroma_data")                 # l∆∞u vector DB

os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(CUSTOM_DIR, exist_ok=True)
os.makedirs(CHROMA_DIR, exist_ok=True)

# src (tu·ª≥ ch·ªçn, cho ƒë√∫ng c·∫•u tr√∫c t√†i li·ªáu)
os.makedirs(os.path.join(PROJECT_ROOT, "src", "base"), exist_ok=True)
os.makedirs(os.path.join(PROJECT_ROOT, "src", "rag"), exist_ok=True)

# t·∫°o __init__.py
for p in [
    os.path.join(PROJECT_ROOT, "src", "__init__.py"),
    os.path.join(PROJECT_ROOT, "src", "base", "__init__.py"),
    os.path.join(PROJECT_ROOT, "src", "rag", "__init__.py"),
]:
    if not os.path.exists(p):
        open(p, "w", encoding="utf-8").close()

# th√™m PROJECT_ROOT v√†o sys.path (ph√≤ng khi t√°ch code)
if PROJECT_ROOT not in sys.path:
    sys.path.append(PROJECT_ROOT)

os.environ["TOKENIZERS_PARALLELISM"] = "false"

print("‚úÖ PROJECT_ROOT:", PROJECT_ROOT)
print("‚úÖ Copy PDF v√†o:", DATA_DIR)
print("‚úÖ (Optional) Copy PDF kh√°c v√†o:", CUSTOM_DIR)
print("‚úÖ Chroma DB l∆∞u ·ªü:", CHROMA_DIR)


<h3>Check d·ªØ li·ªáu PDF ƒë√£ c√≥ ch∆∞a</h3>

In [None]:
import glob

pdf_files = sorted(glob.glob(os.path.join(DATA_DIR, "*.pdf")))
print("üìÑ S·ªë PDF trong generative_ai:", len(pdf_files))
for f in pdf_files[:20]:
    print(" -", os.path.basename(f))

if len(pdf_files) == 0:
    raise ValueError(
        "‚ùå Ch∆∞a c√≥ PDF!\n"
        f"H√£y copy v√†i file .pdf v√†o folder:\n{DATA_DIR}\n"
        "R·ªìi ch·∫°y l·∫°i cell n√†y."
    )


<H3>Clean text + Loader + Chunking</H3>

In [None]:
import re
import unicodedata
from typing import List
from tqdm import tqdm

from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

def clean_vietnamese_text(text: str) -> str:
    # Chu·∫©n h√≥a Unicode ti·∫øng Vi·ªát
    text = unicodedata.normalize("NFC", text)

    # Lo·∫°i b·ªè k√Ω t·ª± control (gi·ªØ \n \t)
    text = "".join(
        ch for ch in text
        if (not unicodedata.category(ch).startswith("C")) or ch in "\n\t"
    )

    # G·ªôp kho·∫£ng tr·∫Øng th·ª´a
    text = re.sub(r"[ \t]+", " ", text)
    text = re.sub(r"\n\s*\n", "\n", text)

    return text.strip()

class SimpleLoader:
    def load_pdf(self, pdf_file: str):
        docs = PyPDFLoader(pdf_file, extract_images=True).load()
        for doc in docs:
            doc.page_content = clean_vietnamese_text(doc.page_content)
            # th√™m metadata ƒë·ªÉ debug (file name + page)
            doc.metadata["source_file"] = os.path.basename(pdf_file)
        return docs

    def load_dir(self, dir_path: str) -> List:
        pdfs = sorted(glob.glob(os.path.join(dir_path, "*.pdf")))
        if not pdfs:
            raise ValueError(f"No PDF files found in: {dir_path}")

        all_docs = []
        for pdf in tqdm(pdfs, desc="Loading PDFs"):
            try:
                all_docs.extend(self.load_pdf(pdf))
            except Exception as e:
                print("Skip:", pdf, "|", e)
        return all_docs

class TextSplitter:
    def __init__(self, chunk_size: int = 400, chunk_overlap: int = 120):
        self.splitter = RecursiveCharacterTextSplitter(
            separators=["\n\n", "\n", " ", ""],
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
        )

    def split(self, documents):
        return self.splitter.split_documents(documents)


<H3>Vector DB (Chroma + Embeddings)</H3>

In [None]:
from langchain_chroma import Chroma
from langchain_huggingface import HuggingFaceEmbeddings

class VectorDB:
    def __init__(
        self,
        documents=None,
        embedding_model: str = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
        collection_name: str = "vietnamese_docs",
        persist_dir: str = None,
    ):
        self.persist_dir = persist_dir or CHROMA_DIR
        self.collection_name = collection_name

        self.embedding = HuggingFaceEmbeddings(model_name=embedding_model)
        self.db = self._build_db(documents)

    def _build_db(self, documents):
        if documents is None or len(documents) == 0:
            # load existing
            return Chroma(
                collection_name=self.collection_name,
                embedding_function=self.embedding,
                persist_directory=self.persist_dir,
            )
        else:
            # build new
            return Chroma.from_documents(
                documents=documents,
                embedding=self.embedding,
                collection_name=self.collection_name,
                persist_directory=self.persist_dir,
            )

    def get_retriever(self, k: int = 4):
        return self.db.as_retriever(
            search_type="similarity",
            search_kwargs={"k": k},
        )


<H3>LLM (Qwen) + fallback model nh·ªè cho m√°y y·∫øu</H3>

In [None]:
import torch
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
from langchain_huggingface import HuggingFacePipeline

def get_hf_llm(
    model_name: str = "Qwen/Qwen2.5-3B-Instruct",
    temperature: float = 0.2,
    max_new_tokens: int = 450,
):
    # N·∫øu m√°y y·∫øu / kh√¥ng GPU -> d√πng model nh·ªè cho ch·∫Øc
    if not torch.cuda.is_available() and model_name == "Qwen/Qwen2.5-3B-Instruct":
        model_name = "Qwen/Qwen2.5-0.5B-Instruct"
        print("‚ö†Ô∏è Kh√¥ng th·∫•y GPU -> auto d√πng model nh·ªè:", model_name)

    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
        device_map="auto",
        low_cpu_mem_usage=True,
    )
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    gen_pipe = pipeline(
        "text-generation",
        model=model,
        tokenizer=tokenizer,
        temperature=temperature,
        max_new_tokens=max_new_tokens,
        pad_token_id=tokenizer.eos_token_id,
        do_sample=True,
        top_p=0.75,
    )

    return HuggingFacePipeline(pipeline=gen_pipe)


<H3>Prompt + Parser + RAG chain (k√®m hi·ªÉn th·ªã context)</H3>

In [None]:
import re
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

class FocusedAnswerParser(StrOutputParser):
    def parse(self, text: str) -> str:
        text = (text or "").strip()

        if "[TR·∫¢ L·ªúI]:" in text:
            answer = text.split("[TR·∫¢ L·ªúI]:")[-1].strip()
        else:
            answer = text

        answer = re.sub(r"\n+", " ", answer).strip()

        # gi·ªõi h·∫°n 3-5 c√¢u
        parts = [p.strip() for p in re.split(r"(?<=[\.\!\?])\s+", answer) if p.strip()]
        if len(parts) > 5:
            answer = " ".join(parts[:5]) + " ..."
        return answer

class OfflineRAG:
    def __init__(self, llm):
        self.llm = llm
        self.prompt = PromptTemplate.from_template("""
B·∫°n l√† tr·ª£ l√Ω AI ph√¢n t√≠ch t√†i li·ªáu ti·∫øng Vi·ªát.

[T√ÄI LI·ªÜU]:
{context}

[C√ÇU H·ªéI]:
{question}

H√£y tr·∫£ l·ªùi d·ª±a tr√™n t√†i li·ªáu. N·∫øu t√†i li·ªáu kh√¥ng c√≥ th√¥ng tin, n√≥i r√µ "Kh√¥ng c√≥ th√¥ng tin".
Tr·∫£ l·ªùi ƒë·∫ßy ƒë·ªß th√¥ng tin (3-5 c√¢u chi ti·∫øt), kh√¥ng th√™m b·∫•t k·ª≥ chi ti·∫øt n√†o ngo√†i t√†i li·ªáu.

[TR·∫¢ L·ªúI]:
""".strip())
        self.answer_parser = FocusedAnswerParser()

    def get_chain(self, retriever):
        def format_docs(docs):
            # tr·∫£ v·ªÅ context + metadata cho demo
            blocks = []
            seen = set()
            for d in docs:
                content = (d.page_content or "").strip()
                if not content or len(content) < 40:
                    continue
                key = content[:200]
                if key in seen:
                    continue
                seen.add(key)
                src = d.metadata.get("source_file", "unknown")
                page = d.metadata.get("page", "?")
                blocks.append(f"[{src} | page {page}]\n{content}")
            return "\n\n---\n\n".join(blocks)

        chain = (
            {"context": retriever | format_docs, "question": RunnablePassthrough()}
            | self.prompt
            | self.llm
            | self.answer_parser
        )
        return chain

    def get_context_only(self, retriever):
        # ti·ªán ƒë·ªÉ show context ri√™ng tr√™n UI
        def format_docs(docs):
            blocks = []
            for d in docs:
                src = d.metadata.get("source_file", "unknown")
                page = d.metadata.get("page", "?")
                blocks.append(f"[{src} | page {page}]\n{(d.page_content or '').strip()}")
            return "\n\n---\n\n".join(blocks)
        return retriever | format_docs


<H3>Build pipeline: load ‚Üí chunk ‚Üí vector ‚Üí chain</H3>

In [None]:
# 1) Load + Chunk
loader = SimpleLoader()
splitter = TextSplitter(chunk_size=400, chunk_overlap=120)

raw_docs = loader.load_dir(DATA_DIR)
split_docs = splitter.split(raw_docs)

print("‚úÖ Raw docs:", len(raw_docs))
print("‚úÖ Chunks:", len(split_docs))

# 2) Vector DB + retriever
vdb = VectorDB(documents=split_docs)
retriever = vdb.get_retriever(k=4)

# 3) LLM + RAG
llm = get_hf_llm()
rag = OfflineRAG(llm)
rag_chain = rag.get_chain(retriever)
ctx_chain = rag.get_context_only(retriever)

def answer_question(question: str) -> str:
    try:
        return rag_chain.invoke(question)
    except Exception as e:
        return f"Error: {str(e)}"

def get_context(question: str) -> str:
    try:
        return ctx_chain.invoke(question)
    except Exception as e:
        return f"Error: {str(e)}"

# Test nhanh
print(answer_question("T√†i li·ªáu c√≥ n√≥i v√™ÃÄ lu√¢Ã£t giao th√¥ng kh√¥ng?"))


<H3>Gradio UI</H3>

In [None]:
import gradio as gr

with gr.Blocks(title="RAG: H√äÃ£ TH√îÃÅNG HOÃâI ƒêAÃÅP LU√ÇÃ£T GIAO TH√îNG") as demo:
    gr.Markdown("# üìå RAG ‚Äì H√äÃ£ TH√îÃÅNG HOÃâI ƒêAÃÅP LU√ÇÃ£T GIAO TH√îNG")

    with gr.Row():
        with gr.Column(scale=1):
            question = gr.Textbox(
                label="C√¢u h·ªèi",
                placeholder="Nh·∫≠p c√¢u h·ªèi v·ªÅ n·ªôi dung trong PDF...",
                lines=3
            )
            btn = gr.Button("G·ª≠i", variant="primary")

        with gr.Column(scale=1):
            answer = gr.Textbox(label="C√¢u tr·∫£ l·ªùi", lines=6, interactive=False)

    gr.Markdown("## üîé Context (Top-k chunks h·ªá th·ªëng l·∫•y ra)")
    context = gr.Textbox(label="Top-k Context", lines=10, interactive=False)

    def qa_with_ctx(q):
        return answer_question(q), get_context(q)

    btn.click(fn=qa_with_ctx, inputs=question, outputs=[answer, context])

demo.launch(share=True)