## retrieval test

In [1]:
import os
import yaml
import torch

from pathlib import Path
from langchain_community.vectorstores import Chroma
from langchain.chains import create_qa_with_sources_chain
from langchain_core.prompts import PromptTemplate
from langchain.memory import ConversationBufferMemory
from langchain.chains.conversational_retrieval.base import ConversationalRetrievalChain
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.llms import HuggingFacePipeline
from langchain.schema import Document

from transformers import AutoTokenizer, AutoModelForCausalLM
from transformers import pipeline

from load_embedding import load_embedding
from set_db import create_chroma_db
from load_llm import load_model

# 캐시 디렉토리 설정
cache_dir = './weights'
os.makedirs(cache_dir, exist_ok=True)
os.environ['TRANSFORMERS_CACHE'] = cache_dir
os.environ['HF_HOME'] = cache_dir
os.environ['HF_DATASETS_CACHE'] = os.path.join(cache_dir, 'datasets')
os.environ['HUGGINGFACE_HUB_CACHE'] = cache_dir
os.environ['TORCH_HOME'] = os.path.join(cache_dir, 'torch')


In [71]:
import yaml
from typing import List, Dict, Any, Optional
from pathlib import Path
import numpy as np

class LegalQASystem:
    def __init__(
        self,
        custom_llm,
        custom_embeddings,
        custom_db,
        template_path: str = "templates/qa_prompt.yaml"
    ):
        self.llm = custom_llm
        self.embeddings = custom_embeddings
        self.db = custom_db
        self.prompt_template = self._load_template(template_path)
    
    def _load_template(self, template_path: str) -> dict:
        """YAML 파일에서 프롬프트 템플릿을 로드합니다."""
        try:
            with open(template_path, 'r', encoding='utf-8') as f:
                template_data = yaml.safe_load(f)
            return template_data
        except FileNotFoundError:
            raise FileNotFoundError(f"템플릿 파일을 찾을 수 없습니다: {template_path}")
        except yaml.YAMLError as e:
            raise ValueError(f"YAML 파일 파싱 중 오류 발생: {str(e)}")
    
    def _format_prompt(self, context: str, question: str) -> str:
        """템플릿을 사용하여 프롬프트를 포맷팅합니다."""
        try:
            template = self.prompt_template['template']
            return template.format(
                context=context,
                question=question
            )
        except KeyError:
            raise KeyError("템플릿에 'template' 키가 없습니다.")
        except Exception as e:
            raise ValueError(f"프롬프트 포맷팅 중 오류 발생: {str(e)}")
    
    def _search_relevant_laws(self, question: str) -> List[Dict[str, Any]]:
        """다중 쿼리로 관련 법령을 검색합니다."""
        try:
            # 검색 키워드 확장
            base_keywords = question.split()[:3]  # 원본 질문의 주요 키워드
            search_queries = [
                question,  # 원래 질문
                f"{base_keywords[0]} 대상자",  # 대상자 관점
                f"{base_keywords[0]} 자격 요건",  # 자격 관점
                f"{base_keywords[0]} 신청 자격"  # 신청 자격 관점
            ]
            
            all_results = []
            for query in search_queries:
                results = self.db.similarity_search(
                    query=query,
                    k=2  # 각 쿼리당 상위 2개만 검색
                )
                all_results.extend(results)
            
            # 중복 제거
            unique_results = []
            seen_contents = set()
            for doc in all_results:
                if doc.page_content not in seen_contents:
                    seen_contents.add(doc.page_content)
                    unique_results.append({
                        'content': doc.page_content,
                        'metadata': {
                            'law_title': doc.metadata.get('law_title', ''),
                            'effective_date': doc.metadata.get('effective_date', ''),
                            'article_number': doc.metadata.get('article_number', ''),
                            'article_subject': doc.metadata.get('article_subject', '')
                        }
                    })
            
            return unique_results[:3]  # 상위 3개 결과 반환
            
        except Exception as e:
            raise Exception(f"법령 검색 중 오류 발생: {str(e)}")
    
    def _search_similar_qa(self, question: str) -> Optional[Dict[str, Any]]:
        """유사한 QA 쌍을 검색합니다."""
        try:
            similar_results = self.db.similarity_search(
                query=question,
                k=1
            )
            
            if similar_results:
                return {
                    "question": similar_results[0].page_content,
                    "answer": similar_results[0].metadata.get('answer', '')
                }
            return None
        except Exception as e:
            raise Exception(f"유사 QA 검색 중 오류 발생: {str(e)}")
    
    def _generate_answer(
        self,
        question: str,
        law_info: List[Dict[str, Any]],
        similar_qa: Optional[Dict[str, Any]]
    ) -> Dict[str, Any]:
        """답변을 생성합니다."""
        try:
            # 컨텍스트 구성
            context = "\n".join([
                f"""
                    법령명: {law['metadata']['law_title']}
                    시행일자: {law['metadata']['effective_date']}
                    조항번호: {law['metadata']['article_number']}
                    조항제목: {law['metadata']['article_subject']}
                    내용: {law['content']}
                """
                for law in law_info
            ])
            
            # 기본 프롬프트로 첫 답변 생성
            initial_prompt = self._format_prompt(context, question)
            raw_answer = self.llm.invoke(initial_prompt)
            
            # 답변 정제 - '[답변]' 태그가 없는 경우에도 처리
            if '[답변]' in raw_answer:
                current_answer = raw_answer.split('[답변]')[1].strip()
            else:
                current_answer = raw_answer.strip()
                
            # 불필요한 텍스트 제거
            if '다음은 장애인복지 관련 문의에 대한 답변을 생성하기 위한 지침입니다.' in current_answer:
                current_answer = current_answer.split('다음은 장애인복지 관련 문의에 대한 답변을 생성하기 위한 지침입니다.')[1].strip()
                
            if '답변 작성 시 주의사항:' in current_answer:
                current_answer = current_answer.split('답변 작성 시 주의사항:')[0].strip()
                
            # 유사 QA가 있는 경우 답변 보완
            if similar_qa and similar_qa.get('answer'):
                supplementary_prompt = f"""
                    다음 답변을 참고하여 기존 답변을 보완해주세요:
                    기존 질문: {similar_qa['question']}
                    기존 답변: {similar_qa['answer']}
                    현재 답변: {current_answer}
                    
                    답변 형식:
                    1. 신청 자격 요건
                    2. 제외 대상
                    3. 예외사항
                    4. 추가 안내사항
                """
                enhanced_answer = self.llm.invoke(supplementary_prompt)
                
                # 보완된 답변에서도 불필요한 부분 제거
                if '[답변]' in enhanced_answer:
                    final_answer = enhanced_answer.split('[답변]')[1].strip()
                else:
                    final_answer = enhanced_answer.strip()
            else:
                final_answer = current_answer
    
            return {
                "answer": final_answer,
                "referenced_laws": law_info,
                "similar_qa_used": similar_qa is not None
            }
            
        except Exception as e:
            raise Exception(f"답변 생성 중 오류 발생: {str(e)}")
    
    def _extract_answer_points(self, answer: str) -> Dict[str, List[str]]:
        """답변에서 각 항목별 포인트를 추출합니다."""
        categories = {
            "신청 자격 요건": [],
            "제외 대상": [],
            "예외사항": [],
            "추가 안내사항": []
        }
        
        current_category = None
        for line in answer.split('\n'):
            line = line.strip()
            if any(category in line for category in categories.keys()):
                current_category = next(cat for cat in categories.keys() if cat in line)
            elif line.startswith('-') and current_category:
                point = line[1:].strip()
                if point and point not in categories[current_category]:
                    categories[current_category].append(point)
        
        return categories
    
    def _merge_answers(self, current_points: Dict[str, List[str]], similar_points: Dict[str, List[str]]) -> str:
        """두 답변의 포인트를 병합하여 최종 답변을 생성합니다."""
        merged_points = {}
        
        # 각 카테고리별로 포인트 병합
        for category in current_points.keys():
            merged_points[category] = list(set(current_points[category]))
            if category in similar_points:
                # 유사 답변의 포인트 중 현재 답변에 없는 것만 추가
                for point in similar_points[category]:
                    if not any(self._is_similar_point(point, existing) for existing in merged_points[category]):
                        merged_points[category].append(point)
        
        # 병합된 포인트로 답변 생성
        final_answer = []
        for category, points in merged_points.items():
            if points:  # 해당 카테고리에 포인트가 있는 경우만 포함
                final_answer.append(f"{category}")
                for point in points:
                    final_answer.append(f"- {point}")
                final_answer.append("")  # 카테고리 간 빈 줄 추가
        
        return "\n".join(final_answer).strip()
    
    def _is_similar_point(self, point1: str, point2: str) -> bool:
        """두 포인트가 유사한지 확인합니다."""
        # 간단한 유사도 체크 (필요에 따라 더 정교한 방법 사용 가능)
        return (
            point1.lower() in point2.lower() or 
            point2.lower() in point1.lower() or
            self._calculate_similarity(point1, point2) > 0.8
        )
    
    def _calculate_similarity(self, text1: str, text2: str) -> float:
        """두 텍스트의 유사도를 계산합니다."""
        # 여기에 텍스트 유사도 계산 로직 구현
        # 예: 코사인 유사도, 레벤슈타인 거리 등 사용
        # 임시로 간단한 구현
        words1 = set(text1.lower().split())
        words2 = set(text2.lower().split())
        intersection = words1.intersection(words2)
        union = words1.union(words2)
        return len(intersection) / len(union) if union else 0.0
    
    def answer_question(self, question: str) -> Dict[str, Any]:
        """질문에 대한 답변을 생성합니다."""
        try:
            relevant_laws = self._search_relevant_laws(question)
            similar_qa = self._search_similar_qa(question)
            response = self._generate_answer(question, relevant_laws, similar_qa)
            return response
        except Exception as e:
            raise Exception(f"질문 답변 중 오류 발생: {str(e)}")

    def _search_relevant_laws(self, question: str) -> List[Dict[str, Any]]:
        """메타데이터와 컨텐츠를 결합한 검색을 수행합니다."""
        try:
            # 1. 메타데이터 필터 조건 설정
            metadata_filters = {
                "article_subject": [
                    "신청", "절차", "방법", "서류", "등록", "신고",
                    "제출", "접수", "처리", "기준", "요건"
                ]
            }
            
            # 2. 메타데이터 기반 검색
            metadata_results = self.db.similarity_search(
                query=question,
                k=5,
                filter={"article_subject": {"$in": metadata_filters["article_subject"]}}
            )
            
            # 3. 일반 컨텐츠 기반 검색
            content_results = self.db.similarity_search(
                query=question,
                k=5
            )
            
            # 4. 결과 병합 및 정렬
            all_results = []
            seen_contents = set()
            
            # 메타데이터 결과 처리 (높은 우선순위)
            for doc in metadata_results:
                if doc.page_content not in seen_contents:
                    seen_contents.add(doc.page_content)
                    all_results.append({
                        'content': doc.page_content,
                        'metadata': {
                            'law_title': doc.metadata.get('law_title', ''),
                            'effective_date': doc.metadata.get('effective_date', ''),
                            'paragraph_number': doc.metadata.get('paragraph_number', ''),
                            'paragraph_subject': doc.metadata.get('paragraph_subject', ''),
                            'article_number': doc.metadata.get('article_number', ''),
                            'article_subject': doc.metadata.get('article_subject', ''),
                            'search_type': 'metadata'
                        }
                    })
            
            # 컨텐츠 결과 처리
            for doc in content_results:
                if doc.page_content not in seen_contents:
                    seen_contents.add(doc.page_content)
                    all_results.append({
                        'content': doc.page_content,
                        'metadata': {
                            'law_title': doc.metadata.get('law_title', ''),
                            'effective_date': doc.metadata.get('effective_date', ''),
                            'paragraph_number': doc.metadata.get('paragraph_number', ''),
                            'paragraph_subject': doc.metadata.get('paragraph_subject', ''),
                            'article_number': doc.metadata.get('article_number', ''),
                            'article_subject': doc.metadata.get('article_subject', ''),
                            'search_type': 'content'
                        }
                    })
            
            return all_results[:5]  # 상위 5개 결과 반환
            
        except Exception as e:
            raise Exception(f"법령 검색 중 오류 발생: {str(e)}")
    
    def debug_qa_system(self, question: str):
        """검색 결과를 확인합니다."""
        print("\n=== 메타데이터 활용 검색 결과 확인 ===")
        results = self._search_relevant_laws(question)
        
        for i, result in enumerate(results):
            print(f"\n[검색 결과 {i+1}] (검색 타입: {result['metadata']['search_type']})")
            print("법령명:", result['metadata']['law_title'])
            print("장 번호:", result['metadata']['paragraph_number'])
            print("장 제목:", result['metadata']['paragraph_subject'])
            print("조문번호:", result['metadata']['article_number'])
            print("조문제목:", result['metadata']['article_subject'])
            print("\n내용:")
            print(result['content'])

