# 金融データ活用チャレンジ: Databricks Free Editionで取り組む提案書自動生成

このノートブックでは、第4回金融データ活用チャレンジに対して **Databricks Free Edition** の機能を活用し、
生成AIによる提案書自動生成パイプラインを構築するデモンストレーションを行います。

## アーキテクチャ概要

| レイヤー | 使用機能 | 内容 |
|---|---|---|
| データ準備 | Unity Catalog, Volumes | 財務CSV・有価証券報告書PDFの一元管理 |
| テキスト抽出 | Serverless Notebook | PDFからの定性情報抽出 |
| 財務分析 | PySpark, pandas | 定量データの分析・指標算出 |
| 提案書生成 | Foundation Model APIs, MLflow | マルチステップLLMパイプライン |
| 評価 | mlflow.genai.evaluate() | カスタムScorerによるLLM-as-a-Judge品質評価 |
| 出力 | python-docx | .docx形式での提案書出力 |

## 0. セットアップ

In [0]:
%pip install --upgrade "mlflow[databricks]>=3.1" python-docx pymupdf --quiet
dbutils.library.restartPython()

In [0]:
import pandas as pd
import json
import os
import re
import fitz  # PyMuPDF
from docx import Document
from docx.shared import Pt, Inches, Cm, RGBColor
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.oxml import OxmlElement
from docx.oxml.ns import qn

import mlflow
from mlflow.tracking import MlflowClient

# MLflow実験の設定
EXPERIMENT_NAME = "/Users/{}/signate-findata-challenge".format(
    spark.sql("SELECT current_user()").collect()[0][0]
)
mlflow.set_experiment(EXPERIMENT_NAME)

print(f"MLflow Experiment: {EXPERIMENT_NAME}")

## 1. データ準備 (Unity Catalog)

アップロードしたデータをUnity Catalogのテーブルおよびボリュームに格納します。
Free Editionではデフォルトカタログ(`main`)が利用可能です。

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS workspace.signate_findata;
CREATE VOLUME IF NOT EXISTS workspace.signate_findata.raw_data;

In [0]:
import zipfile

VOLUME_PATH = "/Volumes/workspace/signate_findata/raw_data"

zip_path = f"{VOLUME_PATH}/securities_report.zip"
extract_dir = f"{VOLUME_PATH}/securities_report"

if os.path.exists(zip_path):
    os.makedirs(extract_dir, exist_ok=True)
    with zipfile.ZipFile(zip_path, "r") as zf:
        for info in zf.infolist():
            # 日本語ファイル名の文字化け対策:
            # ZIPがcp437でエンコードされたファイル名を持つ場合、
            # cp437 → bytes → utf-8 (or shift_jis) で正しく復元する
            try:
                decoded_name = info.filename.encode("cp437").decode("utf-8")
            except (UnicodeDecodeError, UnicodeEncodeError):
                try:
                    decoded_name = info.filename.encode("cp437").decode("shift_jis")
                except (UnicodeDecodeError, UnicodeEncodeError):
                    decoded_name = info.filename  # フォールバック

            # ディレクトリエントリはスキップ
            if decoded_name.endswith("/"):
                os.makedirs(os.path.join(VOLUME_PATH, decoded_name), exist_ok=True)
                continue

            target_path = os.path.join(VOLUME_PATH, decoded_name)
            os.makedirs(os.path.dirname(target_path), exist_ok=True)
            with zf.open(info) as src, open(target_path, "wb") as dst:
                dst.write(src.read())

    pdf_count = len([f for f in os.listdir(extract_dir) if f.endswith(".pdf")])
    print(f"解凍完了: {extract_dir} ({pdf_count} ファイル)")
else:
    print(f"ZIPファイルが見つかりません: {zip_path}")
    print("Volume UIからsecurities_report.zipをアップロードしてください")

In [0]:

# financial_data.csvの読み込み
df_financial = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(f"{VOLUME_PATH}/financial_data.csv")
)

# Delta Tableとして保存
df_financial.write.mode("overwrite").saveAsTable("workspace.signate_findata.financial_data")

display(df_financial.select("コード", "本社所在地", "業種分類", "YEAR", "売上高", "営業利益", "当期純利益", "総資産", "純資産", "従業員数（連結）"))

### 対象企業一覧

| コード | 所在地 | 業種分類 | 特徴 |
|---|---|---|---|
| 12044 | 茨城 | 総合建設・土木 | 急成長、従業員増 |
| 71768 | 高知県 | ホールディングス・多角化 | 安定収益 |
| 73617 | 岡山県 | 住宅・ハウスメーカー | V字回復 |
| 99702 | 滋賀県 | 道路・基礎・インフラ | 堅調成長 |
| 141634 | 和歌山県 | 道路・基礎・インフラ | 赤字→黒字転換 |
| 184226 | 岩手県 | 専門工事 | 直近赤字転落 |
| 244359 | 静岡県 | 総合建設・土木 | 売上縮小傾向 |
| 292640 | 北海道 | 専門工事 | 安定だが利益減少 |
| 308582 | 宮崎県 | 環境・エネルギー関連 | 直近大幅赤字 |
| 325042 | 新潟県 | 専門工事 | 高利益率 |

## 2. 有価証券報告書のテキスト抽出

In [0]:
def extract_pdf_text(pdf_path: str) -> str:
    """PDFからテキストを抽出する"""
    doc = fitz.open(pdf_path)
    text_parts = []
    for page in doc:
        text_parts.append(page.get_text())
    doc.close()
    return "\n".join(text_parts)


