サーバレスコンピュートで実行してください

### 0. 初期設定

In [0]:
# カタログ、スキーマ、ボリューム
MY_CATALOG = "komae_demo_v4"        # 使用したいカタログ名に変更してください
MY_SCHEMA = "pdf_parse"             # 使用したいスキーマ名に変更してください
DATA_VOLUME = "raw_data"            # データ置き場
CHECKPOINT_VOLUME = "_checkpoints"  # チェックポイント置き場

In [0]:
# カタログ、スキーマ、ボリューム作成
spark.sql(f"CREATE CATALOG IF NOT EXISTS {MY_CATALOG}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {MY_CATALOG}.{MY_SCHEMA}")

spark.sql(f"USE CATALOG {MY_CATALOG}")
spark.sql(f"USE SCHEMA  {MY_SCHEMA}")

# データ用ボリューム作成
spark.sql(f"CREATE VOLUME IF NOT EXISTS {MY_CATALOG}.{MY_SCHEMA}.{DATA_VOLUME}")

# チェックポイント用ボリューム作成
spark.sql(f"CREATE VOLUME IF NOT EXISTS {MY_CATALOG}.{MY_SCHEMA}.{CHECKPOINT_VOLUME}")

print(f"MY_CATALOG: {MY_CATALOG}")
print(f"MY_SCHEMA: {MY_SCHEMA}")
print(f"DATA_VOLUME: {DATA_VOLUME}")
print(f"CHECKPOINT_VOLUME: {CHECKPOINT_VOLUME}")

In [0]:
# テーブルパスを指定
BRONZE_TABLE = f"{MY_CATALOG}.{MY_SCHEMA}.settlement_bronze"
SILVER_TABLE = f"{MY_CATALOG}.{MY_SCHEMA}.settlement_silver"
GOLD_TABLE   = f"{MY_CATALOG}.{MY_SCHEMA}.settlement_gold"

### 1. bronze

In [0]:
from pyspark.sql import functions as F

# Auto LoaderでPDFをストリーミング読み込み
bronze_stream_df = (
    spark.readStream
        .format("cloudFiles")                                           # Auto Loader を使う指定
        .option("cloudFiles.format", "binaryFile")                      # バイナリファイル（PDFなど）として読み込む
        .option("cloudFiles.includeExistingFiles", "true")              # 初回に既存ファイルも取り込む
        .option("cloudFiles.maxFilesPerTrigger", "50")                  # 1マイクロバッチあたりの最大ファイル数
        .load(f"/Volumes/{MY_CATALOG}/{MY_SCHEMA}/{DATA_VOLUME}")       # PDFファイル置き場（UCボリュームパス）
)

# 必要列だけ抽出（binaryFileソースのうち、path と content だけ使う）
bronze_stream_df = bronze_stream_df.select("path", "content")

# ai_parse_document(content) でPDFバイナリをテキストに変換
bronze_stream_df = bronze_stream_df.select("path", F.expr("ai_parse_document(content)").alias("content"))

# AvailableNow で「このジョブ起動時点までに到着済みの未処理PDF」をまとめて Bronze テーブルに書き込み
bronze_query = (
    bronze_stream_df.writeStream
        .format("delta")                                                           # Delta テーブルとして書き込む
        .option(
            "checkpointLocation",
            f"/Volumes/{MY_CATALOG}/{MY_SCHEMA}/{CHECKPOINT_VOLUME}/settlement_bronze")   # Auto Loaderが「どのファイルをいつ処理したか」という状態を保存するチェックポイント
        .outputMode("append")                                                      # 新規ファイル分を追記
        .trigger(availableNow=True)                                                # 今回の起動時点までの分だけを一気に処理するトリガー
        .toTable(BRONZE_TABLE)                                                     # 書き込み先テーブル
)

# ストリーミング処理（AvailableNow のマイクロバッチ群）が完了するまで、このセルの実行をブロック
bronze_query.awaitTermination()

In [0]:
# Auto Loader適用しない場合、こちらのシンプルなコードでOKです
# spark.sql(f"""
# -- 決算書ボリュームのファイルの文字を抽出し、bronzeテーブルを作成
# CREATE OR REPLACE TABLE {MY_CATALOG}.{MY_SCHEMA}.settlement_bronze AS
# SELECT
#   path,
#   ai_parse_document(content) AS content
# FROM
#   READ_FILES(
#     '/Volumes/{MY_CATALOG}/{MY_SCHEMA}/{MY_VOLUME}/pdf',
#     format => 'binaryFile'
#   );
# """)

### 2. silver

In [0]:
prompt = (
    '貴方はテキスト整理のプロフェッショナルです。決算についてJSON形式でまとめてください。parse_jsonできるように余計な文字列は入れないでください。'
    'それぞれのセグメントごとに売上高、営業利益、対前年同期売上増減率、概況状況をまとめて一つのテキストに整形してください。'
    'Keyはそれぞれ\"セグメント\"、\"売上高\"、\"営業利益\"、\"対前年同期売上増減率\"、\"概況\"でお願いします。'
    '[で始まり、]で終わるJSONで返してください。```json```という文字列は抜いてください。'
)

