# 【中間課題: 解答用】本格的なRAG/AIエージェントのシステム開発にトライしよう

## 事前準備

### 【1. OpenAI APIキーの設定】


In [None]:
import os
from google.colab import userdata

os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_API_KEY")


### 【2. 必要なライブラリのインストール】

コース共通


In [None]:
!pip install langchain==0.3.26 openai==1.91.0 langchain-community==0.3.26 httpx==0.28.1


中間課題用


In [None]:
!pip install python-docx==1.2.0 docx2txt==0.9 chromadb==1.0.13 tiktoken==0.9.0


### 【3. Googleドライブとの接続（マウント）】


In [None]:
from google.colab import drive
drive.mount('/content/drive')


### 【4. 外部参照先である「オンラインMTG議事録」フォルダのパス設定】


In [None]:
dir_path = "/content/drive/MyDrive/オンラインMTG議事録"


## 【中間課題①】オンラインMTG議事録を外部参照するRAGシステムの開発

### リセット処理（動作検証用）

「データベース化済み」フォルダと「.db」フォルダ内のファイルを全て削除する処理


In [None]:
import shutil
import os

def clear_complete_dir(dir_path):
    print(dir_path)
    if os.path.isdir(dir_path):
        if dir_path.split("/")[-1] == ".db":
            shutil.rmtree(dir_path)
            os.mkdir(dir_path)
            print(f"作成済みの全てのベクターストアを削除しました。")
            return
    
    if os.path.isdir(dir_path):
        files = os.listdir(dir_path)
        if dir_path.split("/")[-1] == "データベース化済み":
            for file in files:
                os.remove(os.path.join(dir_path, file))
            print(f"対象テーマの「データベース化済み」フォルダを空にしました: {dir_path}")
            return
        
        for file in files:
            clear_complete_dir(os.path.join(dir_path, file))

# リセット処理の実行
clear_complete_dir(dir_path)


### ファイル読み込み処理

再帰処理を使って「データベース化前」フォルダ内のファイルを読み込み、「データベース化済み」フォルダにコピーする


In [None]:
from docx import Document
from langchain_community.document_loaders import Docx2txtLoader

def file_load(path, theme_docs):
    # パスを「/」で分割してリスト作成
    splitted_path = path.split("/")
    
    # パスからテーマ名を取り出し
    dir_path = "/content/drive/MyDrive/オンラインMTG議事録"
    theme_name = ""
    for path_elem in splitted_path:
        if path_elem in os.listdir(dir_path):
            theme_name = path_elem
            break
    
    # 「データベース化前」フォルダ内のファイルのパスであるため、「2つ階層を上げたところまでのパス」と「データベース化済み/ファイル名」のパスを連結
    save_filename = os.path.join("/".join(path.split("/")[:-2]), f"データベース化済み/{os.path.basename(path)}")
    
    # 「データベース化前」フォルダ内にはあるが「データベース化済み」フォルダ内にはない議事録ファイルの場合
    if not os.path.isfile(save_filename):
        # 拡張子「.docx」のファイルを読み込み
        loader = Docx2txtLoader(path)
        doc = loader.load()
        
        # 議事録ファイルを「データベース化済み」フォルダ内に保存
        new_doc = Document()
        new_doc.add_paragraph(doc[0].page_content)
        new_doc.save(save_filename)
        
        # テーマ名をキー、取得したドキュメントを値として辞書に追加
        if theme_name in theme_docs:
            theme_docs[theme_name] += doc
        else:
            theme_docs[theme_name] = doc

