# Unity Catalog メタデータ抽出ノートブック

このノートブックはUnity Catalogから既存テーブルのメタデータを効率的に抽出し、TABLE_DDL_INFO と COLUMN_DDL_INFO の形式で整理します。

## 取得対象
- ◎: 完全自動取得可能項目
- ○: 条件付き自動取得可能項目  
- △: 推測可能項目
- ×: 手動管理必須項目（空値で初期化）

## 新機能 (v1.1)
- PK/FK制約情報の自動取得
- 並列処理による高速化
- 統合設定管理

## 1. 初期設定・パラメータ

In [0]:
# ライブラリのインポート
import re
import json
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import Row
from typing import Dict, Any, List, Tuple, Set

In [0]:
# 対象のカタログを指定
catalog = "samples"

spark.sql(f"USE CATALOG {catalog}")

# カタログ配下のスキーマ一覧を取得
schemas_sql = f"""
SELECT 
    catalog_name, 
    schema_name, 
    schema_owner, 
    created, 
    last_altered 
FROM {catalog}.information_schema.schemata
WHERE 1=1
  AND schema_name <> 'information_schema'
ORDER BY catalog_name, schema_name
"""

schemas_df = spark.sql(schemas_sql)
display(schemas_df)

In [0]:
# カタログ配下のスキーマ一覧を取得・確認
schema_list = [row["schema_name"] for row in schemas_df.select("schema_name").collect()]
schema_list

In [0]:
# 統合設定（重複削除・一元管理）
CONFIG = {
    "target_catalog": catalog,
    "include_schemas": schema_list,
    "output_catalog": "ops",
    "output_path": "",
    "exclude_patterns": ['^__', '^event_log_'],
    "table_types": ['MANAGED', 'EXTERNAL', 'VIEW'],
    "retention_days": 180,
    "max_parallel_workers": 4,
    "describe_detail_timeout": 30
}

print(f"対象カタログ: {CONFIG['target_catalog']}")
print(f"対象スキーマ: {CONFIG['include_schemas']}")
print(f"実行時刻: {datetime.now()}")

## 2. 共通処理・ユーティリティ

In [0]:
def build_filter_conditions(config):
    """共通フィルタ条件を生成（重複削除）"""
    schema_filter = "', '".join(config["include_schemas"])
    exclude_where = " AND ".join([f"table_name NOT RLIKE '{p}'" for p in config["exclude_patterns"]])
    table_types = "', '".join(config["table_types"])
    return schema_filter, exclude_where, table_types

def q(identifier: str) -> str:
    """Spark SQL用にバッククォートでエスケープ"""
    return f"`{identifier.replace('`', '``')}`"

def fqname(catalog: str, schema: str, table: str) -> str:
    """完全修飾名を生成"""
    return f"{q(catalog)}.{q(schema)}.{q(table)}"

## 3. テーブル基本情報取得

In [0]:
# 共通フィルタ条件を使用
schema_filter, exclude_where, table_types = build_filter_conditions(CONFIG)

tables_sql = f"""
SELECT 
    table_catalog,
    table_schema,
    table_name,
    table_type,
    created,
    last_altered
FROM {CONFIG['target_catalog']}.information_schema.tables 
WHERE 1=1
  AND table_schema IN ('{schema_filter}')
  AND table_type IN ('{table_types}')
  AND {exclude_where}
ORDER BY table_catalog, table_schema, table_name
"""

tables_df = spark.sql(tables_sql)
tables_df.createOrReplaceTempView("base_tables")

table_count = tables_df.count()
print(f"取得テーブル数: {table_count}")

if table_count > 0:
    display(tables_df.limit(5))
else:
    print("対象テーブルはありません")

## 4. カラム基本情報の取得

