In [0]:
%pip install opencv-python moviepy mlflow langchain sentence-transformers openai gradio pandas databricks-vectorsearch
dbutils.library.restartPython()

In [0]:
# Widgetsの作成
dbutils.widgets.text("catalog", "fall_detection_demo_catalog", "カタログ")
dbutils.widgets.text("schema", "{ご自身のスキーマ名を入力}", "スキーマ") # handson_{名前}
dbutils.widgets.text("suffix", "{ご自身のSuffixを指定}", "ENDPOINT用の接尾辞")
dbutils.widgets.dropdown("recreate_schema", "False", ["True", "False"], "スキーマを再作成")


# Widgetからの値の取得
CATALOG = dbutils.widgets.get("catalog")
SCHEMA = dbutils.widgets.get("schema")
RECREATE_SCHEMA = dbutils.widgets.get("recreate_schema") == "True"
SUFFIX = dbutils.widgets.get("suffix")


# COMMAND ----------

# DBTITLE 1,パラメーターのチェック
print(f"catalog: {CATALOG}")
print(f"schema: {SCHEMA}")
print(f"recreate_schema: {RECREATE_SCHEMA}")

if not CATALOG:
    raise ValueError("存在するカタログ名を入力してください")
if not SCHEMA:
    raise ValueError("スキーマ名を入力してください")

In [0]:
create_schema_sql = f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA};"
spark.sql(create_schema_sql)

In [0]:
create_video_volume = f"""
CREATE VOLUME {CATALOG}.{SCHEMA}.video_volume COMMENT '動画ファイルを格納するためのボリューム';
"""
spark.sql(create_video_volume)

create_frame_volume = f"""
CREATE VOLUME {CATALOG}.{SCHEMA}.frame_volume COMMENT '動画から抽出したフレーム画像を格納するためのボリューム';
"""
spark.sql(create_frame_volume)

In [0]:
import cv2, os
import requests
from datetime import datetime

# Volume パス定義
VIDEO_VOL = f"/Volumes/{CATALOG}/{SCHEMA}/video_volume"
FRAME_VOL = f"/Volumes/{CATALOG}/{SCHEMA}/frame_volume"

# presigned URL（実際のURLに置き換えてください）
presigned_url = 'https://x.gd/Q65MV'

volume_path = f"{VIDEO_VOL}/FallDetection.mp4"

# presigned URLから動画をダウンロードして保存
with requests.get(presigned_url, stream=True) as response:
    response.raise_for_status()  # ダウンロード失敗時は例外
    with open(volume_path, 'wb') as f:
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                f.write(chunk)

print(f'Downloaded video saved to {volume_path}')