def extract_securities_reports(volume_path: str) -> dict:
    """全社の有価証券報告書を抽出してコード別辞書として返す"""
    reports = {}
    pdf_dir = f"{volume_path}/securities_report"

    for filename in os.listdir(pdf_dir):
        if not filename.endswith(".pdf"):
            continue
        # ファイル名から企業コードを抽出
        match = re.search(r"(\d+)", filename)
        if match:
            code = int(match.group(1))
            filepath = os.path.join(pdf_dir, filename)
            text = extract_pdf_text(filepath)
            reports[code] = text
            print(f"  コード {code}: {len(text):,} 文字抽出")

    return reports


# 全社の有価証券報告書を抽出
print("有価証券報告書のテキスト抽出中...")
securities_texts = extract_securities_reports(VOLUME_PATH)
print(f"\n合計 {len(securities_texts)} 社分の報告書を抽出完了")

In [0]:
# テキストデータをテーブルとして保存(リネージ追跡のため)
reports_data = [
    {"コード": code, "report_text": text, "char_count": len(text)}
    for code, text in securities_texts.items()
]
df_reports = spark.createDataFrame(reports_data)
df_reports.write.mode("overwrite").saveAsTable(
    "workspace.signate_findata.securities_reports"
)

display(df_reports.select("コード", "char_count"))

## 3. 財務分析 (定量データ)

各企業の3年分の財務データから主要な指標を算出し、分析サマリーを生成します。

In [0]:
pdf = spark.table("workspace.signate_findata.financial_data").toPandas()

def compute_financial_metrics(company_df: pd.DataFrame) -> dict:
    """企業の3年分データから主要財務指標を算出"""
    company_df = company_df.sort_values("YEAR")
    latest = company_df.iloc[-1]
    prev = company_df.iloc[-2] if len(company_df) > 1 else latest
    oldest = company_df.iloc[0]

    # 基本情報
    metrics = {
        "コード": int(latest["コード"]),
        "本社所在地": latest["本社所在地"],
        "業種分類": latest["業種分類"],
        "従業員数": int(latest["従業員数（連結）"]),
        "資本金_億円": float(latest["資本金（億円）"]),
    }

    # 収益性指標
    metrics["売上高_最新_億円"] = round(latest["売上高"] / 1e8, 1)
    metrics["営業利益_最新_億円"] = round(latest["営業利益"] / 1e8, 1)
    metrics["営業利益率_最新"] = round(latest["営業利益"] / latest["売上高"] * 100, 2) if latest["売上高"] != 0 else 0
    metrics["当期純利益_最新_億円"] = round(latest["当期純利益"] / 1e8, 1)

    # 成長性指標
    if oldest["売上高"] != 0:
        metrics["売上高成長率_3年"] = round((latest["売上高"] / oldest["売上高"] - 1) * 100, 1)
    else:
        metrics["売上高成長率_3年"] = None

    if prev["売上高"] != 0:
        metrics["売上高成長率_YoY"] = round((latest["売上高"] / prev["売上高"] - 1) * 100, 1)
    else:
        metrics["売上高成長率_YoY"] = None

    # 安全性指標
    metrics["自己資本比率"] = round(latest["純資産"] / latest["総資産"] * 100, 1) if latest["総資産"] != 0 else 0
    metrics["総資産_億円"] = round(latest["総資産"] / 1e8, 1)

    # キャッシュフロー
    metrics["営業CF_億円"] = round(latest["営業活動によるキャッシュ・フロー"] / 1e8, 1)
    metrics["投資CF_億円"] = round(latest["投資活動によるキャッシュ・フロー"] / 1e8, 1)
    metrics["財務CF_億円"] = round(latest["財務活動によるキャッシュ・フロー"] / 1e8, 1)

    # 3年分の推移データ
    metrics["売上高推移"] = company_df[["YEAR", "売上高"]].to_dict("records")
    metrics["営業利益推移"] = company_df[["YEAR", "営業利益"]].to_dict("records")
    metrics["当期純利益推移"] = company_df[["YEAR", "当期純利益"]].to_dict("records")

    return metrics


# 全社の指標を算出
all_metrics = {}
for code in pdf["コード"].unique():
    company_df = pdf[pdf["コード"] == code]
    all_metrics[int(code)] = compute_financial_metrics(company_df)

# 確認
for code, m in all_metrics.items():
    print(f"コード {code} ({m['本社所在地']}, {m['業種分類']}): "
          f"売上 {m['売上高_最新_億円']}億円, 営業利益率 {m['営業利益率_最新']}%, "
          f"自己資本比率 {m['自己資本比率']}%")

## 4. LLMパイプラインの構築

Foundation Model APIsを使って、マルチステップで提案書を生成します。

### パイプラインの流れ
1. **Step 1**: 財務データ分析 → 経営課題の抽出
2. **Step 2**: 有価証券報告書の要約 → 事業特性・リスクの把握
3. **Step 3**: 地域特性の分析
4. **Step 4**: 統合提案書の生成

In [0]:
import mlflow.deployments


def get_llm_client():
    """Databricks Foundation Model APIsクライアントを取得"""
    return mlflow.deployments.get_deploy_client("databricks")


def call_llm(
    prompt: str,
    system_prompt: str = "",
    model: str = "databricks-gpt-oss-120b",
    max_tokens: int = 4096,
    temperature: float = 0.3,
) -> str:
    """
    Foundation Model APIs経由でLLMを呼び出す。

    Free Editionではpay-per-tokenの以下のモデルが利用可能(時期により変動):
    - databricks-gpt-oss-120b (OpenAI GPT OSS 120B, 推論モデル, 128Kコンテキスト)
    - databricks-gpt-oss-20b (OpenAI GPT OSS 20B, 軽量推論モデル)
    - databricks-meta-llama-3-3-70b-instruct (Meta Llama 3.3 70B)
    - databricks-qwen3-next-80b-a3b-instruct (Qwen3-Next 80B)
    """
    client = get_llm_client()

    messages = []
    if system_prompt:
        messages.append({"role": "system", "content": system_prompt})
    messages.append({"role": "user", "content": prompt})

    response = client.predict(
        endpoint=model,
        inputs={
            "messages": messages,
            "max_tokens": max_tokens,
            "temperature": temperature,
        },
    )
    content = response.choices[0]["message"]["content"]

    # 推論モデル(GPT OSS 120B等)はcontentがlist形式で返る場合がある
    # 例: [{"type": "thinking", "thinking": "..."}, {"type": "text", "text": "..."}]
    if isinstance(content, list):
        text_parts = []
        for block in content:
            if isinstance(block, dict) and block.get("type") == "text":
                text_parts.append(block.get("text", ""))
            elif isinstance(block, str):
                text_parts.append(block)
        return "\n".join(text_parts)

    return content