In [0]:
# カラム基本情報と制約フラグの統合取得
integrated_sql = f"""
WITH base_columns AS (
    SELECT 
        table_catalog,
        table_schema,
        table_name,
        column_name,
        ordinal_position + 1 as ordinal_position,
        data_type,
        is_nullable,
        column_default,
        numeric_precision,
        numeric_scale
    FROM {CONFIG['target_catalog']}.information_schema.columns 
    WHERE table_schema IN ('{schema_filter}')
      AND {exclude_where}
),
constraint_flags AS (
    SELECT 
        kcu.table_schema, 
        kcu.table_name, 
        kcu.column_name,
        MAX(CASE WHEN tc.constraint_type = 'PRIMARY KEY' THEN 1 ELSE 0 END) as is_pk,
        MAX(CASE WHEN tc.constraint_type = 'FOREIGN KEY' THEN 1 ELSE 0 END) as is_fk
    FROM {CONFIG['target_catalog']}.information_schema.table_constraints tc
    JOIN {CONFIG['target_catalog']}.information_schema.key_column_usage kcu 
        ON tc.constraint_catalog = kcu.constraint_catalog 
        AND tc.constraint_schema = kcu.constraint_schema
        AND tc.constraint_name = kcu.constraint_name
    WHERE tc.constraint_type IN ('PRIMARY KEY', 'FOREIGN KEY')
      AND tc.constraint_schema IN ('{schema_filter}')
      AND {exclude_where.replace('table_name', 'tc.table_name')}
    GROUP BY kcu.table_schema, kcu.table_name, kcu.column_name
)
SELECT 
    c.*,
    COALESCE(cf.is_pk, 0) as is_pk,
    COALESCE(cf.is_fk, 0) as is_fk
FROM base_columns c
LEFT JOIN constraint_flags cf 
    ON c.table_schema = cf.table_schema 
    AND c.table_name = cf.table_name 
    AND LOWER(c.column_name) = LOWER(cf.column_name)
ORDER BY c.table_catalog, c.table_schema, c.table_name, c.ordinal_position
"""

integrated_df = spark.sql(integrated_sql)
integrated_df.createOrReplaceTempView("columns_with_constraints")

column_count = integrated_df.count()
print(f"取得カラム数: {column_count}")

if column_count > 0:
    display(integrated_df.limit(5))
    display(integrated_df.filter(F.col('data_type') == 'DECIMAL').limit(3))
else:
    print("対象カラムはありません")

## 5. テーブル詳細(DESCRIBE DETAIL)情報を取得

In [0]:
def describe_detail_one(catalog: str, schema: str, table: str) -> dict:
    """
    DESCRIBE DETAIL catalog.schema.table を1回実行して dict 化
    properties (MAP) は JSON 化もしておく
    """
    full_quoted = fqname(catalog, schema, table) 
    df = spark.sql(f"DESCRIBE DETAIL {full_quoted}")
    row = df.first()
    if row is None:
        return {
            "catalog_name": catalog,
            "schema_name": schema,
            "table_name": table,
            "full_table_name": f"{catalog}.{schema}.{table}",
            "error": "DESCRIBE DETAIL returned no rows"
        }
    d = row.asDict(recursive=True)

    # properties は MapType(string,string)（ない場合もある）。JSON化しておく
    props = d.get("properties")
    if isinstance(props, dict):
        d["properties_json"] = json.dumps(props, ensure_ascii=False, separators=(',', ':'))
    else:
        d["properties_json"] = None

    # フィールドを付与
    d.update({
        "catalog_name": catalog,
        "schema_name": schema,
        "table_name": table,
        "full_table_name": f"{catalog}.{schema}.{table}"
    })
    return d

def extract_clustering_info(table_details: Dict[str, Any]) -> Tuple[str, List[str]]:
    """
    DESCRIBE DETAIL の dict を前提に、クラスタリング方式と列を返す。
    """
    strategy = 'NONE'
    cols: List[str] = []

    features = table_details.get('tableFeatures', []) or []
    features_u = [str(f).upper() for f in features]

    props: Dict[str, Any] = table_details.get('properties', {}) or {}

    # Liquid clustering
    if 'LIQUID_CLUSTERING' in features_u:
        strategy = 'LIQUID'
        for key in ('delta.clusterBy', 'delta.liquidClustering.columns', 'delta.clusteredColumns'):
            v = props.get(key)
            if isinstance(v, str) and v.strip():
                cols = [c.strip() for c in v.split(',')]
                break

    # Z-Order
    if strategy == 'NONE':
        z_keys = [k for k in props.keys()
                  if k.lower().startswith('delta.zorder') or k.lower().endswith('zorderby')]
        if z_keys:
            strategy = 'ZORDER'
            for zk in z_keys:
                v = props.get(zk)
                if isinstance(v, str) and v.strip():
                    cols = [c.strip() for c in v.split(',')]
                    break

    return strategy, cols

def extract_partition_info(table_details: Dict[str, Any]) -> Tuple[str, List[str]]:
    """
    DESCRIBE DETAIL の partitionColumns（array<string>）をそのまま利用。
    """
    cols = table_details.get('partitionColumns', []) or []
    strategy = 'NONE' if not cols else 'BY_COLUMNS'
    return strategy, [str(c) for c in cols]

