# 3. 멀티모달 RAG

PDF 분할 작업을 위해 `unstructured`를 사용합니다. `unstructured` 를 위해 다음과 도구의 설치가 필요합니다:

- `tesseract` : 광학 문자 인식(OCR)을 위해 사용
- `poppler` : PDF 렌더링 및 처리

[poppler 설치 방법](https://pdf2image.readthedocs.io/en/latest/installation.html)과 [tesseract 설치 방법](https://tesseract-ocr.github.io/tessdoc/Installation.html)을 참고하여 설치해주세요.

In [1]:
%%capture --no-stderr
!sudo apt install tesseract-ocr
!sudo apt install libtesseract-dev
!sudo apt-get install poppler-utils

In [None]:
%%capture --no-stderr
! pip install -U langchain openai chromadb langchain-experimental langchain_openai nltk "unstructured[all-docs]" pillow pydantic lxml pillow matplotlib chromadb tiktoken

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from dotenv import load_dotenv

# .env 파일에서 환경 변수 로드
load_dotenv("/content/drive/MyDrive/Colab Notebooks/.env")

In [None]:
# 파일 경로
fpath = 'multimodal_rag/'
fname = "sample.pdf"

In [None]:
from unstructured.partition.pdf import partition_pdf


# PDF에서 요소 추출
raw_pdf_elements = partition_pdf(
    filename=os.path.join(fpath, fname),
    extract_images_in_pdf=True,
    infer_table_structure=True,
    chunking_strategy="by_title",
    max_characters=4000,
    new_after_n_chars=3800,
    combine_text_under_n_chars=2000,
    extract_image_block_output_dir=fpath,
)

In [None]:
# 텍스트, 테이블 추출
tables = []
texts = []
for element in raw_pdf_elements:
    if "unstructured.documents.elements.Table" in str(type(element)):
        tables.append(str(element))  # 테이블 요소 추가
    elif "unstructured.documents.elements.CompositeElement" in str(type(element)):
        texts.append(str(element))  # 텍스트 요소 추가

In [None]:
tables[0]

In [None]:
texts[0]

In [None]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain.chat_models import ChatOpenAI

# 프롬프트 설정
prompt_text = """당신은 표와 텍스트를 요약하여 검색할 수 있도록 돕는 역할을 맡은 어시스턴트입니다.
이 요약은 임베딩되어 원본 텍스트나 표 요소를 검색하는 데 사용될 것입니다.
표 또는 텍스트에 대한 간결한 요약을 제공하여 검색에 최적화된 형태로 만들어 주세요. 표 또는 텍스트: {element} """
prompt = ChatPromptTemplate.from_template(prompt_text)

# 텍스트 요약 체인
model = ChatOpenAI(temperature=0, model="gpt-4")
summarize_chain = {"element": lambda x: x} | prompt | model | StrOutputParser()

# 제공된 텍스트에 대해 요약을 할 경우
text_summaries = summarize_chain.batch(texts, {"max_concurrency": 5})
# 요약을 원치 않을 경우
# text_summaries = texts

# 제공된 테이블에 적용
table_summaries = summarize_chain.batch(tables, {"max_concurrency": 5})

In [None]:
table_summaries[0]

In [None]:
text_summaries[0]

In [None]:
import base64


def encode_image(image_path) -> str:
    # 이미지 base64 인코딩
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode('utf-8')


# 이미지의 base64 인코딩을 저장하는 리스트
img_base64_list = []

# 이미지를 읽어 base64 인코딩 후 저장
for img_file in sorted(os.listdir(fpath)):
    if img_file.endswith('.jpg'):
        img_path = os.path.join(fpath, img_file)
        base64_image = encode_image(img_path)
        img_base64_list.append(base64_image)

In [None]:
len(img_base64_list)

In [None]:
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI


def image_summarize(img_base64: str) -> str:
    # 이미지 요약
    chat = ChatOpenAI(model="gpt-4o", max_tokens=1024)
    prompt = """
    당신은 이미지를 요약하여 검색을 위해 사용할 수 있도록 돕는 어시스턴트입니다.
    이 요약은 임베딩되어 원본 이미지를 검색하는 데 사용됩니다.
    이미지 검색에 최적화된 간결한 요약을 작성하세요.
    """
    msg = chat.invoke(
        [
            HumanMessage(
                content=[
                    {"type": "text", "text": prompt},
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/jpeg;base64,{img_base64}"
                        },
                    },
                ]
            )
        ]
    )
    return msg.content