### 4.1 プロンプトテンプレート (MLflow Prompt Registry)

プロンプトをMLflowで管理し、バージョニング・再現性を確保します。

In [0]:
import mlflow.genai

# --- テンプレート定義 (Jinja2形式: {{ variable }}) ---
# MLflow Prompt Registry はテンプレート変数に {{ }} を使用

SYSTEM_PROMPT_TEMPLATE = """あなたは地域金融機関の熟練バンカーであり、取引先企業の成長支援を専門としています。
建設業界の事業環境に精通し、以下の視点で企業分析と提案を行います:
- 財務分析(PL/BS/CFの3年推移)
- 地域特性(商圏、人口動態、地域の産業構造)
- 業界動向(2024年問題、GX/DX、人材不足)
- 販路特性(官公庁/民間、元請/下請)

提案は具体的かつ実行可能なものとし、数値的な根拠を示してください。"""

FINANCIAL_ANALYSIS_TEMPLATE = """以下の企業の財務データを分析し、経営課題を抽出してください。

## 企業情報
- コード: {{ code }}
- 所在地: {{ location }}
- 業種: {{ industry }}
- 従業員数: {{ employees }}名
- 資本金: {{ capital }}億円

## 財務指標
- 売上高(最新): {{ revenue }}億円 (3年成長率: {{ revenue_growth_3y }}%)
- 営業利益(最新): {{ operating_income }}億円 (営業利益率: {{ operating_margin }}%)
- 当期純利益(最新): {{ net_income }}億円
- 自己資本比率: {{ equity_ratio }}%
- 営業CF: {{ operating_cf }}億円 / 投資CF: {{ investing_cf }}億円 / 財務CF: {{ financing_cf }}億円

## 売上高推移
{{ revenue_trend }}

## 営業利益推移
{{ operating_income_trend }}

## 分析項目
1. 収益性分析: 売上高・利益の推移から見える収益構造の特徴と課題
2. 安全性分析: 自己資本比率、キャッシュフローの健全性
3. 成長性分析: 売上・利益の成長トレンドと持続可能性
4. 主要な経営課題(3つ以上)

各項目について、具体的な数値を引用しながら分析してください。"""

SECURITIES_ANALYSIS_TEMPLATE = """以下は{{ company_name }}(コード: {{ code }}, 所在地: {{ location }}, 業種: {{ industry }})の有価証券報告書から抽出した情報です。

## 有価証券報告書(抜粋)
{{ report_text }}

## 分析してください
1. 事業の概要と主力事業
2. セグメント構成と売上構成比
3. 経営方針・中期戦略
4. 主要なリスク要因
5. 販路特性(官公庁/民間、元請/下請)
6. 競争優位性・強み
7. 技術力・設備投資の方向性

建設業界の専門家の視点で、事業構造の特徴を簡潔にまとめてください。"""

REGIONAL_ANALYSIS_TEMPLATE = """以下の建設関連企業の地域特性を分析してください。

## 企業情報
- 企業コード: {{ code }}
- 所在地: {{ location }}
- 業種: {{ industry }}

## 分析項目
1. 地域の建設需要の特徴(公共工事/民間工事の比率、主要プロジェクト)
2. 人口動態と将来予測(人口減少率、高齢化率)
3. 地域の産業構造と建設業の位置づけ
4. 行政施策(国土強靭化、インフラ更新、地方創生)
5. 地域固有のリスクと機会(自然災害リスク、再開発計画など)
6. 地域内の競合環境

{{ location }}という立地を踏まえた、地域密着型の視点で分析してください。"""

PROPOSAL_GENERATION_TEMPLATE = """以下の分析結果を統合し、提案書を作成してください。

## 対象企業
- コード: {{ code }}
- 企業名(有報より): {{ company_name }}
- 所在地: {{ location }}
- 業種: {{ industry }}

## Step 1: 財務分析結果
{{ financial_analysis }}

## Step 2: 有価証券報告書分析結果
{{ securities_analysis }}

## Step 3: 地域特性分析結果
{{ regional_analysis }}

## 提案書の構成(以下の順に記述してください)

### 1. エグゼクティブサマリー
全体の要約を3-5行で記述。

### 2. 企業概要・分析
#### 2.1 事業概要
#### 2.2 財務分析(過去3年の推移と特徴)
#### 2.3 外部環境分析(地域特性・業界動向)

### 3. 経営課題の抽出
財務面・事業面・人材面の課題を整理。

### 4. 成長戦略の提案
#### 4.1 短期施策(1年以内)
#### 4.2 中期施策(1-3年)
#### 4.3 長期ビジョン(3-5年)

各施策には以下を含めること:
- 施策の具体的内容
- 期待される効果(定量的に)
- 必要な投資額の概算
- GX/DXの観点を含む提案を最低1つ
- 人材確保・育成に関する提案を最低1つ

### 5. 効果試算
提案施策の実行による3年後の財務インパクト(売上高、営業利益率の改善見込み)。

### 6. ロードマップ
四半期単位の実行スケジュール。

提案は{{ location }}の地域特性を反映した具体的な内容とし、
建設業界の2024年問題、GX、DXトレンドを踏まえたものにしてください。
文字数は10,000〜15,000字を目標としてください。"""


