In [0]:
%pip install -U -qqqq databricks-sql-connector databricks-sdk langchain langchain-community databricks-langchain langchain_core langchain_community langgraph databricks-agents mlflow mlflow-skinny python-docx openpyxl pillow transformers torch uv langgraph==0.3.4 googlemaps pypdf unstructured databricks-vectorsearch python-docx openpyxl googlemaps google-cloud-vision

In [0]:
%pip install -U sentence-transformers transformers==4.49.0
%pip install fugashi ipadic unidic-lite

In [0]:
dbutils.library.restartPython() 

In [0]:

import os
from databricks.sdk.core import DatabricksError
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.vectorsearch import EndpointStatusState, EndpointType
from databricks.sdk.service.serving import EndpointCoreConfigInput, EndpointStateReady
from databricks.sdk.errors import ResourceDoesNotExist, NotFound, PermissionDenied

CURRENT_FOLDER = os.getcwd()
w = WorkspaceClient()

# カタログとスキーマ
UC_CATALOG = 'hhhd_demo_itec'
UC_SCHEMA = 'commuting_allowance'

# モデル
UC_MODEL_NAME = f"{UC_CATALOG}.{UC_SCHEMA}.commuting_allowance_model"

# search endpoint
VECTOR_SEARCH_ENDPOINT = 'commuting_allowance_vector_search'

# カタログを作成
try:
    _ = w.catalogs.get(UC_CATALOG)
    print(f"PASS: UC catalog `{UC_CATALOG}` exists")
except NotFound as e:
    print(f"`{UC_CATALOG}` does not exist, trying to create...")
    try:
        _ = w.catalogs.create(name=UC_CATALOG)
    except PermissionDenied as e:
        print(f"FAIL: `{UC_CATALOG}` does not exist, and no permissions to create.  Please provide an existing UC Catalog.")
        raise ValueError(f"Unity Catalog `{UC_CATALOG}` does not exist.")
        
# スキーマを作成
try:
    _ = w.schemas.get(full_name=f"{UC_CATALOG}.{UC_SCHEMA}")
    print(f"PASS: UC schema `{UC_CATALOG}.{UC_SCHEMA}` exists")
except NotFound as e:
    print(f"`{UC_CATALOG}.{UC_SCHEMA}` does not exist, trying to create...")
    try:
        _ = w.schemas.create(name=UC_SCHEMA, catalog_name=UC_CATALOG)
        print(f"PASS: UC schema `{UC_CATALOG}.{UC_SCHEMA}` created")
    except PermissionDenied as e:
        print(f"FAIL: `{UC_CATALOG}.{UC_SCHEMA}` does not exist, and no permissions to create.  Please provide an existing UC Schema.")
        raise ValueError("Unity Catalog Schema `{UC_CATALOG}.{UC_SCHEMA}` does not exist.")

In [0]:
# エンドポイントが存在しない場合は作成する
vector_search_endpoints = w.vector_search_endpoints.list_endpoints()
if sum([VECTOR_SEARCH_ENDPOINT == ve.name for ve in vector_search_endpoints]) == 0:
    print(f"Please wait, creating Vector Search endpoint `{VECTOR_SEARCH_ENDPOINT}`.  This can take up to 10 minutes...")
    w.vector_search_endpoints.create_endpoint_and_wait(VECTOR_SEARCH_ENDPOINT, endpoint_type=EndpointType.STANDARD)

# Make sure vector search endpoint is online and ready.
w.vector_search_endpoints.wait_get_endpoint_vector_search_endpoint_online(VECTOR_SEARCH_ENDPOINT)

print(f"PASS: Vector Search endpoint `{VECTOR_SEARCH_ENDPOINT}` exists")

In [0]:
from pypdf import PdfReader
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from databricks.vector_search.client import VectorSearchClient
import re
from sentence_transformers import SentenceTransformer
from pyspark.sql.functions import col, expr

# SparkSessionを取得
spark = SparkSession.builder.getOrCreate()

workspace_url = SparkSession.getActiveSession().conf.get(
    "spark.databricks.workspaceUrl", None
)

