In [None]:
import json
import requests
import pandas as pd
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, StructField, 
    IntegerType, StringType, FloatType, DoubleType,
    DateType, TimestampType, BooleanType, DecimalType
)

In [None]:
table_name = ""
schema_json = ""
primary_keys = ""

In [None]:
print("=" * 70)
print("INCREMENTAL LOAD WITH MERGE INTO")
print("=" * 70)
print(f"\n Parameters nhận được:")
print(f"  ├─ table_name: {table_name if table_name else '(empty)'}")
print(f"  └─ primary_keys: {primary_keys if primary_keys else '(empty)'}")

# Validate
if not table_name:
    raise ValueError("Thiếu parameter 'table_name'!")

if not schema_json:
    raise ValueError("Thiếu parameter 'schema_json'!")

if not primary_keys:
    raise ValueError("Thiếu parameter 'primary_keys' cho MERGE!")

# Parse primary keys
pk_list = [pk.strip() for pk in primary_keys.split(",")]

In [None]:
def get_supabase_credentials():
    """Load Supabase credentials từ Lakehouse"""
    config_path = "/lakehouse/default/Files/.config/credentials.json"
    
    try:
        with open(config_path, "r") as f:
            creds = json.load(f)
        return creds["supabase"]["url"], creds["supabase"]["key"]
    except FileNotFoundError:
        print("Chưa setup credentials!")
        return None, None
    except Exception as e:
        print(f"Lỗi đọc config: {e}")
        return None, None

In [None]:
def parse_schema_from_json(schema_json_str: str) -> StructType:
    """Parse schema từ JSON string"""
    try:
        schema_dict = json.loads(schema_json_str)
        
        type_mapping = {
            "IntegerType": IntegerType(),
            "StringType": StringType(),
            "FloatType": FloatType(),
            "DoubleType": DoubleType(),
            "DateType": DateType(),
            "TimestampType": TimestampType(),
            "BooleanType": BooleanType(),
        }
        
        fields = []
        for field_dict in schema_dict["fields"]:
            field_name = field_dict["name"]
            field_type_str = field_dict["type"]
            nullable = field_dict.get("nullable", True)
            
            if field_type_str.startswith("DecimalType"):
                import re
                match = re.search(r'DecimalType\((\d+),\s*(\d+)\)', field_type_str)
                if match:
                    precision, scale = int(match.group(1)), int(match.group(2))
                    field_type = DecimalType(precision, scale)
                else:
                    field_type = DecimalType(19, 4)
            else:
                field_type = type_mapping.get(field_type_str, StringType())
            
            fields.append(StructField(field_name, field_type, nullable))
        
        return StructType(fields)
        
    except Exception as e:
        print(f"Lỗi parse schema: {e}")
        raise

In [None]:
def get_last_watermark(spark, table_name: str):
    """Lấy max_modified_date từ watermark table"""
    try:
        watermark_table = "water_mark"
        
        # Kiểm tra xem bảng watermark có tồn tại không
        if not spark.catalog.tableExists(watermark_table):
            print(f"Bảng watermark chưa tồn tại, sẽ load full data lần đầu")
            return None
        
        # Lấy watermark
        result = spark.sql(f"""
            SELECT max_modified_date 
            FROM {watermark_table}
            WHERE table_name = '{table_name}'
        """).collect()
        
        if result and result[0][0] is not None:
            watermark_date = result[0][0]
            print(f"Tìm thấy watermark: {watermark_date}")
            return watermark_date
        else:
            print(f"Chưa có watermark cho bảng {table_name}, sẽ load full data")
            return None
            
    except Exception as e:
        print(f"Lỗi khi lấy watermark: {e}")
        return None

