In [15]:
# ingestion.py

import getpass
import os
from dotenv import load_dotenv
from langchain_openai import OpenAIEmbeddings
from pinecone import Pinecone
from langchain_pinecone import PineconeVectorStore
from langchain.retrievers import BM25Retriever,EnsembleRetriever
from kiwipiepy import Kiwi
import cohere
import dill

load_dotenv(override=True) # 강제 다시 로드

if not os.environ.get("OPENAI_API_KEY"):
  os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

if not os.environ.get("PINECONE_API_KEY"):
  os.environ["PINECONE_API_KEY"] = getpass.getpass("Enter Pinecone API key: ")

pinecone_api = os.environ["PINECONE_API_KEY"]
cohere_api = os.environ["COHERE_API_KEY"]

# vectorstore load
pc = Pinecone(api_key=pinecone_api)

index_name = "canon"
index = pc.Index(index_name)

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

vector_store = PineconeVectorStore(embedding=embeddings, index=index)


retriever = vector_store.as_retriever(
  search_type="similarity", search_kwargs={"k": 10},
)

# 새로 초기화된 Kiwi와 동일한 토크나이징 함수 적용
kiwi = Kiwi()
def kiwi_tokenize(text):
    return [token.form for token in kiwi.tokenize(text)]

# === 1. BM25Retriever와 Kiwi 로드 ===
with open("../chunk_result/bm25_retriever_r50.pkl", "rb") as f:
    bm25_retriever = dill.load(f)


bm25_retriever.preprocess_func = kiwi_tokenize

# === 3. Ensemble Retriever 생성 ===
ensemble_retriever = EnsembleRetriever(
    retrievers=[retriever, bm25_retriever],
    weights=[0.5, 0.5]  # Dense와 BM25 각각 50% 가중치
)

cohere_client = cohere.Client(cohere_api)

In [16]:
# state.py
from typing import Annotated, TypedDict
from langgraph.graph.message import add_messages

# GraphState 상태 정의
class GraphState(TypedDict):
    question: Annotated[str, "Question"]
    # transform_question: Annotated[list, "Transformed queries generated by LLM"]
    ensemble_context: Annotated[str, "Ensemble Retrieve"]
    multi_context: Annotated[str, "Multi Query"]
    merge_context: Annotated[str, "Merge Context"]
    # context: Annotated[str, "Context"]
    rerank_context : Annotated[str, "Context"]
    answer: Annotated[str, "Answer"]
    message: Annotated[list, add_messages]

In [17]:
# retrieve.py
# from state import GraphState
# from ingestion import retriever, ensemble_retriever, bm25_retriever

# def retrieve_document(state: GraphState) -> GraphState:
#     print("---RETRIEVE---")
#     questions = state["question"]
#     documents = retriever.invoke(questions)
#     print(documents)
#     return {"context": documents, "question": questions}

# Ensemble retriever 정의
def ensemble_document(state: GraphState) -> GraphState:
    print("---ENSEMBLE RETRIEVE---")
    questions = state["question"]
    documents = ensemble_retriever.invoke(questions)
    # print(documents)
    return {"ensemble_context": documents}

In [28]:
# multiquery.py
# from state import GraphState

from typing import List
from langchain_core.output_parsers import BaseOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.messages import AIMessage
from dotenv import load_dotenv
import json

load_dotenv()

# output 정의
class LineListOutputParser(BaseOutputParser[List[str]]):
    def parse(self, text: str) -> List[str]:
        if isinstance(text, AIMessage):
            text = text.content
        
        try:
            parsed_json = json.loads(text)
            return {"lines": parsed_json}
        except:
            lines = text.strip().split("\n")
            return {"lines":list(filter(None, lines))}
        # return list(filter(None, lines))  # Remove empty lines

output_parser = LineListOutputParser()


QUERY_PROMPT = PromptTemplate(
    input_variables=["question"],
    template="""You are an AI language model assistant. Your task is to generate five 
    different versions of the given user question to retrieve relevant documents from a vector 
    database. By generating multiple perspectives on the user question, your goal is to help
    the user overcome some of the limitations of the distance-based similarity search. 
    Provide these alternative questions in a JSON array format, separated by commas.
    Do not include any additional explanations.
    Original question: {question}
    Output format: ["question1", "question2", "question3", "question4", "question5"]""",
)

llm = ChatOpenAI(temperature=0, model="gpt-4o-mini")

llm_chain = QUERY_PROMPT | llm | output_parser

