In [0]:
from pyspark.sql.functions import col, current_timestamp
from delta.tables import DeltaTable

# ============ 配置 ============
env = "dev"
catalog = f"{env}_catalog"
schema = f"crime_data_{env}"

source_table = f"{catalog}.{schema}.bronze_crime"
target_table = f"{catalog}.{schema}.silver_crime"
checkpoint_path = f"abfss://{env}@kevintestdatabricks.dfs.core.windows.net/_checkpoints/silver_crime/"

In [0]:
# ============ 创建目标表（如果不存在）============
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {target_table} (
    id STRING,
    case_number STRING,
    date STRING,
    block STRING,
    iucr STRING,
    primary_type STRING,
    description STRING,
    location_description STRING,
    arrest BOOLEAN,
    domestic BOOLEAN,
    beat STRING,
    district STRING,
    ward STRING,
    community_area STRING,
    fbi_code STRING,
    x_coordinate FLOAT,
    y_coordinate FLOAT,
    year STRING,
    updated_on STRING,
    latitude FLOAT,
    longitude FLOAT,
    location STRING,
    _ingestion_time TIMESTAMP,
    _source_file STRING,
    _env STRING,
    _silver_processed_time TIMESTAMP
)
USING DELTA
TBLPROPERTIES (
    'delta.enableChangeDataFeed' = 'true'
)
""")

print(f"✅ Target table ready: {target_table}")

In [0]:
# ============ 定义 MERGE 函数 ============
def upsert_to_silver(batch_df, batch_id):
    """增量合并到 Silver 表，基于 id 去重"""
    
    # 类型转换
    transformed_df = (batch_df
        .withColumn("latitude", col("latitude").cast("float"))
        .withColumn("longitude", col("longitude").cast("float"))
        .withColumn("x_coordinate", col("x_coordinate").cast("float"))
        .withColumn("y_coordinate", col("y_coordinate").cast("float"))
        .withColumn("_silver_processed_time", current_timestamp())
        # 批次内去重（保留最新）
        .dropDuplicates(["id"])
    )
    
    # 如果目标表存在，执行 MERGE
    if spark.catalog.tableExists(target_table):
        delta_table = DeltaTable.forName(spark, target_table)
        
        (delta_table.alias("target")
            .merge(
                transformed_df.alias("source"),
                "target.id = source.id"
            )
            .whenMatchedUpdate(
                condition="source.updated_on > target.updated_on",  # 只更新更新时间更晚的
                set={
                    "case_number": "source.case_number",
                    "date": "source.date",
                    "block": "source.block",
                    "iucr": "source.iucr",
                    "primary_type": "source.primary_type",
                    "description": "source.description",
                    "location_description": "source.location_description",
                    "arrest": "source.arrest",
                    "domestic": "source.domestic",
                    "beat": "source.beat",
                    "district": "source.district",
                    "ward": "source.ward",
                    "community_area": "source.community_area",
                    "fbi_code": "source.fbi_code",
                    "x_coordinate": "source.x_coordinate",
                    "y_coordinate": "source.y_coordinate",
                    "year": "source.year",
                    "updated_on": "source.updated_on",
                    "latitude": "source.latitude",
                    "longitude": "source.longitude",
                    "location": "source.location",
                    "_ingestion_time": "source._ingestion_time",
                    "_source_file": "source._source_file",
                    "_env": "source._env",
                    "_silver_processed_time": "source._silver_processed_time"
                }
            )
            .whenNotMatchedInsertAll()
            .execute()
        )
    else:
        # 表不存在，直接写入
        transformed_df.write.format("delta").mode("append").saveAsTable(target_table)
    
    print(f"✅ Batch {batch_id}: Processed {transformed_df.count()} records")

In [0]:
# ============ 从 Bronze 增量读取并处理 ============
# 使用 readStream 从 Delta 表增量读取
bronze_stream = (spark.readStream
    .format("delta")
    .option("ignoreChanges", "true")  # 忽略更新，只读新增
    .table(source_table)
)

# 使用 foreachBatch 执行 MERGE
query = (bronze_stream.writeStream
    .foreachBatch(upsert_to_silver)
    .option("checkpointLocation", checkpoint_path)
    .trigger(availableNow=True)  # 处理完当前数据就停止
    .start()
)

query.awaitTermination()
print(f"✅ Silver table updated: {target_table}")

In [0]:
# ============ 验证结果 ============
print(f"Bronze count: {spark.table(source_table).count()}")
print(f"Silver count: {spark.table(target_table).count()}")

# 检查是否有重复 ID
duplicate_check = spark.sql(f"""
    SELECT id, COUNT(*) as cnt 
    FROM {target_table} 
    GROUP BY id 
    HAVING cnt > 1
""")
print(f"Duplicate IDs: {duplicate_check.count()}")

# 查看数据
display(spark.table(target_table).limit(10))