def _to_bool(s: Any) -> bool:
    return str(s).strip().lower() in ('true', '1', 'yes')

def get_delta_properties(table_details: Dict[str, Any]) -> Dict[str, Any]:
    """
    DESCRIBE DETAIL の properties/map と tableFeatures を用いて主な設定を抽出。
    """
    props: Dict[str, Any] = table_details.get('properties', {}) or {}
    features = table_details.get('tableFeatures', []) or []
    features_u = [str(f).upper() for f in features]

    out = {
        'auto_optimize_write':   _to_bool(props.get('delta.autoOptimize.optimizeWrite', 'false')),
        'auto_optimize_compact': _to_bool(props.get('delta.autoOptimize.autoCompact', 'false')),
        'cdf_enabled':           _to_bool(props.get('delta.enableChangeDataFeed', 'false')) or
                                 ('CHANGE_DATA_FEED' in features_u),
        'stats_column_limit':    int(props.get('delta.dataSkippingNumIndexedCols', 32) or 32),
        'stats_custom_columns':  None,
        'vacuum_retention_hours': 168,   # default
        'time_travel_retention_days': 30 # default
    }

    # dataSkipping のカスタム列
    scols = props.get('delta.dataSkippingStatsColumns')
    if isinstance(scols, str) and scols.strip():
        out['stats_custom_columns'] = [c.strip() for c in scols.split(',')]

    # vacuum の保持（例: "interval 168 hours" / "168 hours"）
    vstr = props.get('delta.deletedFileRetentionDuration')
    if isinstance(vstr, str) and vstr:
        m = re.search(r'(\d+)\s*hour', vstr, re.I)
        if m:
            out['vacuum_retention_hours'] = int(m.group(1))

    # time travel の保持（例: "interval 30 days" / "30 days"）
    lstr = props.get('delta.logRetentionDuration')
    if isinstance(lstr, str) and lstr:
        m = re.search(r'(\d+)\s*day', lstr, re.I)
        if m:
            out['time_travel_retention_days'] = int(m.group(1))

    return out

def list_target_tables():
    """base_tables からクオート付きで返す"""
    rows = spark.sql("""
        SELECT DISTINCT table_catalog, table_schema, table_name
        FROM base_tables
        ORDER BY table_catalog, table_schema, table_name
    """).collect()

    out = []
    for r in rows:
        out.append(Row(
            table_catalog=r.table_catalog,
            table_schema=r.table_schema,
            table_name=r.table_name,
            full_table_name_quoted=fqname(r.table_catalog, r.table_schema, r.table_name),
        ))
    return out

In [0]:
def collect_details(table_rows, max_workers=None, limit=None):
    """並列版DESCRIBE DETAIL実行（推奨）"""
    if max_workers is None:
        max_workers = CONFIG.get("max_parallel_workers", 4)
    
    it = table_rows if limit is None else table_rows[:limit]
    print(f"並列実行開始: {len(it)}テーブル, {max_workers}並列")
    
    def process_table(r):
        try:
            return describe_detail_one(r.table_catalog, r.table_schema, r.table_name)
        except Exception as e:
            return {
                "catalog_name": r.table_catalog,
                "schema_name": r.table_schema,
                "table_name": r.table_name,
                "full_table_name": f"{r.table_catalog}.{r.table_schema}.{r.table_name}",
                "error": str(e)
            }
    
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_table = {executor.submit(process_table, r): r for r in it}
        
        for future in as_completed(future_to_table):
            result = future.result()
            results.append(result)
            
            status = "OK" if 'error' not in result else "ERR"
            progress = f"({len(results)}/{len(it)})"
            print(f"{status} {progress}: {result['full_table_name']}")
    
    def sort_key(result):
        return (result.get('catalog_name', ''), 
                result.get('schema_name', ''), 
                result.get('table_name', ''))
    
    results.sort(key=sort_key)
    
    success_count = len([r for r in results if 'error' not in r])
    print(f"並列実行完了: 成功 {success_count}/{len(results)}")
    
    return results

In [0]:
# 対象のリストの確認
tables = list_target_tables()

# 並列実行（推奨）- シリアル版より大幅に高速化
table_details = collect_details(tables, limit=None)

## 6. メタデータ統合処理

In [0]:
def _human_bytes(n: Any) -> str | None:
    """バイト数に単位をつけて返す"""
    if n is None:
        return None
    try:
        n = int(n)
    except Exception:
        return None
    units = ["B","KB","MB","GB","TB","PB"]
    i = 0
    x = float(n)
    while x >= 1024 and i < len(units)-1:
        x /= 1024.0
        i += 1
    return f"{x:.2f} {units[i]}"

