In [0]:
from datetime import timedelta
from pyspark.sql.functions import col, lit, current_timestamp

catalog = "dataquality"
schema = "dq_demo"
base_path = f"/Volumes/{catalog}/{schema}/dq_demo_volume"

pipeline_name = "bev_hourly_pipeline"

# -----------------------
# 1. 進捗取得
# -----------------------
state_df = spark.table(f"{catalog}.{schema}.ingest_state") \
    .where(col("pipeline_name") == pipeline_name)

last_ts = state_df.select("last_processed_hour").collect()[0][0]
next_hour = last_ts + timedelta(hours=1)

mm_dd = next_hour.strftime("%m-%d")
hh = next_hour.strftime("%H")

input_path = f"{base_path}/dt={mm_dd}/hr={hh}"

print("Processing hour:", next_hour, input_path)

# -----------------------
# 2. ファイル読込
# -----------------------
bronze_df = (
    spark.read
      .json(input_path)
      .withColumn("ingest_hour", lit(next_hour))
      .withColumn("ingested_at", current_timestamp())
)

# -----------------------
# 3. Bronze 書き込み
# -----------------------
bronze_df.write.format("delta") \
    .mode("append") \
    .saveAsTable(f"{catalog}.{schema}.bronze_bev_events")

# -----------------------
# 4. state 更新
# -----------------------
spark.sql(f"""
UPDATE {catalog}.{schema}.ingest_state
SET last_processed_hour = TIMESTAMP '{next_hour}',
    updated_at = current_timestamp()
WHERE pipeline_name = '{pipeline_name}'
""")