# ハンズオン：Multimodal RAG on AWS を構築してみよう

## 0. 事前準備

In [None]:
!pip install -U pip
%pip install opensearch-py boto3 opencv-python PyMuPDF

In [None]:
import boto3
import json

bedrock_runtime_client = boto3.client('bedrock-runtime',region_name='us-east-1')
s3_client = boto3.client('s3')

## 1. Amazon Bedrock の API を叩いてみる

- 2024 年5月時点で、Amazon Bedrock では 30 以上の基盤モデルを API 経由で利用することができます。テキスト生成モデルや画像生成モデル、テキスト & ビジョンモデルなど様々な種類があります。
- 様々な基盤モデルを　InvokeModel API という1つの API を通して利用することができます。使用したいモデルの ModelId を指定し、プロンプトやその他パラメータを指定します。
- 利用可能なモデルと ModelID の一覧は[こちらのドキュメント](https://docs.aws.amazon.com/bedrock/latest/userguide/model-ids.html#model-ids-arns) をご覧ください。

#### テキスト生成モデル
今回はテキスト生成モデルである Anthropic Claude 3 Sonnet を使います。後述するように、Claude 3 は画像も同時に入力することができます。

In [None]:
# テキスト & ビジョンモデルである Anthropic Claude 3 Sonnet を使います。
def invoke_llm_model(input):
    user_message = {
        "role": "user",
        "content": [
            {
                "type": "text",
                "text": input,
            }
        ],
    }
    messages = [user_message]
    body = json.dumps(
        {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 4096,
            "messages": messages,
            "temperature": 0,
            "top_p": 1,
            "top_k": 250,
        }
    )
    
    modelId="anthropic.claude-3-sonnet-20240229-v1:0"
    accept = "application/json"
    contentType = "application/json"
    
    response = bedrock_runtime_client.invoke_model(
            body=body, modelId=modelId, accept=accept, contentType=contentType
    )
    response_body = json.loads(response.get("body").read().decode())
    return response_body.get("content")[0]["text"]

In [None]:
invoke_llm_model("大阪の有名な観光地をコテコテの関西弁で教えて")

#### 埋め込みモデル
Embedding Model として Cohere 社の Multilingual Embed モデルを使用します。
その他の基盤モデルには Amazon Titan Embedding などがあります。

In [None]:
def invoke_embedding_model(input):
    response = bedrock_runtime_client.invoke_model(
        body=json.dumps({
            'texts': input,
            'input_type': 'search_document'
        }),
        modelId = 'cohere.embed-multilingual-v3',
        accept="application/json",
        contentType="application/json",
    )
    response_body = json.loads(response.get("body").read())
    return response_body.get("embeddings")

In [None]:
invoke_embedding_model(["こんにちは"])

#### Image Captioning
Anthropic Claude 3 は テキスト & ビジョンモデルであり、画像とテキストを同時に入力することができます。それにより画像に描かれている内容の記述や、画像内に含まれるテキスト・数値の抽出などの用途に用いることができます。

In [None]:
import base64

# Bedrock の API に渡すために画像を Base64 文字列にエンコードする関数
def encode_image(img_file):
    with open(img_file, "rb") as image_file:
        img_str = base64.b64encode(image_file.read())
        base64_string = img_str.decode("latin1")
    return base64_string

# Claude 3 を用いて画像からキャプションを生成させる関数
def generate_image_captions(img_base64, prompt="あなたは画像を分析するタスクを担うアシスタントです。以下に与える画像データについて、画像中に存在する文字列や数値をできるだけ多く使いながら、詳細に分析してください。"): 

    max_tokens = 4096
    user_message = {
            "role": "user",
            "content": [
                {
                    "type": "image",
                    "source": {
                        "type": "base64",
                        "media_type": "image/png",
                        "data": img_base64
                    }
                },
                {
                    "type": "text",
                    "text": prompt
                }
            ]
        }
    
    
    messages = [user_message]
    model_id="anthropic.claude-3-sonnet-20240229-v1:0"
    body = json.dumps(
        {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": max_tokens,
            "messages": messages,
            "temperature": 0
        }
    )
    response = bedrock_runtime_client.invoke_model(body=body, modelId=model_id)
    response_body = json.loads(response.get('body').read())
    return response_body['content'][0]['text']

In [None]:
from PIL import Image

img_path = "claude3-family-comparison.png"
fig = Image.open(img_path)
fig.show()
print(generate_image_captions(encode_image(img_path)))

### 参考：LangChain を用いる場合

In [None]:
from langchain_community.chat_models import BedrockChat

def invoke_llm_model_langchain(prompt):
    llm = BedrockChat(
        model_id="anthropic.claude-3-sonnet-20240229-v1:0", 
        client=bedrock_runtime_client,
        model_kwargs={'temperature': 0.5, "top_p": 1, "top_k": 250}
    )
    answer = llm.invoke(prompt)
    return answer

In [None]:
invoke_llm_model_langchain("味噌汁の作り方を詳しく説明してください。")

In [None]:
from langchain_community.embeddings import BedrockEmbeddings

def invoke_embedding_model_langchain(prompt):
    embedding_model = BedrockEmbeddings(
        client=bedrock_runtime_client,
        model_id = 'cohere.embed-multilingual-v3'
    )
    answer = embedding_model.embed_query(prompt)
    return answer

In [None]:
invoke_embedding_model_langchain("こんにちは")

## 2. RAG 用データの準備
ここから Multimodal RAG の実装に向かっていきます。まずはナレッジベースとしてベクトル DB にIngest するドキュメントを準備します。

今回は PDF ファイルを対象としたマルチモーダル RAG を実装していきます。

`./documents`フォルダに PDF ファイルを格納してください。

In [None]:
import os
data_dir = './documents'
target_files = sorted([os.path.join(data_dir,file_name) for file_name in os.listdir(data_dir)])

## 3. Extract elements via PyMuPDF and Process PDF files
PyMuPDF (fitz) というライブラリは、PDF ファイルをテキスト、表、画像等の要素に分解することができるライブラリです。
`./documents`フォルダ下の PDF ファイルを PyMuPDF で解析していき、各要素をRAG で検索できるような形に変換していきます。

補足： そのほかのライブラリとして、`unstructured`([GitHub](https://github.com/Unstructured-IO/unstructured))を用いることで Excel ファイルや Powerpoint ファイルを扱うことができます。

In [None]:
image_output_dir = "./image_output/"
os.makedirs(image_output_dir, exist_ok=True) 

bucket = "<Input yout bucket name>"
common_prefix = "multimodal-rag-workshop"

In [None]:
import fitz
from pprint import pprint
import re, io
from PIL import Image

table_llm_prompt = """
<instruction>あなたは表を分析するタスクを担うアシスタントです。以下に与える表データについて、下記に記載されている出力項目に着目して読み取れることを出力してください。</instruction>
<content>
- 何がまとめられている表なのか
- 表に記載されているキーワード
- 表から読み取ることができる分析結果
</content>
<table> {table} </table>
"""

image_llm_prompt = """
あなたは画像を分析するタスクを担うアシスタントです。以下に与える画像データについて、画像中に存在する文字列や数値をできるだけ多く使いながら、詳細に分析してください。
"""

# 簡単のため1000文字ごとにチャンクを区切ることとします
chunk_interval =  1000

# 解析した結果を格納するリスト
extracted_elements_list = []

for target_file in target_files:
    
    filename = target_file.split('/')[-1]
    filename_base = filename.split('.')[0]
    src_doc = "multimodal-rag-workshop/documents/"+filename
    s3_client.upload_file(target_file, bucket, src_doc)
    
    doc = fitz.open(target_file)
    
    tables = []
    texts = []
    image_captions = []
    for page_index, page in enumerate(doc):
        
        text = page.get_text()
        text = re.sub(r'(?<! )\n(?! )', ' ', text)

        split_text = [text[x:x+chunk_interval] for x in range(0, len(text), chunk_interval)]
        texts.extend(split_text)

        
        tabs = page.find_tables()
        if tabs.tables:
            for table_index, table in enumerate(tabs):
                df = table.to_pandas()
                rows, columns = df.shape
                print(df)
                null_percentage = (df[(df=='')].count().sum() + df.isnull().sum().sum())/df.size
                if rows < 2 or columns < 2 or null_percentage >= 0.3:
                    continue
                tables.append({'raw':str(table.extract()), 'summary':invoke_llm_model(table_llm_prompt.format(table=table.extract()))})

                
        images = page.get_images()
        
        if images:
            for image_index, img in enumerate(images):
                xref = img[0]
                img_info = doc.extract_image(xref)
                if img_info['width'] < 256 or img_info['height'] < 256:
                    continue
                image_data = img_info["image"]
                image_ext = img_info["ext"]

                pil_img = Image.open(io.BytesIO(image_data))
                image_output_filename = filename_base + "_page" + f'{page_index:03d}' + "_" + f'{image_index:03d}' + '.png'
                image_output_path = image_output_dir + image_output_filename
                pil_img.save(image_output_path, format = "PNG")
                
                # バイナリストリームに変換
                buffered = io.BytesIO()
                pil_img.save(buffered, format="PNG")
                img_str = base64.b64encode(buffered.getvalue()).decode()

                try:
                    caption = generate_image_captions(img_str, prompt = image_llm_prompt)
                except:
                    continue

                # 抽出された画像は Amazon S3 にアップロード
                output_img_s3_path = "multimodal-rag-workshop/images/"+image_output_filename
                s3_client.upload_file(image_output_path, bucket, output_img_s3_path)
                
                image_captions.append({'s3_path': output_img_s3_path, 'caption': caption})
        
    extracted_elements_list.append({
        'source': src_doc,
        'tables': tables,
        'texts': texts,
        'images': image_captions
    })

## 4. ベクトル DB として OpenSearch Serverless を構築する
ベクトル DB の実装方式として、ChromaDB等を用いてアプリと同居する形で構築する方式もありますが、実用的には、アプリと切り出した外部DBを用いることが望ましいです。
今回は、　ベクトルデータベースとして Amazon OpenSearch Serverless を使用します。ユーザー側でのインフラ管理不要、処理増減に合わせて自動でスケールするマネージドサービスです。

ここでは、先ほど抽出した各要素を Embedding モデルを用いてベクトル化し、Amazon OpenSearch Serverless (aoss) に格納していきます。
ドキュメントの数が大量になった場合は Amazon Bedrock の [バッチ推論機能](https://docs.aws.amazon.com/bedrock/latest/userguide/batch-inference.html) を用いる方法もあります。ただ、今回はドキュメントの数がそこまで多くないため、利用せずシンプルな推論方式で実行します。

In [None]:
####
#### 準備：マネージメントコンソール経由で Amazon OpenSearch Serverless コレクションを作成してください。
#### 

In [None]:
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
session = boto3.Session()
host = '<input your aoss endpoint>' # https は抜く
region = 'ap-northeast-1'
service = 'aoss'
credentials = session.get_credentials()
auth = AWSV4SignerAuth(credentials, region, service)

os_client = OpenSearch(
    hosts = [{'host': host, 'port': 443}],
    http_auth = auth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection,
    pool_maxsize = 20
)

In [None]:
def prep_document(embedding,raw_element,processed_element,doc_type,src_doc, s3_bucket, image_s3_path):
    document = { 
        # "_id": str(hash(raw_element)),
        "processed_element_embedding": embedding,
        "processed_element": processed_element,
        "raw_element_type": doc_type,
        "raw_element": raw_element,
        "src_doc": src_doc,
        "s3_bucket": s3_bucket,
        "image_s3_path": image_s3_path
    }
    return document

In [None]:
index_name = "rag_index"

index_body = """
{
  "settings": {
    "index.knn": true
  },
  "mappings": {
    "properties": {
      "processed_element_embedding": {
        "type": "knn_vector",
        "dimension": 1024,
        "method": {
          "name": "hnsw",
          "engine": "faiss",
          "parameters": {}
        }
      },
      "src_doc": {
        "type": "text"
      },
      "raw_element": {
        "type": "text"
      },
      "raw_element_type": {
        "type": "text"
      },
      "processed_element": {
        "type": "text"
      },
      "s3_bucket": {
        "type": "text"
      },
      "image_s3_path": {
        "type": "text"
      }
    }
  }
}
"""

index_body = json.loads(index_body)

response = os_client.indices.create(index_name, body=index_body)

In [None]:
documents = []
for extracted_element in extracted_elements_list:
    texts = extracted_element['texts']
    tables = extracted_element['tables']
    image_captions = extracted_element['images']
    src_doc = extracted_element['source']

    for i,text in enumerate(texts):
        embedding = invoke_embedding_model([text])[0]
        document = prep_document(embedding,text,text,'text',src_doc, bucket, "")
        documents.append(document)
    
    for table in tables:
        table_raw = table['raw']
        table_summary = table['summary']
        embedding = invoke_embedding_model([table_summary])[0]
        document = prep_document(embedding,table_raw,table_summary,'table',src_doc, bucket, "")
        documents.append(document)
        
    for image_caption in image_captions:
        embedding = invoke_embedding_model([image_caption['caption']])[0]
        document = prep_document(embedding,image_caption['caption'],image_caption['caption'],'image',src_doc, bucket, image_caption['s3_path'])
        documents.append(document)

In [None]:
for doc in documents:
    response = os_client.index(
        index = index_name,
        body = doc,
    )

In [None]:
question = 'Claude 3 はいつ発表されましたか？'
embedding = invoke_embedding_model([question])[0]
k = 3 # number of neighbours, size and k are the same to return k results in total. If size is not specified, k results will be returned per shard.
query = {
    "size": k,
    "query": {
        "knn": {
            "processed_element_embedding": {
                "vector": embedding, 
                "k": k}
            },
    }
}

response = os_client.search(
    body = query,
    index = index_name
)

hits = response['hits']['hits']
prompt_template = """
    The following is a friendly conversation between a human and an AI. 
    The AI is talkative and provides lots of specific details from its context.
    If the AI does not know the answer to a question, it truthfully says it 
    does not know.
    {context}
    Instruction: Based on the above documents, provide a detailed answer for, {question} Answer "don't know" 
    if not present in the document. 出力は日本語でわかりやすく回答してください。
    Solution:"""
context = []
for hit in hits:
    context.append(hit['_source']['processed_element'])

llm_prompt = prompt_template.format(context='\n'.join(context),question=question)
output = invoke_llm_model(llm_prompt)

In [None]:
print(output)
print(context)