# 文書情報抽出モデルの構築 - MLflow Tutorial

## 概要
このノートブックでは、MLflowを使用して日本語のビジネス文書から情報を抽出するAIモデルを構築し、管理する方法を学びます。

### 学習内容
1. MLflow Prompt Registryの使い方
2. Models from Codeパターンの実装
3. カスタムPyFuncモデルの作成
4. モデルのバージョン管理とエイリアス設定
5. モデルのサービング

## ステップ1: 環境セットアップ

### 必要なライブラリのインストール
- `mlflow[genai]`: MLflowの生成AI機能（Prompt Registryなど）
- `openai`: OpenAI APIクライアント
- `pandas`: データフレーム操作用

**注意**: インストール後、Pythonランタイムを再起動して変更を反映させます。

In [None]:
%pip install -U mlflow[genai] openai pandas

## ステップ2: 認証情報の設定

OpenAI APIを使用するため、APIキーを環境変数に設定します。

**重要**: 
- `YOUR_API_KEY`を実際のAPIキーに置き換えてください
- 本番環境では、Databricks Secretsを使用することを推奨します
  ```python
  os.environ["OPENAI_API_KEY"] = dbutils.secrets.get(scope="my-scope", key="openai-api-key")
  ```

In [None]:
import os

os.environ["OPENAI_API_KEY"] = "YOUR_API_KEY"

## ステップ3: MLflow実験の設定とプロンプト登録

### MLflow Trackingとは？
機械学習実験の記録・管理システムです。パラメータ、メトリクス、モデルを一元管理できます。

### Prompt Registryとは？
プロンプトをバージョン管理し、複数のモデルで再利用できる仕組みです。
プロンプトを変更した際の影響追跡や、A/Bテストが容易になります。

In [None]:
import mlflow

# MLflow Trackingの出力先を設定（ローカルサーバーの例）
# Databricks環境では自動的にワークスペースのMLflowを使用します
mlflow.set_tracking_uri("http://localhost:5000")

# 実験名を設定 - 関連する実行をグループ化します
mlflow.set_experiment("document_extraction_minimal")

# プロンプトの内容を文字列として用意
# このプロンプトは、LLMに対して構造化されたJSON形式で情報を抽出するよう指示します
template_text = """
あなたは日本語のビジネス文書から情報を抽出する補助者です。
入力として任意のテキストを与えます。
以下のJSON形式に従って、必ず有効なJSONだけを返してください。

- company_name: 会社名（不明な場合はnull）
- contract_start_date: 契約開始日（YYYY-MM-DD形式。不明な場合はnull）
- contract_end_date: 契約終了日（YYYY-MM-DD形式。不明な場合はnull）
- monthly_fee_jpy: 月額料金（数値。不明な場合はnull）
- plan_name: プラン名（不明な場合はnull）

制約:
- 回答はJSONオブジェクト1つだけを返してください。
- 余計な文章やコメント、日本語の説明は一切書かないでください。
- nullを使う場合は、小文字のnullを使ってください。

入力テキスト:
{{text}}

出力形式の例:
{
  "company_name": "株式会社サンプル",
  "contract_start_date": "2025-01-01",
  "contract_end_date": "2025-12-31",
  "monthly_fee_jpy": 120000,
  "plan_name": "プレミアム"
}
"""

# プロンプトをMLflow Prompt Registryに登録
# これにより、プロンプトのバージョン管理と再利用が可能になります
mlflow.genai.register_prompt(
    name="document-extraction-system",
    template=template_text,
)

## ステップ4: カスタムモデルクラスの定義

### Models from Codeパターン
コードを直接MLflowモデルとして登録する手法です。以下のメリットがあります：
- モデルとコードが一体化し、再現性が向上
- 依存関係が明示的に管理される
- デプロイが容易

### このセルの動作
`%%writefile`マジックコマンドで、セルの内容を外部ファイルとして保存します。

### 主要コンポーネントの説明

#### 1. DocumentExtractionModelクラス
- MLflowの`PythonModel`を継承したカスタムモデル
- `load_context()`: モデルロード時の初期化処理
- `predict()`: 推論処理のメインロジック

#### 2. トレーシング機能
- `@mlflow.trace()`: 各関数の実行を記録し、デバッグや性能分析に活用
- `SpanType.TOOL`, `SpanType.LLM`, `SpanType.CHAIN`で処理の種類を分類

#### 3. エラー対策
- OpenAIのJSON mode使用時、systemメッセージに"json"を含めることでBAD_REQUESTエラーを回避

In [None]:
%%writefile ./document_extraction_model.py
import json
from typing import Any, Dict, Optional

import pandas as pd
import mlflow
from mlflow.pyfunc import PythonModel
from mlflow.models import set_model
from mlflow.entities import SpanType

from openai import OpenAI

# デフォルト設定
DEFAULT_PROMPT_URI = "prompts:/document-extraction-system/1"
DEFAULT_LLM_MODEL = "gpt-3.5-turbo"