def extract_sections(text):
    """
    文書を適切なセクションごとに分割する。
    - 空白行を境目として各セクションを分割
    - 各「第X条 (タイトル)」や「附則」も個別のエントリとして処理
    """
    sections = []

    # 空白行を境目にして分割
    raw_sections = re.split(r'\n\s*\n+', text.strip())

    for section in raw_sections:
        # 「第X条」や「附則」がタイトルであるか判定
        title_match = re.match(r'^(第\s*\d+\s*条.*?)$', section, re.MULTILINE)

        if title_match:
            title = title_match.group(1).strip()
            content = section[len(title):].strip()  # タイトル以外の本文
            sections.append({"title": title, "content": content})
        else:
            # タイトルがないセクションもそのまま格納
            sections.append({"title": "", "content": section.strip()})

    return sections

def chunk_text(text):
    """
    条文ごとに適切にチャンクを作成する関数。
    - 「第〇条」や「第〇項」を検出し、新しいチャンクを作成
    - 各エントリを独立したチャンクとして保存
    """
    text = re.sub(r'\n+', '\n', text).strip()

    # 空白行で分割
    sections = extract_sections(text)

    chunks = []
    for sec in sections:
        if sec['content']:
            if sec['title']:
                temp_chunk = f"{sec['title']}:\n{sec['content']}"
            else:
                temp_chunk = sec['content']
        else:
            sec['title']
        chunks.append(temp_chunk)

    return chunks

pdf_path = "通勤手当支給規程.pdf"

# PDFを読み込んでテキストに変換
with open(pdf_path, "rb") as f:
    reader = PdfReader(f)
    text = "\n".join([page.extract_text() for page in reader.pages if page.extract_text()])

# PDFのテキストを適切な長さのチャンクに変換
chunked_texts = chunk_text(text)

# DataFrameに変換
pdf_texts = [{"chunk_id": i, "chunked_text": chunk} for i, chunk in enumerate(chunked_texts)]

# パスをVolumeに合わせて指定
model_path = "/Workspace/Users/wang-b2@itec.hankyu-hanshin.co.jp/ruri-base-v2"
model = SentenceTransformer(model_path)

# データをベクトル化
embeddings = model.encode([item["chunked_text"] for item in pdf_texts], show_progress_bar=True)

# 各チャンクにembeddingを追加
for i in range(len(pdf_texts)):
  pdf_texts[i]["embedding"] = embeddings[i].tolist()

# Deltaテーブルとして保存
DELTA_TABLE_NAME = f"{UC_CATALOG}.{UC_SCHEMA}.commuting_allowance_rules"
df = spark.createDataFrame(pdf_texts)

# # double → float 配列への変換式
df = df.withColumn("embedding", expr("transform(embedding, x -> cast(x as float))"))

# 2. Deltaテーブルとして保存（上書き & schema更新）
df.write.format("delta")\
  .mode("overwrite")\
  .option("overwriteSchema", "true")\
  .saveAsTable(DELTA_TABLE_NAME)

spark.sql(
    f"ALTER TABLE {DELTA_TABLE_NAME} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)"
)

# Databricks UI 上で Delta Table (databricks_docs_chunked) を確認できるリンクを表示
# コードを実行したら、出力される URL をクリックし、Databricks UI に飛んで、Delta Table が正しく作成されたことを確認
print(
    f"View Delta Table at: https://{workspace_url}/explore/data/{UC_CATALOG}/{UC_SCHEMA}/{DELTA_TABLE_NAME.split('.')[-1]}"
)

# ベクトル検索インデックス
CHUNKS_VECTOR_INDEX = f"{UC_CATALOG}.{UC_SCHEMA}.commuting_allowance_index"

# ベクトル検索クライアントを取得
vsc = VectorSearchClient()

# ベクトル検索インデックス（CHUNKS_VECTOR_INDEX）の作成を開始
# コード実行後、URL をクリックして Databricks UI でインデックスが作成されているか確認
print(
    f"Embedding docs & creating Vector Search Index, this will take ~5 - 10 minutes.\nView Index Status at: https://{workspace_url}/explore/data/{UC_CATALOG}/{UC_SCHEMA}/{CHUNKS_VECTOR_INDEX.split('.')[-1]}"
)

# インデックスが存在している場合は作成しない
try:
    # インデックス作成 & 同期
    index = vsc.create_delta_sync_index_and_wait(
        endpoint_name=VECTOR_SEARCH_ENDPOINT,
        source_table_name=DELTA_TABLE_NAME,
        index_name=CHUNKS_VECTOR_INDEX,
        pipeline_type="TRIGGERED",
        primary_key="chunk_id",
        embedding_dimension=768,
        embedding_vector_column="embedding"
    )
    print(f"Index {CHUNKS_VECTOR_INDEX} created.")