# # Node for generating transformed queries
# def generate_transformed_queries(state: GraphState) -> GraphState:
#     print("---QUERY GENERTATE---")
#     query = state["question"]
#     transformed_queries = llm_chain.invoke({'question':query})
#     # print(transformed_queries)
#     return {"transform_question": transformed_queries}

In [29]:
# multiqueryretrieve.py
# from ingestion import retriever
# from state import GraphState
# from node.multiquery import llm_chain

from langchain.retrievers.multi_query import MultiQueryRetriever

# Initialize the retriever
multiquery_retriever = MultiQueryRetriever(
    retriever=retriever, llm_chain=llm_chain
)

# Node for retrieving documents
def multiquery_retrieve(state: GraphState) -> GraphState:
    print("---QUERY RETRIEVE---")
    question = state["question"]
    # print(transformed_queries)
    unique_docs = multiquery_retriever.invoke(question)
    print(unique_docs)
    return {"multi_context": unique_docs}

In [30]:
# querymerge.py
# from state import GraphState


def merge_results(state: GraphState) -> GraphState:
    print("---MERGE---")

    multi_query_result = state['multi_context']
    ensemble_result = state['ensemble_context']
    # print(multi_query_result)
    # print(ensemble_result)

    # 중복 제거 (예: 문서 ID 기준)
    seen_ids = set()
    merged_result = []

    for item in multi_query_result + ensemble_result:
        if item.id not in seen_ids:
            merged_result.append(item)
            seen_ids.add(item.id)
    # print(state)

    return {'merge_context': merged_result}


In [31]:
# rerank.py
# from state import GraphState
# from ingestion import cohere_client


def rerank_with_cohere(query, retrieved_docs, top_n=5):
    # Cohere에 전달할 문서 형식
    documents = [doc.page_content for doc in retrieved_docs]
    
    # Reranker 호출
    response = cohere_client.rerank(
        query=query,
        documents=documents,
        top_n=top_n,  # 상위 N개 문서 선택
        model="rerank-v3.5"  # 사용할 Cohere Reranker 모델
    )
    
    # 상위 문서만 반환
    reranked_docs = [retrieved_docs[result.index] for result in response.results]
    return reranked_docs

# Reranker Node
def rerank_docs(state: GraphState) -> GraphState:
    print("---RERANK---")
    # print(state)
    questions = state['question']
    documents = state['merge_context']
    reranked_docs = rerank_with_cohere(questions, documents)
    return {"rerank_context": reranked_docs}

In [32]:
# generation.py
# 답변 생성 체인
from dotenv import load_dotenv
# from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate

load_dotenv()

llm = ChatOpenAI(temperature=0, model="gpt-4o-mini")
# prompt = hub.pull("rlm/rag-prompt")

ANSWER_PROMPT = PromptTemplate(
    input_variables=["question","context"],
    template="""
당신은 카메라 사용자 메뉴얼에 대한 정보를 제공하는 AI 어시스턴트입니다. 사용자가 질문을 하면, 제공된 Document 형식의 context를 활용하여 답변을 생성하세요. 각 Document에는 이미지 경로가 포함된 metadata가 있습니다. 답변을 생성할 때, 관련된 이미지가 있는 경우 [image: metadata 내 이미지 경로] 형식으로 답변에 포함시켜 주세요. 

예시:
사용자 질문: "카메라의 ISO 설정 방법을 알려주세요."
답변: "카메라의 ISO 설정은 메뉴에서 '설정'을 선택한 후 'ISO' 옵션을 선택하여 조정할 수 있습니다. [image: /path/to/iso_setting_image]"

이와 같은 형식으로 질문에 대한 답변을 생성하세요.

컨텍스트와 질문을 기반으로 답변을 생성하세요:
- 컨텍스트: {context}
- 질문: {question}
"""
)

generation_chain = ANSWER_PROMPT | llm | StrOutputParser()

In [33]:
# generate.py
# 답변 실행 역할

from typing import Any, Dict

# from generation import generation_chain
# from state import GraphState

def generate(state: GraphState) -> Dict[str, Any]:
    print("---GENERATE---")
    question = state["question"]
    context = state["rerank_context"]

    generation = generation_chain.invoke({"context": context, "question": question})

    message = [{"role": "user", "content": question},{"role":"assistant", "content":generation}]
    return {"question": question, "answer": generation, "message": message}

In [34]:
# consts.py
RETRIEVE = "retrieve"
GRADE_DOCUMENTS = "grade_documents"
GENERATE = "generate"
WEBSEARCH = "websearch"