def extract_frames_from_video(video_filename, interval_seconds=1):
    """
    Video Volume から読み込み、Frame Volume に画像を出力
    """
    input_path = f"{VIDEO_VOL}/{video_filename}"
    output_dir = f"{FRAME_VOL}/{os.path.splitext(video_filename)[0]}_frames"
    os.makedirs(output_dir, exist_ok=True)
    
    cap = cv2.VideoCapture(input_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    skip = int(fps * interval_seconds)
    frame_count = 0
    saved = 0
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        if frame_count % skip == 0:
            ts = frame_count / fps
            fname = f"{output_dir}/frame_{saved:06d}_{ts:.1f}s.jpg"
            cv2.imwrite(fname, frame)
            saved += 1
        frame_count += 1
    
    cap.release()
    return saved

# 実行例
num = extract_frames_from_video("FallDetection.mp4", interval_seconds=1)
print(f"抽出したフレーム数: {num}")


In [0]:
import os
import re
import base64
import json
import mlflow
import pandas as pd
from datetime import datetime
from openai import OpenAI
from pyspark.sql import SparkSession

# ──────────────────────────────────────────────────────────
# Databricks・MLflow 初期設定
# ──────────────────────────────────────────────────────────
mlflow.set_tracking_uri("databricks")
mlflow.set_registry_uri("databricks-uc")
mlflow.openai.autolog()

# 実験名は絶対パス＋タイムスタンプで一意化
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
user = spark.sql("SELECT current_user()").collect()[0][0]
EXP_PATH = f"/Users/{user}/{timestamp}_{SUFFIX}"
mlflow.set_experiment(EXP_PATH)

# ──────────────────────────────────────────────────────────
# モデル定義
# ──────────────────────────────────────────────────────────
MULTIMODAL_MODELS = {
    "databricks-llama-4-maverick": "databricks-llama-4-maverick",
    "databricks-claude-3-7-sonnet": "databricks-claude-3-7-sonnet"
}

def get_openai_client() -> OpenAI:
    """Databricks Foundation Model API 用 OpenAI クライアント"""
    return OpenAI(
        api_key=dbutils.notebook.entry_point.getDbutils()
            .notebook().getContext().apiToken().getOrElse(None),
        base_url=f"https://{spark.conf.get('spark.databricks.workspaceUrl')}/serving-endpoints"
    )

# ──────────────────────────────────────────────────────────
# 推論プロンプト
# ──────────────────────────────────────────────────────────
FULL_PROMPT = """
以下を満たすように画像を分析してください。以下の形式で必ず日本語で答えてください。説明については、自由記述して構いません。：
1. 人が転倒しているか
2. 危険度
3. 緊急対応要否
回答形式：
転倒検知: はい/いいえ
危険度: 高/中/低
説明: ...
"""

def analyze_image(image_path: str, model_name: str, client: OpenAI):
    """画像を指定モデルで解析し、結果を返す"""
    with open(image_path, "rb") as f:
        img_b64 = base64.b64encode(f.read()).decode()
    resp = client.chat.completions.create(
        model=MULTIMODAL_MODELS[model_name],
        messages=[{
            "role": "user",
            "content": [
                {"type": "text", "text": FULL_PROMPT},
                {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_b64}"}}
            ]
        }],
        max_tokens=400,
        temperature=0.1
    )
    text = resp.choices[0].message.content or ""
    is_fall = bool(re.search(r"転倒検知[:：]\s*(はい|Yes|yes)", text, re.IGNORECASE))
    return text, is_fall

# ──────────────────────────────────────────────────────────
# Volume 内全画像を評価し、Run 毎に image_path をパラメータとして記録
# ──────────────────────────────────────────────────────────
def evaluate_volume_images():
    if not os.path.exists(FRAME_VOL):
        raise FileNotFoundError(f"Volume パスが存在しません: {FRAME_VOL}")

    img_files = [
        os.path.join(r, f)
        for r, _, fs in os.walk(FRAME_VOL)
        for f in fs if f.lower().endswith((".jpg", ".jpeg", ".png"))
    ]
    if not img_files:
        raise RuntimeError(f"{FRAME_VOL} に画像が見つかりません。")

    client = get_openai_client()
    results = []

    # 全画像×モデルそれぞれを個別の Run として記録
    for image_path in img_files:
        image_name = os.path.basename(image_path)
        for model in MULTIMODAL_MODELS:
            run_name = f"{image_name}_{model}"
            with mlflow.start_run(run_name=run_name, nested=True):
                # Run パラメータに image_path と model を記録
                mlflow.log_param("image_path", image_path)
                mlflow.log_param("model", model)

                # 画像を解析
                text, is_fall = analyze_image(image_path, model, client)

                # 結果テキストをアーティファクトとして保存
                mlflow.log_text(text, "response.txt")

                # 転倒検知結果をメトリクスとして保存
                mlflow.log_metric("is_fall", int(is_fall))

                results.append({
                    "image_path": image_path,
                    "model": model,
                    "is_fall": is_fall,
                    "response_artifact": "response.txt"
                })

    return results

if __name__ == "__main__":
    print("Unity Catalog Volume 内の全画像について、モデル別に個別 Run を作成し、")
    print("image_path パラメータを表示して MLflow UI で比較可能にします。")
    evaluate_volume_images()


In [0]:
import os
import pandas as pd
from sentence_transformers import SentenceTransformer
from PIL import Image
import numpy as np

model = SentenceTransformer('clip-ViT-B-32')  # 例: CLIPモデル

def image_to_vec(img_path):
    image = Image.open(img_path).convert("RGB")
    return model.encode(image)

data = []
for root, _, files in os.walk(FRAME_VOL):
    for f in files:
        if f.endswith(".jpg"):
            path = os.path.join(root, f)
            vec = image_to_vec(path)
            data.append({
                "image_path": path,
                "embedding": vec.tolist()
            })

df = pd.DataFrame(data)

In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T

spark = SparkSession.getActiveSession()

schema = T.StructType([
    T.StructField("image_path", T.StringType(), False),
    T.StructField("embedding", T.ArrayType(T.FloatType()), False)
])

spark_df = spark.createDataFrame(df, schema=schema)
DELTA_TABLE_NAME = f"{CATALOG}.{SCHEMA}.frame_embeddings"
spark_df.write.format("delta").mode("overwrite").saveAsTable(DELTA_TABLE_NAME)

In [0]:
spark.sql(f"ALTER TABLE {DELTA_TABLE_NAME} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")

In [0]:
import time
import json
from databricks.vector_search.client import VectorSearchClient


client = VectorSearchClient()
endpoint_name = f"fall_detection_vector_search_{SUFFIX}"
index_name = f"{CATALOG}.{SCHEMA}.fall_detection_index"
embedding_dimension = 512  # モデルに合わせて変更

# 1. エンドポイント作成
client.create_endpoint(
    name=endpoint_name,
    endpoint_type="STANDARD"  # または "STORAGE_OPTIMIZED"
)


# 2. インデックス作成
client.create_delta_sync_index(
    endpoint_name=endpoint_name,
    index_name=index_name,
    source_table_name=DELTA_TABLE_NAME,  # Deltaテーブルの完全修飾名
    pipeline_type="TRIGGERED",           # または "CONTINUOUS"
    primary_key="image_path",
    embedding_dimension=embedding_dimension,
    embedding_vector_column="embedding"
)


In [0]:
import mlflow.pyfunc
from sentence_transformers import SentenceTransformer
from databricks.vector_search.client import VectorSearchClient

class ImageSearchAgent(mlflow.pyfunc.PythonModel):
    def __init__(self, index_name, embedding_model_name="clip-ViT-B-32"):
        self.index_name = index_name
        self.embedding_model_name = embedding_model_name

    def load_context(self, context):
        self.model = SentenceTransformer(self.embedding_model_name)
        self.vs_client = VectorSearchClient()

    def predict(self, context, model_input):
        # model_input: dict {"query": "転倒している人の画像"}
        query = model_input["query"]
        query_vec = self.model.encode([query])[0]
        # ベクトル検索
        results = self.vs_client.similarity_search(
            index_name=self.index_name,
            query_vector=query_vec.tolist(),
            k=5  # 上位5件
        )
        # 結果の画像パスなどを返す
        return [r['image_path'] for r in results['matches']]


In [0]:
import mlflow
from mlflow.models.signature import infer_signature
from mlflow.models import ModelSignature
from mlflow.models.resources import DatabricksVectorSearchIndex
import pandas as pd
from sentence_transformers import SentenceTransformer
from databricks.vector_search.client import VectorSearchClient

class ImageSearchAgent(mlflow.pyfunc.PythonModel):
    def __init__(self, index_name, embedding_model_name="clip-ViT-B-32"):
        self.index_name = index_name
        self.embedding_model_name = embedding_model_name

    def load_context(self, context):
        self.model = SentenceTransformer(self.embedding_model_name)
        self.vs_client = VectorSearchClient()

    def predict(self, context, model_input):
        # model_input: pandas.DataFrame with column "query"
        if isinstance(model_input, pd.DataFrame):
            queries = model_input["query"].tolist()
        elif isinstance(model_input, dict):
            queries = [model_input["query"]]
        else:
            raise ValueError("Input must be a DataFrame or dict with 'query' key.")
        results = []
        for query in queries:
            query_vec = self.model.encode([query])[0]
            # Vector Search API: search
            search_result = self.vs_client.search(
                index_name=self.index_name,
                query_vector=query_vec.tolist(),
                top_k=5
            )
            # 画像パスのみ抽出（適宜修正）
            image_paths = [hit['image_path'] for hit in search_result['results'][0]['hits']]
            results.append(image_paths)
        return results

# 入力例と出力例
input_example = pd.DataFrame({"query": ["転倒している人の画像"]})
# 仮の出力例（実際はagent.predictで取得）
output_example = [["/Volumes/twatanabe_demo/fall_catalog_detection/frame_volume/xxx.jpg"]]

# signature
signature = infer_signature(input_example, output_example)

index_name = "twatanabe_demo.fall_catalog_detection.fall_detection_index"
registered_model_name = f"{CATALOG}.{SCHEMA}.image_search_agent"

mlflow.set_registry_uri("databricks-uc")

with mlflow.start_run() as run:
    mlflow.pyfunc.log_model(
        name="image_search_agent",  # MLflow 3.xではnameを推奨
        python_model=ImageSearchAgent(index_name=index_name),
        input_example=input_example,
        signature=signature,
        resources=[DatabricksVectorSearchIndex(index_name=index_name)]
    )
    model_uri = f"runs:/{run.info.run_id}/image_search_agent"

mlflow.register_model(model_uri, registered_model_name)
