In [0]:
%sql
USE CATALOG catproject_catalog;

In [0]:
from pyspark.sql.functions import col, lit, max as spark_max,year, month
from delta.tables import DeltaTable
import logging
import sys
from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import current_timestamp


In [0]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("Upsert Customers Silver Table")

In [0]:
bronze_path = "catproject_catalog.ecom_bronze.customer_raw"
silver_path = "catproject_catalog.ecom_silver.customer_silver"


In [0]:
try:
    df_customer_silver = DeltaTable.forName(spark,silver_path)
except AnalysisException as e:
    logger.error(f"Failed to load silver table '{silver_path}': {str(e)}")
    sys.exit(1)  # kết thúc job với mã lỗi

In [0]:
def get_latest_ingestion_time(delta_tbl):
    from pyspark.sql.functions import col, max as spark_max

    df_partitioned = delta_tbl.toDF()  # ✅ chuyển DeltaTable thành DataFrame

    
    partition_max = df_partitioned.select("ingestion_year", "ingestion_month") \
        .groupBy() \
        .agg(spark_max("ingestion_year").alias("max_year")) \
        .collect()[0]

    max_year = partition_max["max_year"]

    partition_max_month = df_partitioned.filter(col("ingestion_year") == max_year) \
        .agg(spark_max("ingestion_month").alias("max_month")) \
        .collect()[0]

    max_month = partition_max_month["max_month"]

    df_filtered = df_partitioned.filter(
        (col("ingestion_year") == max_year) & (col("ingestion_month") == max_month)
    )

    max_timestamp_row = df_filtered.agg(spark_max("ingestion_time").alias("max_ingestion_time")).collect()
    return max_timestamp_row[0]["max_ingestion_time"] if max_timestamp_row else None

In [0]:

max_ingestion_time = get_latest_ingestion_time(df_customer_silver)
if max_ingestion_time : 
    print(f"max ingestion_time: {max_ingestion_time}")
else:
    print("no max ingestion_time")

In [0]:
try:
    df_customer_bronze = DeltaTable.forName(spark,bronze_path)
except AnalysisException as e:
    logger.error(f"Failed to load bronze table '{bronze_path}': {str(e)}")
    sys.exit(1)  # kết thúc job với mã lỗi
    
if max_ingestion_time:
    max_year = max_ingestion_time.year
    max_month = max_ingestion_time.month

    df_customer_bronze = df_customer_bronze.toDF().filter(
        (
            (col("ingestion_year") > max_year) |
            ((col("ingestion_year") == max_year) & (col("ingestion_month") >= max_month))
        ) & (col("ingestion_time") > max_ingestion_time)
    )
    display(df_customer_bronze)
else:
    df_customer_bronze = df_customer_bronze.toDF()
    display(df_customer_bronze.toDF())


In [0]:
if df_customer_bronze.count() > 0:
    # Merge dữ liệu mới
    df_customer_silver.alias("target").merge(
        df_customer_bronze.alias("source"),
        "target.customer_id = source.customer_id"
    ).whenMatchedUpdate(
    condition="target.record_hash != source.record_hash",
    set={
        "full_name": col("source.full_name"),
        "email": col("source.email"),
        "address": col("source.address"),
        "city": col("source.city"),
        "country": col("source.country"),
        "date_of_birth": col("source.date_of_birth"),
        "gender": col("source.gender"),
        "phone_number": col("source.phone_number"),
        "status": col("source.status"),
        "loyalty_level": col("source.loyalty_level"),
        "last_update_time": current_timestamp(),  # ghi lại thời gian cập nhật
        "record_hash": col("source.record_hash")
    }
    ).whenNotMatchedInsert(
    values={
        "customer_id":col("source.customer_id"),
        "full_name": col("source.full_name"),
        "email": col("source.email"),
        "address": col("source.address"),
        "city": col("source.city"),
        "country": col("source.country"),
        "date_of_birth": col("source.date_of_birth"),
        "gender": col("source.gender"),
        "phone_number": col("source.phone_number"),
        "registration_date": col("source.registration_date"),
        "status": col("source.status"),
        "loyalty_level": col("source.loyalty_level"),
        "source_system": col("source.source_system"),
        "ingestion_time": current_timestamp(),
        "ingestion_year": year(current_timestamp()),
        "ingestion_month": month(current_timestamp()),
        "last_update_time": current_timestamp(),
        "operation_type": col("source.operation_type"),
        "record_hash": col("source.record_hash"),
        "source_updated_at": col("source.source_updated_at"),
        "total_spent": "0",
        "total_orders": "0"
    }
    ).execute()
    print("✅ Merge thành công.")
else:
    print("⏳ Không có dữ liệu mới để merge.")
