# RAG Sample code

In [82]:
import os
from dotenv import load_dotenv

# 환경 변수 로드
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
FAISS_INDEX_PATH = os.getenv("FAISS_INDEX_PATH", "faiss_index")


### Function

In [83]:
# langchain 라이브러리에서 필요한 모듈을 가져옵니다.
from langchain import hub
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_upstage import UpstageEmbeddings
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import PromptTemplate
from langchain.embeddings import CacheBackedEmbeddings
from langchain.storage import LocalFileStore
# ABC 모듈을 가져와서 추상 클래스와 메서드를 정의합니다.
from abc import ABC, abstractmethod
# itemgetter를 가져와서 딕셔너리에서 값을 쉽게 가져올 수 있도록 합니다.
from operator import itemgetter

# RetrievalChain이라는 추상 클래스를 정의합니다.
class RetrievalChain(ABC):
    # 초기화 메서드입니다. source_uri를 인자로 받습니다.
    def __init__(self, source_uri):
        self.source_uri = None  # 문서의 출처를 저장할 변수입니다.
        self.k = 5  # 검색할 문서의 수를 설정합니다.
        self.cached_embeddings = None

    # 문서를 로드하는 추상 메서드입니다. 이 메서드는 하위 클래스에서 구현해야 합니다.
    @abstractmethod
    def load_documents(self, source_uris):
        """loader를 사용하여 문서를 로드합니다."""
        pass

    # 텍스트 분할기를 생성하는 추상 메서드입니다. 하위 클래스에서 구현해야 합니다.
    @abstractmethod
    def create_text_splitter(self):
        """text splitter를 생성합니다."""
        pass

    # 문서를 분할하는 메서드입니다. 주어진 문서와 텍스트 분할기를 사용합니다.
    def split_documents(self, docs, text_splitter):
        """text splitter를 사용하여 문서를 분할합니다."""
        return text_splitter.split_documents(docs)

    # 임베딩을 생성하는 메서드입니다.
    def create_embedding(self):
        #upstage 임베딩 사용하는 경우 - 이게 토큰당 비용이 더 싸고 무료 크레딧 제공하긴 합니다
        #embedding = UpstageEmbeddings(model="solar-embedding-1-large")
        #openai 임베딩 사용하는 경우
        embedding = OpenAIEmbeddings(model="text-embedding-3-large")
        fs = LocalFileStore("./cached_embeddings/")
        self.cached_embeddings = CacheBackedEmbeddings.from_bytes_store(
            embedding, fs, namespace=embedding.model
        )
        return self.cached_embeddings

    # 벡터 저장소를 생성하는 메서드입니다.
    def create_vectorstore(self, split_docs,cache=True):
        if cache:
            if os.path.exists(FAISS_INDEX_PATH):
                print("기존 FAISS 인덱스를 로드합니다.")
                return FAISS.load_local(FAISS_INDEX_PATH, self.cached_embeddings, allow_dangerous_deserialization=True)
            else:
                print("새로운 FAISS 벡터 저장소를 생성합니다.")
                vector_store = FAISS.from_texts(split_docs, self.cached_embeddings)
                vector_store.save_local(FAISS_INDEX_PATH)
                return vector_store
        return FAISS.from_documents(
            documents=split_docs, embedding=self.create_embedding()
        )

    # 검색을 수행하는 retriever를 생성하는 메서드입니다.
    def create_retriever(self, vectorstore):
        # MMR을 사용하여 검색을 수행하는 retriever를 생성합니다.
        dense_retriever = vectorstore.as_retriever(
            search_type="mmr", search_kwargs={"k": self.k}
        )
        return dense_retriever

    # 모델을 생성하는 메서드입니다.
    def create_model(self):
        return ChatOpenAI(model_name="gpt-4o-mini", temperature=0)

    # 프롬프트를 생성하는 메서드입니다.
    def create_prompt(self):
        # 프롬프트 템플릿 생성 ! 이부분을 적절히 수정하여 사용하시면 됩니다.
        prompt = PromptTemplate.from_template(
    """You are an assistant for question-answering tasks. 
Use the following pieces of retrieved context to answer the question. 
If you don't know the answer, just say that you don't know. 
Answer in Korean.

#Context: 
{context}

#Question:
{question}

#Answer:"""
)
        return prompt

    # 문서를 포맷하는 정적 메서드입니다.
    @staticmethod
    def format_docs(docs):
        return "\n".join(docs)
    # format_docs 함수 정의
    def format_docs(docs):
        return "\n".join(
            [
                f"<document><content>{doc.page_content}</content><source>{doc.metadata.get('source', '정보 없음')}</source><page>{int(doc.metadata.get('page', 0)) + 1}</page></document>"
                for doc in docs
            ]
        )
    # 전체 체인을 생성하는 메서드입니다.
    def create_chain(self):
        docs = self.load_documents(self.source_uri)  # 문서를 로드합니다.
        text_splitter = self.create_text_splitter()  # 텍스트 분할기를 생성합니다.
        split_docs = self.split_documents(docs, text_splitter)  # 문서를 분할합니다.
        self.vectorstore = self.create_vectorstore(split_docs)  # 벡터 저장소를 생성합니다.
        self.retriever = self.create_retriever(self.vectorstore)  # retriever를 생성합니다.
        model = self.create_model()  # 모델을 생성합니다.
        prompt = self.create_prompt()  # 프롬프트를 생성합니다.
        # 체인을 구성합니다.
        self.chain = (
        {"context":itemgetter("context"), "question":itemgetter("question")} # 컨텍스트와 질문을 연결합니다.
            | prompt  # 프롬프트와 연결합니다.
            | model  # 모델과 연결합니다.
            | StrOutputParser()  # 출력 파서를 연결합니다.
        )
        return self  # 현재 인스턴스를 반환합니다.


