<a href="https://colab.research.google.com/github/martians-sheep/pl_task_recomended_csd/blob/main/GraphRAG_sample.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

GraphRAG クイックスタート

前提条件

ノートブックを実行するために、Python標準ライブラリに含まれないサードパーティパッケージをインストールしてください。

In [1]:
! pip install devtools python-magic requests tqdm

Collecting devtools
  Downloading devtools-0.12.2-py3-none-any.whl.metadata (4.8 kB)
Collecting python-magic
  Downloading python_magic-0.4.27-py2.py3-none-any.whl.metadata (5.8 kB)
Collecting asttokens<3.0.0,>=2.0.0 (from devtools)
  Downloading asttokens-2.4.1-py2.py3-none-any.whl.metadata (5.2 kB)
Collecting executing>=1.1.1 (from devtools)
  Downloading executing-2.0.1-py2.py3-none-any.whl.metadata (9.0 kB)
Downloading devtools-0.12.2-py3-none-any.whl (19 kB)
Downloading python_magic-0.4.27-py2.py3-none-any.whl (13 kB)
Downloading asttokens-2.4.1-py2.py3-none-any.whl (27 kB)
Downloading executing-2.0.1-py2.py3-none-any.whl (24 kB)
Installing collected packages: python-magic, executing, asttokens, devtools
Successfully installed asttokens-2.4.1 devtools-0.12.2 executing-2.0.1 python-magic-0.4.27


In [None]:
import getpass
import json
import time
from pathlib import Path

import magic
import requests
from devtools import pprint
from tqdm import tqdm

## (必須) ユーザー設定

APIサブスクリプションキー、APIベースエンドポイント、および後でノートブックで参照されるいくつかのファイルディレクトリ名を設定します。

APIサブスクリプションキー

APIM（API管理）は、複数の形式の認証とアクセス制御（例えば、管理対象ID）をサポートします。このノートブックのデモでは、サブスクリプションキーを使用します。このキーを見つけるには、Azureポータルにアクセスします。サブスクリプションキーは、<my_resource_group> –>  –>  –>  –>  プライマリキーの下にあります。複数のAPIユーザーに対しては、個別のサブスクリプションキーを生成することができます。

In [None]:
ocp_apim_subscription_key = getpass.getpass(
    "Enter the subscription key to the GraphRag APIM:"
)

"""
"Ocp-Apim-Subscription-Key":
    This is a custom HTTP header used by Azure API Management service (APIM) to
    authenticate API requests. The value for this key should be set to the subscription
    key provided by the Azure APIM instance in your GraphRAG resource group.
"""
headers = {"Ocp-Apim-Subscription-Key": ocp_apim_subscription_key}

## ディレクトリとAPIエンドポイントの設定

デモンストレーションの目的で、提供されたget-wiki-articles.pyスクリプトを使用して少数のWikipedia記事をダウンロードするか、自分のデータを提供してください（GraphRAGはUTF-8エンコードされたTXTファイルを必要とします）。

In [None]:
"""
These parameters must be defined by the notebook user:

- file_directory: a local directory of text files. The file structure should be flat,
                  with no nested directories. (i.e. file_directory/file1.txt, file_directory/file2.txt, etc.)
- storage_name:   a unique name to identify a blob storage container in Azure where files
                  from `file_directory` will be uploaded.
- index_name:     a unique name to identify a single graphrag knowledge graph index.
                  Note: Multiple indexes may be created from the same `storage_name` blob storage container.
- endpoint:       the base/endpoint URL for the GraphRAG API (this is the Gateway URL found in the APIM resource).
"""

file_directory = ""
storage_name = ""
index_name = ""
endpoint = ""

In [None]:
assert (
    file_directory != "" and storage_name != "" and index_name != "" and endpoint != ""
)

## ファイルのアップロード

GraphRAGでデータをインデックス化する方法をデモンストレーションするために、最初にいくつかのファイルをGraphRAGに取り込む必要があります。

