In [0]:
from pyspark.sql.functions import col, regexp_extract
import os
import json
from pytz import timezone
# from pyspark import SparkConf, SparkContext
# from pyspark.sql import SparkSession, SQLContext
from datetime import datetime
from pyspark.sql.functions import *
# from Fileops import FileOps as g
spark.sql("SET TIME ZONE 'EST'")
from delta.tables import DeltaTable
from pyspark.sql.functions import col


InputPath='dbfs:/mnt/rs05ue2pipadl03_Readonly/FIONA/RDS/Delhaize/InstacartSourceData/FoodLion/*/*food_lion.pickup_order_metrics.*.csv.gz'
delta_table_name = "merchandising_dev.pos.fdln_pickup_order_metrics"

df_in_delta = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(InputPath)

# Read the file with metadata enabled (this is automatic in UC when reading Parquet/CSV)
df_in_delta = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(InputPath) \
    .withColumn("file_path", col("_metadata.file_path"))

# Extract the file name from the full file path
df_in_delta = df_in_delta.withColumn(
    "file_name",
    regexp_extract(col("file_path"), r".*/([^/]+\.csv\.gz)$", 1)
).withColumn(
    "file_date",
    regexp_extract(col("file_name"), r"(\d{4}-\d{2}-\d{2})", 1)
)

df_in_delta.createOrReplaceTempView("vw_rpt_8350_foodlion_source")

df_out_pickup_order_metrics=spark.sql(""" SELECT *,row_number() OVER (PARTITION BY order_id,order_delivery_id ORDER BY file_date desc) as rn from vw_rpt_8350_foodlion_source""")


# Keep only the latest rows per order_id & order_delivery_id
df_latest = df_out_pickup_order_metrics.filter(col("rn") == 1).drop("rn").drop("file_path")

# Check if Delta table exists at that name
if DeltaTable.isDeltaTable(spark, f"/mnt/delta/{delta_table_name}"):
    delta_table = DeltaTable.forName(spark, delta_table_name)
else:
    # Create Delta table if it does not exist yet
    df_latest.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(delta_table_name)
    delta_table = DeltaTable.forName(spark, delta_table_name)

# Merge condition based on order_id and order_delivery_id
merge_condition = """
  target.order_id = source.order_id AND
  target.order_delivery_id = source.order_delivery_id
"""

# Perform the merge/upsert
delta_table.alias("target").merge(
    df_latest.alias("source"),
    merge_condition
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

print("Merge completed successfully on keys order_id, order_delivery_id")