# 1回目だけテーブル定義を作成
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {SILVER_TABLE} (
  path    STRING,
  summary STRING
)
USING DELTA
""")

# Bronze にある path のうち、まだ Silver に無いものだけ LLM 要約して追加
spark.sql(f"""
MERGE INTO {SILVER_TABLE} s
USING (
  SELECT
    path,
    ai_query(
      'databricks-claude-sonnet-4',
      '{prompt}' || content
    ) AS summary
  FROM {BRONZE_TABLE}
) b
ON s.path = b.path
WHEN NOT MATCHED THEN
  INSERT (path, summary) VALUES (b.path, b.summary)
""")

### 3. gold

In [0]:
# 1回目だけテーブル定義を作成（なければ作る）
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {GOLD_TABLE} (
  id          STRING,
  company     STRING,
  Industry    STRING,
  segment     STRING,
  Sales       DOUBLE,
  profit      DOUBLE,
  yoy_change  STRING,
  summary     STRING
)
USING delta
""")

# まだGoldに入っていない (path, segment) だけを抽出してINSERT
spark.sql(f"""
INSERT INTO {GOLD_TABLE}
SELECT
  uuid() AS id,
  regexp_extract(
    split(s.path, '/')[size(split(s.path, '/'))-1],
    '^(.*)\.',
    1
  ) AS company,
  split(s.path, '/')[5] AS Industry,
  exploded_case.`セグメント` AS segment,
  try_cast(
    regexp_replace(
      regexp_extract(exploded_case.`売上高`, '([0-9,.]+)', 1),
      ',',
      ''
    ) AS DOUBLE
  ) * CASE WHEN exploded_case.`売上高` LIKE '%億%' THEN 100 ELSE 1 END AS Sales,
  try_cast(
    regexp_replace(
      regexp_extract(exploded_case.`営業利益`, '([0-9,.]+)', 1),
      ',',
      ''
    ) AS DOUBLE
  ) * CASE WHEN exploded_case.`営業利益` LIKE '%億%' THEN 100 ELSE 1 END AS profit,
  exploded_case.`対前年同期売上増減率` AS yoy_change,
  exploded_case.`概況` AS summary
FROM (
  SELECT
    path,
    EXPLODE(
      from_json(
        summary,
        'ARRAY<STRUCT<`セグメント`:STRING,`売上高`:STRING,`営業利益`:STRING,`対前年同期売上増減率`:STRING,`概況`:STRING>>'
      )
    ) AS exploded_case
  FROM {SILVER_TABLE}
) s
LEFT ANTI JOIN {GOLD_TABLE} g
ON
  g.company = regexp_extract(
    split(s.path, '/')[size(split(s.path, '/'))-1],
    '^(.*)\.',
    1
  )
  AND g.segment = s.exploded_case.`セグメント`
""")

### 4. 初回実行のみ。以降は再実行不要です。

In [0]:
# 変数定義
TABLE_PATH = f'{MY_CATALOG}.{MY_SCHEMA}.settlement_gold'                 # テーブルパス
PK_CONSTRAINT_NAME = f'pk_settlement_gold'                               # 主キー

# NOT NULL制約の追加
columns_to_set_not_null = [
    'id']

for column in columns_to_set_not_null:
    spark.sql(f"""
    ALTER TABLE {TABLE_PATH}
    ALTER COLUMN {column} SET NOT NULL;
    """)

# 主キー設定
spark.sql(f'''
ALTER TABLE {TABLE_PATH} DROP CONSTRAINT IF EXISTS {PK_CONSTRAINT_NAME};
''')

spark.sql(f'''
ALTER TABLE {TABLE_PATH}
ADD CONSTRAINT {PK_CONSTRAINT_NAME} PRIMARY KEY (id);
''')

# # チェック
# display(
#     spark.sql(f'''
#     DESCRIBE EXTENDED {TABLE_PATH}
#     '''))

In [0]:
# テーブル名
table_name = f'{MY_CATALOG}.{MY_SCHEMA}.settlement_gold'

# テーブルコメント
table_comment = """
テーブル名: settlement_gold / 決算書（ゴールド）
説明: 決算書PDFから抽出した各セグメントの売上高・営業利益・対前年同期比などを構造化した分析用テーブル
"""
spark.sql(f'COMMENT ON TABLE {table_name} IS "{table_comment}"')

# カラムコメント
column_comments = {
    "id": "UUIDで採番した一意のID",
    "company": "決算書ファイル名から抽出した企業名",
    "Industry": "パスから抽出した業種（業界）",
    "segment": "決算書内のセグメント名",
    "Sales": "セグメントの売上高（数値、単位調整済み）",
    "profit": "セグメントの営業利益（数値、単位調整済み）",
    "yoy_change": "対前年同期売上増減率（文字列）",
    "summary": "セグメントの概況説明テキスト"
}

for column, comment in column_comments.items():
    # シングルクォートをエスケープ
    escaped_comment = comment.replace("'", "\\'")
    sql_query = f"ALTER TABLE {table_name} ALTER COLUMN {column} COMMENT '{escaped_comment}'"
    spark.sql(sql_query)