In [None]:
def upload_files(
    file_directory: str,
    storage_name: str,
    batch_size: int = 100,
    overwrite: bool = True,
    max_retries: int = 5,
) -> requests.Response | list[Path]:
    """
    Upload files to a blob storage container.

    Args:
    file_directory - a local directory of .txt files to upload. All files must have utf-8 encoding.
    storage_name - a unique name for the Azure storage blob container.
    batch_size - the number of files to upload in a single batch.
    overwrite - whether or not to overwrite files if they already exist in the storage blob container.
    max_retries - the maximum number of times to retry uploading a batch of files if the API is busy.

    NOTE: Uploading files may sometimes fail if the blob container was recently deleted
    (i.e. a few seconds before. The solution "in practice" is to sleep a few seconds and try again.
    """
    url = endpoint + "/data"

    def upload_batch(
        files: list, storage_name: str, overwrite: bool, max_retries: int
    ) -> requests.Response:
        for _ in range(max_retries):
            response = requests.post(
                url=url,
                files=files,
                params={"storage_name": storage_name, "overwrite": overwrite},
                headers=headers,
            )
            # API may be busy, retry
            if response.status_code == 500:
                print("API busy. Sleeping and will try again.")
                time.sleep(10)
                continue
            return response
        return response

    batch_files = []
    accepted_file_types = ["text/plain"]
    filepaths = list(Path(file_directory).iterdir())
    for file in tqdm(filepaths):
        # validate that file is a file, has acceptable file type, has a .txt extension, and has utf-8 encoding
        if (
            not file.is_file()
            or file.suffix != ".txt"
            or magic.from_file(str(file), mime=True) not in accepted_file_types
        ):
            print(f"Skipping invalid file: {file}")
            continue
        # open and decode file as utf-8, ignore bad characters
        batch_files.append(
            ("files", open(file=file, mode="r", encoding="utf-8", errors="ignore"))
        )
        # upload batch of files
        if len(batch_files) == batch_size:
            response = upload_batch(batch_files, storage_name, overwrite, max_retries)
            # if response is not ok, return early
            if not response.ok:
                return response
            batch_files.clear()
    # upload remaining files
    if len(batch_files) > 0:
        response = upload_batch(batch_files, storage_name, overwrite, max_retries)
    return response

In [None]:
response = upload_files(
    file_directory=file_directory,
    storage_name=storage_name,
    batch_size=100,
    overwrite=True,
)
if not response.ok:
    print(response.text)
else:
    print(response)

## インデックスの構築

データファイルがアップロードされた後、検索インデックスを構築してナレッジグラフを作成することができます。

In [None]:
def build_index(
    storage_name: str,
    index_name: str,
) -> requests.Response:
    """Create a search index.
    This function kicks off a job that builds a knowledge graph index from files located in a blob storage container.
    """
    url = endpoint + "/index"
    request = {"storage_name": storage_name, "index_name": index_name}
    return requests.post(url, params=request, headers=headers)

In [None]:
response = build_index(storage_name=storage_name, index_name=index_name)
print(response)
if response.ok:
    print(response.text)
else:
    print(f"Failed to submit job.\nStatus: {response.text}")

## インデックス作成ジョブのステータス確認

インデックスが100％完了するまで、次のセクション（クエリの実行）に進むのを待ってください。ステータスを監視するために、次のセルを複数回再実行することができます。注意：GraphRAGのインデックス作成速度は、使用しているAzure OpenAIモデルのTPMクォータに直接関連しています。

In [None]:
def index_status(index_name: str) -> requests.Response:
    url = endpoint + f"/index/status/{index_name}"
    return requests.get(url, headers=headers)


response = index_status(index_name)
pprint(response.json())

## クエリ

インデックス作成ジョブが完了すると、ナレッジグラフはクエリの準備が整います。現在、2種類のクエリ（グローバルとローカル）がサポートされています。両方のクエリを試して、その応答の違いを体験してください。クエリの応答時間も、使用しているAzure OpenAIモデルのTPMクォータに関連しています。

In [None]:
# a helper function to parse out the result from a query response
def parse_query_response(
    response: requests.Response, return_context_data: bool = False
) -> requests.Response | dict[list[dict]]:
    """
    Prints response['result'] value and optionally
    returns associated context data.
    """
    if response.ok:
        print(json.loads(response.text)["result"])
        if return_context_data:
            return json.loads(response.text)["context_data"]
        return response
    else:
        print(response.reason)
        print(response.content)
        return response

## グローバルクエリ

グローバルクエリはリソースを多く消費しますが、データセット全体の理解を必要とする質問に対して良い応答を提供します。

In [None]:
%%time


def global_search(index_name: str | list[str], query: str) -> requests.Response:
    """Run a global query over the knowledge graph(s) associated with one or more indexes"""
    url = endpoint + "/query/global"
    request = {"index_name": index_name, "query": query}
    return requests.post(url, json=request, headers=headers)


global_response = global_search(
    index_name=index_name, query="Summarize the main topics of this data"
)
global_response_data = parse_query_response(global_response, return_context_data=True)
global_response_data

## ローカルクエリ

ローカル検索クエリは、ドキュメントに記載されている特定のエンティティに関する理解を必要とする、狭い焦点の質問に最適です（例：カモミールの治癒特性は何ですか？）。

In [None]:
%%time


def local_search(index_name: str | list[str], query: str) -> requests.Response:
    """Run a local query over the knowledge graph(s) associated with one or more indexes"""
    url = endpoint + "/query/local"
    request = {"index_name": index_name, "query": query}
    return requests.post(url, json=request, headers=headers)


# perform a local query
local_response = local_search(
    index_name=index_name, query="Who are the primary actors in these communities?"
)
local_response_data = parse_query_response(local_response, return_context_data=True)
local_response_data