# Build with AI United 2024 Hands-on - Multimodal Retrieval Augmented Generation (RAG)

## Overview

Retrieval augmented generation (RAG)는 LLM이 외부 data에 접근할 수 있게 함으로써 hallucinations(환각)을 완화하기 위한 매커니즘이며, 이미 널리 사용되는 패러다임이 되었습니다.

이번 핸즈온에서는 텍스트와 이미지로 채워진 재무 문서에 대해 Q&A를 수행하는 multimodal RAG를 수행하는 방법에 대해 다룹니다.

*#vertex_ai #gemini #vertex_ai_gemini_api #text_embedding #multimodal_embedding #document_search_engine #RAG*

### Objectives

이 notebook은 멀티모달 검색 증강 생성(RAG)을 사용해 문서 검색 엔진을 구축하는 방법을 단계별로 안내합니다:

1. 텍스트와 이미지가 모두 포함된 문서의 메타데이터 추출 및 저장, 문서 임베딩 생성하기
2. 텍스트 쿼리로 메타데이터를 검색하여 유사한 텍스트나 이미지를 찾습니다.
3. 이미지 쿼리로 메타데이터를 검색하여 유사한 이미지 찾기
4. 텍스트 쿼리를 입력으로 사용하여 텍스트와 이미지를 모두 사용하여 문맥에 맞는 답을 검색하세요.

### Costs

이 튜토리얼은 Google Cloud의 Vertex AI를 사용하기에 비용이 청구됩니다.