In [None]:
def fetch_incremental_data(base_url: str, api_key: str, table_name: str, 
                          last_modified_date=None, batch_size: int = 1000) -> pd.DataFrame:
    """Lấy dữ liệu incremental từ Supabase (ModifiedDate > watermark)"""
    headers = {
        "apikey": api_key,
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
    
    all_data = []
    offset = 0
    
    print(f"\nĐang lấy dữ liệu từ bảng: {table_name}")
    
    if last_modified_date:
        # Convert datetime to ISO format string cho Supabase filter
        if isinstance(last_modified_date, datetime):
            last_modified_str = last_modified_date.isoformat()
        else:
            last_modified_str = str(last_modified_date)
        
        print(f"  ├─ Filter: ModifiedDate > {last_modified_str}")
    else:
        print(f"  ├─ Mode: FULL LOAD (lần đầu)")
    
    while True:
        url = f"{base_url}/rest/v1/{table_name}"
        
        params = {
            "select": "*",
            "limit": batch_size,
            "offset": offset
        }
        
        # Thêm filter nếu có watermark
        if last_modified_date:
            params["ModifiedDate"] = f"gt.{last_modified_str}"
            params["order"] = "ModifiedDate.asc"
        
        try:
            response = requests.get(url, headers=headers, params=params)
            response.raise_for_status()
            
            data = response.json()
            
            if not data:
                break
            
            all_data.extend(data)
            print(f"  ├─ Đã lấy {len(all_data)} records...")
            
            if len(data) < batch_size:
                break
            
            offset += batch_size
            
        except requests.exceptions.RequestException as e:
            print(f"Lỗi request: {e}")
            return pd.DataFrame()
    
    if all_data:
        df = pd.DataFrame(all_data)
        print(f"Hoàn tất: {len(df)} records x {len(df.columns)} columns")
        return df
    else:
        print(f"Không có dữ liệu mới")
        return pd.DataFrame()

In [None]:
def create_watermark_table_if_not_exists(spark):
    """Tạo bảng watermark nếu chưa tồn tại"""
    try:
        watermark_table = "water_mark"
        
        # Kiểm tra bảng có tồn tại không
        tables = [t.name for t in spark.catalog.listTables()]
        
        if watermark_table not in tables:
            print(f"\n Tạo bảng watermark...")
            
            create_query = f"""
            CREATE TABLE IF NOT EXISTS {watermark_table} (
                table_name STRING,
                max_modified_date TIMESTAMP,
                layer_type STRING,
                last_load_time TIMESTAMP,
                row_count BIGINT
            ) USING DELTA
            """
            spark.sql(create_query)
            print(f"Đã tạo bảng {watermark_table}")
            
    except Exception as e:
        print(f"Lỗi khi tạo bảng watermark: {e}")
        raise

In [None]:
def update_watermark(spark, table_name: str, pandas_df: pd.DataFrame, 
                     layer_type: str = "bronze"):
    """Cập nhật watermark cho 1 bảng"""
    try:
        create_watermark_table_if_not_exists(spark)
        
        watermark_table = "water_mark"
        
        # Tìm max_modified_date
        if 'ModifiedDate' in pandas_df.columns:
            pandas_df['ModifiedDate'] = pd.to_datetime(pandas_df['ModifiedDate'])
            max_modified = pandas_df['ModifiedDate'].max()
            
            if pd.notna(max_modified):
                max_modified = max_modified.to_pydatetime()
            else:
                max_modified = datetime.now()
                print(f"ModifiedDate có giá trị NULL, dùng datetime.now()")
        else:
            print(f"Không có cột ModifiedDate, dùng datetime.now()")
            max_modified = datetime.now()
        
        # Tạo record watermark
        watermark_record = [{
            'table_name': table_name,
            'max_modified_date': max_modified,
            'layer_type': layer_type,
            'last_load_time': datetime.now(),
            'row_count': len(pandas_df)
        }]
        
        watermark_df = spark.createDataFrame(watermark_record)
        watermark_df.createOrReplaceTempView("temp_watermark")
        
        merge_query = f"""
        MERGE INTO {watermark_table} AS target
        USING temp_watermark AS source
        ON target.table_name = source.table_name
        WHEN MATCHED THEN
            UPDATE SET 
                max_modified_date = source.max_modified_date,
                last_load_time = source.last_load_time,
                layer_type = source.layer_type,
                row_count = source.row_count
        WHEN NOT MATCHED THEN
            INSERT (table_name, max_modified_date, layer_type, last_load_time, row_count)
            VALUES (source.table_name, source.max_modified_date, source.layer_type, 
                    source.last_load_time, source.row_count)
        """
        
        spark.sql(merge_query)
        
        print(f"\nĐã cập nhật watermark:")
        print(f"  ├─ Table: {table_name}")
        print(f"  ├─ Max Modified Date: {max_modified}")
        print(f"  ├─ Row Count: {len(pandas_df):,}")
        print(f"  └─ Load Time: {datetime.now()}")
        
    except Exception as e:
        print(f"\nLỗi khi cập nhật watermark: {e}")
        raise

In [None]:
def merge_data_into_table(spark, source_df, target_table: str, primary_keys: list):
    """MERGE dữ liệu vào target table"""
    try:
        # Tạo temp view cho source data
        source_df.createOrReplaceTempView("temp_source")
        
        # Build JOIN condition từ primary keys
        join_conditions = " AND ".join([
            f"target.{pk} = source.{pk}" for pk in primary_keys
        ])
        
        # Build UPDATE SET clause (tất cả columns trừ primary keys)
        all_columns = source_df.columns
        update_columns = [col for col in all_columns if col not in primary_keys]
        update_set = ", ".join([f"{col} = source.{col}" for col in update_columns])
        
        # Build INSERT clause
        insert_columns = ", ".join(all_columns)
        insert_values = ", ".join([f"source.{col}" for col in all_columns])
        
        # MERGE query
        merge_query = f"""
        MERGE INTO {target_table} AS target
        USING temp_source AS source
        ON {join_conditions}
        WHEN MATCHED THEN
            UPDATE SET {update_set}
        WHEN NOT MATCHED THEN
            INSERT ({insert_columns})
            VALUES ({insert_values})
        """
        
        print(f"\n Thực hiện MERGE INTO...")
        print(f"  ├─ Target: {target_table}")
        print(f"  ├─ Primary Keys: {', '.join(primary_keys)}")
        print(f"  └─ Source Records: {source_df.count():,}")
        
        spark.sql(merge_query)
        
        print(f"MERGE hoàn tất!")
        
    except Exception as e:
        print(f"Lỗi khi MERGE: {e}")
        raise

In [None]:
print("\n" + "=" * 70)
print(f"INCREMENTAL LOAD TABLE: {table_name}")
print("=" * 70)

try:
    # Bước 1: Parse schema
    print(f"\n Parse schema...")
    schema = parse_schema_from_json(schema_json)
    print(f"Schema có {len(schema.fields)} cột")
    
    # Bước 2: Khởi tạo Spark
    spark = SparkSession.builder.appName(f"IncrementalLoad_{table_name}").getOrCreate()
    
    full_table_name = f"{table_name}"
    
    # Bước 3: Lấy watermark
    last_watermark = get_last_watermark(spark, table_name)
    
    # Bước 4: Lấy dữ liệu incremental từ Supabase
    url, key = get_supabase_credentials()
    if not url or not key:
        raise ValueError("Không thể lấy credentials!")
    
    pandas_df = fetch_incremental_data(url, key, table_name, last_watermark)
    
    # Nếu không có dữ liệu mới
    # Nếu không có dữ liệu mới
    if pandas_df.empty:
        print("\n Không có dữ liệu mới để load!")
        print(f"  └─ Watermark hiện tại: {last_watermark}")
        
        # Không thoát với lỗi nữa — chỉ in thông báo và kết thúc bình thường
        print("\n Bỏ qua tác vụ vì không có dữ liệu mới.")
        
        mssparkutils.notebook.exit("SUCCESS_NO_NEW_DATA")

    
    # Bước 5: Convert sang Spark DataFrame với schema
    print(f"\n Chuyển đổi DataFrame với schema...")
    try:
        # Convert các cột datetime trong pandas trước
        for field in schema.fields:
            if isinstance(field.dataType, TimestampType) and field.name in pandas_df.columns:
                pandas_df[field.name] = pd.to_datetime(pandas_df[field.name], errors='coerce')
        
        spark_df = spark.createDataFrame(pandas_df, schema=schema)
        print(f"Đã áp dụng schema")
        
    except Exception as e:
        print(f"Lỗi khi áp dụng schema: {e}")
        print("Đang thử load mà không ép schema...")
        spark_df = spark.createDataFrame(pandas_df)
    
    # Bước 6: Kiểm tra target table có tồn tại chưa
    if not spark.catalog.tableExists(full_table_name):
        print(f"\n Bảng {full_table_name} chưa tồn tại, tạo mới...")
        
        spark_df.write \
            .mode("overwrite") \
            .format("delta") \
            .saveAsTable(full_table_name)
        
        print(f"Đã tạo bảng {full_table_name}")
    else:
        # Bước 7: MERGE dữ liệu vào target table
        merge_data_into_table(spark, spark_df, full_table_name, pk_list)
    
    # Bước 8: Cập nhật watermark
    print(f"\nCập nhật watermark...")
    update_watermark(spark, table_name, pandas_df)
    
    # Bước 9: Hiển thị kết quả
    print("\n" + "=" * 70)
    print("HOÀN TẤT INCREMENTAL LOAD")
    print("=" * 70)
    
    row_count = spark.sql(f"SELECT COUNT(*) as cnt FROM {full_table_name}").collect()[0][0]
    
    print(f"\nThống kê:")
    print(f"  ├─ Table: {full_table_name}")
    print(f"  ├─ Total Rows: {row_count:,}")
    print(f"  ├─ New/Updated Rows: {len(pandas_df):,}")
    print(f"  └─ Load time: {datetime.now()}")
    
    # Hiển thị watermark
    print(f"\n Watermark hiện tại:")
    spark.sql(f"""
        SELECT * FROM water_mark 
        WHERE table_name = '{table_name}'
    """).show(truncate=False)
    
    # Return success
    mssparkutils.notebook.exit("SUCCESS")
    
except Exception as e:
    error_msg = f"FAILED: {str(e)}"
    print(f"\n LỖI: {error_msg}")
    import traceback
    traceback.print_exc()
    mssparkutils.notebook.exit(error_msg)
    raise
