In [0]:
!pip install openpyxl

In [0]:
%restart_python

In [0]:
# ============================================================
# テスト用データ作成スクリプト
# ECサイト系のスキーマ想定：顧客・商品・注文の3テーブル構成
# ============================================================

spark.sql("COMMENT ON SCHEMA aaa.bbb IS 'ECサイト業務データ'")

# ------------------------------------------------------------
# テーブル: customers（顧客マスタ）
# ------------------------------------------------------------
spark.sql("DROP TABLE IF EXISTS aaa.bbb.customers")
spark.sql("""
    CREATE TABLE IF NOT EXISTS aaa.bbb.customers (
        customer_id BIGINT    NOT NULL COMMENT '顧客ID（PK）',
        name        STRING             COMMENT '顧客氏名',
        email       STRING             COMMENT 'メールアドレス',
        created     TIMESTAMP          COMMENT '登録日時'
    )
    COMMENT '顧客マスタ'
""")
spark.sql("INSERT INTO aaa.bbb.customers VALUES (1, '山田太郎', 'yamada@example.com', current_timestamp())")

# ------------------------------------------------------------
# テーブル: products（商品マスタ）
# ------------------------------------------------------------
spark.sql("DROP TABLE IF EXISTS aaa.bbb.products")
spark.sql("""
    CREATE TABLE IF NOT EXISTS aaa.bbb.products (
        product_id  BIGINT    NOT NULL COMMENT '商品ID（PK）',
        name        STRING             COMMENT '商品名',
        price       DOUBLE             COMMENT '販売価格（税抜）',
        stock       INT                COMMENT '在庫数'
    )
    COMMENT '商品マスタ'
""")
spark.sql("INSERT INTO aaa.bbb.products VALUES (1, 'ノートPC', 89800.0, 50)")

# ------------------------------------------------------------
# テーブル: orders（注文トランザクション）
# ------------------------------------------------------------
spark.sql("DROP TABLE IF EXISTS aaa.bbb.orders")
spark.sql("""
    CREATE TABLE IF NOT EXISTS aaa.bbb.orders (
        order_id    BIGINT    NOT NULL COMMENT '注文ID（PK）',
        customer_id BIGINT             COMMENT '顧客ID（FK: customers）',
        product_id  BIGINT             COMMENT '商品ID（FK: products）',
        quantity    INT                COMMENT '注文数量',
        ordered_at  TIMESTAMP          COMMENT '注文日時'
    )
    COMMENT '注文トランザクション'
""")
spark.sql("INSERT INTO aaa.bbb.orders VALUES (1, 1, 1, 2, current_timestamp())")

In [0]:
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
from datetime import datetime
from collections import defaultdict

SYSTEM_NAME  = "Databricks"
AUTHOR       = "rif8"
catalog_name = "aaa"
schema_name  = "bbb"
_now         = datetime.now()
created_date = _now.strftime("%Y/%m/%d")
yyyymmdd     = _now.strftime("%Y%m%d")

def fetch_table_columns(catalog, schema):
    df = spark.sql(f"""
        SELECT
            TABLE_NAME,
            COLUMN_NAME,
            COLUMN_DEFAULT AS DEFAULT_VALUE,
            IS_NULLABLE,
            CASE
                WHEN DATA_TYPE = 'STRING'
                    THEN 'STRING'
                WHEN DATA_TYPE IN ('VARCHAR','CHAR','CHARACTER','TEXT')
                    THEN CONCAT(DATA_TYPE, '(', COALESCE(CAST(CHARACTER_MAXIMUM_LENGTH AS STRING), ''), ')')
                WHEN DATA_TYPE IN ('BINARY','VARBINARY')
                    THEN CONCAT(DATA_TYPE, '(', COALESCE(CAST(CHARACTER_OCTET_LENGTH AS STRING), ''), ')')
                WHEN DATA_TYPE IN ('NUMBER','DECIMAL','NUMERIC')
                    THEN CONCAT(DATA_TYPE, '(', COALESCE(CAST(NUMERIC_PRECISION AS STRING), ''), ',', COALESCE(CAST(NUMERIC_SCALE AS STRING), ''), ')')
                ELSE DATA_TYPE
            END AS DATA_TYPE,
            COMMENT
        FROM {catalog}.information_schema.columns
        WHERE table_schema = '{schema}'
        ORDER BY TABLE_NAME, ordinal_position
    """)
    result = defaultdict(list)
    for row in df.collect():
        result[row["TABLE_NAME"].lower()].append(row)
    return result


