## 1. LOADING

In [None]:
from llama_index.core import SimpleDirectoryReader
import re
from llama_index.core.schema import TextNode

reader = SimpleDirectoryReader('data/corpus',required_exts=[".docx"])
documents = reader.load_data()
len(documents)

Get metadata info

In [6]:
import re
from llama_index.core import Document
def make_doc(doc: Document, id: int) -> Document:
    #NOTE: Xoá header của doc
    text = doc.text
    pattern = r"(NGHỊ QUYẾT|QUYẾT ĐỊNH|HƯỚNG DẪN|NGHỊ ĐỊNH|THÔNG TƯ|PHÁP LỆNH|LUẬT|KẾ HOẠCH|CHỈ THỊ).*"
    match = re.search(pattern, text, re.DOTALL)

    if match:
        #NOTE: Lấy nội dung sau header
        doc_content = text[match.start():]

        #NOTE: Lấy nội dung trước header
        doc_header = text[:match.start()]
        
        #NOTE: Trích xuất thông tin
        number_pattern = r"(?:số|Số):\s*([^\s]+)"
        number_match = re.search(number_pattern, doc_header)

        
        #NOTE: Trích xuất tiêu đề
        title_pattern = r"(NGHỊ QUYẾT|QUYẾT ĐỊNH|HƯỚNG DẪN|NGHỊ ĐỊNH|THÔNG TƯ|PHÁP LỆNH|LUẬT|KẾ HOẠCH|CHỈ THỊ)\s+(.*?)\s+_{2,}"
        title_match = re.search(title_pattern, doc_content, re.DOTALL)
        
        return Document(
            text=doc_content.strip(),
            doc_id = f"doc_{id}",
            metadata={
                "file_path": doc.metadata["file_path"],
                "type" : match.group(1).strip() if match else None,
                "number": number_match.group(1).strip() if number_match else None,
                "title": title_match.group(2).strip() if title_match else None,  
            },
            
        )
    else:
        raise ValueError("Không hỗ trợ.")

In [None]:
extracted_meta_docs = []
for id,doc in enumerate(documents):
    try:
        new_doc = make_doc(doc=doc, id = id)
        new_doc.excluded_llm_metadata_keys = ["file_path"]
        new_doc.excluded_embed_metadata_keys = ["file_path"]
        extracted_meta_docs.append(new_doc)
    except ValueError as e:
        print(f"Document {id} không hỗ trợ: {e}")

len(extracted_meta_docs)

## CHUNKING BY REGEX

In [None]:
import re
from llama_index.core.node_parser import TextSplitter
from llama_index.core.schema import TextNode, NodeRelationship, RelatedNodeInfo
from llama_index.core.node_parser import SentenceSplitter
from transformers import AutoTokenizer

