In [0]:
# 必要なライブラリのインストール
%pip install -U databricks-sdk databricks-vectorsearch
dbutils.library.restartPython()


# Databricks RAGアプリチュートリアル - 1. PDFのパースとベクトルインデックスの作成

このノートブックでは、PDFをパースしてVector Search Indexに登録し、検索機能をテストするまでの一連の流れを学習します。

## このワークショップで学習する内容
1. パラメータの設定
2. PDFのパース、前処理、Unity Catalogテーブル登録 
3. Vector Search Indexの作成
4. Vector Searchのテスト

## 前提条件

ワークショップ共通の動作条件はREADME.mdをご確認ください。

- 動作確認済の環境：Serverless Notebook（環境バージョン：3）
- `ai_parse_document`関数のプレビューが有効化されていること。実行前に、ワークスペース管理者より `Mosaic AI Agent Bricks Preview` が有効化されているかを確認してください。

---


## 1-1. パラメータの設定

まず、このワークショップで使用するカタログ、スキーマ、その他の設定パラメータを定義します。

### カタログとスキーマについて
- **カタログ**: Unity Catalogの最上位レベルのコンテナ。データベース、テーブル、関数などを管理
- **スキーマ**: カタログ内のデータベース。テーブルやビューを格納

既存のカタログを使用し、スキーマは既存のものを利用するか新規作成が可能です。


In [0]:
# パラメータ設定
CATALOG_NAME = "skato"  
SCHEMA_NAME = "rag_workshop"  
VOLUME_NAME = "pdf_files"  

# Vector Search関連パラメータ
VECTOR_SEARCH_ENDPOINT = "one-env-shared-endpoint-1"  
VECTOR_INDEX_NAME = "chunked_document_vs_index"  
EMBEDDING_MODEL_ENDPOINT = "skato-plamo-embedding"  

print(f"カタログ: {CATALOG_NAME}")
print(f"スキーマ: {SCHEMA_NAME}")
print(f"Vector Search Endpoint: {VECTOR_SEARCH_ENDPOINT}")
print(f"Embeddingエンドポイント: {EMBEDDING_MODEL_ENDPOINT}")


### Unity Catalogオブジェクトの設定

USE CATALOG と USE SCHEMA を使用して、作業対象のカタログとスキーマを設定します。またrawデータを格納するボリュームを作成します


In [0]:
# カタログとスキーマの設定
spark.sql(f"""USE CATALOG {CATALOG_NAME}""")
spark.sql(f"""DROP SCHEMA IF EXISTS {SCHEMA_NAME} CASCADE""")
spark.sql(f"""CREATE SCHEMA {SCHEMA_NAME}""")
spark.sql(f"""USE SCHEMA {SCHEMA_NAME}""")
spark.sql(f"""CREATE VOLUME {VOLUME_NAME}""")


### ファイルをボリュームに配置

ワークショップで扱うpdfファイルをボリュームに移管します。

In [0]:
import os

os.environ["CATALOG_NAME"] = CATALOG_NAME
os.environ["SCHEMA_NAME"] = SCHEMA_NAME
os.environ["VOLUME_NAME"] = VOLUME_NAME