def fetch_table_info(catalog, schema):
    """テーブルのコメントと種別（TABLE / VIEW）を取得"""
    df = spark.sql(f"""
        SELECT
            table_name,
            UPPER(table_type) AS table_type,
            comment
        FROM {catalog}.information_schema.tables
        WHERE table_schema = '{schema}'
        ORDER BY table_name
    """)
    comments     = {}
    table_types  = {}
    for r in df.collect():
        name               = r["table_name"].lower()
        comments[name]     = r["comment"] or ""
        raw_type           = r["table_type"] or "TABLE"
        table_types[name]  = "VIEW" if "VIEW" in raw_type else "TABLE"
    return comments, table_types

# =============================================
# スタイル定義
# =============================================
PRIM_COLOR      = "0031d8"   # ヘッダー紫
WHITE      = "FFFFFF"
GRAY       = "F2F2F2"   # 縞模様（偶数行）
DARK       = "333333"   # 文字色
VIEW_COLOR = "d9e6ff"   # ビュー行の背景色（薄紫）

def make_fill(color):
    return PatternFill("solid", fgColor=color)

def make_font(bold, color, size=10):
    return Font(name="メイリオ", bold=bold, color=color, size=size)

prim_fill = make_fill(PRIM_COLOR)
gray_fill  = make_fill(GRAY)
white_fill = make_fill(WHITE)
view_fill  = make_fill(VIEW_COLOR)

def bold_white(size=10):  return make_font(True,  WHITE, size)
def bold_dark(size=10):   return make_font(True,  DARK,  size)
def normal_dark(size=10): return make_font(False, DARK,  size)

center    = Alignment(horizontal="center", vertical="center", wrap_text=False)
left      = Alignment(horizontal="left",   vertical="center", wrap_text=False)
left_wrap = Alignment(horizontal="left",   vertical="center", wrap_text=True)

thin = Side(style="thin", color="AAAAAA")
def border_all():
    return Border(left=thin, right=thin, top=thin, bottom=thin)

def apply_merged_border(ws, start_row, end_row, start_col, end_col):
    for row in range(start_row, end_row + 1):
        for col in range(start_col, end_col + 1):
            cell = ws.cell(row=row, column=col)
            left_side   = thin if col == start_col else Side(style=None)
            right_side  = thin if col == end_col   else Side(style=None)
            top_side    = thin if row == start_row else Side(style=None)
            bottom_side = thin if row == end_row   else Side(style=None)
            cell.border = Border(left=left_side, right=right_side,
                                 top=top_side, bottom=bottom_side)

def apply_cell(ws, row, col, value, font, fill, align, border=None, height=None):
    cell = ws.cell(row=row, column=col, value=value)
    cell.font      = font
    cell.fill      = fill
    cell.alignment = align
    if border:
        cell.border = border
    if height:
        ws.row_dimensions[row].height = height
    return cell

def header_cell(ws, row, col, value, height=None):
    return apply_cell(ws, row, col, value, bold_white(), prim_fill, center, border_all(), height)

def data_cell(ws, row, col, value, fill, align=left):
    return apply_cell(ws, row, col, value, normal_dark(), fill, align, border_all())

def row_fill(i, is_view=False):
    if is_view:
        return view_fill
    return gray_fill if i % 2 == 0 else white_fill

def build_cover(wb, system_name, author, created):
    ws = wb.active
    ws.title = "表紙"
    ws.sheet_view.showGridLines = False

    for i, w in enumerate([3, 18, 20, 12, 25, 3], start=1):
        ws.column_dimensions[get_column_letter(i)].width = w
    for r in range(1, 40):
        ws.row_dimensions[r].height = 18

    ws.row_dimensions[2].height = 40
    cell = ws.cell(row=2, column=2, value=f"{system_name}　テーブル定義書")
    cell.font      = make_font(True, DARK, 20)
    cell.alignment = left

    info_rows = [
        (4, "プラットフォーム", system_name, "作成者", author),
        (5, "作成日",          created,      None,     None),
    ]
    for row_num, l1, v1, l2, v2 in info_rows:
        ws.row_dimensions[row_num].height = 22
        apply_cell(ws, row_num, 2, l1, bold_white(),  prim_fill, center, border_all())
        apply_cell(ws, row_num, 3, v1, normal_dark(), white_fill, center, border_all())
        if l2:
            apply_cell(ws, row_num, 4, l2, bold_white(),  prim_fill, center, border_all())
            apply_cell(ws, row_num, 5, v2, normal_dark(), white_fill, center, border_all())