In [93]:
from langchain_text_splitters import RecursiveCharacterTextSplitter


#해야할 것
# 1. sql 쿼리 받아온 데이터를 json 형태로 변환해서 디렉토리에 저장하기
# 2. 데이터 전처리 하기 -> 문서 전처리 함수 만들기 (DB 데이터 형식에 맞춰서 변환 바랍니다)
# 3. 벡터 저장소 만들기 -> 벡터 저장소 함수 만들기
# 4. 벡터 저장소 업로드 하기 -> 벡터 저장소 업로드 함수 만들기

# JSONRetrievalChain 클래스는 RetrievalChain 클래스를 상속받아 JSON 파일에서 문서를 로드하는 기능을 제공합니다.
class JSONRetrievalChain(RetrievalChain):
    def __init__(self, source_uri):
        self.source_uri = source_uri  # 데이터 소스 URI를 초기화합니다.
        self.k = 5  # 검색할 문서의 수를 설정합니다.
        self.cached_embeddings = super().create_embedding()
    def sql_query(self):
        # 이 부분에서 sql로 받아온 데이터를 변환해서 디렉토리에 json형태로 저장하셔야합니다
        # 아래는 gpt 코드라 안해봐서 참고만 하십셔

        # import sqlite3  # SQLite 데이터베이스를 다루기 위해 sqlite3 모듈을 임포트합니다.
        # import json  # JSON 파일을 다루기 위해 json 모듈을 임포트합니다.

        # # 데이터베이스에 연결합니다.
        # conn = sqlite3.connect('your_database.db')  # 데이터베이스 파일 경로를 지정합니다.
        # cursor = conn.cursor()  # 커서를 생성합니다.

        # # SQL 쿼리를 실행하여 데이터를 가져옵니다.
        # cursor.execute("SELECT * FROM documents WHERE category = '법률'")  # 법률 문서를 검색하는 SQL 쿼리
        # rows = cursor.fetchall()  # 쿼리 결과를 가져옵니다.

        # # 결과를 JSON 형식으로 변환합니다.
        # documents = []
        # for row in rows:
        #     document = {
        #         'book_id': row[0],  # 첫 번째 열을 book_id로 사용합니다.
        #         'text': row[1],  # 두 번째 열을 텍스트로 사용합니다.
        #         'category': row[2],  # 세 번째 열을 카테고리로 사용합니다.
        #         'popularity': row[3],  # 네 번째 열을 인기도로 사용합니다.
        #         'keyword': row[4],  # 다섯 번째 열을 키워드로 사용합니다.
        #         'word_segment': row[5],  # 여섯 번째 열을 단어 분할로 사용합니다.
        #         'publication_ymd': row[6]  # 일곱 번째 열을 출판 날짜로 사용합니다.
        #     }
        #     documents.append(document)  # 문서를 리스트에 추가합니다.

        # # JSON 파일로 저장합니다.
        # with open('output_documents.json', 'w', encoding='utf-8') as json_file:
        #     json.dump({'data': documents}, json_file, ensure_ascii=False, indent=4)  # JSON 파일로 저장합니다.

        # cursor.close()  # 커서를 닫습니다.
        # conn.close()  # 데이터베이스 연결을 닫습니다.

        return "SELECT * FROM documents WHERE category = '법률'"  # 법률 문서를 검색하는 SQL 쿼리를 반환합니다.
    
    # load_documents 메서드는 주어진 JSON 파일에서 문서를 로드합니다.
    def load_documents(self, source_uris):
        # JSON 파일의 경로를 설정합니다.
        data_dir = "data/154.의료, 법률 전문 서적 말뭉치/01-1.정식개방데이터/Training/02.라벨링데이터/Training_legal.json"  # data 폴더에 json 파일들을 저장해 주세요
        import json  # JSON 파일을 다루기 위해 json 모듈을 임포트합니다.
        split_docs = []  # 문서를 저장할 리스트를 초기화합니다.

        # JSON 파일을 열고 데이터를 읽습니다.
        with open(data_dir, 'r', encoding='utf-8') as f:
            try:
                json_data = json.load(f)  # JSON 파일을 파싱하여 데이터를 로드합니다.
                            
                # 'data' 키가 있는지 확인하고, 해당 배열에서 문서를 추출합니다.
                if isinstance(json_data, dict) and 'data' in json_data:
                    for index, item in enumerate(json_data['data']):  # JSON 데이터에서 문서를 순회합니다.
                        if index >= 10:  # 20번째 문서까지만 로드
                            break
                        if isinstance(item, dict):  # 각 항목이 딕셔너리인지 확인합니다.
                            # Document 객체를 생성하여 문서 내용을 저장합니다.
                            from langchain.schema import Document
                            doc = Document(
                                page_content=item.get('text', ''),  # 문서의 텍스트를 가져옵니다.
                                metadata={  # 문서의 메타데이터를 설정합니다.
                                    'book_id': item.get('book_id'),  # 책 ID
                                    'category': item.get('category'),  # 카테고리
                                    'popularity': item.get('popularity'),  # 인기
                                    'keyword': item.get('keyword', []),  # 키워드
                                    'word_segment': item.get('word_segment', []),  # 단어 분할
                                    'publication_ymd': item.get('publication_ymd')  # 출판 날짜
                                }
                            )
                            split_docs.append(doc)  # 생성한 Document 객체를 리스트에 추가합니다.
                        else:
                            print("JSON 데이터가 예상된 형식이 아닙니다.")  # 데이터 형식 오류 메시지
                            print("데이터 구조:", json_data.keys() if isinstance(json_data, dict) else type(json_data))  # 데이터 구조 출력
                    
                    print(f"전체 {len(json_data['data'])}개 중 {len(split_docs)}개의 문서를 로드했습니다.")  # 로드된 문서 수 출력
                                
            except json.JSONDecodeError as e:  # JSON 파싱 중 오류가 발생한 경우
                print(f"JSON 파일 파싱 중 오류가 발생했습니다: {e}")  # 오류 메시지 출력
        return split_docs  # 로드된 문서 리스트를 반환합니다.

    # create_text_splitter 메서드는 텍스트 분할기를 생성합니다.
    def create_text_splitter(self):
        return RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)  # 문서를 500자씩 분할하고 50자 중첩하여 분할합니다.


