# Databricks Excel ファイル読み込み機能テスト

このノートブックでは、Databricks Runtime 17.1以上で利用可能な組み込みExcel読み込み機能をテストします。

**前提条件**
- Databricks Runtime 17.1 以上
- ワークスペース管理者がプレビュー機能を有効化済み

**参考ドキュメント**: https://docs.databricks.com/aws/ja/query/formats/excel

## 0. セットアップ: サンプルファイルのアップロード

事前に `databricks_excel_sample.xlsx` をVolume または DBFS にアップロードしてください。

In [0]:
# ファイルパスを設定（環境に合わせて変更してください）
# Unity Catalog Volume を使う場合
EXCEL_FILE_PATH = "/Volumes/takaakiyayoi_catalog/ai_functions/unstructured/input/databricks_excel_sample.xlsx"

# DBFS を使う場合（レガシー）
# EXCEL_FILE_PATH = "dbfs:/FileStore/databricks_excel_sample.xlsx"

## 1. 基本的な読み込み（最初のシート全体）

In [0]:
# デフォルト: 最初のシートの左上から右下の空でないセルまで読み込み
df_basic = spark.read.excel(EXCEL_FILE_PATH)

display(df_basic)

In [0]:
# スキーマを確認
df_basic.printSchema()

## 2. ヘッダー行の指定（headerRows）

`headerRows` オプションで1行目をカラム名として認識させます。

In [0]:
# headerRows=1 でヘッダー行を列名として使用
df_with_header = (
    spark.read
    .option("headerRows", 1)
    .excel(EXCEL_FILE_PATH)
)

display(df_with_header)

In [0]:
# 列名がヘッダーから取得されていることを確認
print("列名:", df_with_header.columns)

## 3. 特定シートの読み込み（dataAddress）

`dataAddress` オプションでシート名やセル範囲を指定できます。

In [0]:
# 「顧客マスタ」シートを読み込み
df_customers = (
    spark.read
    .option("headerRows", 1)
    .option("dataAddress", "顧客マスタ")
    .excel(EXCEL_FILE_PATH)
)

display(df_customers)

## 4. セル範囲の指定（dataAddress）

特定の範囲だけを読み込むことができます。

In [0]:
# 「カテゴリ別集計」シートのA3:D7範囲を読み込み（タイトル行をスキップ）
df_summary = (
    spark.read
    .option("headerRows", 1)
    .option("dataAddress", "カテゴリ別集計!A3:D7")
    .excel(EXCEL_FILE_PATH)
)

display(df_summary)

In [0]:
# 売上データシートの特定範囲（B2:F5）を読み込み
df_partial = (
    spark.read
    .option("headerRows", 0)
    .option("dataAddress", "売上データ!B2:F5")
    .excel(EXCEL_FILE_PATH)
)

display(df_partial)

## 5. シート一覧の取得（listSheets）

`operation` オプションで Excel ファイル内のシート一覧を取得できます。

In [0]:
# シート一覧を取得
df_sheets = (
    spark.read
    .option("operation", "listSheets")
    .excel(EXCEL_FILE_PATH)
)

display(df_sheets)

## 6. SQL での読み込み（read_files関数）

### 6.1 基本的なSQL読み込み

In [0]:
# SQLで読み込み（変数を使うためにPythonから実行）
df_sql = spark.sql(f"""
SELECT * FROM read_files(
    '{EXCEL_FILE_PATH}',
    format => 'excel',
    headerRows => 1,
    schemaEvolutionMode => 'none'
)
""")

display(df_sql)

### 6.2 特定シート・範囲をSQLで読み込み

In [0]:
df_sql_range = spark.sql(f"""
SELECT * FROM read_files(
    '{EXCEL_FILE_PATH}',
    format => 'excel',
    headerRows => 1,
    dataAddress => '顧客マスタ!A1:E6',
    schemaEvolutionMode => 'none'
)
""")

display(df_sql_range)

### 6.3 SQLでシート一覧を取得

In [0]:
df_sql_sheets = spark.sql(f"""
SELECT * FROM read_files(
    '{EXCEL_FILE_PATH}',
    operation => 'listSheets',
    schemaEvolutionMode => 'none'
)
""")

display(df_sql_sheets)

## 7. テーブルへの保存