def build_table_list(wb, table_list, table_comments, table_types):
    ws = wb.create_sheet("テーブル一覧")
    ws.sheet_view.showGridLines = False

    for i, w in enumerate([3, 8, 10, 30, 40, 3], start=1):
        ws.column_dimensions[get_column_letter(i)].width = w

    ws.row_dimensions[2].height = 28
    cell = ws.cell(row=2, column=2, value="テーブル一覧")
    cell.font      = make_font(True, DARK, 14)
    cell.alignment = center

    for col, val in [(2, "No."), (3, "種別"), (4, "テーブル名"), (5, "備考")]:
        header_cell(ws, 4, col, val, height=22)

    for i, tname in enumerate(table_list, start=1):
        r       = i + 4
        ws.row_dimensions[r].height = 18
        t_type  = table_types.get(tname, "TABLE")
        is_view = (t_type == "VIEW")
        fill    = row_fill(i, is_view)
        data_cell(ws, r, 2, i,                     fill, center)
        data_cell(ws, r, 3, t_type,                fill, center)
        data_cell(ws, r, 4, tname,                 fill)
        data_cell(ws, r, 5, table_comments[tname], fill)

def build_table_sheet(wb, tname, catalog, schema, author, created,
                      table_comments, table_types, table_columns):
    ws = wb.create_sheet(tname[:31])
    ws.sheet_view.showGridLines = False

    for i, w in enumerate([3, 12, 22, 14, 12, 14, 30, 3], start=1):
        ws.column_dimensions[get_column_letter(i)].width = w

    r      = 2
    t_type = table_types.get(tname, "TABLE")

    cell = ws.cell(row=r, column=2, value="テーブル情報")
    cell.font = bold_dark()
    r += 1

    meta_rows = [
        ("カタログ名", catalog, "作成者", author),
        ("スキーマ名", schema,  "作成日", created),
        ("テーブル名", tname,   "種別",   t_type),
    ]
    for label, value, r_label, r_value in meta_rows:
        ws.row_dimensions[r].height = 20
        apply_cell(ws, r, 2, label,   bold_white(),  prim_fill, center, border_all())
        apply_cell(ws, r, 3, value,   normal_dark(), white_fill, left,   border_all())
        if r_label:
            apply_cell(ws, r, 4, r_label, bold_white(),  prim_fill, center, border_all())
            apply_cell(ws, r, 5, r_value, normal_dark(), white_fill, left,   border_all())
        r += 1

    ws.row_dimensions[r].height = 20
    apply_cell(ws, r, 2, "備考", bold_white(), prim_fill, center, border_all())
    ws.merge_cells(start_row=r, start_column=3, end_row=r, end_column=7)
    apply_cell(ws, r, 3, "", normal_dark(), white_fill, left, border_all())
    apply_merged_border(ws, r, r, 3, 7)
    r += 1

    ws.row_dimensions[r].height = 45
    ws.merge_cells(start_row=r, start_column=2, end_row=r, end_column=7)
    apply_cell(ws, r, 2, table_comments.get(tname, ""), normal_dark(), white_fill, left_wrap, border_all())
    apply_merged_border(ws, r, r, 2, 7)
    r += 2

    cell = ws.cell(row=r, column=2, value="カラム情報")
    cell.font = bold_dark()
    r += 1

    for col, val in [(2,"No."),(3,"カラム名"),(4,"データ型"),(5,"Not Null"),(6,"デフォルト"),(7,"備考")]:
        header_cell(ws, r, col, val, height=22)
    r += 1

    for i, col_row in enumerate(table_columns[tname], start=1):
        ws.row_dimensions[r].height = 18
        fill     = row_fill(i)
        not_null = "Y" if col_row["IS_NULLABLE"] == "NO" else ""
        data_cell(ws, r, 2, i,                              fill, center)
        data_cell(ws, r, 3, col_row["COLUMN_NAME"],         fill)
        data_cell(ws, r, 4, col_row["DATA_TYPE"],           fill, center)
        data_cell(ws, r, 5, not_null,                       fill, center)
        data_cell(ws, r, 6, col_row["DEFAULT_VALUE"] or "", fill, center)
        data_cell(ws, r, 7, col_row["COMMENT"] or "",       fill)
        r += 1
def main():
    table_columns               = fetch_table_columns(catalog_name, schema_name)
    table_comments, table_types = fetch_table_info(catalog_name, schema_name)
    table_list                  = list(table_comments.keys())

    wb = Workbook()
    build_cover(wb, SYSTEM_NAME, AUTHOR, created_date)
    build_table_list(wb, table_list, table_comments, table_types)
    for tname in table_list:
        build_table_sheet(
            wb, tname, catalog_name, schema_name,
            AUTHOR, created_date,
            table_comments, table_types, table_columns
        )

    output_path = f"./{yyyymmdd}_table_definition_{schema_name}.xlsx"
    wb.save(output_path)
    print(f"出力完了: {output_path}")

main()