def setup_qa_system(
    custom_llm,
    custom_embeddings,
    custom_db,
    template_path: str
) -> LegalQASystem:
    """QA 시스템을 초기화하고 반환합니다."""
    return LegalQASystem(
        custom_llm=custom_llm,
        custom_embeddings=custom_embeddings,
        custom_db=custom_db,
        template_path=template_path
    )

In [7]:
device = 'cuda'
cache_dir = './weights'

llm = load_model('llama', cache_dir)

embedding_model = load_embedding(device)

db = create_chroma_db(embedding_model)

# 2. 템플릿 경로 설정
template_path = Path("prompt/qna.yaml")

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

Device set to use cuda:0
  llm = HuggingFacePipeline(pipeline=pipe)
  embeddings = HuggingFaceEmbeddings(


Successfully created Chroma DB with 824 total documents


In [72]:
qa_system = setup_qa_system(
        custom_llm=llm,
        custom_embeddings=embedding_model,
        custom_db=db,
        template_path=template_path
    )

In [73]:
question = "시각장애인은 어떻게 등록해야하나요?"

# 5. 답변 생성
response = qa_system.answer_question(question)

# 6. 결과 출력
print("=== 질문 ===")
print(question)
print("\n=== 답변 ===")
print(response['answer'])

# if response['similar_qa_used']:
#     print(f"유사 질문: {response['similar_qa_used']['question']}")
#     print(f"기존 답변: {response['similar_qa_used']['answer']}")

=== 질문 ===
시각장애인은 어떻게 등록해야하나요?

=== 답변 ===
1. 시각장애인을 등록하려면 먼저 안과 전문의에게 상담을 받아야합니다. 안과 전문의는 시력 및 시야 결손 정도를 평가하여 장애 정도를 판단할 수 있습니다. 이후 안과 전문의는 장애 정도에 따라 시각장애인 등록 여부를 결정하게 됩니다.

2. 시각장애인은 일반적으로 모든 국가에서 등록되어야 하지만, 일부 국가는 특정 조건을 충족하는 경우에는 등록을 제외할 수도 있습니다. 이에 따라 해당 국가의 장애인 등록 정책을 확인하시는 것이 좋습니다.

3. 일부 국가에서는 시각장애인 등록을 위해 추가적인 시험이나 평가 과정을 거치지 않을 수도 있습니다. 이러한 예외 상황은 개개인의 신체적 상태나 국가의 규정에 따라 다를 수 있으므로, 상세한 정보를 얻기 위해서는 해당 국가의 장애인 등록 기관에 문의하시는 것이 바람직합니다.

4. 장애인 등록 절차와 요구되는 증거 등에 대해서는 각 국가의 장애인 등록 기관이나 보건복지상담센터(국번 없이 129)에 문의하여 정확한 정보를 얻으시기를 권장드립니다. 또한, 장애인 등록이 필요한 국가에서는 등록 기간이 정해져 있을 수 있으니, 등록 기한을 준수하여야 하며, 등록이 완료된 후에도 등록증을 소지하고 이를 제출할 준비가 되어있도록 해야합니다. 

추가로, 장애인 등록은 개인의 사생활 보호와 관련되므로, 신뢰성 있는 기관과의 상담 및 등록 절차를 통해 진행하시기 바랍니다.


In [27]:
# "question": "발달장애인 주간활동지원서비스 신청 대상은 어떻게 되나요?",

answer = "18세~65세 미만의 「장애인복지법」상 등록된 지적 및 자폐성 장애인이 신청 대상입니다.\n※ 장애인 당사자 및 가구의 소득수준과 무관하게 신청 가능\n\n단, 다음에 해당하는 경우에는 신청 대상에서 제외됩니다.\n① 「노인장기요양보험법」 제2조제1호에 따른 노인 등*에 해당하는 자\n② 「장애인복지법」 제32조의2(재외동포 및 외국인의 장애인 등록)에 따라 장애등록한 재외동포 및 외국인\n③ 방과후활동서비스 이용자(18세 이상 재학생의 경우 주간활동서비스(기본형에 한함)과 방과후활동서비스 중 선택하여 이용 가능)\n④ 취업자\n⑤ 「장애인복지법」 제58조(장애인 복지시설)에 따른 장애인 거주시설*에 입소한 자\n * 장애유형별 거주시설, 중증장애인 거주시설, 장애인 단기거주시설\n⑥ 「평생교육법」 제20조의2(장애인평생교육시설등의 설치)에 따른 장애인평생 교육시설 등에서 주기적으로 낮 시간 서비스를 이용하는 자\n⑦ 장애인 주간보호시설(센터) 이용자\n⑧ 「장애인 고용촉진 및 직업재활법」 제9조부터 제19조의2까지에 따른 취업지원 등 직업재활 서비스 이용자\n⑨ 「장애인복지법」 제21조(직업), 「장애인복지법시행령 제13조의 2(장애인일자리 사업 실시)에 따른 장애인일자리사업 참여자\n⑩ 그 밖에 국가나 지방자치단체로부터 주간활동서비스와 유사한 낮시간 지원 서비스를 받는 자\n\n※ ④, ⑥, ⑧, ⑨, ⑩번에 해당하는 사람이 주20시간(월80시간)이내 근로 또는 이용하는 경우 기본형 주간활동서비스 이용 가능\n※ 기존 서비스를 중지할 것을 전제로 주간활동서비스를 신청할 수 있음\n\n더 자세한 사항은 보건복지상담센터(국번없이 ☎129)로 연락주시기 바랍니다."
print(answer)

18세~65세 미만의 「장애인복지법」상 등록된 지적 및 자폐성 장애인이 신청 대상입니다.
※ 장애인 당사자 및 가구의 소득수준과 무관하게 신청 가능

단, 다음에 해당하는 경우에는 신청 대상에서 제외됩니다.
① 「노인장기요양보험법」 제2조제1호에 따른 노인 등*에 해당하는 자
② 「장애인복지법」 제32조의2(재외동포 및 외국인의 장애인 등록)에 따라 장애등록한 재외동포 및 외국인
③ 방과후활동서비스 이용자(18세 이상 재학생의 경우 주간활동서비스(기본형에 한함)과 방과후활동서비스 중 선택하여 이용 가능)
④ 취업자
⑤ 「장애인복지법」 제58조(장애인 복지시설)에 따른 장애인 거주시설*에 입소한 자
 * 장애유형별 거주시설, 중증장애인 거주시설, 장애인 단기거주시설
⑥ 「평생교육법」 제20조의2(장애인평생교육시설등의 설치)에 따른 장애인평생 교육시설 등에서 주기적으로 낮 시간 서비스를 이용하는 자
⑦ 장애인 주간보호시설(센터) 이용자
⑧ 「장애인 고용촉진 및 직업재활법」 제9조부터 제19조의2까지에 따른 취업지원 등 직업재활 서비스 이용자
⑨ 「장애인복지법」 제21조(직업), 「장애인복지법시행령 제13조의 2(장애인일자리 사업 실시)에 따른 장애인일자리사업 참여자
⑩ 그 밖에 국가나 지방자치단체로부터 주간활동서비스와 유사한 낮시간 지원 서비스를 받는 자

※ ④, ⑥, ⑧, ⑨, ⑩번에 해당하는 사람이 주20시간(월80시간)이내 근로 또는 이용하는 경우 기본형 주간활동서비스 이용 가능
※ 기존 서비스를 중지할 것을 전제로 주간활동서비스를 신청할 수 있음

더 자세한 사항은 보건복지상담센터(국번없이 ☎129)로 연락주시기 바랍니다.


In [49]:
# 질문 실행
question = "발달장애인 주간활동지원서비스 신청 대상은 어떻게 되나요?"
response = qa_system.answer_question(question)

# 디버깅
qa_system.debug_qa_system(question)


=== 메타데이터 활용 검색 결과 확인 ===

[검색 결과 1] (검색 타입: content)
법령명: 
장 번호: 
장 제목: 
조문번호: 
조문제목: 

내용:
발달장애인 주간활동서비스는 발달장애인의 주민등록상 주소지 읍­·면·­동 행정복지센터 방문신청 또는 복지로 홈페이지(www.bokjiro.go.kr) 를 통해 온라인으로 신청이 가능합니다. 

더 자세한 사항은 보건복지상담센터(국번없이 ☎129)로 연락주시기 바랍니다.

[검색 결과 2] (검색 타입: content)
법령명: 
장 번호: 
장 제목: 
조문번호: 
조문제목: 

내용:
주간활동서비스 제공기관을 설치·운영하기 위해서는 발달장애인 대상 서비스 제공 능력 및 경험이 있는 공공·비영리·민간기관(법인, 단체 등 포함)이 「발달장애인 권리보장 및 지원에 관한 법률 시행규칙」 [별표 4의2] '주간활동 및 방과 후 활동 서비스 제공기관의 지정 기준'에서 정한 시설 및 인력기준을 갖추고, 소재지 관할 시·군·구청장으로부터 지정을 받아야 합니다.

더 자세한 사항은 보건복지상담센터(국번없이 ☎129)로 연락주시기 바랍니다.