# OpenAI APIの呼び出しを自動的にMLflowに記録
mlflow.openai.autolog()

def _load_prompt(prompt_uri: str):
    """
    Prompt Registryからプロンプトをロードする関数
    URI形式: prompts://<プロンプト名>/<バージョン>
    """
    return mlflow.genai.load_prompt(prompt_uri)

@mlflow.trace(span_type=SpanType.TOOL)
def _render_prompt(prompt, text: str) -> str:
    """
    プロンプトテンプレートに実際のテキストを埋め込む関数
    {{text}}プレースホルダーを入力テキストで置換します
    """
    try:
        return prompt.format(text=text)
    except Exception:
        # フォールバック: format()が使えない場合は文字列置換
        tmpl = getattr(prompt, "template", None)
        if isinstance(tmpl, str):
            return tmpl.replace("{{text}}", text)
        return str(prompt).replace("{{text}}", text)

@mlflow.trace(span_type=SpanType.LLM)
def _call_llm_return_json(*, client, prompt_text: str, model: str, max_tokens: int, temperature: float) -> Dict[str, Any]:
    """
    OpenAI APIを呼び出し、JSON形式で結果を返す関数
    
    Args:
        client: OpenAIクライアント
        prompt_text: 生成済みのプロンプト全文
        model: 使用するLLMモデル名
        max_tokens: 最大トークン数
        temperature: 生成のランダム性（0=決定的、1=創造的）
    
    Returns:
        抽出された情報を含む辞書
    """
    res = client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": "Return ONLY valid JSON (json_object)."},
            {"role": "user", "content": prompt_text},
        ],
        temperature=temperature,
        max_completion_tokens=max_tokens,
        response_format={"type": "json_object"},  # JSON形式を強制
    )
    content = res.choices[0].message.content
    return json.loads(content)


class DocumentExtractionModel(PythonModel):
    """
    ビジネス文書から情報を抽出するカスタムMLflowモデル
    
    入力形式: pandas.DataFrame
        - 必須カラム: 'text' (抽出対象のテキスト)
        - オプションカラム: 'model' (使用するLLMモデル名)
    
    出力形式: pandas.DataFrame
        - 抽出されたJSON項目が各カラムとして返される
    """
    
    def load_context(self, context):
        """
        モデルロード時に1回だけ実行される初期化メソッド
        設定の読み込み、クライアントの初期化、プロンプトのロードを行います
        """
        # model_configから設定を取得
        self.cfg = getattr(context, "model_config", {}) or {}
        
        # OpenAIクライアントを初期化（環境変数からAPIキーを取得）
        self.client = OpenAI()
        
        # 設定値を取得（デフォルト値を指定）
        self.prompt_uri = self.cfg.get("prompt_uri", DEFAULT_PROMPT_URI)
        self.default_model = self.cfg.get("default_model", DEFAULT_LLM_MODEL)
        self.max_tokens = int(self.cfg.get("max_tokens", 1024))
        self.temperature = float(self.cfg.get("temperature", 0.0))
        
        # Prompt Registryからプロンプトをロード
        self.prompt = _load_prompt(self.prompt_uri)
    
    @mlflow.trace(span_type=SpanType.CHAIN)
    def predict(self, context, model_input, params=None):
        """
        推論メソッド - 入力テキストから情報を抽出します
        
        処理フロー:
        1. 入力の検証とDataFrame化
        2. 各行に対してループ処理
        3. プロンプトのレンダリング
        4. LLM呼び出し
        5. 結果の収集とDataFrame化
        """
        # 入力がDataFrameでない場合は変換
        if not isinstance(model_input, pd.DataFrame):
            model_input = pd.DataFrame(model_input)
        
        # 'text'カラムの存在を確認
        if "text" not in model_input.columns:
            raise ValueError("Input must contain column 'text'.")
        
        rows = []
        # 各行を処理
        for _, row in model_input.iterrows():
            text = str(row.get("text", ""))
            model = str(row.get("model", self.default_model))
            
            # プロンプトに実際のテキストを埋め込み
            prompt_text = _render_prompt(self.prompt, text)
            
            # LLMを呼び出して情報抽出
            extracted = _call_llm_return_json(
                client=self.client,
                prompt_text=prompt_text,
                model=model,
                max_tokens=self.max_tokens,
                temperature=self.temperature,
            )
            rows.append(extracted)
        
        return pd.DataFrame(rows)


# ★Models from Codeの重要なポイント★
# set_model()を呼び出して、このファイル全体をMLflowモデルとして認識させます
app = DocumentExtractionModel()
set_model(app)

## ステップ5: モデルの記録と登録

### このステップで行うこと
1. **入力例の作成**: モデルの入力スキーマを自動推論するためのサンプルデータ
2. **MLflow実行の開始**: 実験の記録を開始
3. **モデルのログ記録**: モデルコード、依存関係、設定をMLflowに保存
4. **モデルレジストリへの登録**: 本番環境へのデプロイ準備

