*Code ch·∫°y tr√™n Google Colab*

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
# =========================
# 0) C√ÄI TH∆Ø VI·ªÜN
# =========================
!pip -q install --upgrade pip
!pip -q install "sentence-transformers>=3.0.0" faiss-cpu "rank_bm25>=0.2.2" datasets
!pip -q install "transformers>=4.41.0" "accelerate>=0.30.0"
!pip -q install fastapi uvicorn nest_asyncio
!pip install ijson



In [3]:
# =========================
# 1) C·∫§U H√åNH & IMPORT
# =========================
import os, json, math, gzip, pickle, textwrap, re, uuid, time
from typing import List, Dict, Tuple, Any

import numpy as np
import faiss

from sentence_transformers import SentenceTransformer, CrossEncoder
from rank_bm25 import BM25Okapi

from collections import defaultdict
import re

# --- ƒê∆∞·ªùng d·∫´n l√†m vi·ªác tr√™n Colab ---
WORK_DIR = "/content/drive/MyDrive/eGov-Bot/ITB"
os.makedirs(WORK_DIR, exist_ok=True)

RAW_JSON_PATH   = f"{WORK_DIR}/toan_bo_du_lieu.json"      # d·ªØ li·ªáu g·ªëc (list c√°c th·ªß t·ª•c)
CHUNKS_JSONL    = f"{WORK_DIR}/chunks.jsonl"  # d·ªØ li·ªáu ƒë√£ chunk (m·ªói d√≤ng 1 chunk)
FAISS_INDEX     = f"{WORK_DIR}/index.faiss"   # file ch·ªâ m·ª•c FAISS
METAS_PKL_GZ    = f"{WORK_DIR}/metas.pkl.gz"  # metadata k√®m text
BM25_PICKLE     = f"{WORK_DIR}/bm25.pkl.gz"   # bm25 corpus (tu·ª≥ ch·ªçn)

# --- Model embedding & reranker ---
EMBED_MODEL_NAME   = "AITeamVN/Vietnamese_Embedding"


In [None]:
# =========================
# 3) H√ÄM TI·ªÄN X·ª¨ L√ù & CHUNKING (√≠t RAM)
# =========================
import json
import re
import uuid
import gc
import os
import sys
from typing import Generator, Dict, Any, List
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing

FIELD_ORDER = [
    "ten_thu_tuc", "yeu_cau_dieu_kien", "thanh_phan_ho_so",
    "trinh_tu_thuc_hien", "cach_thuc_thuc_hien",
    "co_quan_thuc_hien", "thu_tuc_lien_quan"
]

FIELD_VN_NAME = {
    "ten_thu_tuc": "T√™n th·ªß t·ª•c",
    "yeu_cau_dieu_kien": "Y√™u c·∫ßu, ƒëi·ªÅu ki·ªán",
    "thanh_phan_ho_so": "Th√†nh ph·∫ßn h·ªì s∆°",
    "trinh_tu_thuc_hien": "Tr√¨nh t·ª± th·ª±c hi·ªán",
    "cach_thuc_thuc_hien": "C√°ch th·ª©c th·ª±c hi·ªán",
    "co_quan_thuc_hien": "C∆° quan th·ª±c hi·ªán",
    "thu_tuc_lien_quan": "Th·ªß t·ª•c li√™n quan"
}


def normalize_text(s: str) -> str:
    """Chu·∫©n h√≥a text"""
    if not s:
        return ""
    s = str(s).replace("\r\n", "\n").replace("\t", " ").strip()
    s = re.sub(r"[ \u00A0]+", " ", s)
    s = re.sub(r"\n{3,}", "\n\n", s)
    return s

