In [0]:
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    TimestampType,
)
from pyspark.sql.functions import lit, current_timestamp, col
from delta.tables import DeltaTable
from datetime import datetime

In [0]:
df = (
    spark.read.format("csv")
    .option("header", True)
    .load("/Volumes/nyctaxi/00_landing/data_sources/lookup/taxi_zone_lookup.csv")
)


In [0]:
df=df.select(
                col("LocationID").alias("location_id").cast(IntegerType()),
                col("Borough").alias("Borough").cast(StringType()),
                col("Zone").alias("zone").cast(StringType()),
                col("service_zone").cast(StringType()),
                lit(current_timestamp()).alias("effective_date"),
                lit(None).cast(TimestampType()).alias("end_date")
               )

In [0]:
# fixed end time point to close the record
end_timestamp= datetime.now()

# load the SCD2 delta table
dt = DeltaTable.forName(spark, "nyctaxi.02_silver.taxi_zone_lookup")


In [0]:
## Pass 1 close any row that has a new attribute if it s still open
dt.alias("target").merge(
    source = df.alias("source"),
    condition= "target.location_id = source.location_id and source.effective_date > target.effective_date",
).whenMatchedUpdate(
    set={
        "end_date": lit(end_timestamp).cast(TimestampType()),
    }
).execute()



In [0]:
## Pass 2 insert new records
# on récupère les location_id des lignes qu'on vient de fermer
id_list= [row.location_id for row in dt.toDF().filter(f"end_date = '{end_timestamp}'").select("location_id").collect()]
# if id_list is empty, don't try to insert anything
if len(id_list)==0:
    print("no new records to insert")
    exit()
else:
    dt.alias("target").merge(
        source = df.alias("source"),
        condition = f"source.location_id not in ({', '.join(map(str,id_list))})"
    ).\
    whenNotMatchedInsert( # si on retrouve l'id dans la source ("WhenNot.."), alors celui-ci a été modifié: il faut l'insérer dans la target 
        values = {
            "location_id": "source.location_id",
            "Borough": "source.Borough",
            "zone": "source.zone",
            "service_zone": "source.service_zone",
            "effective_date": current_timestamp(),
            "end_date": lit(None).cast(TimestampType())}
    ).\
    execute()

In [0]:
# Pass 3: insert new keys ()..les nouveaux ids
dt.alias("target").merge(
    source = df.alias("source"),
    condition = "target.location_id = source.location_id",
).whenNotMatchedInsert(
    values = {
        "location_id": "source.location_id",
        "Borough": "source.Borough",
        "zone": "source.zone",
        "service_zone": "source.service_zone",
        "effective_date": current_timestamp(),
        "end_date": lit(None).cast(TimestampType())}
).execute()
# display(df)



In [0]:
# display(df)
# display(spark.sql("select * from nyctaxi.02_silver.taxi_zone_lookup"))