In [0]:
from typing import Final, Dict, List, Union
from ast import literal_eval
from json import load
from pyspark.sql import DataFrame, Column, Window
from pyspark.sql.functions import (
    col,
    to_date,
    to_timestamp,
    expr,
    row_number,
    max,
    current_timestamp,
    desc,
)
from delta import DeltaTable

In [0]:
dbutils.widgets.text("path_to_json", "/Volumes/dihpoc/ssot/mara/sample.json")

In [0]:
CONFIGS: Final[Dict[str, Union[str, List[Dict[str, Union[str, List[Dict[str, str]]]]]]]] = (
    load(open(dbutils.widgets.get("path_to_json")))
)

In [0]:
spark.sql(f"USE CATALOG {CONFIGS['catalog']};")

In [0]:
spark.sql(f"USE SCHEMA {CONFIGS['source_schema']};")

In [0]:
MAPPING: Final[List[Dict[str, Union[str, List[Dict[str, str]]]]]] = CONFIGS["target_tables_mapping"]

In [0]:
SOURCE_DF: Final[DataFrame] = spark.readStream.format("delta").table(CONFIGS["source_table"])

In [0]:
# display(
#     dbutils.fs.rm("/Volumes/dihpoc/ssot/mara/checkpoint", True)
# )

In [0]:
dbutils.fs.mkdirs("/Volumes/dihpoc/ssot/mara/checkpoint")

In [0]:
def write_to_sinks(source_df: DataFrame, BATCH_ID: Final[int]) -> None:
    print("111111111111111111111111111111")
    for config in MAPPING:
        clone_df: DataFrame = source_df
        print("222222222222222222222222222222222222222")
        for operations in config.get("transformations", []):
            for column, transformation in operations.items():
                clone_df: DataFrame = clone_df.withColumn(
                    column, expr(transformation),
                )
                
        TS: Final[str] = config.get("timestamp_column")
        clone_df: DataFrame = clone_df.withColumnRenamed(CONFIGS["source_timestamp"], TS)
        TARGET_TABLE: Final[str] = ".".join([config.get("schema"), config.get("table")])
        TARGET_DF: Final[DataFrame] = source_df.sparkSession.table(TARGET_TABLE)
        TARGET_COLS: Final[List[str]] = TARGET_DF.columns

        latest: str = TARGET_DF.agg(max(col(TS))).first()[f"max({TS})"]
        LATEST: Final[str] = latest if latest else "0000-01-01T00:00:00.000Z"
        clone_df: DataFrame = clone_df.filter(col(CONFIGS["source_timestamp"]) > LATEST)

        SOURCE_PKS: Final[List[str]] = [
            key.strip() for key in config.get("source_pks").split(",")
        ]
        
        clone_df: DataFrame = (
            clone_df.withColumn(
                "rn",
                row_number().over(
                    Window.partitionBy(*SOURCE_PKS).orderBy(
                        desc(TS)
                    )
                ),
            )
            .filter(col("rn") == 1)
            .drop("rn")
        )
        
        print("44444444444444444444444444444444")
        MODE: Final[str] = config.get("mode")
        if MODE == "update":
            TARGET_PKS: Final[List[str]] = [
                key.strip() for key in config.get("target_pks").split(",")
            ]
            condition: List[str] = []
            for source, target in zip(SOURCE_PKS, TARGET_PKS):
                condition.append(f"b.{source} = s.{target}")

            DeltaTable.forName(source_df.sparkSession, TARGET_TABLE).alias("s").merge(
                clone_df.alias("b"),
                condition=" and ".join(condition),
            ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
        else:
            clone_df.select(*TARGET_COLS).write.format("delta").saveAsTable(
                name=TARGET_TABLE,
                mode=MODE,
            )

In [0]:
stream: DataFrame = SOURCE_DF.writeStream.format("delta").option(
    "checkpointLocation", CONFIGS["checkpoint_location"],
).trigger(availableNow=True).foreachBatch(write_to_sinks).queryName("ssot").start()