### 重要な設定項目
- `python_model`: モデルコードのファイルパス
- `pip_requirements`: 依存パッケージ（デプロイ時に自動インストール）
- `model_config`: モデルに渡す設定（load_context()で参照）
- `registered_model_name`: モデルレジストリでの名前

In [None]:
import pandas as pd
import mlflow

# 入力例の作成
# これはモデルの入力スキーマ推論とテストに使用されます
input_example = pd.DataFrame({
    "text": ["契約者は株式会社サンプルで、契約期間は2025年1月1日から同じ年の末までです。サービスプランはプレミアムをご契約いただいたので、月額120,000円になります。"],
    "model": ["gpt-3.5-turbo"],
})

# MLflow実行を開始（実験の1回の実行を表す）
with mlflow.start_run(run_name="doc-extraction-model-from-code"):
    # PyFuncモデルとして記録
    model_info = mlflow.pyfunc.log_model(
        artifact_path="model",  # モデルの保存先（実行内のパス）
        python_model="./document_extraction_model.py",  # モデルコードのファイル
        registered_model_name="document-extraction-model",  # レジストリでの名前
        input_example=input_example,  # 入力スキーマ推論用
        pip_requirements=[  # 依存パッケージリスト
            "mlflow[genai]>=2.9.0",
            "openai",
            "pandas",
        ],
        model_config={  # モデルに渡す設定（context.model_configで参照可能）
            "prompt_uri": "prompts:/document-extraction-system/1",
            "default_model": "gpt-3.5-turbo",
            "max_tokens": 1024,
            "temperature": 0.0,  # 決定的な出力のため0に設定
        },
    )

    print("Model URI:", model_info.model_uri)

## ステップ6: モデルのロードとテスト

記録したモデルを実際にロードし、推論が正しく動作するかテストします。

### model_uriの形式
- `runs:/<run_id>/model`: 特定の実行からロード
- `models:/<model_name>/<version>`: モデルレジストリからロード
- `models:/<model_name>@<alias>`: エイリアス経由でロード

In [None]:
# モデルをロード
loaded = mlflow.pyfunc.load_model(model_info.model_uri)

# テスト推論を実行
result = loaded.predict(input_example)
print("推論結果:")
print(result)

## ステップ7: モデルエイリアスの設定

### モデルエイリアスとは？
モデルバージョンに人間が理解しやすい名前を付ける機能です。

### 一般的なエイリアス名
- `champion`: 本番環境で使用中のベストモデル
- `challenger`: 評価中の候補モデル
- `staging`: ステージング環境用

### メリット
- バージョン番号を直接指定せずにモデルを参照できる
- モデル切り替えがエイリアスの付け替えだけで完結
- A/Bテストやカナリアリリースが容易

In [None]:
from mlflow import MlflowClient

client = MlflowClient()

# バージョン1に"champion"エイリアスを付与
# これにより models:/document-extraction-model@champion でアクセス可能
client.set_registered_model_alias(
    name="document-extraction-model",
    alias="champion",
    version="1"
)

print("エイリアス 'champion' をバージョン1に設定しました")

## ステップ8: モデルのサービング（デプロイ）

### MLflow Models Serve
モデルをREST APIエンドポイントとして公開する機能です。

### コマンドの説明
- `-m models:/document-extraction-model@champion`: エイリアス経由でモデルを指定
- `-p 5000`: ポート番号
- `--host 0.0.0.0`: すべてのネットワークインターフェースでリッスン

### 使用方法
サービング開始後、以下のようにAPIを呼び出せます:
```bash
curl -X POST http://localhost:5000/invocations \
  -H 'Content-Type: application/json' \
  -d '{
    "dataframe_split": {
      "columns": ["text", "model"],
      "data": [["契約書のテキスト...", "gpt-3.5-turbo"]]
    }
  }'
```

**注意**: 本番環境では、Databricks Model ServingやKubernetes等を使用することを推奨します。

In [None]:
%%sh

mlflow models serve \
  -m models:/document-extraction-model@champion \
  -p 5000 \
  --host 0.0.0.0

## まとめ

### このノートブックで学んだこと

1. **Prompt Registry**: プロンプトのバージョン管理と再利用
2. **Models from Code**: コードベースでのモデル定義と登録
3. **カスタムPyFuncモデル**: 柔軟なモデル実装パターン
4. **MLflowトレーシング**: LLM呼び出しの可観測性向上
5. **モデルエイリアス**: バージョン管理のベストプラクティス
6. **モデルサービング**: REST APIとしてのデプロイ

### 次のステップ

- 異なるLLMモデル（GPT-4、Claude等）でのテスト
- プロンプトの改良とバージョン比較
- 本番環境へのデプロイ
- モニタリングとA/Bテストの実装

### 参考リソース

- [MLflow Documentation](https://mlflow.org/docs/latest/index.html)
- [MLflow Prompt Engineering](https://mlflow.org/docs/latest/llms/prompt-engineering/index.html)
- [MLflow Model Registry](https://mlflow.org/docs/latest/model-registry.html)