# --- MLflow Prompt Registry に登録 ---
# register_prompt はべき等: 同一テンプレートなら既存バージョンを返す
PROMPT_PREFIX = "workspace.signate_findata"

PROMPT_DEFS = {
    f"{PROMPT_PREFIX}.signate_system_prompt": SYSTEM_PROMPT_TEMPLATE,
    f"{PROMPT_PREFIX}.signate_financial_analysis": FINANCIAL_ANALYSIS_TEMPLATE,
    f"{PROMPT_PREFIX}.signate_securities_analysis": SECURITIES_ANALYSIS_TEMPLATE,
    f"{PROMPT_PREFIX}.signate_regional_analysis": REGIONAL_ANALYSIS_TEMPLATE,
    f"{PROMPT_PREFIX}.signate_proposal_generation": PROPOSAL_GENERATION_TEMPLATE,
}

# Prompt Registry に登録し、latest エイリアスを付与
for name, template in PROMPT_DEFS.items():
    pv = mlflow.genai.register_prompt(name=name, template=template)
    mlflow.genai.set_prompt_alias(name=name, alias="latest", version=pv.version)
    print(f"  登録: {name} (version={pv.version} → @latest)")

print("\n✅ 全プロンプトを MLflow Prompt Registry に登録完了")
print("   MLflow UI > Prompt Registry で確認できます")

### 4.2 パイプライン実行

In [0]:
def format_trend(records: list, col: str) -> str:
    """推移データを文字列にフォーマット"""
    lines = []
    for r in records:
        val = r[col] / 1e8
        lines.append(f"  - {int(r['YEAR'])}年: {val:,.1f} 億円")
    return "\n".join(lines)


def generate_proposal_for_company(
    code: int,
    metrics: dict,
    report_text: str,
    model: str = "databricks-gpt-oss-120b",
) -> dict:
    """
    1社分の提案書を生成する完全パイプライン。
    各ステップの入出力をMLflowに記録する。
    プロンプトは MLflow Prompt Registry から load して使用。
    """
    results = {"code": code, "steps": {}}

    # --- Prompt Registry から @latest でロード ---
    system_pv = mlflow.genai.load_prompt(f"{PROMPT_PREFIX}.signate_system_prompt")
    financial_pv = mlflow.genai.load_prompt(f"{PROMPT_PREFIX}.signate_financial_analysis")
    securities_pv = mlflow.genai.load_prompt(f"{PROMPT_PREFIX}.signate_securities_analysis")
    regional_pv = mlflow.genai.load_prompt(f"{PROMPT_PREFIX}.signate_regional_analysis")
    proposal_pv = mlflow.genai.load_prompt(f"{PROMPT_PREFIX}.signate_proposal_generation")

    system_prompt_text = system_pv.template  # 変数なしテンプレート

    with mlflow.start_run(run_name=f"proposal_{code}") as run:
        mlflow.log_param("company_code", code)
        mlflow.log_param("location", metrics["本社所在地"])
        mlflow.log_param("industry", metrics["業種分類"])
        mlflow.log_param("model", model)
        # プロンプトバージョンを記録 (再現性)
        mlflow.log_param("prompt_version_system", system_pv.version)
        mlflow.log_param("prompt_version_financial", financial_pv.version)
        mlflow.log_param("prompt_version_securities", securities_pv.version)
        mlflow.log_param("prompt_version_regional", regional_pv.version)
        mlflow.log_param("prompt_version_proposal", proposal_pv.version)

        # ============================
        # Step 1: 財務分析
        # ============================
        print(f"  [Step 1] 財務分析...")
        financial_prompt = financial_pv.format(
            code=code,
            location=metrics["本社所在地"],
            industry=metrics["業種分類"],
            employees=metrics["従業員数"],
            capital=metrics["資本金_億円"],
            revenue=metrics["売上高_最新_億円"],
            revenue_growth_3y=metrics["売上高成長率_3年"],
            operating_income=metrics["営業利益_最新_億円"],
            operating_margin=metrics["営業利益率_最新"],
            net_income=metrics["当期純利益_最新_億円"],
            equity_ratio=metrics["自己資本比率"],
            operating_cf=metrics["営業CF_億円"],
            investing_cf=metrics["投資CF_億円"],
            financing_cf=metrics["財務CF_億円"],
            revenue_trend=format_trend(metrics["売上高推移"], "売上高"),
            operating_income_trend=format_trend(metrics["営業利益推移"], "営業利益"),
        )

        financial_analysis = call_llm(
            financial_prompt, system_prompt=system_prompt_text, model=model
        )
        results["steps"]["financial_analysis"] = financial_analysis
        mlflow.log_text(financial_prompt, "prompts/step1_financial_input.txt")
        mlflow.log_text(financial_analysis, "outputs/step1_financial_output.txt")

        # ============================
        # Step 2: 有価証券報告書分析
        # ============================
        print(f"  [Step 2] 有価証券報告書分析...")
        # 報告書テキストはトークン数制限のため先頭部分を使用
        truncated_report = report_text[:15000]

        securities_prompt = securities_pv.format(
            company_name=f"コード{code}の企業",
            code=code,
            location=metrics["本社所在地"],
            industry=metrics["業種分類"],
            report_text=truncated_report,
        )

        securities_analysis = call_llm(
            securities_prompt, system_prompt=system_prompt_text, model=model
        )
        results["steps"]["securities_analysis"] = securities_analysis
        mlflow.log_text(securities_prompt, "prompts/step2_securities_input.txt")
        mlflow.log_text(securities_analysis, "outputs/step2_securities_output.txt")

        # ============================
        # Step 3: 地域分析
        # ============================
        print(f"  [Step 3] 地域特性分析...")
        regional_prompt = regional_pv.format(
            code=code,
            location=metrics["本社所在地"],
            industry=metrics["業種分類"],
        )

        regional_analysis = call_llm(
            regional_prompt, system_prompt=system_prompt_text, model=model
        )
        results["steps"]["regional_analysis"] = regional_analysis
        mlflow.log_text(regional_prompt, "prompts/step3_regional_input.txt")
        mlflow.log_text(regional_analysis, "outputs/step3_regional_output.txt")

        # ============================
        # Step 4: 提案書統合生成
        # ============================
        print(f"  [Step 4] 提案書生成...")
        proposal_prompt = proposal_pv.format(
            code=code,
            company_name=f"コード{code}の企業",
            location=metrics["本社所在地"],
            industry=metrics["業種分類"],
            financial_analysis=financial_analysis,
            securities_analysis=securities_analysis,
            regional_analysis=regional_analysis,
        )

        proposal = call_llm(
            proposal_prompt,
            system_prompt=system_prompt_text,
            model=model,
            max_tokens=8192,
            temperature=0.4,
        )
        results["proposal"] = proposal
        mlflow.log_text(proposal_prompt, "prompts/step4_proposal_input.txt")
        mlflow.log_text(proposal, "outputs/step4_proposal_output.txt")

        # メトリクス記録
        mlflow.log_metric("proposal_length", len(proposal))
        mlflow.log_metric("total_steps", 4)

        print(f"  完了: 提案書 {len(proposal):,} 文字 (Run ID: {run.info.run_id})")

    return results

