# Data Ingest in Python (Azure AI Search)
提供している一連のサンプルノートブックのコンテンツを実行させるために必要な Azure AI Search の検索インデックスを作成します。このノートブックでは以下のような処理を行います。

- Azure AI Search のインデックスを作成
- Azure Blob Storage に PDF をアップロード
- Azure AI Document Intellingence で PDF の中身を抽出して構造化
- 抽出したテキストをチャンクに分割
- 分割されたチャンクを Embeddings に変換
- Azure AI Search のインデックスとして登録



# 事前準備
この Python サンプルを実行するには、以下が必要です：
- [Azure AI Search リソース](https://learn.microsoft.com/azure/search/search-create-service-portal)。エンドポイントと管理者 API キーが必要です。本ノートブックは `azure-search-documents==11.4.0` 専用です。
- [Azure AI Document Intelligence リソース](https://learn.microsoft.com/azure/ai-services/document-intelligence/create-document-intelligence-resource)。エンドポイントとキーが必要です。
- [Azure Blob Storage リソース](https://learn.microsoft.com/azure/storage/common/storage-account-create?tabs=azure-portal)。[接続文字列](https://learn.microsoft.com/azure/storage/common/storage-account-get-info?tabs=portal#get-a-connection-string-for-the-storage-account)が必要です。
- [Azure OpenAI Service](https://learn.microsoft.com/azure/ai-services/openai/how-to/create-resource?pivots=web-portal) にアクセスできる承認済み Azure サブスクリプション
- Azure OpenAI Service への `text-embedding-ada-002` [Embeddings モデル](https://learn.microsoft.com/azure/ai-services/openai/tutorials/embeddings?tabs=python%2Ccommand-line&pivots=programming-language-python)のデプロイメント。このデモでは、API バージョン `2023-05-15` を使用しています。
- Azure OpenAI Service の接続とモデル情報
  - OpenAI API キー
  - OpenAI Embeddings モデルのデプロイメント名
  - OpenAI API バージョン
- [Python](https://www.python.org/downloads/release/python-31011/) (この手順はバージョン 3.10.x でテストされています)

これらのデモには、[Visual Studio Code](https://code.visualstudio.com/download) と [Jupyter extension](https://marketplace.visualstudio.com/items?itemName=ms-toolsai.jupyter) を使用できます。本ノートブックは https://github.com/Azure-Samples/azure-search-openai-demo をベースにしています。

## パッケージのインストール

In [None]:
!pip install azure-search-documents==11.4.0
!pip install azure-identity==1.15.0
!pip install azure-ai-formrecognizer==3.3.2
!pip install azure-storage-blob==12.14.1
!pip install openai[datalib]==1.3.9
!pip install pypdf==3.17.0
!pip install jsonpickle

In [None]:
import azure.search.documents
print("azure.search.documents", azure.search.documents.__version__)
import azure.ai.formrecognizer
print("azure.ai.formrecognizer", azure.ai.formrecognizer.__VERSION__)
import azure.storage.blob
print("azure.storage.blob", azure.storage.blob.__version__)
import openai
print("openai", openai.__version__)

## 必要なライブラリと環境変数のインポート

In [None]:
import os
import io
import time
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient  
from azure.search.documents.indexes.models import (  
    ExhaustiveKnnAlgorithmConfiguration,
    ExhaustiveKnnParameters,
    SearchIndex,
    SearchField,
    SearchFieldDataType,
    SimpleField,
    SearchableField,
    SemanticConfiguration,
    SemanticPrioritizedFields,
    SemanticField,
    SemanticSearch,
    VectorSearch,
    HnswAlgorithmConfiguration,
    HnswParameters,
    VectorSearchAlgorithmConfiguration,
    VectorSearchAlgorithmKind,
    VectorSearchAlgorithmMetric,
    VectorSearchProfile,
)  

## 接続設定

In [None]:
# Azure Blob Storage
azure_storage_container: str = "content"
azure_blob_connection_string: str = "<Your blob connection string>"
# Azure AI Search
search_service_endpoint: str = "<Your search service endpoint>"
search_service_admin_key: str = "<Your search service admin key>"
index_name: str = "gptkbindex"
search_analyzer_name: str = "ja.lucene"
credential = AzureKeyCredential(search_service_admin_key)
# Azure AI Document Intelligence
document_intelligence_key: str = "<Your document intelligence key>"
document_intelligence_endpoint: str = "<Your document intelligence endpoint>"
document_intelligence_creds: str = AzureKeyCredential(document_intelligence_key)
# Azure OpenAI Service
AZURE_OPENAI_API_KEY = "Your OpenAI API Key"
AZURE_OPENAI_ENDPOINT = "https://<Your OpenAI Service>.openai.azure.com/"
model: str = "embedding"  # 自動構築時のデフォルト設定


## 検索インデックスの定義
検索インデックススキーマとベクトル検索設定を作成します。
以下の記法は `azure-search-documents==11.4.0` 専用となっていますので注意してください。

In [None]:
def create_search_index():
    # Create a search index
    fields = [
        SimpleField(name="id", type="Edm.String", key=True),
        SearchableField(
            name="content", type="Edm.String", analyzer_name=search_analyzer_name
        ),
        SearchField(
            name="embedding",
            type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
            hidden=False,
            searchable=True,
            filterable=False,
            sortable=False,
            facetable=False,
            vector_search_dimensions=1536,
            vector_search_profile_name="embedding_config",
        ),
        SimpleField(name="category", type="Edm.String", filterable=True, facetable=True),
        SimpleField(name="sourcepage", type="Edm.String", filterable=True, facetable=True),
        SimpleField(name="sourcefile", type="Edm.String", filterable=True, facetable=True),
        SimpleField(name="metadata", type="Edm.String", filterable=True, facetable=True),
    ]

    semantic_config = SemanticConfiguration(
        name="default",
        prioritized_fields=SemanticPrioritizedFields(
            title_field=None,
            keywords_fields=None,
            content_fields=[SemanticField(field_name="content")]
        )
    )

    # Create the semantic settings with the configuration
    semantic_search = SemanticSearch(configurations=[semantic_config])

    # Configure the vector search configuration
    vector_search = VectorSearch(
        algorithms=[
            HnswAlgorithmConfiguration(
                name="hnsw_config",
                kind=VectorSearchAlgorithmKind.HNSW,
                parameters=HnswParameters(
                    m=4,
                    ef_construction=400,
                    ef_search=500,
                    metric=VectorSearchAlgorithmMetric.COSINE,
                ),
            ),
        ],
        profiles=[
            VectorSearchProfile(
                name="embedding_config",
                algorithm_configuration_name="hnsw_config",
            ),
        ],
    )

    index_client = SearchIndexClient(endpoint=search_service_endpoint, credential=credential)
    if index_name not in index_client.list_index_names():
        index = SearchIndex(
            name=index_name,
            fields=fields,
            vector_search=vector_search,
            semantic_search=semantic_search,
        )
        print(f"Creating {index_name} search index")
        result = index_client.create_or_update_index(index) 
        print(f' {result.name} created')
    else:
        print(f"Search index {index_name} already exists")

def remove_from_index(filename):
    print(f"Removing sections from '{filename or '<all>'}' from search index '{index_name}'")
    search_client = SearchClient(endpoint=search_service_endpoint,
                                    index_name=index_name,
                                    credential=credential)
    while True:
        filter = None if filename is None else f"sourcefile eq '{os.path.basename(filename)}'"
        r = search_client.search("", filter=filter, top=1000, include_total_count=True)
        if r.get_count() == 0:
            break
        r = search_client.delete_documents(documents=[{ "id": d["id"] } for d in r])
        print(f"\tRemoved {len(r)} sections from index")
        # It can take a few seconds for search results to reflect changes, so wait a bit
        time.sleep(2)


# Azure Blob Storage に PDF ファイルをアップロード
PDF ファイルをページ単位に分割してから、Azure Blob Storage へアップロードします。これはチャット UI からのプレビュー用にも使用します。

In [None]:
from azure.storage.blob import BlobServiceClient
from pypdf import PdfReader, PdfWriter

def blob_name_from_file_page(filename, page = 0):
    if os.path.splitext(filename)[1].lower() == ".pdf":
        return os.path.splitext(os.path.basename(filename))[0] + f"-{page}" + ".pdf"
    else:
        return os.path.basename(filename)

def upload_blobs(filename):
    blob_service_client = BlobServiceClient.from_connection_string(azure_blob_connection_string)
    blob_container = blob_service_client.get_container_client(azure_storage_container)
    if not blob_container.exists():
        blob_container.create_container()

    # ファイルが PDF の場合、ページに分割し、各ページを個別の Blob としてアップロードする。
    if os.path.splitext(filename)[1].lower() == ".pdf":
        reader = PdfReader(filename)
        pages = reader.pages
        for i in range(len(pages)):
            blob_name = blob_name_from_file_page(filename, i)
            
            f = io.BytesIO()
            writer = PdfWriter()
            writer.add_page(pages[i])
            writer.write(f)
            f.seek(0)
            blob_container.upload_blob(blob_name, f, overwrite=True)
    else:
        blob_name = blob_name_from_file_page(filename)
        with open(filename,"rb") as data:
            blob_container.upload_blob(blob_name, data, overwrite=True)


# Azure AI Document Intelligence を利用して帳票を OCR
PDF の OCR に用いる[レイアウトモデル](https://learn.microsoft.com/azure/ai-services/document-intelligence/concept-layout) `prebuilt-layout` は、高度な機械学習ベースのドキュメント分析 API です。これを使用すると、さまざまな形式のドキュメントを受け取り、ドキュメントの構造化されたデータ表現を返すことができます。これは、Microsoft の強力な光学式文字認識 (OCR) 機能の強化バージョンと、ディープラーニング モデルを組み合わせ、テキスト、テーブル、選択マーク、ドキュメント構造を抽出します。

In [None]:
from azure.ai.formrecognizer import DocumentAnalysisClient
import html
import jsonpickle

def table_to_html(table):
    table_html = "<table>"
    rows = [sorted([cell for cell in table.cells if cell.row_index == i], key=lambda cell: cell.column_index) for i in range(table.row_count)]
    for row_cells in rows:
        table_html += "<tr>"
        for cell in row_cells:
            tag = "th" if (cell.kind == "columnHeader" or cell.kind == "rowHeader") else "td"
            cell_spans = ""
            if cell.column_span > 1: cell_spans += f" colSpan={cell.column_span}"
            if cell.row_span > 1: cell_spans += f" rowSpan={cell.row_span}"
            table_html += f"<{tag}{cell_spans}>{html.escape(cell.content)}</{tag}>"
        table_html +="</tr>"
    table_html += "</table>"
    return table_html

def get_document_text(filename):
    offset = 0
    page_map = []

    print(f"Extracting text from '{filename}' using Azure AI Document Intelligence")
    form_recognizer_client = DocumentAnalysisClient(endpoint=document_intelligence_endpoint, credential=document_intelligence_creds, headers={"x-ms-useragent": "azure-search-chat-demo/1.0.0"})
    with open(filename, "rb") as f:
        poller = form_recognizer_client.begin_analyze_document("prebuilt-layout", document = f)
    form_recognizer_results = poller.result()
    
    # Debug 用(AnalyzeResultオブジェクト構造をそのままJSONへ)
    # json_data = jsonpickle.encode(form_recognizer_results)
    # with open('data.json', "w", encoding='utf-8') as f:
    #     f.write(json_data)
    #
    # f = open("data.json")
    # json_str = f.read()
    # form_recognizer_results = jsonpickle.decode(json_str)

    for page_num, page in enumerate(form_recognizer_results.pages):
        tables_on_page = [table for table in form_recognizer_results.tables if table.bounding_regions[0].page_number == page_num + 1]

        # mark all positions of the table spans in the page
        page_offset = page.spans[0].offset
        page_length = page.spans[0].length
        table_chars = [-1]*page_length
        for table_id, table in enumerate(tables_on_page):
            for span in table.spans:
                # replace all table spans with "table_id" in table_chars array
                for i in range(span.length):
                    idx = span.offset - page_offset + i
                    if idx >=0 and idx < page_length:
                        table_chars[idx] = table_id

        # build page text by replacing characters in table spans with table html
        page_text = ""
        added_tables = set()
        for idx, table_id in enumerate(table_chars):
            if table_id == -1:
                page_text += form_recognizer_results.content[page_offset + idx]
            elif table_id not in added_tables:
                page_text += table_to_html(tables_on_page[table_id])
                added_tables.add(table_id)

        page_text += " "
        page_map.append((page_num, offset, page_text))
        offset += len(page_text)

    return page_map


# Embeddings の生成関数の定義

tenacity ライブラリを用いて Embeddings API のコールにリトライを設定することで Rate limit に対処します。

In [None]:
from tenacity import retry, stop_after_attempt, wait_random_exponential
from openai import AzureOpenAI

client = AzureOpenAI(
  api_key = AZURE_OPENAI_API_KEY,  
  api_version = "2023-05-15",
  azure_endpoint = AZURE_OPENAI_ENDPOINT
)

def before_retry_sleep(retry_state):
    print("Rate limited on the OpenAI embeddings API, sleeping before retrying...")

@retry(wait=wait_random_exponential(min=15, max=60), stop=stop_after_attempt(15), before_sleep=before_retry_sleep)
def compute_embedding(text):
    return client.embeddings.create(input = [text], model=model).data[0].embedding


# チャンク分割

In [None]:
MAX_SECTION_LENGTH = 1000
SENTENCE_SEARCH_LIMIT = 100
SECTION_OVERLAP = 100

def split_text(page_map, filename):
    SENTENCE_ENDINGS = [".", "!", "?"]
    WORDS_BREAKS = [",", ";", ":", " ", "(", ")", "[", "]", "{", "}", "\t", "\n"]
    print(f"Splitting '{filename}' into sections")

    def find_page(offset):
        num_pages = len(page_map)
        for i in range(num_pages - 1):
            if offset >= page_map[i][1] and offset < page_map[i + 1][1]:
                return i
        return num_pages - 1

    all_text = "".join(p[2] for p in page_map)
    length = len(all_text)
    start = 0
    end = length
    while start + SECTION_OVERLAP < length:
        last_word = -1
        end = start + MAX_SECTION_LENGTH

        if end > length:
            end = length
        else:
            # Try to find the end of the sentence
            while end < length and (end - start - MAX_SECTION_LENGTH) < SENTENCE_SEARCH_LIMIT and all_text[end] not in SENTENCE_ENDINGS:
                if all_text[end] in WORDS_BREAKS:
                    last_word = end
                end += 1
            if end < length and all_text[end] not in SENTENCE_ENDINGS and last_word > 0:
                end = last_word # Fall back to at least keeping a whole word
        if end < length:
            end += 1

        # Try to find the start of the sentence or at least a whole word boundary
        last_word = -1
        while start > 0 and start > end - MAX_SECTION_LENGTH - 2 * SENTENCE_SEARCH_LIMIT and all_text[start] not in SENTENCE_ENDINGS:
            if all_text[start] in WORDS_BREAKS:
                last_word = start
            start -= 1
        if all_text[start] not in SENTENCE_ENDINGS and last_word > 0:
            start = last_word
        if start > 0:
            start += 1

        section_text = all_text[start:end]
        yield (section_text, find_page(start))

        last_table_start = section_text.rfind("<table")
        if (last_table_start > 2 * SENTENCE_SEARCH_LIMIT and last_table_start > section_text.rfind("</table")):
            # If the section ends with an unclosed table, we need to start the next section with the table.
            # If table starts inside SENTENCE_SEARCH_LIMIT, we ignore it, as that will cause an infinite loop for tables longer than MAX_SECTION_LENGTH
            # If last table starts inside SECTION_OVERLAP, keep overlapping
            print(f"Section ends with unclosed table, starting next section with the table at page {find_page(start)} offset {start} table start {last_table_start}")
            start = min(end - SECTION_OVERLAP, start + last_table_start)
        else:
            start = end - SECTION_OVERLAP

    if start + SECTION_OVERLAP < end:
        yield (all_text[start:end], find_page(start))


# インデックスに登録するドキュメントを作成

In [None]:
import re
import base64
import json
def filename_to_id(filename):
    filename_ascii = re.sub("[^0-9a-zA-Z_-]", "_", filename)
    filename_hash = base64.b16encode(filename.encode('utf-8')).decode('ascii')
    return f"file-{filename_ascii}-{filename_hash}"

def create_sections(filename, page_map, use_vectors, category):
    file_id = filename_to_id(filename)
    for i, (content, pagenum) in enumerate(split_text(page_map, filename)):
        section = {
            "id": f"{file_id}-page-{i}",
            "content": content,
            "category": category,
            "sourcepage": blob_name_from_file_page(filename, pagenum),
            "sourcefile": filename,
            "metadata": json.dumps({"page": pagenum, "sourcepage": blob_name_from_file_page(filename, pagenum)})
        }
        
        section["embedding"] = compute_embedding(content)
        yield section


# チャンクをインデックス化

In [None]:
def index_sections(filename, sections):
    search_client = SearchClient(
        endpoint=search_service_endpoint, index_name=index_name, credential=credential
    )
    i = 0
    batch = []
    for s in sections:
        batch.append(s)
        i += 1
        if i % 1000 == 0:
            results = search_client.upload_documents(documents=batch)
            succeeded = sum([1 for r in results if r.succeeded])
            print(f"\tIndexed {len(results)} sections, {succeeded} succeeded")
            batch = []

    if len(batch) > 0:
        results = search_client.upload_documents(documents=batch)
        succeeded = sum([1 for r in results if r.succeeded])
        print(f"\tIndexed {len(results)} sections, {succeeded} succeeded")

# メイン処理

In [None]:
import glob

print("Create Search Index...")
create_search_index()
print("Processing files...")

path_pattern = "../data/*.pdf"
for filename in glob.glob(path_pattern):
    print(f"Processing '{filename}'")
    try:
        upload_blobs(filename)
        remove_from_index(filename)
        page_map = get_document_text(filename)
        category = os.path.basename(os.path.dirname(filename))
        sections = create_sections(
            os.path.basename(filename), page_map, False, category
        )
        index_sections(os.path.basename(filename), sections)

    except Exception as e:
        print(f"\tGot an error while reading {filename} -> {e} --> skipping file")