Learn about [Vertex AI pricing](https://cloud.google.com/vertex-ai/pricing) and use the [Pricing Calculator](https://cloud.google.com/products/calculator/) to generate a cost estimate based on your projected usage.

Build with AI United 2024 핸즈온에 참여하시는 경우, credit이 제공되오니 반드시 등록 후 사용하시기 바랍니다.

### Reference

이 notebook은 [Lavi Nigam](https://github.com/lavinigam-gcp)의 [Multimodal Retrieval Augmented Generation (RAG) using Vertex AI Gemini API](https://github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/use-cases/retrieval-augmented-generation/intro_multimodal_rag.ipynb)를 참고하여 제작되었습니다.

## Getting Started

### Install Vertex AI SDK for Python and other dependencies

In [1]:
! pip3 install --upgrade --user google-cloud-aiplatform pymupdf



### Restart current runtime

To use the newly installed packages in this Jupyter runtime, you must restart the runtime. You can do this by running the cell below, which will restart the current kernel.

In [2]:
# Restart kernel after installs so that your environment can access the new packages
import IPython
import time

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

{'status': 'ok', 'restart': True}

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Please wait until it is finished before continuing to the next step. ⚠️</b>
</div>

In [1]:
# Define project information

import sys

PROJECT_ID = ""  # @param {type:"string"}
#LOCATION = "asia-northeast3"  # @param {type:"string"}
LOCATION = "us-central1"

# if not running on colab, try to get the PROJECT_ID automatically
if "google.colab" not in sys.modules:
    import subprocess

    PROJECT_ID = subprocess.check_output(
        ["gcloud", "config", "get-value", "project"], text=True
    ).strip()

print(f"Your project ID is: {PROJECT_ID}")

Your project ID is: bwai-handson-lab


In [2]:
import sys

# Initialize Vertex AI
import vertexai

vertexai.init(project=PROJECT_ID, location=LOCATION)

### Import libraries

In [3]:
from IPython.display import Markdown, display
from vertexai.generative_models import (
    Content,
    GenerationConfig,
    GenerationResponse,
    GenerativeModel,
    HarmCategory,
    HarmBlockThreshold,
    Image,
    Part,
)

### Load the Gemini 1.0 Pro and Gemini 1.0 Pro Vision model

In [4]:
text_model = GenerativeModel("gemini-1.0-pro")
multimodal_model = GenerativeModel("gemini-1.0-pro-vision")

### Get documents and images from GCS

In [5]:
# download documents and images used in this notebook
!gsutil -m rsync -r gs://bwai-handson-bucket .
print("Download completed")

Building synchronization state...
Starting synchronization...
Copying gs://bwai-handson-bucket/documents2/google-10k-sample-kr-part2.pdf...
Copying gs://bwai-handson-bucket/documents/google-10k-sample-part1.pdf...       
Copying gs://bwai-handson-bucket/documents/google-10k-sample-part2.pdf...       
Copying gs://bwai-handson-bucket/documents2/google-10k-sample-kr-part1.pdf...   
/ [4/4 files][  1.8 MiB/  1.8 MiB] 100% Done                                    
Operation completed over 4 objects/1.8 MiB.                                      
Download completed


## Building metadata of documents containing text and images

### The data

핸즈온에서 사용할 원본 데이터는 회사의 재무 성과, 경영, 관리 및 위험 요소에 대한 포괄적인 개요를 제공하는 Google-10K의 수정된 버전입니다. 원본 문서가 다소 방대하므로 대신 14페이지만 있는 수정된 버전을 사용하게 됩니다. 축소되었지만 샘플 문서에는 여전히 표, 차트, 그래프 등의 이미지와 함께 텍스트가 포함되어 있습니다.

### Extract and store metadata of text and images from a document

멀티모달 RAG 시스템을 구축하기 전에 문서에 있는 모든 텍스트와 이미지의 metadata를 확보하는 것이 중요합니다. 참조 및 인용을 위해 metadata에는 페이지 번호, 파일 이름, 이미지 개수 등의 필수 요소가 포함되어야 합니다. 따라서 다음 단계로 데이터를 쿼리할 때 유사성 검색(similarity search)을 수행하는 데 필요한 metadata에서 임베딩을 생성합니다.

In [6]:
import glob
import os
import time
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union

from IPython.display import display
import PIL
import fitz
import numpy as np
import pandas as pd
import requests
from vertexai.generative_models import (
    GenerationConfig,
    HarmBlockThreshold,
    HarmCategory,
    Image,
)
from vertexai.language_models import TextEmbeddingModel
from vertexai.vision_models import Image as vision_model_Image
from vertexai.vision_models import MultiModalEmbeddingModel

`get_gemini_response()` 함수는 주어진 입력에 대해 multimodal의 추론(inference)를 수행한 후 생성된 결과를 하나의 문자열로 결합하여 최종 결과를 return합니다.

이번 핸즈온에서는 multimodal 모델로 `gemini 1.0 pro vision`을 사용합니다.

In [7]:
def get_gemini_response(
    generative_multimodal_model,
    model_input: List[str],
    stream: bool = True,
    generation_config: Optional[GenerationConfig] = GenerationConfig(
        temperature=0.2, max_output_tokens=2048
    ),
    safety_settings: Optional[dict] = {
        HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
        HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
        HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
        HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
    },
) -> str:

    response = generative_multimodal_model.generate_content(
        model_input,
        generation_config=generation_config,
        stream=stream,
        safety_settings=safety_settings,
    )
    response_list = []

    for chunk in response:
        try:
            response_list.append(chunk.text)
        except Exception as e:
            print(
                "Exception occurred while calling gemini. Something is wrong. Lower the safety thresholds [safety_settings: BLOCK_NONE ] if not already done. -----",
                e,
            )
            response_list.append("Exception occurred")
            continue
    response = "".join(response_list)

    return response

텍스트와 이미지의 임베딩을 위해 아래의 임베딩 모델을 사용합니다.

- text embedding model: [textembedding-gecko@latest](https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/text-embeddings?hl=ko)
- multimodal embedding model: [multimodalembedding@001](https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/multimodal-embeddings?hl=ko)

아래 코드에서는 텍스트와 이미지 데이터를 고차원의 embedding vecter로 변환하는 함수를 포함합니다.

- `get_text_embedding_from_text_embedding_model()`: 텍스트 문자열을 입력받아 text_embedding_model을 이용하여 텍스트 임베딩 생성
- `get_image_embedding_from_multimodal_embedding_model()`: 이미지 경로 및 텍스트(선택)를 입력받아 multimodal_embedding_model을 이용하여 이미지 임베딩 생성 

In [8]:
text_embedding_model = TextEmbeddingModel.from_pretrained("textembedding-gecko@latest")
multimodal_embedding_model = MultiModalEmbeddingModel.from_pretrained(
    "multimodalembedding@001"
)

# Functions for getting text and image embeddings

def get_text_embedding_from_text_embedding_model(
    text: str,
    return_array: Optional[bool] = False,
) -> list:
    """
    Generates a numerical text embedding from a provided text input using a text embedding model.

    Args:
        text: The input text string to be embedded.
        return_array: If True, returns the embedding as a NumPy array.
                      If False, returns the embedding as a list. (Default: False)

    Returns:
        list or numpy.ndarray: A 768-dimensional vector representation of the input text.
                               The format (list or NumPy array) depends on the
                               value of the 'return_array' parameter.
    """
    embeddings = text_embedding_model.get_embeddings([text])
    text_embedding = [embedding.values for embedding in embeddings][0]

    if return_array:
        text_embedding = np.fromiter(text_embedding, dtype=float)

    # returns 768 dimensional array
    return text_embedding


def get_image_embedding_from_multimodal_embedding_model(
    image_uri: str,
    embedding_size: int = 512,
    text: Optional[str] = None,
    return_array: Optional[bool] = False,
) -> list:
    """Extracts an image embedding from a multimodal embedding model.
    The function can optionally utilize contextual text to refine the embedding.

    Args:
        image_uri (str): The URI (Uniform Resource Identifier) of the image to process.
        text (Optional[str]): Optional contextual text to guide the embedding generation. Defaults to "".
        embedding_size (int): The desired dimensionality of the output embedding. Defaults to 512.
        return_array (Optional[bool]): If True, returns the embedding as a NumPy array.
        Otherwise, returns a list. Defaults to False.

    Returns:
        list: A list containing the image embedding values. If `return_array` is True, returns a NumPy array instead.
    """
    # image = Image.load_from_file(image_uri)
    image = vision_model_Image.load_from_file(image_uri)
    embeddings = multimodal_embedding_model.get_embeddings(
        image=image, contextual_text=text, dimension=embedding_size
    )  # 128, 256, 512, 1408
    image_embedding = embeddings.image_embedding

    if return_array:
        image_embedding = np.fromiter(image_embedding, dtype=float)

    return image_embedding

아래 코드는 RAG를 위해 사용될 PDF에서 텍스트와 이미지를 추출합니다. 텍스트 문서는 지정된 크기의 chunk로 나누고, 각 chunk에 대한 임베딩을 생성합니다.

- `get_pdf_doc_object()`: RAG를 위한 document로 주어지는 pdf에서 pdf 내용과 page 수를 추출
- `get_text_overlapping_chunk()`: 텍스트 문서를 지정된 크기의 청크로 나누고 청크 사이를 겹쳐서 문맥을 보존
- `get_page_text_embedding()`: embedding model을 사용하여 각 텍스트 청크에 대한 임베딩 생성
- `get_chunk_text_metadata()`: 지정된 페이지 객체에서 텍스트를 추출하여 청크로 나누고 각 청크에 대한 임베딩을 생성
- `get_image_for_gemini()`: PDF 문서에서 이미지를 추출하여 JPEG 형식으로 변환하고 지정된 디렉터리에 저장한 다음 PIL Image Object로 로드

In [9]:
def get_pdf_doc_object(pdf_path: str) -> tuple[fitz.Document, int]:
    
    # Open the PDF file
    doc: fitz.Document = fitz.open(pdf_path)

    # Get the number of pages in the PDF file
    num_pages: int = len(doc)

    return doc, num_pages


def get_text_overlapping_chunk(
    text: str, character_limit: int = 1000, overlap: int = 100
) -> dict:

    if overlap > character_limit:
        raise ValueError("Overlap cannot be larger than character limit.")

    # Initialize variables
    chunk_number = 1
    chunked_text_dict = {}

    # Iterate over text with the given limit and overlap
    for i in range(0, len(text), character_limit - overlap):
        end_index = min(i + character_limit, len(text))
        chunk = text[i:end_index]

        # Encode and decode for consistent encoding
        chunked_text_dict[chunk_number] = chunk.encode("ascii", "ignore").decode("utf-8", "ignore")
        # chunked_text_dict[chunk_number] = chunk.encode("utf-8", "ignore").decode("utf-8", "ignore")  # for korean

        # Increment chunk number
        chunk_number += 1

    return chunked_text_dict


def get_page_text_embedding(text_data: Union[dict, str]) -> dict:

    embeddings_dict = {}

    if isinstance(text_data, dict):
        # Process each chunk
        # print(text_data)
        for chunk_number, chunk_value in text_data.items():
            text_embd = get_text_embedding_from_text_embedding_model(text=chunk_value)
            embeddings_dict[chunk_number] = text_embd
    else:
        # Process the first 1000 characters of the page text
        text_embd = get_text_embedding_from_text_embedding_model(text=text_data)
        embeddings_dict["text_embedding"] = text_embd

    return embeddings_dict


def get_chunk_text_metadata(
    page: fitz.Page,
    character_limit: int = 1000,
    overlap: int = 100,
    embedding_size: int = 128,
) -> tuple[str, dict, dict, dict]:

    if overlap > character_limit:
        raise ValueError("Overlap cannot be larger than character limit.")

    # Extract text from the page
    text: str = page.get_text().encode("ascii", "ignore").decode("utf-8", "ignore")
    # text: str = page.get_text().encode("utf-8", "ignore").decode("utf-8", "ignore")  # for korean

    # Get whole-page text embeddings
    page_text_embeddings_dict: dict = get_page_text_embedding(text)

    # Chunk the text with the given limit and overlap
    chunked_text_dict: dict = get_text_overlapping_chunk(text, character_limit, overlap)
    # print(chunked_text_dict)

    # Get embeddings for the chunks
    chunk_embeddings_dict: dict = get_page_text_embedding(chunked_text_dict)
    # print(chunk_embeddings_dict)

    # Return all extracted data
    return text, page_text_embeddings_dict, chunked_text_dict, chunk_embeddings_dict


def get_image_for_gemini(
    doc: fitz.Document,
    image: tuple,
    image_no: int,
    image_save_dir: str,
    file_name: str,
    page_num: int,
) -> Tuple[Image, str]:

    # Extract the image from the document
    xref = image[0]
    pix = fitz.Pixmap(doc, xref)

    # Convert the image to JPEG format
    pix.tobytes("jpeg")

    # Create the image file name
    image_name = f"{image_save_dir}/{file_name}_image_{page_num}_{image_no}_{xref}.jpeg"

    # Create the image save directory if it doesn't exist
    os.makedirs(image_save_dir, exist_ok=True)

    # Save the image to the specified location
    pix.save(image_name)

    # Load the saved image as a Gemini Image Object
    image_for_gemini = Image.load_from_file(image_name)

    return image_for_gemini, image_name

`get_document_metadata()` 함수는 문서에서 텍스트와 이미지 metadata를 추출하여 텍스트 metadata와 이미지 metadata라는 두 개의 DataFrame을 출력으로 반환합니다. 텍스트 metadata와 이미지 metadata를 모두 추출하여 저장하는 이유는 둘 중 하나만 사용하는 것만으로는 적절한 답변이 나오지 않기 때문입니다. 예를 들어, 관련 답변이 문서 내에 시각적 형태로 존재할 수 있지만 텍스트 기반 RAG는 시각적 이미지를 고려할 수 없습니다.

- text metadata: PDF의 각 page로부터 추출된 page text, chunked text dictionaries, chunk embedding dictionaries를 포함
- image metadata: PDF의 각 image로부터 추출된 image path, image description, image embeddings (with and without context), image description text embedding을 포함

In [10]:
def get_text_metadata_df(
    filename: str, text_metadata: Dict[Union[int, str], Dict]
) -> pd.DataFrame:

    final_data_text: List[Dict] = []

    for key, values in text_metadata.items():
        for chunk_number, chunk_text in values["chunked_text_dict"].items():
            data: Dict = {}
            data["file_name"] = filename
            data["page_num"] = int(key) + 1
            data["text"] = values["text"]
            data["text_embedding_page"] = values["page_text_embeddings"][
                "text_embedding"
            ]
            data["chunk_number"] = chunk_number
            data["chunk_text"] = chunk_text
            data["text_embedding_chunk"] = values["chunk_embeddings_dict"][chunk_number]

            final_data_text.append(data)

    return_df = pd.DataFrame(final_data_text)
    return_df = return_df.reset_index(drop=True)
    return return_df


def get_image_metadata_df(
    filename: str, image_metadata: Dict[Union[int, str], Dict]
) -> pd.DataFrame:

    final_data_image: List[Dict] = []
    for key, values in image_metadata.items():
        for _, image_values in values.items():
            data: Dict = {}
            data["file_name"] = filename
            data["page_num"] = int(key) + 1
            data["img_num"] = int(image_values["img_num"])
            data["img_path"] = image_values["img_path"]
            data["img_desc"] = image_values["img_desc"]
            # data["mm_embedding_from_text_desc_and_img"] = image_values[
            #     "mm_embedding_from_text_desc_and_img"
            # ]
            data["mm_embedding_from_img_only"] = image_values[
                "mm_embedding_from_img_only"
            ]
            data["text_embedding_from_image_description"] = image_values[
                "text_embedding_from_image_description"
            ]
            final_data_image.append(data)

    return_df = pd.DataFrame(final_data_image).dropna()
    return_df = return_df.reset_index(drop=True)
    return return_df


def get_document_metadata(
    generative_multimodal_model,
    pdf_folder_path: str,
    image_save_dir: str,
    image_description_prompt: str,
    embedding_size: int = 128,
    generation_config: Optional[GenerationConfig] = GenerationConfig(
        temperature=0.2, max_output_tokens=2048
    ),
    safety_settings: Optional[dict] = {
        HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
        HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
        HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
        HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
    },
    add_sleep_after_page: bool = False,
    sleep_time_after_page: int = 2,
) -> Tuple[pd.DataFrame, pd.DataFrame]:

    text_metadata_df_final, image_metadata_df_final = pd.DataFrame(), pd.DataFrame()

    for pdf_path in glob.glob(pdf_folder_path + "/*.pdf"):
        print(
            "\n\n",
            "Processing the file: ---------------------------------",
            pdf_path,
            "\n\n",
        )

        doc, num_pages = get_pdf_doc_object(pdf_path)

        file_name = pdf_path.split("/")[-1]

        text_metadata: Dict[Union[int, str], Dict] = {}
        image_metadata: Dict[Union[int, str], Dict] = {}

        for page_num in range(num_pages):
            print(f"Processing page: {page_num + 1}")

            page = doc[page_num]

            text = page.get_text()
            (
                text,
                page_text_embeddings_dict,
                chunked_text_dict,
                chunk_embeddings_dict,
            ) = get_chunk_text_metadata(page, embedding_size=embedding_size)

            text_metadata[page_num] = {
                "text": text,
                "page_text_embeddings": page_text_embeddings_dict,
                "chunked_text_dict": chunked_text_dict,
                "chunk_embeddings_dict": chunk_embeddings_dict,
            }

            images = page.get_images()
            image_metadata[page_num] = {}

            for image_no, image in enumerate(images):
                image_number = int(image_no + 1)
                image_metadata[page_num][image_number] = {}

                image_for_gemini, image_name = get_image_for_gemini(
                    doc, image, image_no, image_save_dir, file_name, page_num
                )

                print(
                    f"Extracting image from page: {page_num + 1}, saved as: {image_name}"
                )

                response = get_gemini_response(
                    generative_multimodal_model,
                    model_input=[image_description_prompt, image_for_gemini],
                    generation_config=generation_config,
                    safety_settings=safety_settings,
                    stream=True,
                )

                image_embedding = get_image_embedding_from_multimodal_embedding_model(
                    image_uri=image_name,
                    embedding_size=embedding_size,
                )

                image_description_text_embedding = (
                    get_text_embedding_from_text_embedding_model(text=response)
                )

                image_metadata[page_num][image_number] = {
                    "img_num": image_number,
                    "img_path": image_name,
                    "img_desc": response,
                    # "mm_embedding_from_text_desc_and_img": image_embedding_with_description,
                    "mm_embedding_from_img_only": image_embedding,
                    "text_embedding_from_image_description": image_description_text_embedding,
                }

            # Add sleep to reduce issues with Quota error on API
            if add_sleep_after_page:
                time.sleep(sleep_time_after_page)
                print(
                    "Sleeping for ",
                    sleep_time_after_page,
                    """ sec before processing the next page to avoid quota issues. You can disable it: "add_sleep_after_page = False"  """,
                )

        text_metadata_df = get_text_metadata_df(file_name, text_metadata)
        image_metadata_df = get_image_metadata_df(file_name, image_metadata)

        text_metadata_df_final = pd.concat(
            [text_metadata_df_final, text_metadata_df], axis=0
        )
        image_metadata_df_final = pd.concat(
            [
                image_metadata_df_final,
                image_metadata_df.drop_duplicates(subset=["img_desc"]),
            ],
            axis=0,
        )

        text_metadata_df_final = text_metadata_df_final.reset_index(drop=True)
        image_metadata_df_final = image_metadata_df_final.reset_index(drop=True)

    return text_metadata_df_final, image_metadata_df_final

다음 단계에서는 이 함수를 사용해 문서에서 텍스트와 이미지의 metadata를 추출하고 저장합니다. 다음 cell을 완료하는 데 몇 분 정도 걸릴 수 있습니다.

In [None]:
# Specify the PDF folder with multiple PDF

# pdf_folder_path = "/content/data/" # if running in Google Colab/Colab Enterprise
pdf_folder_path = "documents/"  # if running in Vertex AI Workbench.

# Specify the image description prompt. Change it
# image_description_prompt = """Explain what is going on in the image.
# If it's a table, extract all elements of the table.
# If it's a graph, explain the findings in the graph.
# Do not include any numbers that are not mentioned in the image.
# """
image_description_prompt = """Explain what is going on in the image.
If it's a table, extract all elements of the table.
If it's a graph, explain the findings in the graph.
Do not include any numbers that are not mentioned in the image.
"""

# Extract text and image metadata from the PDF document
text_metadata_df, image_metadata_df = get_document_metadata(
    multimodal_model,  # we are passing gemini 1.0 pro vision model
    pdf_folder_path,
    image_save_dir="images",
    image_description_prompt=image_description_prompt,
    embedding_size=1408,
    # add_sleep_after_page = True, # Uncomment this if you are running into API quota issues
    # sleep_time_after_page = 5,
    # generation_config = # see next cell
    # safety_settings =  # see next cell
)

print("\n\n --- Completed processing. ---")



 Processing the file: --------------------------------- documents/google-10k-sample-part1.pdf 


Processing page: 1
Processing page: 2
Extracting image from page: 2, saved as: images/google-10k-sample-part1.pdf_image_1_0_11.jpeg
Processing page: 3
Extracting image from page: 3, saved as: images/google-10k-sample-part1.pdf_image_2_0_15.jpeg
Processing page: 4
Extracting image from page: 4, saved as: images/google-10k-sample-part1.pdf_image_3_0_18.jpeg
Processing page: 5
Extracting image from page: 5, saved as: images/google-10k-sample-part1.pdf_image_4_0_21.jpeg
Processing page: 6
Processing page: 7


 Processing the file: --------------------------------- documents/google-10k-sample-part2.pdf 


Processing page: 1
Extracting image from page: 1, saved as: images/google-10k-sample-part2.pdf_image_0_0_6.jpeg
Extracting image from page: 1, saved as: images/google-10k-sample-part2.pdf_image_0_1_8.jpeg
Processing page: 2
Extracting image from page: 2, saved as: images/google-10k-sample-par

#### Inspect the processed text metadata

텍스트 metadata는 아래와 같은 항목을 포함하고 있습니다.

- text: 페이지의 원본 텍스트
- text_embedding_page: 페이지의 원본 텍스트의 임베딩 벡터
- chunk_text: 더 작은 청크로 나눈 원본 텍스트
- chunk_number: 각 텍스트 청크의 index
- text_embedding_chunk: 각 텍스트 청크의 임베딩 벡터

In [None]:
text_metadata_df.head()

#### Inspect the processed image metadata

이미지 metadata는 아래와 같은 항목을 포함하고 있습니다.

- img_desc: Gemini가 생성한 이미지에 대한 설명
- mm_embedding_from_img_only: 설명 기반 분석과 비교하기 위해, 설명을 제외한 이미지만의 임베딩 벡터
- text_embedding_from_image_description: 텍스트 분석 및 비교를 가능하도록 하기 위해, 생성된 설명의 텍스트 임베딩

In [None]:
image_metadata_df.head()

### Implement RAG

위에서 추출한 document의 text/image metadata를 이용해 RAG를 구현할 차례입니다.

- `get_similar_text_from_query()`: text query가 주어지면, cosine similarity 알고리즘을 사용하여 document에서 관련성이 높은 top N개의 텍스트를 검색. metadata의 text embedding을 사용하여 계산하며, 결과는 top score, page/chunk number, embedding size로 필터링 할 수 있음
- `print_text_to_text_citation()`: get_similar_text_from_query() 함수에서 검색된 텍스트의 출처(인용문)와 세부 정보를 print
- `get_similar_image_from_query()`: image가 주어지면, metadata의 image emgedding을 사용하여 document에서 관련성이 높은 top N개의 이미지를 검색
- `print_text_to_image_citation()`: get_similar_image_from_query() 함수에서 검색된 이미지의 출처(인용)와 세부 정보를 print
- `get_gemini_response()`: Gemini 모델을 이용하여 text와 image 조합을 기반으로 질문에 대한 답을 응답
- `display_images()`: paths 또는 PIL Image objects로 제공된 이미지들을 연속하여 print

In [None]:
def get_similar_text_from_query(
    query: str,
    text_metadata_df: pd.DataFrame,
    column_name: str = "",
    top_n: int = 3,
    chunk_text: bool = True,
    print_citation: bool = False,
) -> Dict[int, Dict[str, Any]]:

    if column_name not in text_metadata_df.columns:
        raise KeyError(f"Column '{column_name}' not found in the 'text_metadata_df'")

    query_vector = get_user_query_text_embeddings(query)

    # Calculate cosine similarity between query text and metadata text
    cosine_scores = text_metadata_df.apply(
        lambda row: get_cosine_score(
            row,
            column_name,
            query_vector,
        ),
        axis=1,
    )

    # Get top N cosine scores and their indices
    top_n_indices = cosine_scores.nlargest(top_n).index.tolist()
    top_n_scores = cosine_scores.nlargest(top_n).values.tolist()

    # Create a dictionary to store matched text and their information
    final_text: Dict[int, Dict[str, Any]] = {}

    for matched_textno, index in enumerate(top_n_indices):
        # Create a sub-dictionary for each matched text
        final_text[matched_textno] = {}

        # Store page number
        final_text[matched_textno]["file_name"] = text_metadata_df.iloc[index][
            "file_name"
        ]

        # Store page number
        final_text[matched_textno]["page_num"] = text_metadata_df.iloc[index][
            "page_num"
        ]

        # Store cosine score
        final_text[matched_textno]["cosine_score"] = top_n_scores[matched_textno]

        if chunk_text:
            # Store chunk number
            final_text[matched_textno]["chunk_number"] = text_metadata_df.iloc[index][
                "chunk_number"
            ]

            # Store chunk text
            final_text[matched_textno]["chunk_text"] = text_metadata_df["chunk_text"][
                index
            ]
        else:
            # Store page text
            final_text[matched_textno]["text"] = text_metadata_df["text"][index]

    # Optionally print citations immediately
    if print_citation:
        print_text_to_text_citation(final_text, chunk_text=chunk_text)

    return final_text

def print_text_to_text_citation(
    final_text: Dict[int, Dict[str, Any]],
    print_top: bool = True,
    chunk_text: bool = True,
) -> None:

    color = Color()

    # Iterate through the matched text citations
    for textno, text_dict in final_text.items():
        # Print the citation header
        print(color.RED + f"Citation {textno + 1}:", "Matched text: \n" + color.END)

        # Print the cosine similarity score
        print(color.BLUE + "score: " + color.END, text_dict["cosine_score"])

        # Print the file_name
        print(color.BLUE + "file_name: " + color.END, text_dict["file_name"])

        # Print the page number
        print(color.BLUE + "page_number: " + color.END, text_dict["page_num"])

        # Print the matched text based on the chunk_text argument
        if chunk_text:
            # Print chunk number and chunk text
            print(color.BLUE + "chunk_number: " + color.END, text_dict["chunk_number"])
            print(color.BLUE + "chunk_text: " + color.END, text_dict["chunk_text"])
        else:
            # Print page text
            print(color.BLUE + "page text: " + color.END, text_dict["page_text"])

        # Only print the first citation if print_top is True
        if print_top and textno == 0:
            break
            
def get_similar_image_from_query(
    text_metadata_df: pd.DataFrame,
    image_metadata_df: pd.DataFrame,
    query: str = "",
    image_query_path: str = "",
    column_name: str = "",
    image_emb: bool = True,
    top_n: int = 3,
    embedding_size: int = 128,
) -> Dict[int, Dict[str, Any]]:

    # Check if image embedding is used
    if image_emb:
        # Calculate cosine similarity between query image and metadata images
        user_query_image_embedding = get_user_query_image_embeddings(
            image_query_path, embedding_size
        )
        cosine_scores = image_metadata_df.apply(
            lambda x: get_cosine_score(x, column_name, user_query_image_embedding),
            axis=1,
        )
    else:
        # Calculate cosine similarity between query text and metadata image captions
        user_query_text_embedding = get_user_query_text_embeddings(query)
        cosine_scores = image_metadata_df.apply(
            lambda x: get_cosine_score(x, column_name, user_query_text_embedding),
            axis=1,
        )

    # Remove same image comparison score when user image is matched exactly with metadata image
    cosine_scores = cosine_scores[cosine_scores < 1.0]

    # Get top N cosine scores and their indices
    top_n_cosine_scores = cosine_scores.nlargest(top_n).index.tolist()
    top_n_cosine_values = cosine_scores.nlargest(top_n).values.tolist()

    # Create a dictionary to store matched images and their information
    final_images: Dict[int, Dict[str, Any]] = {}

    for matched_imageno, indexvalue in enumerate(top_n_cosine_scores):
        # Create a sub-dictionary for each matched image
        final_images[matched_imageno] = {}

        # Store cosine score
        final_images[matched_imageno]["cosine_score"] = top_n_cosine_values[
            matched_imageno
        ]

        # Load image from file
        final_images[matched_imageno]["image_object"] = Image.load_from_file(
            image_metadata_df.iloc[indexvalue]["img_path"]
        )

        # Add file name
        final_images[matched_imageno]["file_name"] = image_metadata_df.iloc[indexvalue][
            "file_name"
        ]

        # Store image path
        final_images[matched_imageno]["img_path"] = image_metadata_df.iloc[indexvalue][
            "img_path"
        ]

        # Store page number
        final_images[matched_imageno]["page_num"] = image_metadata_df.iloc[indexvalue][
            "page_num"
        ]

        final_images[matched_imageno]["page_text"] = np.unique(
            text_metadata_df[
                (
                    text_metadata_df["page_num"].isin(
                        [final_images[matched_imageno]["page_num"]]
                    )
                )
                & (
                    text_metadata_df["file_name"].isin(
                        [final_images[matched_imageno]["file_name"]]
                    )
                )
            ]["text"].values
        )

        # Store image description
        final_images[matched_imageno]["image_description"] = image_metadata_df.iloc[
            indexvalue
        ]["img_desc"]

    return final_images

def print_text_to_image_citation(
    final_images: Dict[int, Dict[str, Any]], print_top: bool = True
) -> None:

    color = Color()

    # Iterate through the matched image citations
    for imageno, image_dict in final_images.items():
        # Print the citation header
        print(
            color.RED + f"Citation {imageno + 1}:",
            "Matched image path, page number and page text: \n" + color.END,
        )

        # Print the cosine similarity score
        print(color.BLUE + "score: " + color.END, image_dict["cosine_score"])

        # Print the file_name
        print(color.BLUE + "file_name: " + color.END, image_dict["file_name"])

        # Print the image path
        print(color.BLUE + "path: " + color.END, image_dict["img_path"])

        # Print the page number
        print(color.BLUE + "page number: " + color.END, image_dict["page_num"])

        # Print the page text
        print(
            color.BLUE + "page text: " + color.END, "\n".join(image_dict["page_text"])
        )

        # Print the image description
        print(
            color.BLUE + "image description: " + color.END,
            image_dict["image_description"],
        )

        # Only print the first citation if print_top is True
        if print_top and imageno == 0:
            break


# Add colors to the print
class Color:
    """
    This class defines a set of color codes that can be used to print text in different colors.
    This will be used later to print citations and results to make outputs more readable.
    """

    PURPLE: str = "\033[95m"
    CYAN: str = "\033[96m"
    DARKCYAN: str = "\033[36m"
    BLUE: str = "\033[94m"
    GREEN: str = "\033[92m"
    YELLOW: str = "\033[93m"
    RED: str = "\033[91m"
    BOLD: str = "\033[1m"
    UNDERLINE: str = "\033[4m"
    END: str = "\033[0m"


# Extracts text embeddings for the user query using a text embedding model.
def get_user_query_text_embeddings(user_query: str) -> np.ndarray:
    return get_text_embedding_from_text_embedding_model(user_query)

# Extracts image embeddings for the user query image using a multimodal embedding model.
def get_user_query_image_embeddings(image_query_path: str, embedding_size: int) -> np.ndarray:
    return get_image_embedding_from_multimodal_embedding_model(image_uri=image_query_path, embedding_size=embedding_size)

# Calculates the cosine similarity between the user query embedding and the dataframe embedding for a specific column.
def get_cosine_score(dataframe: pd.DataFrame, column_name: str, input_text_embd: np.ndarray) -> float:
    text_cosine_score = round(np.dot(dataframe[column_name], input_text_embd), 2)
    return text_cosine_score

## Text Search

간단한 질문부터 시작해봅시다.

- 질문: Google의 Class A, Class B, Class C 주식의 주당 기본 순이익과 희석 순이익에 대한 세부 정보를 알려주세요.
- 예상답변: 다양한 주식 유형에 대한 Google의 주당 기본 순이익 및 희석 순이익의 가치에 대해 응답

In [None]:
query = "I need details for basic and diluted net income per share of Class A, Class B, and Class C share for google?"

### Search similar text with text query

In [None]:
# Matching user text query with "chunk_embedding" to find relevant chunks.
matching_results_text = get_similar_text_from_query(
    query,
    text_metadata_df,
    column_name="text_embedding_chunk",
    top_n=3,
    chunk_text=True,
)

# Print the matched text citations
print_text_to_text_citation(matching_results_text, print_top=False, chunk_text=True)

### Search similar images with text query

In [None]:
matching_results_image = get_similar_image_from_query(
    text_metadata_df,
    image_metadata_df,
    query=query,
    column_name="text_embedding_from_image_description",  # Use image description text embedding
    image_emb=False,  # Use text embedding instead of image embedding
    top_n=3,
    embedding_size=1408,
)

# Markdown(print_text_to_image_citation(matching_results_image, print_top=True))
print("\n **** Result: ***** \n")

# Display the top matching image
display(matching_results_image[0]["image_object"])

## Multimodal retrieval augmented generation (RAG)

- Step 1: user query 질의
- Step 2: text embedding을 이용하여 Document의 전체 Page에서 모든 text chunk를 검색
- Step 3: image embedding을 이용하여 image description과 일치하는 유사한 이미지를 모두 검색
- Step 4: Step 2,3에서 찾은 text 및 image를 context_text 및 context_images로 결합
- Step 5: Gemini 모델에 추론 쿼리를 전달할 때, Step 2,3에서 찾은 context_text와 context_images를 함께 전달. (모델이 기억해야 할 특정 instruction을 추가할 수 있음)
- Step 6: Gemini의 답변 확인. 답변과 함께 query를 처리하는데 사용된 관련 text 및 image를 확인할 수 있음.

### Step 1: User query

In [None]:
query = "What are the total revenues for APAC and USA for 2021?"
# query = "What is deferred income taxes?"
# query = "How do you compute net income per share?"
# query = "What drove percentage change in the consolidated revenue and cost of revenue for the year 2021 and was there any effect of Covid?"
# query = "What is the cause of 41% increase in revenue from 2020 to 2021 and how much is dollar change?"

### Step 2: Get all relevant text chunks

In [None]:
# Retrieve relevant chunks of text based on the query
matching_results_chunks_data = get_similar_text_from_query(
    query,
    text_metadata_df,
    column_name="text_embedding_chunk",
    top_n=10,
    chunk_text=True,
)

# print(matching_results_chunks_data)

### Step 3: Get all relevant images

In [None]:
# Get all relevant images based on user query
matching_results_image_fromdescription_data = get_similar_image_from_query(
    text_metadata_df,
    image_metadata_df,
    query=query,
    column_name="text_embedding_from_image_description",
    image_emb=False,
    top_n=10,
    embedding_size=1408,
)

# display(matching_results_image_fromdescription_data[0]["image_object"])

### Step 4: Create context_text and context_images

In [None]:
# combine all the selected relevant text chunks
context_text = []
for key, value in matching_results_chunks_data.items():
    context_text.append(value["chunk_text"])
final_context_text = "\n".join(context_text)

# combine all the relevant images and their description generated by Gemini
context_images = []
for key, value in matching_results_image_fromdescription_data.items():
    context_images.extend(
        ["Image: ", value["image_object"], "Caption: ", value["image_description"]]
    )

# print(final_context_text)
# print(context_images)

### Step 5: Pass context to Gemini

In [None]:
prompt = f""" Instructions: Compare the images and the text provided as Context: to answer Question:
Make sure to think thoroughly before answering the question and put the necessary steps to arrive at the answer in bullet points for easy explainability.
If unsure, respond, "Not enough context to answer".

Context:
 - Text Context:
 {final_context_text}
 - Image Context:
 {context_images}

Question: {query}

Answer:
"""

print(query)

# Generate Gemini response with streaming output
Markdown(
    get_gemini_response(
        multimodal_model,
        model_input=[prompt],
        stream=True,
        generation_config=GenerationConfig(temperature=0.4, max_output_tokens=2048),
    )
)

### Step 6: Print citations and references

In [None]:
# Image citations. You can check how Gemini generated metadata helped in grounding the answer.
print_text_to_image_citation(
    matching_results_image_fromdescription_data, print_top=False
)

In [None]:
# Text citations
print_text_to_text_citation(
    matching_results_chunks_data,
    print_top=False,
    chunk_text=True,
)

## Conclusions

멀티모달 RAG는 매우 강력할 수 있지만 몇 가지 한계에 직면할 수 있다는 점에 유의해야 합니다.

- Data dependency: 고품질의 텍스트와 시각 자료가 필요합니다.
- Computationally demanding: 멀티모달 데이터 처리는 리소스 집약적입니다.
- Domain specific: 일반 데이터로 학습된 모델은 의료와 같은 전문 분야에서는 빛을 발하지 못할 수 있습니다.
- Black box: 이러한 모델의 작동 방식을 이해하는 것은 까다로워 신뢰와 채택을 방해할 수 있습니다.

비록 이러한 어려움과 도전 과제들이 있지만, multimodal RAG는 다양한 데이터를 처리할 수 있는 검색 시스템을 향하기 위한 중요한 단계를 의미합니다.

## Reference

- https://github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/use-cases/retrieval-augmented-generation/intro_multimodal_rag.ipynb