def get_pk_fk_flags():
    """PK/FKフラグをシンプルに取得"""
    try:
        rows = spark.sql("SELECT * FROM constraint_flags").collect()
        return {(r.table_schema, r.table_name, r.column_name.lower()): 
                {'is_pk': bool(r.is_pk), 'is_fk': bool(r.is_fk)} for r in rows}
    except:
        return {}

def build_table_ddl_info(table_details: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """TABLE_DDL_INFO構築のメイン処理"""
    idx: Dict[Tuple[str, str, str], Dict[str, Any]] = {}
    for d in table_details:
        k = (d.get('catalog_name'), d.get('schema_name'), d.get('table_name'))
        if all(k):
            idx[k] = d

    base_rows = spark.sql("""
        SELECT table_catalog, table_schema, table_name, table_type, created, last_altered
        FROM base_tables
        ORDER BY table_catalog, table_schema, table_name
    """).collect()

    out: List[Dict[str, Any]] = []
    for r in base_rows:
        key = (r.table_catalog, r.table_schema, r.table_name)
        detail = idx.get(key, {})
        
        cluster_strategy, cluster_cols = extract_clustering_info(detail)
        partition_strategy, partition_cols = extract_partition_info(detail)
        delta_props = get_delta_properties(detail) if detail else {
            'auto_optimize_write': False, 'auto_optimize_compact': False, 'cdf_enabled': False,
            'stats_column_limit': 32, 'stats_custom_columns': None,
            'vacuum_retention_hours': 168, 'time_travel_retention_days': 30,
        }
        
        rec = {
            'catalog_name': r.table_catalog, 'schema_name': r.table_schema, 'table_name': r.table_name, 'table_type': r.table_type,
            'storage_format': detail.get('format', 'DELTA'), 'storage_location': detail.get('location'),
            'external_location': detail.get('location') if r.table_type == 'EXTERNAL' else None,
            'partition_strategy': partition_strategy, 'partition_columns': partition_cols,
            'clustering_strategy': cluster_strategy, 'clustering_columns': cluster_cols,
            'auto_optimize_write': delta_props['auto_optimize_write'], 'auto_optimize_compact': delta_props['auto_optimize_compact'],
            'vacuum_retention_hours': delta_props['vacuum_retention_hours'], 'stats_column_limit': delta_props['stats_column_limit'],
            'stats_custom_columns': delta_props['stats_custom_columns'], 'cdf_enabled': delta_props['cdf_enabled'],
            'time_travel_retention_days': delta_props['time_travel_retention_days'],
            'num_files': detail.get('numFiles'), 'size_in_bytes': detail.get('sizeInBytes'), 'size_pretty': _human_bytes(detail.get('sizeInBytes')),
            'table_features': [str(f) for f in (detail.get('tableFeatures') or [])], 'table_id': detail.get('id'),
            'created_at': detail.get('createdAt', r.created), 'last_altered': detail.get('lastModified', r.last_altered),
            'extracted_at': datetime.now(timezone.utc).isoformat(), 'extraction_method': 'detail_python',
        }
        
        if 'error' in detail:
            rec['detail_error'] = detail['error']
            
        out.append(rec)

    return out

def build_column_ddl_info(table_details: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """COLUMN_DDL_INFO構築のメイン処理"""
    # パーティション・クラスタ列のマップ作成
    part_cols_map: Dict[Tuple[str, str, str], Set[str]] = {}
    clus_cols_map: Dict[Tuple[str, str, str], Set[str]] = {}

    for d in table_details:
        key = (d.get('catalog_name'), d.get('schema_name'), d.get('table_name'))
        if not all(key):
            continue
        _, part_cols = extract_partition_info(d)
        part_cols_map[key] = set([c.lower() for c in part_cols])
        _, clus_cols = extract_clustering_info(d)
        clus_cols_map[key] = set([c.lower() for c in clus_cols])

    # PK/FKフラグをシンプル取得
    pk_fk_flags = get_pk_fk_flags()

    # 列のベース情報を取得
    rows = spark.sql("""
        SELECT * FROM base_columns
        ORDER BY table_catalog, table_schema, table_name, ordinal_position
    """).collect()

    out: List[Dict[str, Any]] = []
    now_utc = datetime.now(timezone.utc).isoformat()

    for r in rows:
        key = (r.table_catalog, r.table_schema, r.table_name)
        pset = part_cols_map.get(key, set())
        cset = clus_cols_map.get(key, set())
        
        col_name_lower = r.column_name.lower()
        flag_key = (r.table_schema, r.table_name, col_name_lower)
        flags = pk_fk_flags.get(flag_key, {'is_pk': False, 'is_fk': False})

        rec = {
            "catalog_name": r.table_catalog, "schema_name": r.table_schema, "table_name": r.table_name, "column_name": r.column_name,
            "ordinal_position": int(r.ordinal_position), "data_type": r.data_type, "numeric_precision": r.numeric_precision, "numeric_scale": r.numeric_scale,
            "is_nullable": (str(r.is_nullable).upper() == "YES"), "default_value": r.column_default,
            "column_comment": getattr(r, "comment", None) or getattr(r, "column_comment", None),
            "is_partition_column": col_name_lower in pset, "is_clustering_column": col_name_lower in cset,
            "is_primary_key": flags['is_pk'], "foreign_key_reference": None if not flags['is_fk'] else "FK参照あり",
            "extracted_at": now_utc, "extraction_method": "detail_python",
        }
        
        out.append(rec)

    return out

In [0]:
def _human_bytes(n: Any) -> str | None:
    """バイト数に単位をつけて返す"""
    if n is None:
        return None
    try:
        n = int(n)
    except Exception:
        return None
    units = ["B","KB","MB","GB","TB","PB"]
    i = 0
    x = float(n)
    while x >= 1024 and i < len(units)-1:
        x /= 1024.0
        i += 1
    return f"{x:.2f} {units[i]}"

def build_table_ddl_info(table_details: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """TABLE_DDL_INFO構築のメイン処理"""
    idx: Dict[Tuple[str, str, str], Dict[str, Any]] = {}
    for d in table_details:
        k = (d.get('catalog_name'), d.get('schema_name'), d.get('table_name'))
        if all(k):
            idx[k] = d

    base_rows = spark.sql("""
        SELECT table_catalog, table_schema, table_name, table_type, created, last_altered
        FROM base_tables
        ORDER BY table_catalog, table_schema, table_name
    """).collect()

    out: List[Dict[str, Any]] = []
    for r in base_rows:
        key = (r.table_catalog, r.table_schema, r.table_name)
        detail = idx.get(key, {})
        
        cluster_strategy, cluster_cols = extract_clustering_info(detail)
        partition_strategy, partition_cols = extract_partition_info(detail)
        delta_props = get_delta_properties(detail) if detail else {
            'auto_optimize_write': False, 'auto_optimize_compact': False, 'cdf_enabled': False,
            'stats_column_limit': 32, 'stats_custom_columns': None,
            'vacuum_retention_hours': 168, 'time_travel_retention_days': 30,
        }
        
        rec = {
            'catalog_name': r.table_catalog, 'schema_name': r.table_schema, 'table_name': r.table_name, 'table_type': r.table_type,
            'storage_format': detail.get('format', 'DELTA'), 'storage_location': detail.get('location'),
            'external_location': detail.get('location') if r.table_type == 'EXTERNAL' else None,
            'partition_strategy': partition_strategy, 'partition_columns': partition_cols,
            'clustering_strategy': cluster_strategy, 'clustering_columns': cluster_cols,
            'auto_optimize_write': delta_props['auto_optimize_write'], 'auto_optimize_compact': delta_props['auto_optimize_compact'],
            'vacuum_retention_hours': delta_props['vacuum_retention_hours'], 'stats_column_limit': delta_props['stats_column_limit'],
            'stats_custom_columns': delta_props['stats_custom_columns'], 'cdf_enabled': delta_props['cdf_enabled'],
            'time_travel_retention_days': delta_props['time_travel_retention_days'],
            'num_files': detail.get('numFiles'), 'size_in_bytes': detail.get('sizeInBytes'), 'size_pretty': _human_bytes(detail.get('sizeInBytes')),
            'table_features': [str(f) for f in (detail.get('tableFeatures') or [])], 'table_id': detail.get('id'),
            'created_at': detail.get('createdAt', r.created), 'last_altered': detail.get('lastModified', r.last_altered),
            'extracted_at': datetime.now(timezone.utc).isoformat(), 'extraction_method': 'detail_python',
        }
        
        if 'error' in detail:
            rec['detail_error'] = detail['error']
            
        out.append(rec)

    return out

def build_column_ddl_info(table_details: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """COLUMN_DDL_INFO構築のメイン処理（統合版）"""
    # パーティション・クラスタ列のマップ作成
    part_cols_map: Dict[Tuple[str, str, str], Set[str]] = {}
    clus_cols_map: Dict[Tuple[str, str, str], Set[str]] = {}

    for d in table_details:
        key = (d.get('catalog_name'), d.get('schema_name'), d.get('table_name'))
        if not all(key):
            continue
        _, part_cols = extract_partition_info(d)
        part_cols_map[key] = set([c.lower() for c in part_cols])
        _, clus_cols = extract_clustering_info(d)
        clus_cols_map[key] = set([c.lower() for c in clus_cols])

    # 統合クエリから列+制約情報を取得
    rows = spark.sql("""
        SELECT * FROM columns_with_constraints
        ORDER BY table_catalog, table_schema, table_name, ordinal_position
    """).collect()

    out: List[Dict[str, Any]] = []
    now_utc = datetime.now(timezone.utc).isoformat()

    for r in rows:
        key = (r.table_catalog, r.table_schema, r.table_name)
        pset = part_cols_map.get(key, set())
        cset = clus_cols_map.get(key, set())
        
        col_name_lower = r.column_name.lower()

        rec = {
            "catalog_name": r.table_catalog, "schema_name": r.table_schema, "table_name": r.table_name, "column_name": r.column_name,
            "ordinal_position": int(r.ordinal_position), "data_type": r.data_type, "numeric_precision": r.numeric_precision, "numeric_scale": r.numeric_scale,
            "is_nullable": (str(r.is_nullable).upper() == "YES"), "default_value": r.column_default,
            "column_comment": getattr(r, "comment", None) or getattr(r, "column_comment", None),
            "is_partition_column": col_name_lower in pset, "is_clustering_column": col_name_lower in cset,
            "is_primary_key": bool(r.is_pk), "foreign_key_reference": None if not r.is_fk else "FK参照あり",
            "extracted_at": now_utc, "extraction_method": "detail_python",
        }
        
        out.append(rec)

    return out

## 9. メタデータテーブル保存

In [0]:
# 統合設定を使用したデータ保存とエクスポート
meta_catalog = CONFIG["output_catalog"]
meta_schema  = CONFIG["target_catalog"]

# メタデータ保存先がない場合、作成
spark.sql(f"CREATE CATALOG IF NOT EXISTS `{meta_catalog}`")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS `{meta_catalog}`.`{meta_schema}`")

table_df  = spark.sql("SELECT * FROM table_ddl_info")
column_df = spark.sql("SELECT * FROM column_ddl_info")

# スナップショット時刻を付与
snap_ts = datetime.now(timezone.utc)
table_df  = table_df.withColumn("snapshot_at", F.lit(snap_ts))
column_df = column_df.withColumn("snapshot_at", F.lit(snap_ts))

# 最新版テーブルとして上書き保存（Managed Delta）
(table_df
 .write.mode("overwrite")
 .option("overwriteSchema", "true")
 .saveAsTable(f"`{meta_catalog}`.`{meta_schema}`.`{meta_schema}_table_ddl_info`"))

(column_df
 .write.mode("overwrite")
 .option("overwriteSchema", "true")
 .saveAsTable(f"`{meta_catalog}`.`{meta_schema}`.`{meta_schema}_column_ddl_info`"))

# 保持期間設定
retention_days = CONFIG["retention_days"]

spark.sql(f"""  
        ALTER TABLE `{meta_catalog}`.`{meta_schema}`.`{meta_schema}_table_ddl_info`  SET TBLPROPERTIES
        ('delta.logRetentionDuration'='interval {retention_days} days',
        'delta.deletedFileRetentionDuration'='interval {retention_days} days')
        """)

spark.sql(f"""
        ALTER TABLE `{meta_catalog}`.`{meta_schema}`.`{meta_schema}_column_ddl_info` SET TBLPROPERTIES
        ('delta.logRetentionDuration'='interval {retention_days} days',
        'delta.deletedFileRetentionDuration'='interval {retention_days} days')
        """)

print(f"メタデータ保存完了: {meta_catalog}.{meta_schema}.*_ddl_info")
print(f"\nUnity Catalog メタデータ抽出が完了しました!")
print(f"TABLE_DDL_INFO: {len(table_ddl_data)} テーブル")
print(f"COLUMN_DDL_INFO: {len(column_ddl_data)} カラム")
print(f"PK/FK制約情報も含まれています")