In [0]:
# デモ: 1社分を実行
# 全社実行する場合は TARGET_CODES = list(all_metrics.keys())
TARGET_CODES = [12044]  # 茨城あずま建設(総合建設・土木)

all_proposals = {}
for code in TARGET_CODES:
    print(f"\n{'='*60}")
    print(f"企業コード {code} の提案書生成開始")
    print(f"{'='*60}")

    result = generate_proposal_for_company(
        code=code,
        metrics=all_metrics[code],
        report_text=securities_texts.get(code, ""),
    )
    all_proposals[code] = result

print(f"\n全 {len(all_proposals)} 社分の提案書生成完了")

## 5. mlflow.genai.evaluate() による品質評価

MLflow 3の`mlflow.genai.evaluate()`とカスタムScorerを使い、
コンペ公式の5軸評価基準でLLM-as-a-Judgeを実行します。
評価結果はMLflow実験UIのTracesタブで確認できます。

In [0]:
from mlflow.genai.scorers import scorer
from mlflow.genai.judges import make_judge
from mlflow.entities import Feedback

# 5つの評価軸それぞれにLLM Judgeを作成
# {{ inputs }}で企業情報(コード・所在地・業種)、{{ outputs }}で提案書本文を参照

JUDGE_PREAMBLE = (
    "あなたは大手銀行の支店長であり熟練の経営コンサルタントです。\n"
    "以下の企業情報と提案書を評価してください。\n\n"
    "## 企業情報\n{{ inputs }}\n\n"
    "## 提案書\n{{ outputs }}\n\n"
)

overall_structure_judge = make_judge(
    name="overall_structure",
    instructions=(
        JUDGE_PREAMBLE
        + "## 評価基準\n"
        "過去3年の財務・事業分析と未来への成長戦略が論理的に接続されているか。"
        "過去の実績データに基づく現状把握と、具体的な成長戦略が一貫した論理で結ばれているかを評価する。"
    ),
    feedback_value_type=int,
    model="databricks:/databricks-gpt-oss-120b",
)

regional_relevance_judge = make_judge(
    name="regional_relevance",
    instructions=(
        JUDGE_PREAMBLE
        + "## 評価基準\n"
        "企業の所在地の商圏・人口動態・行政施策を踏まえた地域密着型の提案か。"
        "地域固有の課題や機会を具体的に反映した提案になっているかを評価する。"
    ),
    feedback_value_type=int,
    model="databricks:/databricks-gpt-oss-120b",
)

industry_understanding_judge = make_judge(
    name="industry_understanding",
    instructions=(
        JUDGE_PREAMBLE
        + "## 評価基準\n"
        "官公庁/民間、元請/下請などの販路特性を把握した分析・提案か。"
        "建設業界特有のビジネス構造を理解した上での提案になっているかを評価する。"
    ),
    feedback_value_type=int,
    model="databricks:/databricks-gpt-oss-120b",
)

gx_dx_response_judge = make_judge(
    name="gx_dx_response",
    instructions=(
        JUDGE_PREAMBLE
        + "## 評価基準\n"
        "環境技術(GX)・省力化技術(DX)など技術トレンドへの対応策が提案されているか。"
        "具体的な技術導入や投資計画が示されているかを評価する。"
    ),
    feedback_value_type=int,
    model="databricks:/databricks-gpt-oss-120b",
)

workforce_demand_judge = make_judge(
    name="workforce_demand",
    instructions=(
        JUDGE_PREAMBLE
        + "## 評価基準\n"
        "人手不足・需要変化に対する実効性のある解決策が示されているか。"
        "採用・育成・外注戦略や需要変動への対応が具体的に提案されているかを評価する。"
    ),
    feedback_value_type=int,
    model="databricks:/databricks-gpt-oss-120b",
)

# 提案書の文字数チェック(コンペ上限: 15,000文字)
@scorer
def proposal_length_check(outputs: str) -> Feedback:
    char_count = len(outputs)
    is_valid = char_count <= 15000
    return Feedback(
        value="pass" if is_valid else "fail",
        rationale=f"文字数: {char_count}/15,000 ({'OK' if is_valid else '超過'})",
    )