In [0]:
%sh cp input/* /Volumes/$CATALOG_NAME/$SCHEMA_NAME/$VOLUME_NAME/

## 1-2. PDFのパース、前処理、Unity Catalogテーブル登録

### ai_parse_document関数について

`ai_parse_document`は、DatabricksのAI関数の一つで、構造化されていないドキュメント（PDF、Word、PowerPointなど）を解析してテキストやメタデータを抽出する関数です。

⚠️：2025年7月時点でベータ版の機能となります。実行前に、ワークスペース管理者より `Mosaic AI Agent Bricks Preview` が有効化されているかを確認してください

**主な特徴:**
- PDFからテキスト、テーブル、画像を抽出
- ページ番号、座標などのメタデータも取得可能
- SQLから直接実行可能
- Unity Catalogと連携してファイルを処理

**構文:**
```sql
ai_parse_document(file_path, [options])
```

### ai_parse_document関数を使用したPDFのパース

既にVolumeに格納されているPDFファイルをパースし、Unity Catalogテーブルに保存します。


In [0]:
%sql
-- 複数PDFをパースしてテーブルに保存
CREATE OR REPLACE TABLE parsed_documents AS
SELECT 
  path,
  ai_parse_document(content) as content -- ai_parse_document: PDF, 画像の内容をLLMでパースする
FROM READ_FILES('/Volumes/skato/rag_workshop/pdf_files/*.pdf', format => 'binaryFile');


### パース結果の確認


In [0]:
%sql
SELECT 
  path, 
  content
FROM parsed_documents

In [0]:
%sql
-- 要素別にドキュメントを整理（pathも保存）
CREATE OR REPLACE TABLE parsed_documents_by_element AS (
SELECT 
  path,
  CAST(value:id AS INT) AS element_id,
  CAST(value:page_id AS INT) AS page_id,
  CAST(value:type AS STRING) AS content_type,
  CAST(value:content AS STRING) AS content,
  value
FROM parsed_documents,
LATERAL variant_explode(content:document.elements)
);
SELECT * FROM parsed_documents_by_element;

### 参考：ai_queryによるバッチ推論

`ai_query`は、DatabricksのAI関数の一つで、自然言語の質問や指示をSQLから直接AIモデルに投げて、テキスト生成や要約、分類など様々なタスクを実行できる関数です。

**主な特徴:**
- SQLから直接AIモデルを呼び出し、自然言語処理タスクを実行可能
- テキスト生成、要約、分類、翻訳など幅広い用途に対応
- 入力プロンプトや追加パラメータを柔軟に指定できる
- Unity Catalogテーブルのデータと組み合わせて活用可能

**例:**

```sql
SELECT ai_query('次のレビューを要約してください: ' || review_text) AS summary FROM reviews
```

In [0]:
%sql
SELECT 
  content
  , ai_query('databricks-claude-sonnet-4', '以下のドキュメントを2-3文の日本語で要約して: ' || content ) as summarized_content -- ai_queryでエンドポイント（Claude Sonnet 4）とプロンプトを指定し各レコードに対して推論
FROM
  parsed_documents

### チャンキング処理

抽出されたテキストを適切なサイズのチャンクに分割します。
Vector Searchでの検索精度を向上させるため、テキストを適切なサイズに分割し、オーバーラップを設定します。

### チャンキング戦略について
長いドキュメントは、検索とRAGの品質向上のために適切なサイズのチャンクに分割する必要があります。
このワークショップでは、一定の要素を重複（オーバーラップ）させながら要素をチャンクさせるカスタム関数を使用します


In [0]:
df = spark.table("parsed_documents_by_element")
display(df)

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

def chunk_content(df: DataFrame, 
                 chunk_size: int = 5, 
                 overlap: int = 1,
                 key_col: str = "element_id",
                 content_col: str = "content",
                 path_col: str = "path",
                 page_col: str = "page_id",
                 min_content_length: int = 0,
                 show_stats: bool = True) -> DataFrame:
    """
    複数ファイル対応：Spark DataFrameのcontentカラムをpathごとに分けてキー列の順序に基づいてチャンキングする関数
    
    Parameters:
    -----------
    df : DataFrame
        入力のSpark DataFrame（path, element_id, page_id, contentカラムを含む）
    chunk_size : int, default=5
        1つのチャンクに含める行数
    overlap : int, default=1
        チャンク間でオーバーラップする行数
    key_col : str, default="element_id"
        チャンキングのキーとなる列名
    content_col : str, default="content"
        チャンキング対象のコンテンツカラム名
    path_col : str, default="path"
        ファイルパス列名
    page_col : str, default="page_id"
        ページID列名
    min_content_length : int, default=0
        対象列の最小文字数しきい値（これ以下の行は除外）
    show_stats : bool, default=True
        統計情報を表示するかどうか
    
    Returns:
    --------
    DataFrame
        チャンキングされたDataFrame
        カラム: path, chunk_id, chunk_content, key_ids, chunk_start_key, chunk_end_key, 
               chunk_length, chunk_start_page, chunk_end_page
    """
    
    # 入力データの行数をカウント
    original_count = df.count()
    
    # 文字数フィルタリング（指定したしきい値より大きい行のみ保持）
    df_filtered = df.filter(length(col(content_col)) > min_content_length)
    
    # フィルタリング後の行数をカウント
    filtered_count = df_filtered.count()
    removed_count = original_count - filtered_count
    
    # pathごとに処理を実行
    paths = [row[path_col] for row in df_filtered.select(path_col).distinct().collect()]
    
    result_dfs = []
    total_chunks = 0
    
    for path in paths:
        # pathでフィルタリング
        path_df = df_filtered.filter(col(path_col) == path)
        
        # キー列でソートして位置番号を振る（1から開始）
        window_spec = Window.orderBy(key_col)
        df_sorted = path_df.withColumn("position", row_number().over(window_spec))
        
        # step_sizeを計算
        step_size = chunk_size - overlap
        
        # 各行が属するチャンクIDの範囲を計算
        df_with_chunk_range = df_sorted.withColumn(
            "min_chunk_id",
            greatest(lit(0), ceil((col("position") - chunk_size) / lit(step_size)).cast("int"))
        ).withColumn(
            "max_chunk_id", 
            floor((col("position") - 1) / lit(step_size)).cast("int")
        )
        
        # チャンクIDのシーケンスを生成して展開
        df_expanded = df_with_chunk_range.withColumn(
            "local_chunk_id",
            explode(sequence(col("min_chunk_id"), col("max_chunk_id")))
        )
        
        # 実際にその行がそのチャンクに含まれるかをチェック
        df_filtered_chunks = df_expanded.filter(
            (col("position") >= col("local_chunk_id") * lit(step_size) + 1) &
            (col("position") <= col("local_chunk_id") * lit(step_size) + lit(chunk_size))
        )
        
        # チャンクごとにcontentを結合
        path_result_df = df_filtered_chunks.groupBy("local_chunk_id").agg(
            # path情報を保持
            first(col(path_col)).alias(path_col),
            
            # キー列でソートしてからcontentを結合
            concat_ws("\n", 
                array_sort(
                    collect_list(
                        struct(col(key_col).alias("sort_key"), col(content_col).alias("content"))
                    )
                ).getField("content")
            ).alias("chunk_content"),
            
            # キー列のリスト（ソート済み）
            array_sort(collect_list(col(key_col))).alias("key_ids"),
                        
            # チャンクの範囲情報
            min(col(key_col)).alias("chunk_start_key"),
            max(col(key_col)).alias("chunk_end_key"),
            min(col(page_col)).alias("chunk_start_page"),
            max(col(page_col)).alias("chunk_end_page")
        ).withColumn(
            "chunk_length", length(col("chunk_content"))
        ).withColumn(
            # グローバルなchunk_idを生成（path名をハッシュ化 + ローカルchunk_id）
            "chunk_id", 
            concat(
                lit(f"{paths.index(path):03d}_"),
                lpad(col("local_chunk_id").cast("string"), 4, "0")
            )
        ).drop("local_chunk_id").orderBy("chunk_id")
        
        result_dfs.append(path_result_df)
        total_chunks += path_result_df.count()
    
    # 全てのpathの結果を結合
    if result_dfs:
        result_df = result_dfs[0]
        for df in result_dfs[1:]:
            result_df = result_df.unionByName(df)
        result_df = result_df.orderBy("chunk_id")
    else:
        # 空のDataFrameを作成
        schema = StructType([
            StructField(path_col, StringType(), True),
            StructField("chunk_id", StringType(), True),
            StructField("chunk_content", StringType(), True),
            StructField("key_ids", ArrayType(IntegerType()), True),
            StructField("chunk_start_key", IntegerType(), True),
            StructField("chunk_end_key", IntegerType(), True),
            StructField("chunk_start_page", IntegerType(), True),
            StructField("chunk_end_page", IntegerType(), True),
            StructField("chunk_length", IntegerType(), True)
        ])
        result_df = spark.createDataFrame([], schema)
    
    # 統計情報の表示
    if show_stats:
        print("=" * 50)
        print("複数ファイル対応チャンキング統計情報")
        print("=" * 50)
        
        # 基本情報
        print(f"処理対象ファイル数: {len(paths)}")
        print(f"処理ファイル:")
        for i, path in enumerate(paths):
            print(f"  {i+1:2d}. {path}")
        print(f"キー列名: {key_col}")
        print(f"対象列名: {content_col}")
        print(f"チャンクサイズ: {chunk_size}")
        print(f"オーバーラップサイズ: {overlap}")
        print(f"ステップ数: {chunk_size - overlap}")
        print(f"期待重複率: {overlap / chunk_size * 100:.1f}%")
        print(f"最小文字数しきい値: {min_content_length}")
        
        # データフィルタリング情報
        print(f"処理前総行数: {original_count:,}")
        print(f"処理後総行数: {filtered_count:,}")
        print(f"取り除いた行数: {removed_count:,}")
        if original_count > 0:
            print(f"除外率: {removed_count / original_count * 100:.1f}%")
        
        # チャンク文字数統計を計算
        if result_df.count() > 0:
            stats = result_df.agg(
                max("chunk_length").alias("max_length"),
                min("chunk_length").alias("min_length"),
                avg("chunk_length").alias("avg_length"),
                count("chunk_id").alias("total_chunks")
            ).collect()[0]
            
            print(f"最大チャンク文字数: {stats['max_length']:,}")
            print(f"最小チャンク文字数: {stats['min_length']:,}")
            print(f"平均チャンク文字数: {stats['avg_length']:,.1f}")
            print(f"総チャンク数: {stats['total_chunks']}")
            
            # ファイル別統計
            print("\nファイル別チャンク数:")
            file_stats = result_df.groupBy(path_col).agg(
                count("chunk_id").alias("chunk_count")
            ).orderBy(path_col).collect()
            
            for stat in file_stats:
                print(f"  {stat[path_col]}: {stat['chunk_count']} チャンク")
        else:
            print("最大チャンク文字数: 0")
            print("最小チャンク文字数: 0")
            print("平均チャンク文字数: 0.0")
            print("総チャンク数: 0")
        
        print("=" * 50)
    
    return result_df

In [0]:
chunked_df = chunk_content(df, chunk_size=5, overlap=1, key_col="element_id", content_col="content", min_content_length=0)

chunked_df.write.mode("overwrite").saveAsTable("chunked_documents")
display(chunked_df)

In [0]:
%sql
-- Vector Search Indexを作成するためにchange data feedを有効化
ALTER TABLE chunked_documents SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

## 1-3. Vector Search Indexの作成

### Mosaic AI Vector Searchについて

Mosaic AI Vector Searchは、Databricksの提供するベクトル検索サービスです。テキストや画像などの非構造化データを高次元ベクトルに変換し、類似度検索を高速に実行できます。

**主な特徴:**
- Unity Catalogとの完全統合
- 自動的なベクトル化（埋め込み生成）
- リアルタイムでの増分更新
- スケーラブルな検索性能

### 参考：UIでのVector Search Index作成手順

#### 1. エンドポイントの作成

1. 左サイドバーで**「Compute」**をクリック。
2. 上部の**「Vector Search」**タブを選択し、**「Create」**ボタンを押す。
3. 「Create endpoint」フォームが開くので以下を入力・設定する
   - **エンドポイント名**を入力
   - **Type**フィールドで「Standard」または「Storage Optimized」を選択
   - （必要な場合は）Advanced settings で予算ポリシーを設定
4. **「Confirm」**をクリックして作成完了

#### 2. インデックスの作成

1. 左サイドバーで**「Catalog」**をクリックしてCatalog Explorerを開く。
2. 対象とする**Deltaテーブル**を選択。
3. 右上の**「作成」**ボタン、もしくはケバブ（三点）メニューから**「Vector search index」**を選択。
4. 「Create vector index」フォームで以下を設定
   - **名前**: `<catalog>.<schema>.<name>` 形式で入力（英数字＋アンダースコアのみ）
   - **主キー**: 主キーとなるテーブルのカラムを指定
   - **エンドポイント**: 先ほど作成したエンドポイントを選択
   - **同期する列**: 標準エンドポイントの場合のみ必要、同期対象の列を選択（空欄なら全カラム同期）
   - **埋め込みソース（Embedding source）**:
     - Databricksにテキスト列から埋め込み計算させる場合は「Compute embeddings」を選択
     - 既存の埋め込みベクトルカラムを使う場合は「Use existing embedding column」を選択
   - **埋め込みモデルエンドポイント**（必要に応じて）を選択
5. 設定後、**「Confirm」**で作成

### Python SDKでのIndex実装

以下では、Python SDKを使用してプログラマティックにVector Search Indexを作成します。

In [0]:
# 必要なライブラリのインポート
from databricks.vector_search.client import VectorSearchClient
import time

# Vector Search Clientの初期化
vsc = VectorSearchClient()

print("Vector Search Clientを初期化しました")


### Vector Search Endpointの作成または確認

Vector Search Endpointは、Vector Searchサービスにアクセスするためのエンドポイントです。


In [0]:
try:
    # 既存のエンドポイントを確認
    endpoint = vsc.get_endpoint(VECTOR_SEARCH_ENDPOINT)
    print(f"既存のVector Search Endpoint '{VECTOR_SEARCH_ENDPOINT}' を使用します")
    print(f"エンドポイントステータス: {endpoint['endpoint_status']['state']}")
except Exception as e:
    print(f"エンドポイント '{VECTOR_SEARCH_ENDPOINT}' が見つかりません。新規作成します...")
    
    # 新しいエンドポイントを作成
    endpoint = vsc.create_endpoint(
        name=VECTOR_SEARCH_ENDPOINT,
        endpoint_type="STANDARD"
    )
    
    # エンドポイントの準備完了を待機
    while endpoint.endpoint_status.state == "PROVISIONING":
        print("エンドポイントの準備中...")
        time.sleep(30)
        endpoint = vsc.get_endpoint(VECTOR_SEARCH_ENDPOINT)
    
    print(f"Vector Search Endpoint '{VECTOR_SEARCH_ENDPOINT}' を作成しました")
    print(f"エンドポイントステータス: {endpoint.endpoint_status.state}")


### Vector Search Indexの作成

チャンキング済みのドキュメントテーブルに対してVector Search Indexを作成します。


In [0]:
# 完全なテーブル名とインデックス名を定義
source_table_fullname = f"{CATALOG_NAME}.{SCHEMA_NAME}.chunked_documents"
vs_index_fullname = f"{CATALOG_NAME}.{SCHEMA_NAME}.{VECTOR_INDEX_NAME}"

print(f"ソーステーブル: {source_table_fullname}")
print(f"インデックス名: {vs_index_fullname}")
print(f"Embeddingエンドポイント: {EMBEDDING_MODEL_ENDPOINT}")


In [0]:
try:
    # 既存のインデックスを確認
    index = vsc.get_index(vs_index_fullname)
    print(f"既存のVector Search Index '{vs_index_fullname}' が見つかりました")
    print(f"インデックスステータス: {index.status.ready}")
except Exception as e:
    print(f"インデックス '{vs_index_fullname}' が見つかりません。新規作成します...")
    
    # 新しいインデックスを作成
    index = vsc.create_delta_sync_index(
        endpoint_name=VECTOR_SEARCH_ENDPOINT,
        index_name=vs_index_fullname,
        source_table_name=source_table_fullname,
        primary_key="chunk_id",
        pipeline_type="TRIGGERED",
        embedding_source_column="chunk_content",
        embedding_model_endpoint_name=EMBEDDING_MODEL_ENDPOINT
    )
   
    print(f"Vector Search Index '{vs_index_fullname}' の作成を開始しました")
    print("インデックスの構築には数分かかる場合があります...")


### インデックス作成状況の確認


In [0]:
index.wait_until_ready()

## 1-4. Vector Searchのテスト

作成したVector Search Indexを使用して、実際に類似度検索を実行してみます。

### Python SDKによる検索テスト


In [0]:
# テスト用のクエリ
test_query = "生成AIの開発ワークフローについて教えてください"

print(f"検索クエリ: {test_query}")
print("=" * 50)

# Vector Searchによる類似度検索
results = vsc.get_index(index_name=vs_index_fullname).similarity_search(
    query_text=test_query,
    columns=["chunk_id", "chunk_content"],
    num_results=5
)

print(f"検索結果: {len(results['result']['data_array'])} 件")
print("\n--- 検索結果 ---")

for i, result in enumerate(results['result']['data_array'], 1):
    chunk_id = result[0]
    content = result[1]
    
    print(f"\n{i}. チャンクID: {chunk_id}")
    print(f"   内容: {content[:200]}...")


### SQLのvector_search関数による検索テスト

DatabricksのSQL環境では、`vector_search`関数を使用してVector Searchを実行できます。これにより、SQLクエリ内で直接ベクトル検索を組み込むことができます。


In [0]:
%sql
-- SQLのvector_search関数を使用した検索
SELECT 
  *
FROM vector_search(
  index => 'skato.rag_workshop.chunked_document_vs_index',
  query => '生成AIの開発ワークフローについて教えてください',
  num_results => 5
)


## まとめ

このワークショップでは、以下の内容を学習しました：

1. **パラメータ設定**: Unity Catalogのカタログとスキーマの設定
2. **PDFパース**: `ai_parse_document`関数を使用したPDFの解析とチャンキング
3. **Vector Search Index作成**: Mosaic AI Vector Searchのセットアップ
4. **検索テスト**: Python SDKとSQLによる類似度検索の実行

### 次のステップ
次は、このVector Search Indexを活用してRAG（Retrieval-Augmented Generation）アプリケーションを構築します。

### 参考リンク
- [Databricks AI Functions](https://docs.databricks.com/aws/ja/sql/language-manual/functions/ai_parse_document)
- [Mosaic AI Vector Search](https://docs.databricks.com/aws/en/generative-ai/create-query-vector-search)
- [RAGデータパイプラインのベストプラクティス](https://docs.databricks.com/aws/en/generative-ai/tutorials/ai-cookbook/quality-data-pipeline-rag)
