In [0]:
%load_ext autoreload
%autoreload 2
# Enables autoreload; learn more at https://docs.databricks.com/en/files/workspace-modules.html#autoreload-for-python-modules
# To disable autoreload; run %autoreload 0

# Import data
UnityCatalog に参照させるドキュメントを取り込む。ここでは簡素に作る。本格的な構成にする場合は以下の構成を想定。

- ドキュメントを S3 に配置
- パイプライン1: ETL パイプライン内で UnityCatalog へデータ取り込み (増分)
  - ドキュメントテーブル
- パイプライン2_1: ETL パイプライン内で UnityCatalog へデータ取り込み (増分。開発段階では洗い替え)
  - チャンキングテーブル (チャンキング戦略 1)
- パイプライン2_n: ETL パイプライン内で UnityCatalog へデータ取り込み (増分。開発段階では洗い替え)
  - チャンキングテーブル (チャンキング戦略 n)

In [0]:
import pyspark.sql.functions as F

S3BUCKET_NAME = "S3BUCKET_NAME"
(
    spark.read
    .text(f"/Volumes/workspace/default/{S3BUCKET_NAME}/news_summary/", wholetext=True)
    .select(
        F.url_decode(F.col("_metadata.file_name")).alias("file_name"),
        F.col("value").alias("content")
    )
    .write
    .format("delta")
    .option("delta.enableChangeDataFeed", "true")
    .mode("overwrite")
    .saveAsTable("workspace.default.doc_sample")
)

In [0]:
doc_df = spark.table("workspace.default.doc_sample")
doc_df.display()

In [0]:
%sql
CREATE OR REPLACE TABLE workspace.default.doc_sample_embed (
  file_name string,
  content string,
  embedding array<float>
)
TBLPROPERTIES (delta.enableChangeDataFeed = true)

In [0]:
import pyspark.sql.functions as F
from langchain_google_genai import GoogleGenerativeAIEmbeddings

import libs.chunkstore.databricks as chunkstore

# 埋め込み表現生成用の UDF を作成
api_key = dbutils.secrets.get(scope="agent", key="GEMINI_API_KEY")
embr = chunkstore.emb_udf_factory(
    lambda: GoogleGenerativeAIEmbeddings(
        model="gemini-embedding-001", api_key=api_key))

# 埋め込み表現を生成
embed_df = doc_df
embed_df = embed_df.withColumn("embedding", embr(F.col("content")))
embed_df.write.insertInto("workspace.default.doc_sample_embed")

In [0]:
spark.table("workspace.default.doc_sample_embed").display()

# Create Vector Search Index

In [0]:
def create_vsi(
    index_name: str,
    index_col: str,
    emb_col: str,
    emb_dimsize: int,
    source_name: str,
    endpoint_name: str = "vsi_endpoint",
):
    from databricks.vector_search.client import VectorSearchClient

    client = VectorSearchClient()
    # ---------------------------------------------
    # Vector Search Index 用のエンドポイントを取得
    # ---------------------------------------------
    try:
        res_enp = client.get_endpoint(endpoint_name)
    # 無い場合は作成
    except Exception as ex:
        res_enp = client.create_endpoint_and_wait(
            name=endpoint_name, endpoint_type="STANDARD"
        )

    # ---------------------------------------------
    # Vector Search Index を取得
    # ---------------------------------------------
    try:
        res_vsi = client.get_index(endpoint_name, index_name)
    # 無い場合は作成
    except Exception as ex:
        res_vsi = client.create_delta_sync_index_and_wait(
            endpoint_name,
            index_name,
            index_col,
            source_name,
            "TRIGGERED",
            emb_dimsize,
            emb_col,
            # 一旦全カラムをインデックスへ収録
            # columns_to_sync=["file_name", "content"]
        )
    return res_enp, res_vsi


In [0]:
VECTOR_SEARCH_ENDPOINT = "vsi_endpoint"
VECTOR_SEARCH_INDEX = "workspace.default.doc_sample_vsi"
VECTOR_SEARCH_SOURCE = "workspace.default.doc_sample_embed"

In [0]:
create_vsi(VECTOR_SEARCH_INDEX, "file_name", "embedding", 3072, VECTOR_SEARCH_SOURCE)

# Playground

In [0]:
from libs.retriever import DbxDocSampleChunkStore
from libs.documentstore.markdown import MarkdownDocumentStore

api_key = dbutils.secrets.get(scope="agent", key="GEMINI_API_KEY")
doc_cs = DbxDocSampleChunkStore(VECTOR_SEARCH_ENDPOINT, api_key)
doc_ds = MarkdownDocumentStore(VECTOR_SEARCH_INDEX, doc_cs)
doc_ds.connect()

In [0]:
results = doc_ds.search_documents("databricks", top_k=5)

# results
tmpl = """
----------------------------
{id}
----------------------------
{content}
"""

for item in results:
    print(tmpl.format(id=item.metadata["file_name"], content=item.page_content[:500]))