In [0]:
# 提案書生成結果を評価データセットに変換
eval_data = []
for code, result in all_proposals.items():
    eval_data.append({
        "inputs": {
            "company_code": str(code),
            "location": all_metrics[code]["本社所在地"],
            "industry": all_metrics[code]["業種分類"],
        },
        "outputs": result["proposal"],
    })

# mlflow.genai.evaluate() で5軸評価を一括実行
eval_results = mlflow.genai.evaluate(
    data=eval_data,
    scorers=[
        overall_structure_judge,
        regional_relevance_judge,
        industry_understanding_judge,
        gx_dx_response_judge,
        workforce_demand_judge,
        proposal_length_check,
    ],
)

print("評価完了 - MLflow実験UIのTracesタブで詳細を確認できます")
print(f"評価Run ID: {eval_results.run_id}")
# assessments列はネストされたオブジェクトでArrow変換不可のため除外
df_eval = eval_results.tables["eval_results"]
display(df_eval.drop(columns=[c for c in df_eval.columns if "assessments" in c], errors="ignore"))

## 6. 提案書のdocx出力

生成した提案書をWord形式で出力します。

In [0]:
def _add_formatted_runs(paragraph, text):
    """マークダウンのインライン記法(**太字**, *斜体*)をWord書式に変換してRunを追加。
    <br>タグは改行に変換。"""
    # <br>タグを改行マーカーに変換
    text = re.sub(r"<br\s*/?>", "\n", text)

    for segment in text.split("\n"):
        if paragraph.runs and paragraph.runs[-1].text != "":
            # 改行を挿入(最初のセグメント以外)
            run = paragraph.add_run()
            run._element.append(OxmlElement("w:br"))

        pattern = r"(\*\*(.+?)\*\*|\*(.+?)\*)"
        last_end = 0
        for m in re.finditer(pattern, segment):
            if m.start() > last_end:
                paragraph.add_run(segment[last_end : m.start()])
            if m.group(2):  # **太字**
                run = paragraph.add_run(m.group(2))
                run.bold = True
            elif m.group(3):  # *斜体*
                run = paragraph.add_run(m.group(3))
                run.italic = True
            last_end = m.end()
        if last_end < len(segment):
            paragraph.add_run(segment[last_end:])


def _add_formatted_paragraph(doc, text, style=None):
    """マークダウンのインライン記法をWord書式に変換してパラグラフ追加"""
    p = doc.add_paragraph(style=style)
    _add_formatted_runs(p, text)
    return p


def _parse_markdown_table(table_lines):
    """Markdownテーブル行のリストをパースして(header_row, data_rows)を返す。
    区切り行(|---|---|)は自動除外。"""
    rows = []
    for line in table_lines:
        line = line.strip()
        if not line.startswith("|") or not line.endswith("|"):
            continue
        # 区切り行(|---|---|)をスキップ
        inner = line[1:-1]
        if re.match(r"^[\s\-:|]+$", inner):
            continue
        cells = [c.strip() for c in inner.split("|")]
        rows.append(cells)
    if len(rows) < 2:
        return rows[0] if rows else [], []
    return rows[0], rows[1:]


def _add_word_table(doc, header, data_rows):
    """python-docxでWordテーブルを作成"""
    num_cols = len(header)
    table = doc.add_table(rows=1, cols=num_cols)
    table.style = "Light Grid Accent 1"
    table.autofit = True

    # ヘッダー行
    hdr_cells = table.rows[0].cells
    for i, h in enumerate(header):
        p = hdr_cells[i].paragraphs[0]
        _add_formatted_runs(p, h)
        for run in p.runs:
            run.bold = True
            run.font.size = Pt(9)

    # データ行
    for row_data in data_rows:
        row_cells = table.add_row().cells
        for i, cell_text in enumerate(row_data):
            if i < num_cols:
                p = row_cells[i].paragraphs[0]
                _add_formatted_runs(p, cell_text)
                for run in p.runs:
                    run.font.size = Pt(9)

    # テーブル幅をページ幅いっぱいに設定
    tbl = table._tbl
    tbl_pr = tbl.tblPr if tbl.tblPr is not None else OxmlElement("w:tblPr")
    tbl_w = OxmlElement("w:tblW")
    tbl_w.set(qn("w:type"), "pct")
    tbl_w.set(qn("w:w"), "5000")  # 100% = 5000
    tbl_pr.append(tbl_w)

    doc.add_paragraph("")  # テーブル後に空行


def create_proposal_docx(
    code: int, proposal_text: str, metrics: dict, output_dir: str
) -> str:
    """提案書をdocx形式で出力"""
    doc = Document()

    # スタイル設定
    style = doc.styles["Normal"]
    font = style.font
    font.name = "游明朝"
    font.size = Pt(10.5)

    # タイトルページ
    title = doc.add_heading(level=1)
    title.alignment = WD_ALIGN_PARAGRAPH.CENTER
    run = title.add_run("事業成長支援 提案書")
    run.font.size = Pt(24)

    subtitle = doc.add_paragraph()
    subtitle.alignment = WD_ALIGN_PARAGRAPH.CENTER
    subtitle.add_run(
        f"\n対象企業: コード {code}\n"
        f"所在地: {metrics['本社所在地']}\n"
        f"業種: {metrics['業種分類']}\n"
        f"\n作成日: 2026年2月"
    )

    doc.add_page_break()

    # 本文: マークダウンをdocxに変換
    # テーブル行をグループ化するため、行単位ではなくブロック単位で処理
    lines = proposal_text.split("\n")
    i = 0
    while i < len(lines):
        stripped = lines[i].strip()

        if not stripped:
            doc.add_paragraph("")
            i += 1
            continue

        # 見出しレベルの判定
        if stripped.startswith("#### "):
            doc.add_heading(stripped[5:], level=4)
        elif stripped.startswith("### "):
            doc.add_heading(stripped[4:], level=3)
        elif stripped.startswith("## "):
            doc.add_heading(stripped[3:], level=2)
        elif stripped.startswith("# "):
            doc.add_heading(stripped[2:], level=1)
        elif stripped.startswith("- ") or stripped.startswith("* "):
            _add_formatted_paragraph(doc, stripped[2:], style="List Bullet")
        elif re.match(r"^\d+\.\s", stripped):
            text = re.sub(r"^\d+\.\s*", "", stripped)
            _add_formatted_paragraph(doc, text, style="List Number")
        elif stripped.startswith("|") and stripped.endswith("|"):
            # テーブルブロック: 連続する|で始まり|で終わる行をまとめて収集
            table_lines = []
            while i < len(lines):
                s = lines[i].strip()
                if s.startswith("|") and s.endswith("|"):
                    table_lines.append(s)
                    i += 1
                elif re.match(r"^[\s\-:|]+$", s):
                    # 区切り行も含める
                    table_lines.append(s)
                    i += 1
                else:
                    break
            header, data_rows = _parse_markdown_table(table_lines)
            if header:
                _add_word_table(doc, header, data_rows)
            continue  # iは既にインクリメント済み
        elif stripped == "---" or re.match(r"^[-|:]+$", stripped):
            i += 1
            continue
        else:
            _add_formatted_paragraph(doc, stripped)

        i += 1

    # 保存
    output_path = os.path.join(output_dir, f"{code}.docx")
    doc.save(output_path)
    return output_path