# 이미지 요약을 저장하는 리스트
image_summaries = []

for img_base64 in img_base64_list:
    image_summary = image_summarize(img_base64)
    image_summaries.append(image_summary)


In [None]:
image_summaries[0]

In [None]:
from langchain.retrievers import MultiVectorRetriever
from langchain_core.stores import InMemoryStore
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma

# 분할한 텍스트들을 색인할 벡터 저장소
vectorstore = Chroma(collection_name="multi_modal_rag",
                     embedding_function=OpenAIEmbeddings())

# 원본문서 저장을 위한 저장소 선언
docstore = InMemoryStore()
id_key = "doc_id"

# 검색기
retriever = MultiVectorRetriever(
    vectorstore=vectorstore,
    docstore=docstore,
    id_key=id_key,
)

In [None]:
import uuid

# 원본 텍스트 데이터 저장
doc_ids = [str(uuid.uuid4()) for _ in texts]
retriever.docstore.mset(list(zip(doc_ids, texts)))

# 원본 테이블 데이터 저장
table_ids = [str(uuid.uuid4()) for _ in tables]
retriever.docstore.mset(list(zip(table_ids, tables)))

# 원본 이미지(base64) 데이터 저장
img_ids = [str(uuid.uuid4()) for _ in img_base64_list]
retriever.docstore.mset(list(zip(img_ids, img_base64_list)))

In [None]:
from langchain.schema.document import Document

# 텍스트 요약 벡터 저장
summary_texts = [
    Document(page_content=s, metadata={id_key: doc_ids[i]})
    for i, s in enumerate(text_summaries)
]
retriever.vectorstore.add_documents(summary_texts)

# 테이블 요약 벡터 저장
summary_tables = [
    Document(page_content=s, metadata={id_key: table_ids[i]})
    for i, s in enumerate(table_summaries)
]
retriever.vectorstore.add_documents(summary_tables)

# 이미지 요약 벡터 저장

summary_img = [
    Document(page_content=s, metadata={id_key: img_ids[i]})
    for i, s in enumerate(image_summaries)
]
retriever.vectorstore.add_documents(summary_img)

In [None]:
docs = retriever.invoke(
    "말라리아 군집 사례는 어떤가요? "
)

In [None]:
len(docs)

In [None]:
from base64 import b64decode

def split_image_text_types(docs):
    # 이미지와 텍스트 데이터를 분리
    b64 = []
    text = []
    for doc in docs:
        try:
            b64decode(doc)
            b64.append(doc)
        except Exception as e:
            text.append(doc)
    return {
        "images": b64,
        "texts": text
    }

docs_by_type = split_image_text_types(docs)

In [None]:
len(docs_by_type["images"])

In [None]:
len(docs_by_type["texts"])

In [None]:
docs_by_type["images"]

In [None]:
docs_by_type["texts"]

In [None]:

from IPython.display import display, HTML

def plt_img_base64(img_base64):
    # base64 이미지로 html 태그를 작성합니다
    image_html = f'<img src="data:image/jpeg;base64,{img_base64}" />'

    # html 태그를 기반으로 이미지를 표기합니다
    display(HTML(image_html))

plt_img_base64(docs_by_type["images"][0])

In [None]:
docs_by_type["texts"][0]

In [None]:
from operator import itemgetter
from langchain.schema.runnable import RunnablePassthrough, RunnableLambda

def prompt_func(dict):
    format_texts = "\n".join(dict["context"]["texts"])
    text = f"""
    다음 문맥에만 기반하여 질문에 답하세요. 문맥에는 텍스트, 표, 그리고 아래 이미지가 포함될 수 있습니다:
    질문: {dict["question"]}

    텍스트와 표:
    {format_texts}
    """

    prompt = [
        HumanMessage(
            content=[
                {"type": "text", "text": text},
                {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{dict['context']['images'][0]}"}},
            ]
        )
    ]

    return prompt


model = ChatOpenAI(temperature=0, model="gpt-4o", max_tokens=1024)

# RAG 파이프라인
chain = (
        {"context": retriever | RunnableLambda(split_image_text_types), "question": RunnablePassthrough()}
        | RunnableLambda(prompt_func)
        | model
        | StrOutputParser()
)

In [None]:
chain.invoke(
    "말라리아 군집 사례는 어떤가요?"
)