def split_text_rcts(
    text: str,
    max_chars: int = 800,
    overlap: int = 50,
    separators: List[str] = None
) -> Generator[str, None, None]:
    """Recursive Text Character Split (RCTS)"""
    text = normalize_text(text)
    if not text:
        return

    if separators is None:
        separators = ["\n\n", "\n", ". ", ", ", " "]

    def _split_recursive(txt: str) -> List[str]:
        if len(txt) <= max_chars:
            return [txt]

        # t√¨m separator ph√π h·ª£p nh·∫•t
        for sep in separators:
            idx = txt.rfind(sep, 0, max_chars)
            if idx != -1 and idx > max_chars * 0.3:  # tr√°nh c·∫Øt qu√° s·ªõm
                left = txt[:idx + len(sep)].strip()
                right = txt[idx + len(sep):].strip()
                return _split_recursive(left) + _split_recursive(right)

        # n·∫øu kh√¥ng t√¨m ƒë∆∞·ª£c -> c·∫Øt c·ª©ng
        left = txt[:max_chars]
        right = txt[max_chars:]
        return [left] + _split_recursive(right)

    chunks = _split_recursive(text)

    # th√™m overlap gi·ªØa c√°c chunk
    final_chunks = []
    for i, chunk in enumerate(chunks):
        if i > 0 and overlap > 0:
            prefix = chunks[i-1][-overlap:]
            chunk = prefix + " " + chunk
        final_chunks.append(chunk.strip())

    return (c for c in final_chunks if c.strip())

def create_chunk(record: Dict[str, Any], field: str, piece: str, piece_idx: int) -> str:
    """T·∫°o m·ªôt chunk JSON"""
    parent_id = str(record.get("nguon", str(uuid.uuid4())[:8]))
    title = normalize_text(record.get("ten_thu_tuc", ""))
    source = str(record.get("nguon", ""))

    field_name = FIELD_VN_NAME.get(field, field)
    text_content = f"Th·ªß t·ª•c: {title}\nM·ª•c: {field_name}\nN·ªôi dung: {piece}"

    chunk = {
        "id": f"{parent_id}#{field}#{piece_idx}",
        "parent_id": parent_id,
        "ten_thu_tuc": title,
        "field": field,
        "text": text_content,
        "raw": piece,
        "nguon": source
    }

    return json.dumps(chunk, ensure_ascii=False)

def process_record_streaming(record: Dict[str, Any], outfile) -> int:
    """X·ª≠ l√Ω m·ªôt record v√† ghi tr·ª±c ti·∫øp ra file"""
    chunk_count = 0

    for field in FIELD_ORDER:
        raw_value = record.get(field)
        if not raw_value:
            continue

        piece_idx = 0
        for piece in split_text_rcts(str(raw_value)):
            if not piece.strip():
                continue

            try:
                chunk_json = create_chunk(record, field, piece, piece_idx)
                outfile.write(chunk_json + "\n")
                chunk_count += 1
                piece_idx += 1
            except Exception as e:
                print(f"‚ùå L·ªói t·∫°o chunk cho field {field}: {e}")
                continue

    return chunk_count

def safe_json_stream_parser(filepath: str) -> Generator[Dict[str, Any], None, None]:
    """Parser JSON streaming an to√†n h∆°n"""

    print("üîÑ ƒêang parse JSON stream...")

    try:
        with open(filepath, 'r', encoding='utf-8', buffering=8192) as f:
            # ƒê·ªçc v√† ki·ªÉm tra k√Ω t·ª± ƒë·∫ßu
            first_char = f.read(1)
            if first_char != '[':
                print(f"‚ùå File kh√¥ng b·∫Øt ƒë·∫ßu b·∫±ng '[', m√† l√† '{first_char}'")
                return

            # Reset v·ªÅ ƒë·∫ßu v√† b·ªè qua '['
            f.seek(1)

            buffer = ""
            brace_count = 0
            bracket_count = 0
            in_string = False
            escape_next = False
            char_count = 0

            while True:
                char = f.read(1)
                if not char:
                    break

                char_count += 1

                # X·ª≠ l√Ω escape characters
                if escape_next:
                    buffer += char
                    escape_next = False
                    continue

                if char == '\\' and in_string:
                    buffer += char
                    escape_next = True
                    continue

                # X·ª≠ l√Ω strings
                if char == '"':
                    in_string = not in_string
                    buffer += char
                    continue

                if in_string:
                    buffer += char
                    continue

                # X·ª≠ l√Ω c·∫•u tr√∫c JSON
                if char == '{':
                    brace_count += 1
                    buffer += char
                elif char == '}':
                    brace_count -= 1
                    buffer += char

                    # Ho√†n th√†nh m·ªôt object
                    if brace_count == 0 and bracket_count == 0:
                        try:
                            # Clean buffer tr∆∞·ªõc khi parse
                            clean_buffer = buffer.strip().rstrip(',')
                            if clean_buffer:
                                obj = json.loads(clean_buffer)
                                yield obj
                                del obj
                        except json.JSONDecodeError as e:
                            print(f"‚ùå JSON decode error t·∫°i k√Ω t·ª± {char_count}: {e}")
                            print(f"   Buffer: {buffer[:100]}...")

                        # Reset buffer
                        buffer = ""
                        gc.collect()

                elif char == '[':
                    bracket_count += 1
                    buffer += char
                elif char == ']':
                    if bracket_count > 0:
                        bracket_count -= 1
                        buffer += char
                    else:
                        # K·∫øt th√∫c main array
                        break
                elif char == ',':
                    if brace_count == 0 and bracket_count == 0:
                        # D·∫•u ph·∫©y gi·ªØa c√°c objects ch√≠nh
                        continue
                    else:
                        buffer += char
                elif char in ' \t\n\r':
                    # Whitespace
                    if buffer.strip():  # Ch·ªâ th√™m n·∫øu buffer kh√¥ng r·ªóng
                        buffer += ' '
                else:
                    buffer += char

                # Progress indicator
                if char_count % 100000 == 0:
                    print(f"üìñ ƒê√£ ƒë·ªçc {char_count} k√Ω t·ª±...")

    except Exception as e:
        print(f"‚ùå L·ªói ƒë·ªçc file: {e}")