# Serverless環境ではVolumesへの直接書き込みでI/Oエラーが発生するため
# ローカルtempに書いてからVolumesにコピーする
import tempfile
import shutil

LOCAL_OUTPUT_DIR = tempfile.mkdtemp(prefix="proposals_")
VOLUME_OUTPUT_DIR = f"{VOLUME_PATH}/proposals"
os.makedirs(VOLUME_OUTPUT_DIR, exist_ok=True)

# docx生成(ローカルtempに書き出し)
for code, result in all_proposals.items():
    path = create_proposal_docx(code, result["proposal"], all_metrics[code], LOCAL_OUTPUT_DIR)
    # Volumesにコピー
    shutil.copy2(path, VOLUME_OUTPUT_DIR)
    print(f"  コード {code}: {VOLUME_OUTPUT_DIR}/{code}.docx")

print("\n提案書のdocx出力完了")

## 7. プロンプトログ・検証プロセスレポートの生成

In [0]:
def create_prompt_log(all_proposals: dict, output_dir: str) -> str:
    """MLflowから取得した全ステップの入出力をプロンプトログとして出力"""
    doc = Document()
    doc.add_heading("プロンプトログ", level=1)
    doc.add_paragraph(
        "本ドキュメントは、提案書生成パイプラインの各ステップにおける "
        "生成AIへの入力(プロンプト)と出力結果を記録したものです。"
    )

    prompt_no = 1
    for code, result in all_proposals.items():
        doc.add_heading(f"対象企業: コード {code}", level=2)

        step_names = {
            "financial_analysis": "Step 1: 財務分析",
            "securities_analysis": "Step 2: 有価証券報告書分析",
            "regional_analysis": "Step 3: 地域特性分析",
        }

        for step_key, step_label in step_names.items():
            doc.add_heading(f"プロンプト #{prompt_no}: {step_label}", level=3)

            doc.add_paragraph("【入力プロンプト(概要)】")
            # プロンプトテンプレート名を記載
            doc.add_paragraph(
                f"テンプレート: {step_key.upper()}_PROMPT を企業コード {code} のデータで埋め込み",
                style="List Bullet",
            )

            doc.add_paragraph("【出力結果】")
            output_text = result["steps"].get(step_key, "(未実行)")
            # 長すぎる場合は先頭を抜粋
            if len(output_text) > 2000:
                doc.add_paragraph(output_text[:2000] + "\n...(以下省略)")
            else:
                doc.add_paragraph(output_text)

            prompt_no += 1

        # Step 4: 提案書生成
        doc.add_heading(f"プロンプト #{prompt_no}: Step 4: 提案書統合生成", level=3)
        doc.add_paragraph("【入力プロンプト(概要)】")
        doc.add_paragraph(
            f"Step 1-3の分析結果を統合し、PROPOSAL_GENERATION_PROMPT テンプレートで提案書を生成",
            style="List Bullet",
        )
        doc.add_paragraph("【出力結果】")
        doc.add_paragraph(f"提案書本文({len(result['proposal']):,}文字) → {code}.docx として出力")
        prompt_no += 1

    output_path = os.path.join(output_dir, "prompt_log.docx")
    doc.save(output_path)
    return output_path


prompt_log_path = create_prompt_log(all_proposals, LOCAL_OUTPUT_DIR)
shutil.copy2(prompt_log_path, VOLUME_OUTPUT_DIR)
print(f"プロンプトログ: {VOLUME_OUTPUT_DIR}/prompt_log.docx")