# graph.py
from dotenv import load_dotenv
from langgraph.graph import START, END, StateGraph
from langgraph.types import Send
# from langchain.schema.runnable import RunnableParallel


# from consts import RETRIEVE, GRADE_DOCUMENTS, GENERATE, WEBSEARCH

# from nodes import generate, grade_documents, retrieve, web_search
# from state import GraphState

load_dotenv()

workflow = StateGraph(GraphState)


workflow.add_node("ensemble retrieve", ensemble_document)
# workflow.add_node("multi query generate", generate_transformed_queries)
workflow.add_node("multi query retrieve", multiquery_retrieve)
workflow.add_node("merge retrieve", merge_results)
workflow.add_node("rerank", rerank_docs)
workflow.add_node("generate", generate)

# workflow.set_entry_point("retrieve")
workflow.add_edge(START, 'ensemble retrieve')
# workflow.add_edge(START, 'multi query generate')
workflow.add_edge(START, "multi query retrieve")

# workflow.add_edge('multi query generate', 'multi query retrieve')

# 조건부 전환 함수 정의
def check_conditions(state: GraphState):
    # 두 결과가 모두 존재하는지 확인
    if "ensemble_context" in state and "multi_context" in state:
        return [Send("merge retrieve", state)]  # 조건 충족 시 다음 노드로 전달
    return []  # 조건 미충족 시 빈 리스트 반환 (대기 상태)

# 조건부 전환 추가
workflow.add_conditional_edges(
    "ensemble retrieve",
    check_conditions,
    ["merge retrieve"]
)

workflow.add_conditional_edges(
    "multi query retrieve",
    check_conditions,
    ["merge retrieve"]
)


workflow.add_edge("merge retrieve", "rerank")
workflow.add_edge("rerank", "generate")
workflow.add_edge("generate", END)

app = workflow.compile()

In [35]:
app.get_graph().draw_mermaid_png(output_file_path='./graph_node_0202_1.png')

b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x01\x92\x00\x00\x02\x13\x08\x02\x00\x00\x00\xa2\xe4\rM\x00\x00\x00\x01sRGB\x00\xae\xce\x1c\xe9\x00\x00 \x00IDATx\x9c\xed\xddw\\\x13\xe7\xe3\x07\xf0\'d@H\x18\t{\x8a"\x8a\x8a\xd6\x81{T\xdc\x02\x02*\xee=j\xadhk\xabmm\xb5\xad\x1d\xda\xbaZk\xb5\xd6Qk\xdd{\xe2\xc0m\xc5\xbd\xb0Z\xb5\x8a[\xf6\x9e\xd9\xc9\xef\x8f\xf8\xa3|Y\xa2^xr\x97\xcf\xfb\x0f_\xe1r\xb9|b\x8e\x0f\xcf].w<\x83\xc1@\x00\x00\xd8\xc3\x8av\x00\x00\x80W\x83\xda\x02\x00\x96Am\x01\x00\xcb\xa0\xb6\x00\x80eP[\x00\xc02\xa8-\x00`\x19\x01\xed\x00P\xa32\x93\x94E\xf9\xba\xe2\x02\x9dZ\xa9W)\xf4\xb4\xe3T\x8b\xc8\xc6J \xe0\xd9\xda\xf3m\xed\xf8\xee~b\xdaq\x80>\x1e\x8e\xdb\xb2\x04O\xef\x16?\xbcY\xf8\xf0V\x91w\x80XY\xa4\xb7\xb5\xe3;\xba\x88\xf4:v\xbc\xf5"\xb1UN\xba\xba8_g0\x18\x9e\xdc.\xae\x1d$\xa9\x1d$i\xd0\xca\x9ev.\xa0\x06\xb5\xc5q\xcf\xee\x15\x9f\xdb\x97\xe5\xec%r\xf5\xb5\xa9\x13$\x918\xb0{|\xad\xd7\x1b\x1e\xdd*zt\xab\xe8\xf1\xed\xa2\x96=\xe4our\xa4\x9d\x08(@mq\xd9\xb1Mi\x85\xb9\xdav\x11N\xae\xde6\

In [36]:
print(app.invoke(input={"question": "iso 설정 방법에 대해 알려줘"}))

