In [8]:
import os
import re
import time
import pickle
import asyncio
import nest_asyncio
import pandas as pd
import json
from tqdm import tqdm
from collections import defaultdict
from IPython.display import clear_output
from typing import List

from dotenv import load_dotenv
from langchain.docstore.document import Document
from langchain.chains import RetrievalQA, LLMChain
from langchain.prompts import PromptTemplate
from langchain_community.embeddings import ClovaXEmbeddings
from langchain_community.chat_models import ChatClovaX
from pymilvus import connections, utility
from langchain_community.vectorstores.milvus import Milvus
from langchain.schema import BaseRetriever
from sklearn.metrics.pairwise import cosine_similarity
from pydantic import BaseModel

In [2]:
class ISBNMergingRetriever(BaseRetriever):
    """
    BaseRetriever를 감싸서, 검색 결과를 ISBN 기준으로 그룹화하고
    각 그룹의 청크를 하나의 Document로 병합하여 반환한다.
    """

    base_retriever: BaseRetriever  # Pydantic 필드 선언

    def _get_relevant_documents(self, query: str) -> List[Document]:
        docs = self.base_retriever.get_relevant_documents(query)
        grouped = {}
        for doc in docs:
            isbn = doc.metadata.get("ISBN", "NO_ISBN")
            grouped.setdefault(isbn, []).append(doc)
        merged_docs = []
        for isbn, doc_list in grouped.items():
            combined_text = "\n".join(d.page_content for d in doc_list)
            merged_meta = dict(doc_list[0].metadata)
            print(
                f"[디버그] ISBN: {isbn}, 병합된 청크 수: {len(doc_list)}, 병합 텍스트 길이: {len(combined_text)}"
            )
            merged_docs.append(
                Document(page_content=combined_text, metadata=merged_meta)
            )
        return merged_docs

    async def _aget_relevant_documents(self, query: str) -> List[Document]:
        docs = await self.base_retriever.aget_relevant_documents(query)
        grouped = {}
        for doc in docs:
            isbn = doc.metadata.get("ISBN", "NO_ISBN")
            grouped.setdefault(isbn, []).append(doc)
        merged_docs = []
        for isbn, doc_list in grouped.items():
            combined_text = "\n".join(d.page_content for d in doc_list)
            merged_meta = dict(doc_list[0].metadata)
            merged_docs.append(
                Document(page_content=combined_text, metadata=merged_meta)
            )
        return merged_docs

    def add_documents(self, documents: List[Document], **kwargs) -> None:
        pass

    def get_type(self) -> str:
        return "isbn_merging_retriever"

In [None]:
# 환경설정, 임베딩, 문서 구성 등

previous_additional_question_embeddings = []


def is_similar_question(new_emb, prev_embeds, threshold=0.65):
    if not prev_embeds:
        return False
    sim_scores = cosine_similarity([new_emb], prev_embeds)[0]
    max_score = max(sim_scores)
    print(f"[중복 유사도 판단] Max = {max_score:.3f}")
    return max_score > threshold


load_dotenv(dotenv_path=r"")

os.environ["NCP_CLOVASTUDIO_API_KEY"] = os.getenv("NCP_CLOVASTUDIO_API_KEY")
os.environ["NCP_CLOVASTUDIO_API_URL"] = os.getenv(
    "NCP_CLOVASTUDIO_API_URL", "https://clovastudio.stream.ntruss.com/"
)

connections.connect(
    alias="default",
    host=os.getenv("MILVUS_HOST", "localhost"),
    port=os.getenv("MILVUS_PORT", "19530"),
)

ncp_embeddings = ClovaXEmbeddings(model="bge-m3")
llm_clova = ChatClovaX(model="HCX-003", max_tokens=2048)

In [None]:
embedding_file = r""
if os.path.exists(embedding_file):
    with open(embedding_file, "rb") as f:
        saved_data = pickle.load(f)
    all_text_embedding_pairs = saved_data["embeddings"]
    all_metadata_list = saved_data["metadata"]
    print("임베딩 데이터 불러오기")
else:
    raise FileNotFoundError(f"임베딩 파일을 찾을 수 없습니다: {embedding_file}")