def recursive_file_check(path, theme_docs):
    # 受け取ったパスがフォルダかどうかで条件分岐
    if os.path.isdir(path):
        # 「データベース化済み」もしくは「.db」フォルダの場合はファイル読み込みの処理が不要であるため、後続の処理をストップ
        if path.split("/")[-1] == "データベース化済み" or path.split("/")[-1] == ".db":
            return
        
        # フォルダ内のファイル/フォルダ名の一覧を取得
        files = os.listdir(path)
        
        # 一覧をループ処理
        for file in files:
            # フォルダ内のファイルもしくはフォルダのパスを渡し、再度関数を呼び出す（関数の中で同じ関数を呼び出す「再帰処理」）
            recursive_file_check(os.path.join(path, file), theme_docs)
    else:
        # 受け取ったパスがファイルの場合のみ、「読み込み」と「コピー」の処理を実行
        file_load(path, theme_docs)

# 辞書「theme_docs」に、各テーマ名をキーとしてドキュメント一覧を格納する
theme_docs = {}

# 「オンラインMTG議事録」フォルダ内のファイル読み込みと「データベース化済み」フォルダへのコピーの処理を実行
recursive_file_check(dir_path, theme_docs)

# 辞書の中身を確認
theme_docs


### ベクターストア作成の事前準備


In [None]:
from langchain.text_splitter import CharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings

# CharacterTextSplitterクラスのインスタンスを作成
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=50)

# OpenAIEmbeddingsクラスのインスタンスを作成
embeddings = OpenAIEmbeddings()

# 7つのテーマ名一覧（「営業」など）が格納されたリストを作成
theme_list = []
for filename in os.listdir(dir_path):
    if not filename.startswith('.'):
        theme_list.append(filename)

# リストの中身を確認
theme_list


### ベクターストアの作成とRetrieverの準備


In [None]:
from langchain.vectorstores import Chroma

# テーマ名をキー、Retrieverを値として追加する用の空の辞書を用意
theme_retriever = {}

# テーマ名（「営業」など）の一覧が格納されたリストに対してfor文を回す
for theme_name in theme_list:
    # 新たにベクターストアに追加するデータが該当テーマにおいて存在する場合のみ、ベクターストアの作成やデータ追加の処理を実行
    if theme_name in theme_docs:
        # 「オンラインMTG議事録」フォルダから読み込んだデータの中で、テーマ名に該当するデータを取り出し、チャンク分割を行う
        splitted_docs = text_splitter.split_documents(theme_docs[theme_name])
        
        # テーマに対応するベクターストアが「.db」フォルダ内に存在するかどうかで条件分岐
        if os.path.isdir(f"{dir_path}/.db/.{theme_name}_chromadb"):
            # 該当テーマのベクターストアを読み込む
            db = Chroma(persist_directory=f"{dir_path}/.db/.{theme_name}_chromadb", embedding_function=embeddings)
            # 該当テーマのベクターストアにチャンク分割したドキュメントを追加する
            db.add_documents(documents=splitted_docs)
            
            # 全テーマ横断のベクターストアを読み込む
            all_db = Chroma(persist_directory=f"{dir_path}/.db/.all_chromadb", embedding_function=embeddings)
            # 全テーマ横断のベクターストアにチャンク分割したドキュメントを追加する
            all_db.add_documents(documents=splitted_docs)
        else:
            # チャンク分割したドキュメントを使い、該当テーマのベクターストアを作成
            db = Chroma.from_documents(splitted_docs, embeddings, persist_directory=f"{dir_path}/.db/.{theme_name}_chromadb")
            
            # 全テーマ横断のベクターストアが「.db」フォルダ内に存在するかどうかで条件分岐
            if os.path.isdir(f"{dir_path}/.db/.all_chromadb"):
                # 全テーマ横断のベクターストアを読み込む
                all_db = Chroma(persist_directory=f"{dir_path}/.db/.all_chromadb", embedding_function=embeddings)
                # 全テーマ横断のベクターストアにチャンク分割したドキュメントを追加する
                all_db.add_documents(documents=splitted_docs)
            else:
                # チャンク分割したドキュメントを使い、全テーマ横断のベクターストアを作成
                all_db = Chroma.from_documents(splitted_docs, embeddings, persist_directory=f"{dir_path}/.db/.all_chromadb")
        
        # ベクターストアからRetrieverを作成し、テーマ名をキー、Retrieverを値として辞書に追加
        theme_retriever[theme_name] = db.as_retriever()