def process_one_record(record: Dict[str, Any]) -> List[str]:
    """X·ª≠ l√Ω 1 record v√† tr·∫£ v·ªÅ danh s√°ch JSON string (chunk)."""
    chunks = []
    for field in FIELD_ORDER:
        raw_value = record.get(field)
        if not raw_value:
            continue

        piece_idx = 0
        for piece in split_text_rcts(str(raw_value)):
            if not piece.strip():
                continue
            try:
                chunk_json = create_chunk(record, field, piece, piece_idx)
                chunks.append(chunk_json)
                piece_idx += 1
            except Exception as e:
                print(f"‚ùå L·ªói t·∫°o chunk cho field {field}: {e}")
                continue
    return chunks


def chunking_main():
    """Chunking ch√≠nh ‚Äì ch·∫°y song song b·∫±ng to√†n b·ªô CPU core"""
    print("=== B·∫ÆT ƒê·∫¶U CHUNKING (MULTICORE) ===")

    if not os.path.exists(RAW_JSON_PATH):
        print(f"‚ùå File kh√¥ng t·ªìn t·∫°i: {RAW_JSON_PATH}")
        return

    file_size_mb = os.path.getsize(RAW_JSON_PATH) / (1024 * 1024)
    print(f"üìÅ K√≠ch th∆∞·ªõc file: {file_size_mb:.2f} MB")

    total_chunks = 0
    processed_records = 0
    num_cores = multiprocessing.cpu_count()
    print(f"üñ•Ô∏è Ph√°t hi·ªán {num_cores} CPU core ‚Äì d√πng t·ªëi ƒëa")

    try:
        with open(CHUNKS_JSONL, 'w', encoding='utf-8', buffering=1024) as outfile, \
             ProcessPoolExecutor(max_workers=num_cores) as executor:

            futures = []
            for record in safe_json_stream_parser(RAW_JSON_PATH):
                futures.append(executor.submit(process_one_record, record))
                processed_records += 1

                # ƒë·ªÉ tr√°nh d·ªìn qu√° nhi·ªÅu future -> flush d·∫ßn
                if len(futures) >= 100:
                    for f in as_completed(futures):
                        chunks = f.result()
                        for c in chunks:
                            outfile.write(c + "\n")
                        total_chunks += len(chunks)
                    outfile.flush()
                    futures = []
                    gc.collect()

                    if processed_records % 50 == 0:
                        print(f"‚úÖ {processed_records} records ‚Üí {total_chunks} chunks")

            # X·ª≠ l√Ω n·ªët c√°c future c√≤n l·∫°i
            for f in as_completed(futures):
                chunks = f.result()
                for c in chunks:
                    outfile.write(c + "\n")
                total_chunks += len(chunks)

    except Exception as e:
        print(f"‚ùå L·ªói fatal: {e}")
        return

    print(f"\nüéâ HO√ÄN TH√ÄNH!")
    print(f"üìä T·ªïng k·∫øt: {processed_records} records ‚Üí {total_chunks} chunks")
    print(f"üíæ Output: {CHUNKS_JSONL}")