In [94]:
# 체인 생성
rag = JSONRetrievalChain(source_uri="").create_chain()


전체 63704개 중 10개의 문서를 로드했습니다.
새로운 FAISS 벡터 저장소를 생성합니다.


AttributeError: 'Document' object has no attribute 'encode'

In [77]:
rag_chain=rag.chain
rag_retriever=rag.retriever
retrieved_docs=rag_retriever.invoke("이행강제금부과처분취소 ")
response = rag_chain.invoke({
    "context": retrieved_docs,  # 필요한 경우 적절한 컨텍스트를 추가
    "question": "이행강제금부과처분취소에 대해 알려줘"
})
print(response)

이행강제금부과처분취소에 대한 판결은 대법원에서 2013년 12월 12일 선고된 2012두20397 판결에 해당합니다. 이 판결에서는 개발제한구역의 지정 및 관리에 관한 특별조치법에 따라 이행강제금을 부과·징수할 때마다 시정명령 절차를 다시 거쳐야 하는지 여부에 대해 소극적으로 판단하였습니다. 즉, 이행강제금을 부과하기 전에 반드시 시정명령을 다시 발부할 필요는 없다는 것입니다. 또한, 이행강제금 부과의 근거가 되는 시정명령은 법률 시행일인 2010년 2월 7일 이후에 이루어져야 한다고 명시하였습니다.