In [0]:
# 一時ビューとして登録
df_with_header.createOrReplaceTempView("sales_temp")

In [0]:
%sql
-- 一時ビューの内容を確認
SELECT * FROM sales_temp

In [0]:
# Delta テーブルとして保存（カタログ/スキーマは環境に合わせて変更）
# df_with_header.write.mode("overwrite").saveAsTable("your_catalog.your_schema.excel_sales_data")

## 8. COPY INTO でのインポート

In [0]:
%sql
-- テーブルが存在しない場合は作成（環境に合わせて変更）
-- CREATE TABLE IF NOT EXISTS your_catalog.your_schema.excel_demo_table;

-- COPY INTO your_catalog.your_schema.excel_demo_table
-- FROM '/Volumes/your_catalog/your_schema/your_volume/databricks_excel_sample.xlsx'
-- FILEFORMAT = EXCEL
-- FORMAT_OPTIONS ('headerRows' = '1', 'mergeSchema' = 'true')
-- COPY_OPTIONS ('mergeSchema' = 'true');

## 9. Auto Loader でのストリーミング読み込み

継続的にExcelファイルを監視・読み込みする場合に使用します。

In [0]:
# ストリーミング読み込み（実行する場合はコメントを外してください）
# checkpoint_path = "/Volumes/your_catalog/your_schema/your_volume/checkpoints/excel_stream"
# schema_path = "/Volumes/your_catalog/your_schema/your_volume/schemas/excel_stream"
# source_path = "/Volumes/your_catalog/your_schema/your_volume/excel_files/"

# df_stream = (
#     spark.readStream
#     .format("cloudFiles")
#     .option("cloudFiles.format", "excel")
#     .option("cloudFiles.inferColumnTypes", True)
#     .option("headerRows", 1)
#     .option("cloudFiles.schemaLocation", schema_path)
#     .option("cloudFiles.schemaEvolutionMode", "none")
#     .load(source_path)
# )

# # Delta テーブルに書き込み
# (
#     df_stream.writeStream
#     .format("delta")
#     .option("mergeSchema", "true")
#     .option("checkpointLocation", checkpoint_path)
#     .table("your_catalog.your_schema.excel_streaming_table")
# )

## 10. 数式の評価確認

Excelの数式は計算済みの値として取り込まれます。

In [0]:
# 売上金額列（数式 =D*E で計算）が正しく取り込まれているか確認
df_formula_check = (
    spark.read
    .option("headerRows", 1)
    .option("dataAddress", "売上データ!A1:F9")
    .excel(EXCEL_FILE_PATH)
)

display(df_formula_check)

In [0]:
# 数量 × 単価 = 売上金額 の検証
from pyspark.sql.functions import col

df_verify = df_formula_check.withColumn(
    "計算検証",
    col("数量") * col("単価") == col("売上金額")
)

display(df_verify.select("商品名", "数量", "単価", "売上金額", "計算検証"))

## 11. 複数シートの一括処理

`listSheets` で取得したシート名を使って、全シートを処理できます。

In [0]:
# シート一覧を取得
sheets_df = (
    spark.read
    .option("operation", "listSheets")
    .excel(EXCEL_FILE_PATH)
)

sheet_names = [row.sheetName for row in sheets_df.collect()]
print("シート一覧:", sheet_names)

In [0]:
# 各シートを読み込んで表示
for sheet_name in sheet_names:
    print(f"\n=== {sheet_name} ===")
    df = (
        spark.read
        .option("headerRows", 1)
        .option("dataAddress", sheet_name)
        .excel(EXCEL_FILE_PATH)
    )
    df.show()

## まとめ

| 機能 | オプション | 例 |
|------|-----------|-----|
| ヘッダー行指定 | `headerRows` | `1` (1行目をヘッダーとして使用) |
| シート/範囲指定 | `dataAddress` | `"Sheet1!A1:E10"` |
| シート一覧取得 | `operation` | `"listSheets"` |
| ストリーミング | `cloudFiles.format` | `"excel"` |

**制限事項**
- パスワード保護ファイルは非対応
- ヘッダー行は1行のみ対応
- 結合セルは左上のみ値が入る
- スキーマ進化は非対応（`schemaEvolutionMode="none"` が必要）