[ai_parse_document 関数](https://learn.microsoft.com/ja-jp/azure/databricks/sql/language-manual/functions/ai_parse_document)<br>
[ai_query 関数](https://learn.microsoft.com/ja-jp/azure/databricks/sql/language-manual/functions/ai_query)

In [0]:
%run ./00_config

### 1. bronze

In [0]:
spark.sql(f"""
-- ボリュームのファイルの文字を抽出し、bronzeテーブルを作成
CREATE OR REPLACE TABLE {MY_CATALOG}.{MY_SCHEMA}.sql_receipt_bronze AS
SELECT
  path,
  ai_parse_document(content) AS content
FROM
  READ_FILES(
    '/Volumes/{MY_CATALOG}/{MY_SCHEMA}/{MY_VOLUME}/binary/pdf',
    format => 'binaryFile'
  );
""")

### 2. silver

In [0]:
prompt = """
貴方はテキスト整理のプロフェッショナルです。請求書についてJSON形式でまとめてください。parse_jsonできるように余計な文字列は入れないでください。
発行日（yyyy-MM-dd形式）、請求先名、請求元（郵便番号、住所、企業名、部署名）、明細（商品名、数量、単価、金額）、小計、消費税、合計金額、備考をまとめて一つのテキストに整形してください。
Keyはそれぞれ"発行日(issue_date)"、"請求先名(billing_name)"、"請求元(supplier)"、"明細(details)"、"小計(subtotal)"、"消費税(consumption_tax)"、"合計金額(total_amount)"、"備考(remarks)"、でお願いします。
{で始まり、}で終わるJSONで返してください。``````という文字列は抜いてください。
数値（数量、単価、金額、小計、消費税金額、合計金額）は数字型で扱えるように単位は抜いてください。
不明な場合は空白("") or nullにしてください。
フォーマットサンプル：
{
  "issue_date": "1900-01-01",
  "billing_name": "",
  "supplier": {
    "zip_code": "",
    "address": "",
    "company_name": "",
    "department_name": ""
  },
  "details": [
    {
      "item_name": "",
      "item_quantity": 0,
      "item_unit_price": 0,
      "item_amount": 0
    }
  ],
  "subtotal": 0,
  "consumption_tax": 0,
  "total_amount": 0,
  "remarks": ""
}
"""

spark.sql(f"""
-- bronzeテーブルから必要項目を切り出して、Silverをテーブルを作成
CREATE OR REPLACE TABLE {MY_CATALOG}.{MY_SCHEMA}.sql_receipt_silver AS
SELECT
  path,
  ai_query(
    'databricks-claude-sonnet-4',
    '{prompt}' || content
  ) AS summary
FROM
  {MY_CATALOG}.{MY_SCHEMA}.sql_receipt_bronze
""")

In [0]:
# prompt = """
# You are a professional at organizing text. Please summarize the invoice in JSON format. Do not include any extra strings so that it can be parsed with parse_json.
# Please extract and format the following fields into a single JSON text: Issue Date (yyyy-MM-dd format), Billing Name, Supplier (Zip Code, Address, Company Name, Department Name), Details (Item Name, Quantity, Unit Price, Amount), Subtotal, Consumption Tax, Total Amount, and Remarks.
# Use the following keys: "issue_date", "billing_name", "supplier", "details", "subtotal", "consumption_tax", "total_amount", and "remarks".
# Return the result as a JSON starting with { and ending with }. Do not include the string .
# For numeric fields (quantity, unit price, amount, subtotal, consumption tax, total amount), remove any units so they can be handled as numbers.
# If any value is unknown, set it as "" (empty string) or null.
# Format sample:
# {
#   "issue_date": "1900-01-01",
#   "billing_name": "",
#   "supplier": {
#     "zip_code": "",
#     "address": "",
#     "company_name": "",
#     "department_name": ""
#   },
#   "details": [
#     {
#       "item_name": "",
#       "item_quantity": 0,
#       "item_unit_price": 0,
#       "item_amount": 0
#     }
#   ],
#   "subtotal": 0,
#   "consumption_tax": 0,
#   "total_amount": 0,
#   "remarks": ""
# }
# """

# spark.sql(f"""
# -- Extract required fields from the bronze table and create the silver table
# CREATE OR REPLACE TABLE {MY_CATALOG}.{MY_SCHEMA}.sql_receipt_silver AS
# SELECT
#   path,
#   ai_query(
#     'databricks-claude-sonnet-4',
#     '{prompt}' || content
#   ) AS summary
# FROM
#   {MY_CATALOG}.{MY_SCHEMA}.sql_receipt_bronze
# """)

In [0]:
%sql
select * from sql_receipt_silver

### 2. gold

In [0]:
# spark.sql(f"""
# -- json形式に格納されたSilverテーブルを展開して、Goldテーブルを作成
# CREATE OR REPLACE TABLE {MY_CATALOG}.{MY_SCHEMA}.receipt_gold AS
# SELECT
#   uuid() AS id,
#   regexp_replace(
#     split(path, '/')[size(split(path, '/'))-1],
#     '\\.pdf$',
#     ''
#   ) AS file_name,
#   parsed.`発行日` AS issue_date,
#   parsed.`請求先名` AS billing_name,
#   parsed.`請求元`.`郵便番号` AS supplier_zip,
#   parsed.`請求元`.`住所` AS supplier_address,
#   parsed.`請求元`.`企業名` AS supplier_company,
#   parsed.`請求元`.`部署名` AS supplier_department,
#   detail.`商品名` AS item_name,
#   CAST(detail.`数量` AS BIGINT) AS item_qty,
#   CAST(detail.`単価` AS BIGINT) AS item_unit_price,
#   CAST(detail.`金額` AS BIGINT) AS item_amount,
#   CAST(parsed.`小計` AS BIGINT) AS subtotal,
#   CAST(parsed.`消費税` AS BIGINT) AS consumption_tax,
#   CAST(parsed.`小計` AS BIGINT) + CAST(parsed.`消費税` AS BIGINT) AS total_amount_with_tax,
#   parsed.`備考` AS remarks
# FROM
#   (
#     SELECT
#       *,
#       from_json(
#         summary,
#         'STRUCT<`発行日`:STRING,`請求先名`:STRING,`請求元`:STRUCT<`郵便番号`:STRING,`住所`:STRING,`企業名`:STRING,`部署名`:STRING>,`明細`:ARRAY<STRUCT<`商品名`:STRING,`数量`:DOUBLE,`単価`:DOUBLE,`金額`:DOUBLE>>,`小計`:DOUBLE,`消費税`:DOUBLE,`備考`:STRING>'
#       ) AS parsed
#     FROM
#       {MY_CATALOG}.{MY_SCHEMA}.receipt_silver
#   )
# LATERAL VIEW
#   EXPLODE(parsed.`明細`) details AS detail
# ;
# """)

In [0]:
spark.sql(f"""
-- json形式に格納されたSilverテーブルを展開して、Goldテーブルを作成
CREATE OR REPLACE TABLE {MY_CATALOG}.{MY_SCHEMA}.sql_receipt_gold AS
SELECT
  uuid() AS id,
  regexp_replace(
    split(path, '/')[size(split(path, '/'))-1],
    '\\.pdf$',
    ''
  ) AS file_name,
  parsed.issue_date AS issue_date,                              -- 発行日
  parsed.billing_name AS billing_name,                          -- 請求先名
  parsed.supplier.zip_code AS supplier_zip,                     -- 請求元.郵便番号
  parsed.supplier.address AS supplier_address,                  -- 請求元.住所
  parsed.supplier.company_name AS supplier_company,             -- 請求元.企業名
  parsed.supplier.department_name AS supplier_department,       -- 請求元.部署名
  detail.item_name AS item_name,                                -- 商品名
  CAST(detail.item_quantity AS BIGINT) AS item_qty,             -- 詳細.数量
  CAST(detail.item_unit_price AS BIGINT) AS item_unit_price,    -- 詳細.単価
  CAST(detail.item_amount AS BIGINT) AS item_amount,            -- 詳細.金額
  CAST(parsed.subtotal AS BIGINT) AS subtotal,                  -- 小計（税抜）
  CAST(parsed.consumption_tax AS BIGINT) AS consumption_tax,    -- 消費税金額
  CAST(parsed.total_amount AS BIGINT) AS total_amount_with_tax, -- 合計金額（税込）
  parsed.remarks AS remarks                                     -- 備考
FROM (
  SELECT
    *,
    from_json(
      summary,
      'STRUCT<
         issue_date:STRING,
         billing_name:STRING,
         supplier:STRUCT<
           zip_code:STRING,
           address:STRING,
           company_name:STRING,
           department_name:STRING
         >,
         details:ARRAY<
           STRUCT<
             item_name:STRING,
             item_quantity:DOUBLE,
             item_unit_price:DOUBLE,
             item_amount:DOUBLE
           >
         >,
         subtotal:DOUBLE,
         consumption_tax:DOUBLE,
         total_amount:DOUBLE,
         remarks:STRING
       >'
    ) AS parsed
  FROM {MY_CATALOG}.{MY_SCHEMA}.sql_receipt_silver
)
LATERAL VIEW
  EXPLODE(parsed.details) details AS detail
;
""")

In [0]:
%sql
select * from sql_receipt_gold

In [0]:
# spark.sql(f"""
# -- Goldテーブルに対してCDFを有効化
# ALTER TABLE {MY_CATALOG}.{MY_SCHEMA}.sql_receipt_gold
# SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
# """)

In [0]:
# 変数定義
TABLE_PATH = f'{MY_CATALOG}.{MY_SCHEMA}.sql_receipt_gold'                 # テーブルパス
PK_CONSTRAINT_NAME = f'pk_sql_receipt_gold'                               # 主キー

# NOT NULL制約の追加
columns_to_set_not_null = [
    'id']

for column in columns_to_set_not_null:
    spark.sql(f"""
    ALTER TABLE {TABLE_PATH}
    ALTER COLUMN {column} SET NOT NULL;
    """)

# 主キー設定
spark.sql(f'''
ALTER TABLE {TABLE_PATH} DROP CONSTRAINT IF EXISTS {PK_CONSTRAINT_NAME};
''')

spark.sql(f'''
ALTER TABLE {TABLE_PATH}
ADD CONSTRAINT {PK_CONSTRAINT_NAME} PRIMARY KEY (id);
''')

# # チェック
# display(
#     spark.sql(f'''
#     DESCRIBE EXTENDED {TABLE_PATH}
#     '''))

In [0]:
certified_tag = 'system.Certified'

try:
    spark.sql(f"ALTER TABLE sql_receipt_gold SET TAGS ('{certified_tag}')")
    print(f"認定済みタグ '{certified_tag}' の追加が完了しました。")

except Exception as e:
    print(f"認定済みタグ '{certified_tag}' の追加中にエラーが発生しました: {str(e)}")
    print("このエラーはタグ機能に対応していないワークスペースで実行した場合に発生する可能性があります。")

In [0]:
# テーブル名
table_name = f'{MY_CATALOG}.{MY_SCHEMA}.sql_receipt_gold'

# テーブルコメント
comment = """
テーブル名：`sql_receipt_gold / 領収書（ゴールド）`
説明：領収書データをパースして構造化したテーブルです。分析用に使います。
"""
spark.sql(f'COMMENT ON TABLE {table_name} IS "{comment}"')

# カラムコメント
column_comments = {
    "id": "自動採番したユニークID",
    "file_name": "アップロードされたPDFのファイル名",
    "issue_date": "発行日",
    "billing_name": "請求先名",
    "supplier_zip": "請求元 郵便番号",
    "supplier_address": "請求元 住所",
    "supplier_company": "請求元 企業名",
    "supplier_department": "請求元 部署名",
    "item_name": "明細 商品名",
    "item_qty": "明細 数量",
    "item_unit_price": "明細 単価",
    "item_amount": "明細 金額",
    "subtotal": "小計（税抜）",
    "consumption_tax": "消費税金額",
    "total_amount_with_tax": "合計金額（税込）",
    "remarks": "備考"
}

for column, comment in column_comments.items():
    escaped_comment = comment.replace("'", "\\'")
    sql_query = f"ALTER TABLE {table_name} ALTER COLUMN {column} COMMENT '{escaped_comment}'"
    spark.sql(sql_query)