---ENSEMBLE RETRIEVE---
---QUERY RETRIEVE---
[Document(id='chunk-m50-644', metadata={'brand': 'Canon', 'image_path': [], 'index': '사용자 정의 기능 (C.Fn)', 'main_index': '설정', 'model': 'M50', 'page': 636.0, 'pdf_path': '/Users/yoeun/Library/Mobile Documents/com~apple~CloudDocs/github/FINAL Project/parse&chunk/data/pdf/split_pdf_image/m50/m50_page_636.jpg', 'sub_index': 'N/A'}, page_content='```\n● 조작'), Document(id='chunk-r50-479', metadata={'brand': 'Canon', 'image_path': ['/Users/yoeun/Library/Mobile Documents/com~apple~CloudDocs/github/FINAL Project/parse&chunk/data/image/r50/r50_page_467_1.png'], 'index': '인덱스 디스플레이 (멀티 이미지 디스플레이)', 'main_index': '재생', 'model': 'R50', 'page': 467.0, 'pdf_path': '/Users/yoeun/Library/Mobile Documents/com~apple~CloudDocs/github/FINAL Project/parse&chunk/data/pdf/split_pdf_image/r50/r50_page_467.jpg', 'sub_index': 'N/A'}, page_content='![그림 자리(이미지 확대 단계를 보여주는 다이어그램)]'), Document(id='chunk-r6-590', metadata={'brand': 'Canon', 'image_path': ['/Users/yoeun/Lib

In [None]:
inputs = {"question": "iso 설정 방법에 대해 알려줘"}


# app.stream() 실행 후 "generate" 노드인 경우만 출력
for chunk_msg, metadata in app.stream(inputs, stream_mode="messages"):
        if metadata.get("langgraph_node") == "generate":
                print(chunk_msg.content, end="", flush=True)

---ENSEMBLE RETRIEVE------QUERY RETRIEVE---

["iso 설정하는 방법을 설명해줘", "iso 설정 절차에 대해 알고 싶어", "iso 설정을 어떻게 해야 하는지 알려줘", "iso 설정 방법에 대한 정보를 제공해줘", "iso 설정 관련 팁이나 가이드를 줄 수 있어?"][Document(id='chunk-m50-644', metadata={'brand': 'Canon', 'image_path': [], 'index': '사용자 정의 기능 (C.Fn)', 'main_index': '설정', 'model': 'M50', 'page': 636.0, 'pdf_path': '/Users/yoeun/Library/Mobile Documents/com~apple~CloudDocs/github/FINAL Project/parse&chunk/data/pdf/split_pdf_image/m50/m50_page_636.jpg', 'sub_index': 'N/A'}, page_content='```\n● 조작'), Document(id='chunk-r6-590', metadata={'brand': 'Canon', 'image_path': ['/Users/yoeun/Library/Mobile Documents/com~apple~CloudDocs/github/FINAL Project/parse&chunk/data/image/r6/r6_page_566_1.png'], 'index': '인덱스 디스플레이 (멀티 이미지 디스플레이)', 'main_index': '재생', 'model': 'R6', 'page': 566.0, 'pdf_path': '/Users/yoeun/Library/Mobile Documents/com~apple~CloudDocs/github/FINAL Project/parse&chunk/data/pdf/split_pdf_image/r6/r6_page_566.jpg', 'sub_index': 'N/A'}, page_content='![그

### 추후 참고

In [None]:
# 추후 사용
# retrieval_grader.py
# 문서가 실제로 질몬과 관련이 있는지 판단하는 내용

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field

llm = ChatOpenAI(temperature=0)


class GradeDocuments(BaseModel):
    """Binary score for relevance score on retrieved documents."""

    binary_score: str = Field(
        description="Documents are relevant to the question, 'yes' or 'no'"
    )


structured_llm_grader = llm.with_structured_output(GradeDocuments)

system = """You are a grade accessing relevance of a retrieved document to a user question. \n
If the document contains keywors(s) or semantic meaning related to the question, grade it as relevant. \n
Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""

grade_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Retrieved document: \n\n {document} \n\n User question: {question}"),
    ]
)

retrieval_grader = grade_prompt | structured_llm_grader


In [None]:
## grade_documents.py
# 모든 문서를평가하는 노드

from typing import Any, Dict

from graphs.chains.retrieval_grader import retrieval_grader
from graphs.state import GraphState


def grade_documents(state: GraphState) -> Dict[str, Any]:
    """
    Determines whether the retrieved documents are relevant to the question
    If any document is not relevant, we will set a flag to run web search

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Filtered out irrelevant documents and updated web_search state
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    filtered_docs = []
    web_search = False
    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "document": d.page_content}
        )
        grade = score.binary_score
        if grade.lower() == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")

    if len(filtered_docs) == 0:
        web_search = True

    return {"documents": filtered_docs, "question": question, "web_search": web_search}