class HierarchicalRegexNodeParser(TextSplitter):
    def __init__(self, chunk_size=512, chunk_overlap=50, max_token_limit=512):
        super().__init__(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
        self._max_token_limit = max_token_limit
        self._tokenizer = AutoTokenizer.from_pretrained('bkai-foundation-models/vietnamese-bi-encoder')
        self._sentence_splitter = SentenceSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap
        )

    def count_tokens(self, text):
        """Đếm số token của văn bản."""
        return len(self._tokenizer.encode(text))

    def split_text_by_regex(self, text, regex_pattern):

        # Tìm tất cả các vị trí khớp với pattern
        matches = list(re.finditer(regex_pattern, text))
        
        # Nếu không có pattern nào khớp, trả về text gốc
        if not matches:
            return [text.strip()] if text.strip() else []
        
        chunks = []
        # Xử lý phần đầu tiên trước pattern đầu tiên
        if matches[0].start() > 0:
            chunks.append(text[:matches[0].start()].strip())
        
        # Xử lý các phần giữa các pattern
        for i in range(len(matches)):
            # Xác định vị trí bắt đầu của phần hiện tại
            start_pos = matches[i].start()
            
            # Xác định vị trí kết thúc (là vị trí bắt đầu của pattern tiếp theo hoặc cuối văn bản)
            end_pos = matches[i+1].start() if i < len(matches) - 1 else len(text)
            
            # Trích xuất phần text bao gồm pattern hiện tại và nội dung sau nó đến pattern tiếp theo
            chunk = text[start_pos:end_pos].strip()
            if chunk:
                chunks.append(chunk)
        
        return [chunk for chunk in chunks if chunk.strip()]

    def split_text(self, text: str) -> list:
        """Chia văn bản thành các chunk nhỏ hơn với metadata."""
        # Chia văn bản thành các câu với metadata
        sentences = self._sentence_splitter.split_text(text)
        chunks=[]
        for sentence in sentences:
            chunks.append(sentence.strip())
        return chunks

    def get_base_nodes_from_documents(self, documents):
        """Chuyển Document thành Node với metadata."""
        nodes = []
        i = 0
        for doc in documents:
            if doc.metadata['type'] in ["NGHỊ QUYẾT", "NGHỊ ĐỊNH", "THÔNG TƯ", "PHÁP LỆNH", "LUẬT"]:
                # Chia văn bản thành các chunk với metadata
                chunks = self.split_text_by_regex(doc.text, r"\n\nĐiều\s([1-9][0-9]{0,2})\. ")
                
                for  chunk in chunks:
                    node = TextNode(
                        text=chunk,
                        id_=f"{doc.id_}_node_{i}",
                        metadata=doc.metadata.copy()
                    )
                    nodes.append(node)
                    i+=1
            else: 
                chunks = self.split_text(doc.text)
                
                for chunk in chunks:
                    node = TextNode(
                        text=chunk,
                        id_=f"{doc.id_}_node_{i}",
                        metadata=doc.metadata.copy()
                    )
                    nodes.append(node)
                    i+=1
                
        return nodes
    
    def get_chunks_from_documents(self,documents):
        base_nodes = self.get_base_nodes_from_documents(documents)
       
        for node in base_nodes:
            if len(node.text) > self._max_token_limit:
                chunks = self.split_text(node.text)
                for i, chunk in enumerate(chunks):
                    new_node = TextNode( 
                        text=chunk,
                        id_=f"{node.id_}_chunk_{i}",
                        metadata=node.metadata.copy()
                    )
                    yield new_node
            else:
                yield node
            
    # a) : r"^[a-zA-Z]\) "
    # 1. : , r"^[0-9]+\. ",
node_parser = HierarchicalRegexNodeParser()
base_nodes = node_parser.get_base_nodes_from_documents(extracted_meta_docs)
print('Sô lượng base node:' ,len(base_nodes))
    

In [None]:
nodes = list(node_parser.get_chunks_from_documents(extracted_meta_docs))
print("Số lượng nodes: ",len(nodes))
print("Ví dụ chunk:")
print(nodes[1])

## Data Presentation

In [10]:
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

def plot_distribution(lens):
    # Thiết lập style
    plt.style.use('seaborn-v0_8-darkgrid')
    sns.set_context("paper")

    # Tạo figure với kích thước phù hợp
    plt.figure(figsize=(10, 6))

    # Vẽ histogram
    n, bins, patches = plt.hist(lens, bins='auto', alpha=0.7, color='steelblue', 
                            edgecolor='black', linewidth=1.2)

    # Thêm KDE (Kernel Density Estimation) để thấy rõ hơn phân phối
    sns.kdeplot(lens, color='darkred', linewidth=2)

    # Tính toán các thông số thống kê
    mean_value = np.mean(lens)
    median_value = np.median(lens)
    min_value = np.min(lens)
    max_value = np.max(lens)

    # Vẽ các đường thẳng đứng cho giá trị trung bình và trung vị
    plt.axvline(mean_value, color='red', linestyle='--', linewidth=1.5, 
                label=f'Mean: {mean_value:.2f}')
    plt.axvline(median_value, color='green', linestyle='-.', linewidth=1.5, 
                label=f'Median: {median_value:.2f}')

    # Thêm các chú thích và tiêu đề
    plt.title('Phân phối độ dài chunks', fontsize=16)
    plt.xlabel('Độ dài (số ký tự/token)', fontsize=12)
    plt.ylabel('Tần suất', fontsize=12)
    plt.legend()

    # Thêm textbox thông tin
    info_text = f'Min: {min_value}\nMax: {max_value}\nMean: {mean_value:.2f}\nMedian: {median_value:.2f}'
    props = dict(boxstyle='round', facecolor='wheat', alpha=0.5)
    plt.text(0.05, 0.95, info_text, transform=plt.gca().transAxes, fontsize=9,
            verticalalignment='top', bbox=props)

    # Hiển thị biểu đồ
    plt.tight_layout()
    plt.show()

    #plt.savefig(f'_distribution.png', dpi=300, bbox_inches='tight')