metadata_mapping = {
    "ISBN": "ISBN",
    "페이지": "page",
    "가격": "price",
    "제목": "title",
    "저자": "author",
    "분류": "category",
    "저자소개": "author_intro",
    "책소개": "book_intro",
    "목차": "table_of_contents",
    "출판사리뷰": "publisher_review",
    "추천사": "recommendation",
}

all_metadata_list_mapped = []
for meta in all_metadata_list:
    mapped_meta = {metadata_mapping.get(key, key): value for key, value in meta.items()}
    all_metadata_list_mapped.append(mapped_meta)

documents = [
    Document(page_content=pair[0], metadata=meta)
    for pair, meta in zip(all_text_embedding_pairs, all_metadata_list_mapped)
]

임베딩 데이터 불러오기


In [12]:
collection_name = "book_rag_db"
if utility.has_collection(collection_name):
    utility.drop_collection(collection_name)

vectorstore = Milvus(
    embedding_function=ncp_embeddings,
    collection_name=collection_name,
    connection_args={"host": "localhost", "port": "19530"},
    auto_id=True,
)

texts = [pair[0] for pair in all_text_embedding_pairs]
embeds = [pair[1] for pair in all_text_embedding_pairs]


def precomputed_embed_documents(cls, input_texts):
    if input_texts != texts:
        raise ValueError(
            "ERROR : 입력 텍스트 순서가 사전 계산된 임베딩과 일치하지 않음"
        )
    return embeds


