In [2]:
from src.preprocessor.utils.dataset_level import read_pickle

read_pickle('./data/processed/queries.pkl')

{'161615': 'Người học ngành quản lý khai thác công trình thủy lợi trình độ cao đẳng phải có khả năng học tập và nâng cao trình độ như thế nào?',
 '80037': 'Nội dung lồng ghép vấn đề bình đẳng giới trong xây dựng văn bản quy phạm pháp luật được quy định thế nào?',
 '124074': 'Sản phẩm phần mềm có được hưởng ưu đãi về thời gian miễn thuế, giảm thuế hay không? Nếu được thì trong vòng bao nhiêu năm?',
 '146841': 'Điều kiện để giáo viên trong cơ sở giáo dục mầm non, tiểu học ngoài công lập bị ảnh hưởng bởi Covid-19 được hưởng chính sách hỗ trợ là gì?',
 '6176': 'Nguyên tắc áp dụng phụ cấp ưu đãi nghề y tế thế nào?',
 '88965': 'Chi phí hoạt động dịch vụ của Ngân hàng Hợp tác xã Việt Nam bao gồm những khoản chi nào?',
 '65283': 'Người học ngành điện tàu thủy trình độ trung cấp sau khi tốt nghiệp có thể làm được những công việc nào?',
 '60201': 'Hương ước có nội dung trái với phong tục tập quán của địa phương thì xử lý như thế nào?',
 '103523': 'Luật sư nước ngoài đã được cấp Giấy phép hành ng

In [None]:
from typing import Dict, List, Tuple, Set
import re
from dataclasses import dataclass
import spacy
from transformers import AutoTokenizer, AutoModelForTokenClassification
import torch

@dataclass
class LegalEntity:
    text: str
    label: str
    start: int
    end: int

class LegalNER:
    def __init__(self):
        # Load mô hình NER tiếng Việt (VnCoreNLP hoặc PhoBERT-NER)
        self.tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base")
        self.model = AutoModelForTokenClassification.from_pretrained("vinai/phobert-base")
        
        # Định nghĩa các label cho legal domain
        self.legal_labels = {
            'LAW': ['luật', 'nghị định', 'thông tư', 'quyết định'],
            'ORGANIZATION': ['bộ', 'sở', 'ủy ban', 'tòa án'],
            'POSITION': ['chủ tịch', 'giám đốc', 'trưởng phòng'],
            'MONEY': ['đồng', 'triệu', 'tỷ', 'USD'],
            'TIME': ['ngày', 'tháng', 'năm', 'kỳ hạn'],
            'PENALTY': ['phạt', 'xử phạt', 'chế tài'],
        }
        
        # Compile regex patterns
        self.patterns = self._compile_patterns()

    def _compile_patterns(self) -> Dict[str, List[re.Pattern]]:
        """Compile regex patterns cho mỗi loại entity"""
        patterns = {}
        for label, keywords in self.legal_labels.items():
            patterns[label] = [
                re.compile(rf'\b{keyword}\b[\s\w]+', re.IGNORECASE) 
                for keyword in keywords
            ]
        return patterns

    def extract_entities(self, text: str) -> List[LegalEntity]:
        """Trích xuất legal entities từ text sử dụng kết hợp rule-based và model-based"""
        entities = []
        
        # 1. Rule-based extraction using patterns
        for label, patterns in self.patterns.items():
            for pattern in patterns:
                for match in pattern.finditer(text):
                    entities.append(LegalEntity(
                        text=match.group().strip(),
                        label=label,
                        start=match.start(),
                        end=match.end()
                    ))
        
        # 2. Model-based extraction using PhoBERT-NER
        inputs = self.tokenizer(text, return_tensors="pt", truncation=True)
        outputs = self.model(**inputs)
        predictions = torch.argmax(outputs.logits, dim=2)
        
        tokens = self.tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])
        current_entity = None
        
        for idx, (token, pred) in enumerate(zip(tokens, predictions[0])):
            if pred in self.model.config.label2id:
                label = self.model.config.id2label[pred.item()]
                if label.startswith("B-"):  # Beginning of entity
                    if current_entity:
                        entities.append(current_entity)
                    current_entity = LegalEntity(
                        text=token,
                        label=label[2:],
                        start=idx,
                        end=idx
                    )
                elif label.startswith("I-") and current_entity:  # Inside entity
                    current_entity.text += f" {token}"
                    current_entity.end = idx
        
        if current_entity:
            entities.append(current_entity)
        
        return self._merge_overlapping_entities(entities)

    def _merge_overlapping_entities(self, entities: List[LegalEntity]) -> List[LegalEntity]:
        """Gộp các entity chồng chéo"""
        if not entities:
            return []
            
        entities.sort(key=lambda x: (x.start, -x.end))
        merged = [entities[0]]
        
        for current in entities[1:]:
            previous = merged[-1]
            
            if current.start <= previous.end:
                # Nếu cùng label và có overlap, merge lại
                if current.label == previous.label:
                    previous.end = max(previous.end, current.end)
                    previous.text = previous.text + " " + current.text
            else:
                merged.append(current)
                
        return merged