In [0]:
def create_verification_report(all_proposals: dict, output_dir: str) -> str:
    """検証プロセスレポートを生成"""
    doc = Document()
    doc.add_heading("検証プロセスレポート", level=1)

    doc.add_paragraph(
        "本ドキュメントは、生成AIの出力を人間がどのように検証・修正・採用したかを記録したものです。"
    )

    # テーブル形式で記録
    doc.add_heading("パイプライン設計における検証プロセス", level=2)

    table = doc.add_table(rows=1, cols=3)
    table.style = "Table Grid"
    hdr = table.rows[0].cells
    hdr[0].text = "プロンプトNo."
    hdr[1].text = "成果物に対する評価"
    hdr[2].text = "ネクストアクション"

    verification_steps = [
        (
            "1",
            "単一プロンプトで提案書全体を生成。財務分析が表面的で、地域特性への言及が不足。",
            "マルチステップ化を決定。Step 1(財務分析)を分離し、具体的な数値指標を含む分析を指示。",
        ),
        (
            "2",
            "財務分析の精度が向上。ただし有価証券報告書の定性情報が未活用。",
            "Step 2(有価証券報告書分析)を追加。事業構造・リスク・販路特性の抽出を指示。",
        ),
        (
            "3",
            "定性情報の取り込みにより分析が深化。地域特性の考慮が依然として弱い。",
            "Step 3(地域分析)を追加。所在地の人口動態・行政施策・建設需要を分析する専用ステップを設計。",
        ),
        (
            "4",
            "4ステップ構成で提案書を生成。全体の構成は改善されたが、GX/DXへの言及が一般論に留まる。",
            "提案書生成プロンプトにGX/DX・人材不足対応の具体的要件を追加。効果試算の定量性を強化。",
        ),
        (
            "5",
            "LLM-as-a-Judgeでの評価: 合計スコアの確認。10社間での品質のばらつきを確認。",
            "スコアの低い企業の提案書を重点的にレビューし、プロンプトの改善ポイントを特定。",
        ),
    ]

    for no, evaluation, action in verification_steps:
        row = table.add_row().cells
        row[0].text = no
        row[1].text = evaluation
        row[2].text = action

    doc.add_heading("品質保証の仕組み", level=2)
    doc.add_paragraph(
        "本パイプラインでは以下の仕組みで品質を担保しています:"
    )
    items = [
        "MLflowによる全プロンプト・出力のバージョン管理と追跡",
        "LLM-as-a-Judge(mlflow.genai.evaluate)による5軸自動評価",
        "10社間の評価スコア比較によるばらつき検出",
        "プロンプトテンプレートのイテレーティブな改善サイクル",
    ]
    for item in items:
        doc.add_paragraph(item, style="List Bullet")

    output_path = os.path.join(output_dir, "verification_process_report.docx")
    doc.save(output_path)
    return output_path


verification_path = create_verification_report(all_proposals, LOCAL_OUTPUT_DIR)
shutil.copy2(verification_path, VOLUME_OUTPUT_DIR)
print(f"検証プロセスレポート: {VOLUME_OUTPUT_DIR}/verification_process_report.docx")

## 8. 全社一括実行とスコア比較

以下のセルでは全10社分の提案書を一括生成し、評価スコアを比較します。

> **注意**: 全社実行にはFoundation Model APIsの呼び出し回数が多くなるため、
> Free Editionのクォータに注意してください。

In [0]:
# ※ デモでは上で1社分を実行済み。全社実行する場合は以下を実行:
#
# TARGET_CODES = list(all_metrics.keys())
#
# for code in TARGET_CODES:
#     if code in all_proposals:
#         continue  # 実行済みはスキップ
#     print(f"\n{'='*60}")
#     print(f"企業コード {code} の提案書生成開始")
#     print(f"{'='*60}")
#     result = generate_proposal_for_company(
#         code=code,
#         metrics=all_metrics[code],
#         report_text=securities_texts.get(code, ""),
#     )
#     all_proposals[code] = result
#
#     # docx出力
#     create_proposal_docx(code, result["proposal"], all_metrics[code], LOCAL_OUTPUT_DIR)
#     shutil.copy2(f"{LOCAL_OUTPUT_DIR}/{code}.docx", VOLUME_OUTPUT_DIR)
#
# # 全社分の評価を一括実行(mlflow.genai.evaluate())
# eval_data = [
#     {"inputs": {"company_code": str(c), "location": all_metrics[c]["本社所在地"],
#                 "industry": all_metrics[c]["業種分類"]}, "outputs": r["proposal"]}
#     for c, r in all_proposals.items()
# ]
# eval_results = mlflow.genai.evaluate(
#     data=eval_data,
#     scorers=[overall_structure_judge, regional_relevance_judge,
#              industry_understanding_judge, gx_dx_response_judge,
#              workforce_demand_judge, proposal_length_check],
# )

In [0]:
# mlflow.genai.evaluate()の結果を表示
# eval_results.tables["eval_results"] にスコアが格納されている
df_eval = eval_results.tables["eval_results"]
display(df_eval.drop(columns=[c for c in df_eval.columns if "assessments" in c], errors="ignore"))

## 9. まとめ: LLMOpsとしての構成

### 本デモで活用したDatabricks Free Editionの機能

| 機能 | 用途 |
|---|---|
| Unity Catalog | 財務データ・有価証券報告書テキストのテーブル管理、リネージ追跡 |
| Foundation Model APIs | GPT OSS 120B等のLLM呼び出し(分析・提案書生成・評価) |
| MLflow Experiment Tracking | 全プロンプト・出力のバージョン管理、評価メトリクスの記録 |
| mlflow.genai.evaluate() | カスタムScorer + make_judgeによる5軸LLM-as-a-Judge品質評価 |
| Serverless Notebook | パイプライン実行環境 |
| Jobs | 全10社の一括実行・スケジュール実行(発展) |

### 頑健性・再現性を担保する工夫

1. **プロンプトのテンプレート化**: 企業コードをパラメータとして渡すだけで全社実行可能
2. **マルチステップ分離**: 財務分析→有報分析→地域分析→統合生成の4段構成で各ステップが独立
3. **MLflowによるフルトレーサビリティ**: 入力・出力・メトリクスすべてを記録
4. **自動評価サイクル**: LLM-as-a-Judgeで品質のばらつきを検出し、改善ループを回せる
5. **Unity Catalogによるデータガバナンス**: データリネージの自動追跡

### 発展: Databricks Appsでの対話的UI化

Free Editionでは1つのDatabricks Appをデプロイ可能です。
Streamlit/Gradioベースで、営業担当者が企業を選択して提案書を生成・プレビューするUIを
構築することで、実運用イメージを示すことができます。