def fallback_simple_load():
    """Ph∆∞∆°ng √°n d·ª± ph√≤ng: load tr·ª±c ti·∫øp (cho file nh·ªè)"""
    print("üîÑ Th·ª≠ ph∆∞∆°ng √°n load tr·ª±c ti·∫øp...")

    try:
        with open(RAW_JSON_PATH, 'r', encoding='utf-8') as f:
            data = json.load(f)

        print(f"‚úÖ Load th√†nh c√¥ng! T·ªïng records: {len(data)}")

        total_chunks = 0

        with open(CHUNKS_JSONL, 'w', encoding='utf-8') as outfile:
            for i, record in enumerate(data):
                try:
                    chunks_created = process_record_streaming(record, outfile)
                    total_chunks += chunks_created

                    if (i + 1) % 100 == 0:
                        print(f"‚úÖ {i + 1}/{len(data)} ‚Üí {total_chunks} chunks")
                        outfile.flush()
                        gc.collect()

                except Exception as e:
                    print(f"‚ùå L·ªói record {i}: {e}")
                    continue

        print(f"üéâ Fallback ho√†n th√†nh: {len(data)} records ‚Üí {total_chunks} chunks")

    except Exception as e:
        print(f"‚ùå Fallback th·∫•t b·∫°i: {e}")

def main():
    """Main function"""
    print("=== ROBUST JSON CHUNKING ===")

    try:
        chunking_main()
    except Exception as e:
        print(f"‚ùå Ph∆∞∆°ng ph√°p ch√≠nh th·∫•t b·∫°i: {e}")
        print("üîÑ Th·ª≠ ph∆∞∆°ng √°n d·ª± phong...")
        fallback_simple_load()

if __name__ == "__main__":
    main()

=== ROBUST JSON CHUNKING ===
=== B·∫ÆT ƒê·∫¶U CHUNKING (MULTICORE) ===
üìÅ K√≠ch th∆∞·ªõc file: 70.24 MB
üñ•Ô∏è Ph√°t hi·ªán 2 CPU core ‚Äì d√πng t·ªëi ƒëa
üîÑ ƒêang parse JSON stream...
üìñ ƒê√£ ƒë·ªçc 400000 k√Ω t·ª±...
‚úÖ 100 records ‚Üí 1023 chunks
üìñ ƒê√£ ƒë·ªçc 600000 k√Ω t·ª±...
‚úÖ 200 records ‚Üí 2003 chunks
‚úÖ 300 records ‚Üí 2970 chunks
‚úÖ 400 records ‚Üí 3939 chunks
‚úÖ 500 records ‚Üí 5151 chunks
‚úÖ 600 records ‚Üí 6190 chunks
‚úÖ 700 records ‚Üí 7233 chunks
‚úÖ 800 records ‚Üí 8299 chunks
‚úÖ 900 records ‚Üí 9375 chunks
‚úÖ 1000 records ‚Üí 10358 chunks
‚úÖ 1100 records ‚Üí 11580 chunks
‚úÖ 1200 records ‚Üí 12596 chunks
üìñ ƒê√£ ƒë·ªçc 5300000 k√Ω t·ª±...
‚úÖ 1300 records ‚Üí 13613 chunks
‚úÖ 1400 records ‚Üí 14614 chunks
‚úÖ 1500 records ‚Üí 15551 chunks
üìñ ƒê√£ ƒë·ªçc 6600000 k√Ω t·ª±...
‚úÖ 1600 records ‚Üí 16597 chunks
‚úÖ 1700 records ‚Üí 17643 chunks
‚úÖ 1800 records ‚Üí 18674 chunks
üìñ ƒê√£ ƒë·ªçc 7800000 k√Ω t·ª±...
‚úÖ 1900 records ‚Üí 19661 chunks