ClovaXEmbeddings.embed_documents = classmethod(precomputed_embed_documents)
vectorstore.add_texts(
    texts=texts, metadatas=all_metadata_list_mapped, embeddings=embeds
)

  vectorstore = Milvus(


[457017515479502109,
 457017515479502110,
 457017515479502111,
 457017515479502112,
 457017515479502113,
 457017515479502114,
 457017515479502115,
 457017515479502116,
 457017515479502117,
 457017515479502118,
 457017515479502119,
 457017515479502120,
 457017515479502121,
 457017515479502122,
 457017515479502123,
 457017515479502124,
 457017515479502125,
 457017515479502126,
 457017515479502127,
 457017515479502128,
 457017515479502129,
 457017515479502130,
 457017515479502131,
 457017515479502132,
 457017515479502133,
 457017515479502134,
 457017515479502135,
 457017515479502136,
 457017515479502137,
 457017515479502138,
 457017515479502139,
 457017515479502140,
 457017515479502141,
 457017515479502142,
 457017515479502143,
 457017515479502144,
 457017515479502145,
 457017515479502146,
 457017515479502147,
 457017515479502148,
 457017515479502149,
 457017515479502150,
 457017515479502151,
 457017515479502152,
 457017515479502153,
 457017515479502154,
 457017515479502155,
 457017515479

In [None]:
# 기존 dense_retriever 생성 → custom retriever로 감싸기

dense_retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
merged_retriever = ISBNMergingRetriever(base_retriever=dense_retriever)

dpr_qa_chain = RetrievalQA.from_chain_type(
    llm=llm_clova, retriever=merged_retriever, return_source_documents=True
)


def extract_field(text, field_name):
    pattern = rf"{re.escape(field_name)}\s*:\s*(.*)"
    match = re.search(pattern, text)
    return match.group(1).strip() if match else ""


MIN_INFO_LENGTH = 10

In [44]:
decision_prompt_template = PromptTemplate(
    template="""
[대화 맥락]
사용자 대화 내역:
{{ history }}
사용자의 최신 질문: "{{ query }}"

[역할 및 목표]
{{ role_instructions }}
현재 대화 상황과 질문의 맥락을 분석하여 아래 중 하나의 행동만을 출력해라.
- "추천": 사용자가 책 추천을 명확히 요청하거나, 선호도 정보(예: 선호하는 카테고리, 선호하는 작가, 책을 찾는 목적, 사전지식 등)가 **구체적**이고 충분하다고 판단되는 경우에만.
- "추가 질문": 위 조건을 충족하지 못해, 추가로 선호도 정보를 더 알아내야 할 경우, 새로운 선호도 정보를 얻을 수 있는 구체적인 추가 질문을 생성할 것.
출력 형식 (반드시 아래 내용만 출력):
행동: "<추천 또는 추가 질문>"
추가 질문: "<추가 질문인 경우 구체적인 추가 질문을, 추천인 경우 빈 문자열>"
""",
    input_variables=["history", "query", "role_instructions"],
    template_format="jinja2",
)

final_query_generation_template = PromptTemplate(
    template="""
[대화 요약]
{{ history }}

[사용자 요청]
{{ query }}

[페르소나 정보]
{{ persona_info }}

[사용자 선호도]
{{ preferences }}

위 정보를 바탕으로, 사용자가 제공한 선호도 정보(카테고리, 작가, 목적 등)를 모두 반영하여, 책 추천에 유용한 최종 검색 쿼리만을 생성해라.
출력 형식:
쿼리: <최종 검색 쿼리>
- 반드시 위 형식만을 사용하고, 추가 설명이나 안내 문구는 포함하지 말라.
- "최종 검색 쿼리"는 질문 형태가 아니어야 하며, '추가 질문'이라는 표현은 사용하지 말라.
""",
    input_variables=["history", "query", "persona_info", "preferences"],
    template_format="jinja2",
)

# 페르소나별 Pipeline 클래스

literature_role = "너는 감성적이고 문학적인 도서 추천 챗봇이다. 사용자의 감정과 취향을 섬세하게 파악하여, 감성적인 어투를 활용해 책을 추천해라."
science_role = "너는 정확하고 논리적인 과학/기술 도서 추천 챗봇이다. 사용자의 관심 분야와 요구를 분석하여, 명확하고 구체적인 정보를 활용하여 책을 추천해라."
general_role = "너는 친절하고 신뢰할 수 있는 범용 도서 추천 챗봇이다. 사용자의 관심사와 목적을 파악하여, 다양한 분야의 책을 적절하게 추천해라. 문학, 과학 외에도 자기계발, 역사, 에세이, 경제경영 등 모든 장르에 유연하게 대응해라."

In [45]:
import contextlib
from langchain_core.runnables import RunnableSequence


class SummarizeFinalQueryChain:
    async def __call__(self, input_data: dict) -> dict:
        query = input_data.get("final_query") or input_data.get("text", "")
        prompt = f"다음 내용을 하나의 자연스러운 문장으로 정제해줘:\n{query}\n정제된 검색 쿼리:"
        result_text = await async_invoke_llm(prompt, "최종 쿼리 정제")
        return {"text": result_text.strip()}


async def async_invoke(chain: LLMChain, vars_dict: dict, step_name: str) -> dict:
    try:
        print(f"\n[디버그] {step_name} 호출 전 변수: {vars_dict}")
        result = await asyncio.to_thread(chain.invoke, vars_dict)
        print(f"\n[디버그] {step_name} 결과: {result}")
        return result
    except Exception as e:
        print(f"[에러] {step_name}에서 예외 발생: {str(e)}")
        return {"text": ""}


async def async_invoke_llm(prompt: str, step_name: str) -> str:
    try:
        print(f"\n[디버그] {step_name} 프롬프트 호출:\n{prompt}")
        response = await asyncio.to_thread(llm_clova.invoke, prompt)
        result_text = response.text().strip()
        print(f"\n[디버그] {step_name} 응답: {result_text}")
        return result_text
    except Exception as e:
        print(f"[에러] {step_name}에서 예외 발생: {str(e)}")
        return ""

In [None]:
class BaseRAGPipeline:
    def __init__(self, config, llm, retriever, qa_chain, documents):
        self.config = config
        self.llm = llm
        self.retriever = retriever
        self.qa_chain = qa_chain
        self.documents = documents

        # 사용자와 LLM 히스토리 분리
        self.user_history = []
        self.llm_history = []

        self.user_preferences = defaultdict(list)
        self.preferences_text = ""
        self.preference_update_count = 0

        # 마지막 추천 결과 저장 (병합된 Document 리스트)
        self.last_recommendations = []
        # 마지막 행동 추적 (예: "추천", "추가 질문" 등)
        self.last_action = None

        self.decision_chain = LLMChain(llm=self.llm, prompt=decision_prompt_template)
        self.final_query_generation_chain = LLMChain(
            llm=self.llm, prompt=final_query_generation_template
        )

    def robust_parse_decision_response(self, response_text):
        action_match = re.search(r'행동\s*[:：]\s*"?([^"\n]+)"?', response_text)
        action = action_match.group(1).strip() if action_match else None

        book_info_match = re.search(
            r'추천\s*책\s*정보\s*[:：]\s*"?([^"\n]+)"?', response_text
        )
        book_info = book_info_match.group(1).strip() if book_info_match else ""

        follow_match = re.search(r'추가\s*질문\s*[:：]\s*"?([^"\n]+)"?', response_text)
        additional_question = follow_match.group(1).strip() if follow_match else ""
        return action, book_info, additional_question

    async def get_final_query(self, final_query_vars):
        composite_final_chain = RunnableSequence(
            *[self.final_query_generation_chain, SummarizeFinalQueryChain()]
        )
        composite_result = await composite_final_chain.ainvoke(final_query_vars)
        final_query = composite_result.get("text", "").strip()
        return final_query

    async def summarize_user_preferences(self, existing_preferences, new_input):
        prompt = (
            f"다음 사용자 선호도 내용들을 하나의 자연스러운 문장으로 요약해줘:\n"
            f"기존 선호도: {existing_preferences}\n"
            f"새로운 입력: {new_input}\n"
            f"요약된 선호도:"
        )
        return await async_invoke_llm(prompt, "사용자 선호도 요약")

    async def generate_answer(self, query):
        # 일반 QA 시나리오: RetrievalQA 체인을 사용
        author_match = re.search(r"(?:저자|작가)\s*[:：]\s*(\S+)", query)
        if author_match:
            author_name = author_match.group(1).strip().lower()
            dense_results = self.qa_chain.invoke(query)["source_documents"]
            keyword_results = [
                doc
                for doc in self.documents
                if doc.metadata.get("author", "").strip().lower() == author_name
            ]
            source_docs = list(
                {
                    doc.metadata.get("ISBN"): doc
                    for doc in (dense_results + keyword_results)
                    if doc.metadata.get("ISBN")
                }.values()
            )
        else:
            result = self.qa_chain.invoke(query)
            source_docs = result["source_documents"]

        retrieved_isbns = set()
        for doc in source_docs:
            isbn = doc.metadata.get("ISBN")
            if isbn:
                retrieved_isbns.add(isbn)

        aggregated_docs = []
        for isbn in retrieved_isbns:
            book_docs = [
                doc for doc in self.documents if doc.metadata.get("ISBN") == isbn
            ]
            if not book_docs:
                continue
            aggregated_text = "\n".join(doc.page_content for doc in book_docs)
            aggregated_docs.append(
                Document(page_content=aggregated_text, metadata=book_docs[0].metadata)
            )

        formatted_answers = []
        for doc in aggregated_docs:
            metadata = doc.metadata
            title = metadata.get("title") or extract_field(doc.page_content, "제목")
            author = metadata.get("author") or extract_field(doc.page_content, "저자")
            aggregated_text = doc.page_content
            book_intro = extract_field(aggregated_text, "책소개")
            publisher_review = extract_field(aggregated_text, "출판사리뷰")
            recommendation_field = extract_field(aggregated_text, "추천사")

            if book_intro and len(book_intro.strip()) >= MIN_INFO_LENGTH:
                selected_info = book_intro
            elif publisher_review and len(publisher_review.strip()) >= MIN_INFO_LENGTH:
                selected_info = publisher_review
            elif (
                recommendation_field
                and len(recommendation_field.strip()) >= MIN_INFO_LENGTH
            ):
                selected_info = recommendation_field
            else:
                selected_info = ""

            if not selected_info:
                reason = "추천 정보 생성 불가"
            else:
                reason_prompt = (
                    f"다음 정보와 검색에 활용된 쿼리를 참고하여, "
                    f"이 책이 추천되는 이유를 명확하게 요약해라.\n"
                    f"정보:\n{selected_info}"
                )
                generated_reason = await async_invoke_llm(
                    reason_prompt, "추천 이유 생성"
                )
                if (
                    not generated_reason
                    or len(generated_reason) < 10
                    or "추천 정보 생성 불가" in generated_reason
                ):
                    reason = "추천 정보 생성 불가"
                else:
                    reason = generated_reason

            formatted = f"책 제목: {title}\n저자: {author}\n추천 이유: {reason}"
            formatted_answers.append(formatted)

        answer = "\n\n".join(formatted_answers)
        refined_answer = await async_invoke_llm(
            f"아래 원본 추천 결과를 읽고, 각 책의 정보를 다음 형식에 맞춰 재작성해라.\n\n"
            f"형식:\n책 제목: <책 제목>\n저자: <저자>\n추천 이유: <추천 이유>\n\n"
            f"원본 추천 결과:\n{answer}\n\n"
            f"출력 시, 반드시 위 형식만을 사용하고 불필요한 안내 문구는 포함하지 말아라.",
            "추천 결과 재정제",
        )
        return refined_answer, None

    async def _summarize_chunk_with_llm(self, text: str) -> str:
        prompt = f"다음 책 정보를 200자 이내로 요약해줘:\n{text}\n\n요약:"
        summary = await async_invoke_llm(prompt, "chunk summary")
        if not summary or len(summary) < 10:
            summary = "별도의 상세 정보가 충분치 않습니다."
        return summary

    def _merge_documents_by_isbn(self, isbn: str) -> Document:
        docs = [doc for doc in self.documents if doc.metadata.get("ISBN") == isbn]
        if not docs:
            return None
        combined_text = "\n".join(doc.page_content for doc in docs)
        merged_meta = docs[0].metadata
        return Document(page_content=combined_text, metadata=merged_meta)

    async def _some_simple_recommendation(self, final_query):
        source_docs = self.retriever.get_relevant_documents(final_query)
        top_docs = source_docs[:3]
        merged_top_docs = []
        for doc in top_docs:
            isbn = doc.metadata.get("ISBN", "NO_ISBN")
            merged_doc = self._merge_documents_by_isbn(isbn)
            if merged_doc:
                merged_top_docs.append(merged_doc)
                print(
                    f"[추천 디버그] ISBN: {isbn}, 병합된 전체 청크 길이: {len(merged_doc.page_content)}"
                )
        recommendations = []
        for doc in merged_top_docs:
            metadata = doc.metadata
            title = metadata.get("title", "제목 정보 없음")
            author = metadata.get("author", "저자 정보 없음")
            book_intro = extract_field(doc.page_content, "책소개")
            publisher_review = extract_field(doc.page_content, "출판사리뷰")
            recommendation_field = extract_field(doc.page_content, "추천사")
            if book_intro and len(book_intro.strip()) >= MIN_INFO_LENGTH:
                selected_info = book_intro
            elif publisher_review and len(publisher_review.strip()) >= MIN_INFO_LENGTH:
                selected_info = publisher_review
            elif (
                recommendation_field
                and len(recommendation_field.strip()) >= MIN_INFO_LENGTH
            ):
                selected_info = recommendation_field
            else:
                selected_info = ""
            if selected_info:
                summary = await self._summarize_chunk_with_llm(selected_info)
            else:
                summary = "별도의 상세 정보가 충분치 않습니다."
            if len(summary) > 200:
                summary = summary[:200] + "..."
            recommendation_text = (
                f"책 제목: {title}\n저자: {author}\n추천 이유: {summary}"
            )
            recommendations.append(recommendation_text)
        self.last_recommendations = merged_top_docs
        if not recommendations:
            return "해당 검색 쿼리에 대한 직접 추천 결과가 없습니다."
        return "\n\n".join(recommendations)

    # 후속 질문 처리 함수 (LLM을 통해 사용자의 의도를 분석)
    async def handle_followup_query(self, followup_query: str) -> (bool, str):
        rec_info = []
        for doc in self.last_recommendations:
            title = doc.metadata.get("title", "제목 정보 없음")
            isbn = doc.metadata.get("ISBN", "NO_ISBN")
            # 책 전문 중 1000자까지만 보게
            snippet = doc.page_content[:1000].replace("\n", " ")
            rec_info.append(f"제목: {title}, ISBN: {isbn}, 내용 일부: {snippet}")
        rec_info_str = "\n".join(rec_info)

        prompt = f"""
        다음 후속 질문: "{followup_query}"
        추천된 책 목록:
        {rec_info_str}

        위 책 목록을 참고하여, 사용자의 후속 질문 의도를 분석해라.
        - 만약 사용자가 특정 책에 대해 더 자세한 정보를 원한다면, "action"을 "상세"로, "ISBN"에 해당 책의 ISBN을, "query"는 빈 문자열로 출력해라.
        - 만약 사용자가 특정 책과 유사한 책을 추천받고자 한다면, "action"을 "유사"로, "ISBN"에 해당 책의 ISBN을, "query"에 해당 책 전문을 요약한 내용을 출력해라.
        출력 형식은 반드시 JSON 형식으로, 예:
        {"{"} "action": "상세", "ISBN": "1234567890", "query": "" {"}"}
        또는
        {"{"} "action": "유사", "ISBN": "1234567890", "query": "요약문" {"}"}
        단, 불필요한 설명이나 추가 문구 없이 오직 JSON 형식만 출력해라.
        """
        result_text = await async_invoke_llm(prompt, "후속 질문 의도 분석")
        try:
            result = json.loads(result_text)
            action = result.get("action", "")
            isbn = result.get("ISBN", "")
            query_part = result.get("query", "")
        except Exception as e:
            print(f"[에러] 후속 질문 의도 분석 결과 파싱 실패: {e}")
            return (False, "후속 질문 처리 중 오류가 발생했습니다.")

        if action == "상세":
            target_doc = None
            for doc in self.last_recommendations:
                if doc.metadata.get("ISBN", "") == isbn:
                    target_doc = doc
                    break
            if target_doc:
                detail_prompt = f"다음 책 정보에 대해 더 자세한 설명을 해줘:\n\n{target_doc.page_content}\n\n자세한 설명:"
                detailed_info = await async_invoke_llm(detail_prompt, "후속 상세 설명")
                return (True, detailed_info)
            else:
                return (True, "해당 ISBN의 책 정보를 찾을 수 없습니다.")
        elif action == "유사":
            if query_part:
                # 기존 단순 제목 나열 대신, _some_simple_recommendation을 호출하여
                # 제목, 저자, 추천이유 형식의 재추천 결과를 생성함.
                recommendation_result = await self._some_simple_recommendation(
                    query_part
                )
                return (True, recommendation_result)
            else:
                return (True, "요약된 쿼리 정보가 부족합니다.")
        else:
            return (False, "후속 질문 의도 분석 결과 알 수 없는 행동입니다.")

    async def search_and_generate_answer(self, user_query, force_recommendation=False):
        self.preferences_text = " ".join(self.user_preferences["preferences"])
        query_summary = "\n".join(self.user_history[-5:])

        if self.config.get("persona") == "Literature":
            persona_info = "감성, 현재 기분, 선호하는 문학 장르 및 작가 정보"
        elif self.config.get("persona") == "Science":
            persona_info = "초심자 여부, 관심 분야, 구체적인 기술 정보"
        elif self.config.get("persona") == "General":
            persona_info = "장르, 책을 찾는 이유, 독서 취향 정보"
        else:
            persona_info = ""

        if force_recommendation:
            action = "추천"
            print("[디버그] force_recommendation 적용: 추천 행동으로 전환합니다.")
            additional_question = ""
        else:
            prompt_vars = {
                "history": query_summary,
                "query": user_query,
                "role_instructions": self.config["role_instructions"],
            }
            decision_result = await async_invoke(
                self.decision_chain, prompt_vars, "행동 결정"
            )
            decision_text = decision_result.get("text", "").strip()
            print(f"\n[디버그] Decision 응답: {decision_text}")
            action, _, additional_question = self.robust_parse_decision_response(
                decision_text
            )
            print(f"[디버그] 행동: {action}")

        if action == "추가 질문":
            try:
                add_q_emb = ncp_embeddings.embed_query(additional_question)
            except Exception as e:
                print(f"[에러] 추가 질문 임베딩 생성 중 문제 발생: {str(e)}")
                add_q_emb = None

            if add_q_emb is not None and is_similar_question(
                add_q_emb, previous_additional_question_embeddings
            ):
                print("[정보] 동일 추가 질문이 재입력되어 추천 행동으로 전환합니다.")
                return await self.search_and_generate_answer(
                    user_query, force_recommendation=True
                )
            else:
                if add_q_emb is not None:
                    previous_additional_question_embeddings.append(add_q_emb)

            if additional_question:
                self.llm_history.append(f"챗봇: {additional_question}")
                print("-" * 50)
                print(f"[챗봇] {additional_question}")
                print("-" * 50)

            raw_user_input = input("[사용자] ")
            self.user_history.append(raw_user_input)

            if self.preferences_text:
                updated_pref = await self.summarize_user_preferences(
                    self.preferences_text, raw_user_input
                )
            else:
                updated_pref = raw_user_input

            self.preferences_text = updated_pref
            self.preference_update_count += 1
            print(f"\n[디버그] 업데이트된 사용자 선호도: {self.preferences_text}")
            print(f"[디버그] 선호도 업데이트 횟수: {self.preference_update_count}")

            if self.preference_update_count >= 3:
                print("[디버그] 선호도 업데이트가 3회가 되어 추천 행동으로 전환합니다.")
                action = "추천"
            else:
                updated_prompt_vars = {
                    "history": "\n".join(self.user_history[-5:]),
                    "query": user_query,
                    "role_instructions": self.config["role_instructions"],
                }
                decision_result = await async_invoke(
                    self.decision_chain, updated_prompt_vars, "재결정"
                )
                decision_text = decision_result.get("text", "").strip()
                print(f"\n[디버그] 재결정 응답: {decision_text}")
                action, _, additional_question = self.robust_parse_decision_response(
                    decision_text
                )
                print(f"[디버그] 재결정 행동: {action}")
                if action == "추가 질문":
                    if additional_question:
                        self.llm_history.append(f"챗봇: {additional_question}")
                        print("-" * 50)
                        print(f"[챗봇] {additional_question}")
                        print("-" * 50)
                    return additional_question

        final_query_vars = {
            "history": "\n".join(self.user_history[-5:]),
            "query": user_query,
            "persona_info": persona_info,
            "preferences": self.preferences_text,
        }
        final_query = await self.get_final_query(final_query_vars)
        print(f"\n[디버그] 최종 검색 쿼리: {final_query}")

        if action == "추천":
            self.last_action = "추천"
            answer = await self._some_simple_recommendation(final_query)
            return answer
        else:
            self.last_action = action
            answer, _ = await self.generate_answer(final_query)
            return answer

    async def interactive_multi_turn_qa(self):
        if self.config["persona"] == "Literature":
            greeting = "안녕하세요~ 감성적이고 문학적인 도서 추천 챗봇입니다. 어떤 책을 추천해드릴까요?"
        elif self.config["persona"] == "Science":
            greeting = "안녕하십니까. 정확하고 논리적인 과학/기술 도서 추천 챗봇입니다. 관심 있는 기술 분야에 대해 편하게 이야기해 주세요."
        else:
            greeting = "안녕하세요! 범용/일반 도서 추천 챗봇입니다. 관심 있는 분야에 대해 편하게 이야기해 주세요."

        self.llm_history.append(f"챗봇: {greeting}")
        print("-" * 50)
        print(f"[챗봇] {greeting}")
        print("-" * 50)

        while True:
            try:
                user_query = input("[사용자] ")
            except (KeyboardInterrupt, EOFError):
                print("\n[대화 종료]")
                break

            if user_query.lower() == "quit":
                print("\n[대화 저장 중...]")
                print("대화 저장 완료")
                import sys

                sys.exit()

            # 만약 직전 행동이 "추천"이었다면 후속 질문으로 처리 시도
            if self.last_action == "추천" and self.last_recommendations:
                handled, followup_output = await self.handle_followup_query(user_query)
                if handled:
                    answer = followup_output
                    self.last_action = None
                    self.last_recommendations = []
                else:
                    self.user_history.append(user_query)
                    answer = await self.search_and_generate_answer(user_query)
            else:
                self.user_history.append(user_query)
                answer = await self.search_and_generate_answer(user_query)

            if answer is not None:
                self.llm_history.append(f"챗봇: {answer}")
                print("-" * 50)
                print(f"[챗봇] {answer}")
                print("-" * 50)

In [53]:
class LiteratureRAGPipeline(BaseRAGPipeline):
    def __init__(self, llm, retriever, qa_chain, documents):
        config = {"persona": "Literature", "role_instructions": literature_role}
        super().__init__(config, llm, retriever, qa_chain, documents)


class ScienceRAGPipeline(BaseRAGPipeline):
    def __init__(self, llm, retriever, qa_chain, documents):
        config = {"persona": "Science", "role_instructions": science_role}
        super().__init__(config, llm, retriever, qa_chain, documents)


class GeneralRAGPipeline(BaseRAGPipeline):
    def __init__(self, llm, retriever, qa_chain, documents):
        config = {"persona": "General", "role_instructions": general_role}
        super().__init__(config, llm, retriever, qa_chain, documents)

In [None]:
nest_asyncio.apply()


def main():
    previous_additional_question_embeddings.clear()
    print("페르소나 선택:")
    print("1. 예술/문학")
    print("2. 과학/기술")
    print("3. 범용/일반")
    choice = input("원하는 페르소나 번호를 입력하세요 (1, 2, 3): ").strip()
    if choice == "1":
        pipeline = LiteratureRAGPipeline(
            llm_clova, merged_retriever, dpr_qa_chain, documents
        )
    elif choice == "2":
        pipeline = ScienceRAGPipeline(
            llm_clova, merged_retriever, dpr_qa_chain, documents
        )
    elif choice == "3":
        pipeline = GeneralRAGPipeline(
            llm_clova, merged_retriever, dpr_qa_chain, documents
        )
    else:
        print("잘못된 선택입니다. 기본 범용/일반 페르소나로 실행합니다.")
        pipeline = GeneralRAGPipeline(
            llm_clova, merged_retriever, dpr_qa_chain, documents
        )

    asyncio.run(pipeline.interactive_multi_turn_qa())


if __name__ == "__main__":
    main()

페르소나 선택:
1. 예술/문학
2. 과학/기술
3. 범용/일반
--------------------------------------------------
[챗봇] 안녕하세요~ 감성적이고 문학적인 도서 추천 챗봇입니다. 어떤 책을 추천해드릴까요?
--------------------------------------------------

[디버그] 행동 결정 호출 전 변수: {'history': '소설책.', 'query': '소설책.', 'role_instructions': '너는 감성적이고 문학적인 도서 추천 챗봇이다. 사용자의 감정과 취향을 섬세하게 파악하여, 감성적인 어투를 활용해 책을 추천해라.'}

[디버그] 행동 결정 결과: {'history': '소설책.', 'query': '소설책.', 'role_instructions': '너는 감성적이고 문학적인 도서 추천 챗봇이다. 사용자의 감정과 취향을 섬세하게 파악하여, 감성적인 어투를 활용해 책을 추천해라.', 'text': '행동: "추가 질문"\n추가 질문: 어떤 장르의 소설책을 좋아하시나요?'}

[디버그] Decision 응답: 행동: "추가 질문"
추가 질문: 어떤 장르의 소설책을 좋아하시나요?
[디버그] 행동: 추가 질문
--------------------------------------------------
[챗봇] 어떤 장르의 소설책을 좋아하시나요?
--------------------------------------------------

[디버그] 업데이트된 사용자 선호도: 
[디버그] 선호도 업데이트 횟수: 1

[디버그] 재결정 호출 전 변수: {'history': '소설책.\n', 'query': '소설책.', 'role_instructions': '너는 감성적이고 문학적인 도서 추천 챗봇이다. 사용자의 감정과 취향을 섬세하게 파악하여, 감성적인 어투를 활용해 책을 추천해라.'}

[디버그] 재결정 결과: {'history': '소설책.\n', 'query': '소설책.', 

SystemExit: 

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