In [None]:
len_base_nodes = []
for node in base_nodes:
    len_base_nodes.append(len(node.text))
len_base_nodes.sort(reverse=True)
print('10 base node dài nhất',len_base_nodes[:10])
plot_distribution(len_base_nodes)

In [None]:
len_nodes = []

for node in nodes:
    len_nodes.append(len(node.text))
len_nodes.sort(reverse=True)
len_nodes[:10]
plot_distribution(len_nodes)

## Xử lý ngoại lệ

In [13]:
# NODE: XÓA FILE dạng điền form
# long_nodes = []
# for node in nodes:
#     if len(node.text) >1200 and "……" in node.text:
#         long_nodes.append(node)
# print(len(long_nodes))
# long_nodes[:10]
# import os
# for node in long_nodes:
#     if os.path.exists(node.metadata["file_path"]):
#         # Xóa file nếu tồn tại
#         os.remove(node.metadata["file_path"])

## Save Nodes

Save into Huggingface

In [None]:
from datasets import Dataset

chunk_data = []
i = 1
for node in nodes:
    chunk_data.append({
        "id": i,
        "text": node.text,
        "metadata": {k: v for k, v in node.metadata.items() if k != "file_path"}
    })
    
    i+=1

dataset = Dataset.from_list(chunk_data)

from huggingface_hub import login

#login(token="your_token")
#dataset.push_to_hub("khanglt0004/vietnamese_legal_chunks")

Load Dataset

In [None]:
from datasets import load_dataset
from llama_index.core.schema import TextNode
nodes = []
def _load_dataset(path="khanglt0004/vietnamese_legal_chunks"):
    dataset = load_dataset(path)

    
    for item in dataset['train']:
        new_node = TextNode(
            text=item['text'],
            id_=str(item['id']),
            metadata=item['metadata']
        )
        nodes.append(new_node)
    print("Đã tải dữ liệu các chunks, số lượng: ", len(nodes))
_load_dataset()

In [18]:
sizes = [len(node.text) for node in nodes]

In [None]:
import pandas as pd
import matplotlib.pyplot as plt


# Phân loại theo các mốc
bins = [0, 500, 750, 1000, float('inf')]
labels = ['<500', '500-750', '750-1000', '>1000']
categories = pd.cut(sizes, bins=bins, labels=labels, right=True)

# Tính toán số lượng và phần trăm
summary = categories.value_counts().sort_index()
percentages = (summary / len(sizes) * 100).round(2)

# Tạo DataFrame thống kê
df = pd.DataFrame({
    'Khoảng': labels,
    'Số lượng': summary.values,
    'Phần trăm': percentages.values
})

print(df)

# Vẽ biểu đồ
plt.figure(figsize=(8, 5))
plt.bar(df['Khoảng'], df['Số lượng'], color='skyblue')
plt.title('Phân bố độ dài trong các mốc')
plt.xlabel('Khoảng độ dài')
plt.ylabel('Số lượng')
plt.grid(axis='y', linestyle='--', alpha=0.7)

# Hiển thị phần trăm trên cột
for i, (count, pct) in enumerate(zip(df['Số lượng'], df['Phần trăm'])):
    plt.text(i, count + 0.5, f'{pct}%', ha='center')

plt.tight_layout()
plt.show()


In [None]:
plt.savefig('chunk_sizes_distribution.png', dpi=300, bbox_inches='tight')

In [None]:
# from sentence_transformers import SentenceTransformer

# # INPUT TEXT MUST BE ALREADY WORD-SEGMENTED!


# model = SentenceTransformer('bkai-foundation-models/vietnamese-bi-encoder')
# for node in nodes:
#     if len(node.text) > 1000:
#         embeddings = model.encode(nodes[1].text)
#         print('embedding:',embeddings)
#         break