except Exception as e:
    print(f"Error creating index: {e}")
    # print(f"Index {CHUNKS_VECTOR_INDEX} already exists. Skipping index creation.")

In [0]:
MODEL_NAME_1 = "databricks-meta-llama-3-3-70b-instruct"
MODEL_NAME_2 = "databricks-claude-3-7-sonnet"

config = { 
    "llm_model_serving_endpoint_name": MODEL_NAME_1,
}

input_example = { 
    "messages": [
        {
            "role": "user",
            "content": "AIエージェントとは？",
        }
    ]
}

In [0]:
import mlflow
from databricks_langchain import VectorSearchRetrieverTool
from mlflow.models.resources import DatabricksFunction, DatabricksServingEndpoint
from unitycatalog.ai.langchain.toolkit import UnityCatalogTool

# TODO: Manually include underlying resources if needed. See the TODO in the markdown above for more information.
resources = [DatabricksServingEndpoint(endpoint_name=config["llm_model_serving_endpoint_name"])]
# for tool in tools:
#     if isinstance(tool, VectorSearchRetrieverTool):
#         resources.extend(tool.resources)
#     elif isinstance(tool, UnityCatalogTool):
#         resources.append(DatabricksFunction(function_name=tool.uc_function_name))

with mlflow.start_run():
    logged_agent_info = mlflow.pyfunc.log_model(
        artifact_path="agent",
        python_model="agent.py",
        input_example=input_example, 
        pip_requirements=[
            "mlflow",
            "langgraph==0.3.4",
            "databricks-langchain>=0.4.0",
            "langchain-core",
            "databricks-sql-connector",
            "pypdf",
            "python-docx",
            "pandas",
            "openpyxl",
            "pyspark",
            "googlemaps",
            "requests",
            "langchain",
            "langchain-community",
            "databricks-vectorsearch",
            "sentence-transformers",
            "transformers==4.49.0",
            "google-cloud-vision",
        ],
        resources=resources,
    )

In [0]:
import pandas as pd

content = "'ユーザーID': 10099992, '保険区分': '住宅ローン', '保険内連番': null　こちらの年末調整情報を探して　住宅ローン控除.png　こちらの画像情報を探して、そしてを探した年末調整情報の結果と画像情報の結果を比較してください。"

# content = "AIエージェントとは？"
# content = "通勤手当申請書1.docx　こちらの通勤手当申請内容を取得して、会社規定に適合しているか判断してください。"
# content = "通勤手当申請書1.docx　こちらの通勤手当申請書の内容を取得して、申請者が過去申請したことあるかどうかを検索して、そして今回の申請は会社規定に適合しているか判断してください。"

eval_examples = [
    {
        "request": {"messages": [{"role": "user", "content": content}]},
        "expected_response": None,
    },
]
eval_dataset = pd.DataFrame(eval_examples)

In [0]:
import mlflow

with mlflow.start_run(run_id=logged_agent_info.run_id):
    eval_results = mlflow.evaluate(
        f"runs:/{logged_agent_info.run_id}/agent",
        data=eval_dataset,  # Your evaluation dataset
        model_type="databricks-agent",  # Enable Mosaic AI Agent Evaluation
    )

# Review the evaluation results in the MLFLow UI (see console output), or access them in place:
# display(eval_results.tables['eval_results'])

In [0]:
from databricks import agents
import time
from databricks.sdk.service.serving import EndpointStateReady, EndpointStateConfigUpdate
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

mlflow.set_registry_uri("databricks-uc")

UC_CATALOG = 'hhhd_demo_itec'
UC_SCHEMA = 'tax_adjustment'
UC_MODEL_NAME = f"{UC_CATALOG}.{UC_SCHEMA}.tax_adjustment_model"

# register the model to UC
uc_registered_model_info = mlflow.register_model(
    model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME
)

# Deploy to enable the Review APP and create an API endpoint
deployment_info = agents.deploy(UC_MODEL_NAME, uc_registered_model_info.version, tags = {"endpointSource": "docs"})

# Wait for the Review App to be ready
print("\nWaiting for endpoint to deploy.  This can take 9 - 10 minutes.", end="")
while w.serving_endpoints.get(deployment_info.endpoint_name).state.ready == EndpointStateReady.NOT_READY or w.serving_endpoints.get(deployment_info.endpoint_name).state.config_update == EndpointStateConfigUpdate.IN_PROGRESS:
    print(".", end="")
    time.sleep(30)

print(f"Endpoint {deployment_info.endpoint_name} is now ready!")