class LegalQASystem:
    def __init__(self):
        self.ner = LegalNER()
        
    def process_query(self, query: str) -> Dict[str, List[LegalEntity]]:
        """Xử lý câu query và trích xuất entities"""
        query_entities = self.ner.extract_entities(query)
        return self._categorize_entities(query_entities)
    
    def process_document(self, doc_text: str) -> Dict[str, List[LegalEntity]]:
        """Xử lý văn bản pháp luật và trích xuất entities"""
        doc_entities = self.ner.extract_entities(doc_text)
        return self._categorize_entities(doc_entities)
    
    def _categorize_entities(self, entities: List[LegalEntity]) -> Dict[str, List[LegalEntity]]:
        """Phân loại entities theo label"""
        categorized = {}
        for entity in entities:
            if entity.label not in categorized:
                categorized[entity.label] = []
            categorized[entity.label].append(entity)
        return categorized
    
    def calculate_entity_similarity(
        self,
        query_entities: Dict[str, List[LegalEntity]],
        doc_entities: Dict[str, List[LegalEntity]]
    ) -> float:
        """Tính độ tương đồng giữa entities của query và document"""
        total_score = 0
        total_weight = 0
        
        # Định nghĩa trọng số cho từng loại entity
        weights = {
            'LAW': 1.0,
            'ORGANIZATION': 0.8,
            'POSITION': 0.6,
            'MONEY': 0.7,
            'TIME': 0.5,
            'PENALTY': 0.9
        }
        
        for label in set(query_entities.keys()) & set(doc_entities.keys()):
            query_texts = {e.text.lower() for e in query_entities[label]}
            doc_texts = {e.text.lower() for e in doc_entities[label]}
            
            # Tính Jaccard similarity cho mỗi loại entity
            intersection = len(query_texts & doc_texts)
            union = len(query_texts | doc_texts)
            if union > 0:
                score = intersection / union
                total_score += score * weights[label]
                total_weight += weights[label]
        
        return total_score / total_weight if total_weight > 0 else 0.0
    
    def find_relevant_documents(
        self,
        query: str,
        documents: Dict[str, str],
        top_k: int = 3
    ) -> List[Tuple[str, float]]:
        """Tìm văn bản liên quan dựa trên entity matching"""
        query_entities = self.process_query(query)
        
        scores = []
        for doc_id, doc_text in documents.items():
            doc_entities = self.process_document(doc_text)
            similarity = self.calculate_entity_similarity(query_entities, doc_entities)
            scores.append((doc_id, similarity))
        
        # Sort by similarity score
        scores.sort(key=lambda x: x[1], reverse=True)
        return scores[:top_k]

    def answer_query(
        self,
        query: str,
        documents: Dict[str, str]
    ) -> Dict[str, any]:
        """Trả lời câu hỏi dựa trên entity matching"""
        # Xử lý query
        query_entities = self.process_query(query)
        
        # Tìm documents liên quan
        relevant_docs = self.find_relevant_documents(query, documents)
        
        # Lấy document có score cao nhất
        best_doc_id, score = relevant_docs[0]
        
        return {
            'query_entities': query_entities,
            'relevant_document': documents[best_doc_id],
            'confidence_score': score,
            'matched_entities': self.process_document(documents[best_doc_id])
        }

# Example usage
if __name__ == "__main__":
    qa_system = LegalQASystem()
    
    # Sample data
    documents = {
        '14201': """
        Bảo hiểm xã hội là chế độ bảo đảm thay thế hoặc bù đắp một phần thu nhập của 
        người lao động khi họ bị giảm hoặc mất thu nhập do ốm đau, thai sản, tai nạn
        lao động, bệnh nghề nghiệp, hết tuổi lao động hoặc chết, trên cơ sở đóng vào 
        quỹ bảo hiểm xã hội.
        """,
        '59633': """
        Theo Điều 8 Nghị định số 115/2015/NĐ-CP ngày 11/11/2015, người lao động làm việc 
        theo hợp đồng lao động xác định thời hạn được hưởng bảo hiểm xã hội một lần với 
        mức hưởng bằng 1,5 tháng mức bình quân tiền lương tháng đóng bảo hiểm xã hội cho 
        những năm đóng trước năm 2014.
        """
    }
    
    # Process a query
    query = "Thế nào là bảo hiểm xã hội?"
    result = qa_system.answer_query(query, documents)
    
    print("Query entities:", result['query_entities'])
    print("Matched entities:", result['matched_entities'])
    print("Confidence score:", result['confidence_score'])