# テーマ名をキー、Retrieverを値として追加した辞書の中身を確認
theme_retriever


### RAGシステムの構築（ヒント④の解答）

LLMに回答を生成させるまでのロジックを実装します


In [None]:
from langchain.chat_models import ChatOpenAI
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

# 全テーマ横断のベクターストア「.all_chromadb」を読み込む
all_db = Chroma(persist_directory=f"{dir_path}/.db/.all_chromadb", embedding_function=embeddings)

# Retrieverを作成する
retriever = all_db.as_retriever()

# ChatOpenAIクラスのインスタンスを、ストリーミング出力のオプション付きで作成する
llm = ChatOpenAI(
    model="gpt-3.5-turbo",
    temperature=0,
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()]
)

# メモリの初期化（会話履歴を格納するため）
memory = ConversationBufferMemory(
    memory_key="chat_history",
    return_messages=True,
    output_key="answer"
)

# 会話履歴の記憶機能が搭載されたRAGのChainを作成する
qa_chain = ConversationalRetrievalChain.from_llm(
    llm=llm,
    retriever=retriever,
    memory=memory,
    return_source_documents=True
)

print("RAGシステムの構築が完了しました!")


### 最初の質問を実行


In [None]:
# ユーザー入力値を変数に格納する
user_input = "営業チームの最新の課題について教えてください"

# ユーザー入力値と会話履歴をもとに回答を生成する
result = qa_chain({"question": user_input})

# 回答の詳細を表示
print(f"\n\n質問: {user_input}")
print(f"\n回答: {result['answer']}")
print(f"\n参照したドキュメント数: {len(result['source_documents'])}")


### フォローアップ質問（会話履歴の記憶機能の確認）

会話履歴が記憶されていることを確認するため、前の質問に関連する質問をします


In [None]:
# 再度ユーザー入力値をLLMに与え、会話履歴の記憶機能が搭載されていることを確認する
user_input_2 = "その課題に対してどのような対策が提案されていますか?"

# 会話履歴をもとに回答を生成（前の質問の文脈を理解しているはず）
result_2 = qa_chain({"question": user_input_2})

# 回答の詳細を表示
print(f"\n\n質問: {user_input_2}")
print(f"\n回答: {result_2['answer']}")
print(f"\n参照したドキュメント数: {len(result_2['source_documents'])}")


### インタラクティブな対話機能（オプション）

継続的に質問できるループを実装します


In [None]:
# インタラクティブな対話ループ（オプション）
print("\n" + "="*80)
print("RAGシステムと対話を開始します。")
print("終了するには 'quit'、'exit'、または '終了' と入力してください。")
print("="*80 + "\n")

while True:
    user_question = input("\n質問を入力してください: ")
    
    # 終了条件
    if user_question.lower() in ['quit', 'exit', '終了']:
        print("対話を終了します。")
        break
    
    # 空の入力をスキップ
    if user_question.strip() == "":
        print("質問を入力してください。")
        continue
    
    # 質問を実行
    try:
        result = qa_chain({"question": user_question})
        print(f"\n回答: {result['answer']}")
        print(f"参照したドキュメント数: {len(result['source_documents'])}")
        print("\n" + "-"*80)
    except Exception as e:
        print(f"エラーが発生しました: {e}")


### 参照元ドキュメントの確認（オプション）

どのドキュメントから情報を取得したかを確認できます


In [None]:
# 最後の質問で参照されたドキュメントの内容を確認
print("最後の質問で参照されたドキュメント:\n")
for i, doc in enumerate(result_2['source_documents'], 1):
    print(f"--- ドキュメント {i} ---")
    print(f"内容: {doc.page_content[:200]}...")  # 最初の200文字のみ表示
    print(f"メタデータ: {doc.metadata}")
    print()
