Ingest Data from Silver Tables (csvs) and EndTimeRules

In [0]:
df_basic = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Volumes/postnord/default/data/Silver_item_basic_20251222.csv")
# change head() method to limit()
display(df_basic.limit(5))

In [0]:
df_basic.printSchema()

In [0]:
df_scan = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Volumes/postnord/default/data/Silver_item_scans_20251222.csv")

# change head() method to limit()
display(df_scan.limit(5))

In [0]:
display(df_scan.printSchema())

In [0]:
from pyspark.sql. functions import monotonically_increasing_id

# Load Excel without header
df_rules = spark.read. format("excel").option("header", "false").option("inferSchema", "true").load("/Volumes/postnord/default/data/EndTimeRules.xlsx")

# Get the first row values (these are actual column names)
header = df_rules.first()

#print(header)

# Add row index
df_rules = df_rules.withColumn("row_id", monotonically_increasing_id())

# Remove first row
df_rules = df_rules. filter(df_rules["row_id"] > 0).drop("row_id")

# Rename columns using header values
for i, col_name in enumerate(header):
    df_rules = df_rules. withColumnRenamed(f"_c{i}", str(col_name))

# The limit() method returns a spark dataframe
display(df_rules.limit(5)) 



In [0]:
display(df_rules.printSchema())

For each (packageid, systemdato), keep the row with the latest created_dt and transaction_ref in df_basic

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

window_basic = Window.partitionBy("packageid", "systemdato").orderBy(col("created_dt").desc(), col("transaction_ref").desc())
df_basic_dedup = df_basic.withColumn("rn", row_number().over(window_basic)).filter(col("rn") == 1).drop("rn")

display(df_basic_dedup.limit(5))

For df_scan, deduplicate on (packageid, systemdato, scan_datetime, scan_type, reason_code, location, transaction_ref)

In [0]:
#window_scan = Window.partitionBy(
 #   "packageid", "systemdato", "scan_datetime", "scan_type", "reason_code", "location", "transaction_ref"
#).orderBy(col("scan_datetime").desc())

#df_scan_dedup = df_scan.withColumn("rn", row_number().over(window_scan)).filter(col("rn") == 1).drop("rn")

#display(df_scan_dedup.limit(5))

In [0]:
window_scan = Window.partitionBy(
    "packageid", "systemdato", "scan_datetime", "scan_type", "reason_code", "location", "transaction_ref"
).orderBy(col("created_dt").desc())

df_scan_dedup = df_scan.withColumn("rn", row_number().over(window_scan)).filter(col("rn") == 1).drop("rn")

display(df_scan_dedup.limit(5))

Use a column (e.g., created_dt) to only process new or updated records from df_basic since the last run.
For batch jobs, filter records where created_dt > last processed date.


In [0]:
# First run - no previous data to compare against
# Process all deduplicated records

# Set this flag to True for the first run
is_first_run = True

if is_first_run:
    # First run - process all records
    df_basic_incremental = df_basic_dedup
    df_scan_incremental = df_scan_dedup
else:
    # Subsequent runs - filter for new records from TARGET table
    last_processed_dt = spark.table("postnord.default.basic_processed").agg({"created_dt": "max"}).collect()[0][0]
    print(f"Last processed:  {last_processed_dt}")
    
    df_basic_incremental = df_basic_dedup.filter(col("created_dt") > last_processed_dt)
    df_scan_incremental = df_scan_dedup.filter(col("created_dt") > last_processed_dt)

In [0]:
display(df_basic_incremental.limit(5))


In [0]:
display(df_scan_incremental.limit(5))

Join df_basic_incremental and df_scan_incremental on (packageid, systemdato, transaction_ref).

In [0]:
df_joined = df_scan_incremental.join(
    df_basic_incremental,
    on=["packageid", "systemdato", "transaction_ref"],
    how="inner"
)

display(df_joined.limit(5))

Join df_joined with df_rules on (scan_type, reason_code, location, product).


In [0]:
display(df_rules.limit(5))

In [0]:
display(df_rules.printSchema())

In [0]:
df_rules = df_rules.withColumnRenamed("location ", "location")

In [0]:
display(df_rules.printSchema())

In [0]:
display(df_joined.printSchema())

In [0]:
# Check data types
print("df_joined types:")
df_joined.select("scan_type", "reason_code", "location", "product").dtypes


In [0]:
print("df_rules types:")
df_rules.select("scan_type", "reason_code", "location", "product").dtypes

In [0]:
# Try joining on just one column to isolate the issue
#df_test = df_joined.join(df_rules, on=["scan_type"], how="inner")
#print(f"Match on scan_type only: {df_test.count()}")

#df_test = df_joined.join(df_rules, on=["scan_type", "reason_code"], how="inner")
#print(f"Match on scan_type + reason_code:  {df_test.count()}")

#df_test = df_joined.join(df_rules, on=["scan_type", "reason_code", "location"], how="inner")
#print(f"Match on scan_type + reason_code + location: {df_test. count()}")




In [0]:
print("df_joined locations:")
df_joined.select("location").distinct().show(truncate=False)

In [0]:
print("df_rules locations:")
df_rules.select("location").distinct().show(truncate=False)


In [0]:
from pyspark.sql.functions import col, when, upper, trim

# Step 1 & 2: Clean both dataframes
join_columns = ["scan_type", "reason_code", "location", "product"]

df_joined_clean = df_joined
df_rules_clean = df_rules

for c in join_columns:
    df_joined_clean = df_joined_clean.withColumn(
        c,
        when(col(c).isNull(), "NULL").otherwise(upper(trim(col(c))))
    )
    df_rules_clean = df_rules_clean.withColumn(
        c,
        when(col(c).isNull(), "NULL").otherwise(upper(trim(col(c))))
    )

# Step 3: Join
df_with_rules = df_joined_clean.join(
    df_rules_clean,
    on=join_columns,
    how="left"
)

# Step 4: Verify
print(f"Total rows: {df_with_rules.count()}")
print(f"Matched (has rule): {df_with_rules.filter(col('is_end_event').isNotNull()).count()}")
print(f"Unmatched (no rule): {df_with_rules.filter(col('is_end_event').isNull()).count()}")

In [0]:
display(df_with_rules.limit(5))

In [0]:
# Check schema - is_end_event should now be present
df_with_rules.printSchema()

# Check for nulls in is_end_event (unmatched rows from left join)
df_with_rules.select("is_end_event").distinct().show()

# Count matched vs unmatched
print(f"Total rows: {df_with_rules.count()}")
print(f"Matched rows: {df_with_rules.filter(col('is_end_event').isNotNull()).count()}")
print(f"Unmatched rows: {df_with_rules.filter(col('is_end_event').isNull()).count()}")

Filter End Events \
Filter rows where is_end_event == TRUE

In [0]:
df_end_events = df_with_rules.filter(col("is_end_event") == 'true')

In [0]:
display(df_end_events.limit(5))

For each (packageid, systemdato), select the earliest scan_datetime among end events.


In [0]:
window_end = Window.partitionBy("packageid", "systemdato").orderBy(col("scan_datetime").asc())
df_end_time = df_end_events.withColumn("rn", row_number().over(window_end)).filter(col("rn") == 1).select("packageid", "systemdato", "scan_datetime")

In [0]:
display(df_end_time.limit(5))

Classify Delivery Status 

In [0]:
# Joining df_end_time with df_basic_incremental to get ETA
df_status = df_basic_incremental.join(
    df_end_time,
    on=["packageid", "systemdato"],
    how="left"
).withColumnRenamed("scan_datetime", "end_time")

In [0]:
display(df_status.limit(5))

Add delivery_status column: \
'On Time': end_time <= ETA \
'Delayed': end_time > ETA \
'En Route': end_time is null


In [0]:
from pyspark.sql.functions import when

df_status = df_status.withColumn(
    "delivery_status",
    when(col("end_time").isNull(), "En Route")
    .when(col("end_time") <= col("ETA"), "On Time")
    .otherwise("Delayed")
)

In [0]:
display(df_status.limit(5))

Fact table upsert

In [0]:
spark.sql("USE CATALOG postnord")
print(spark.catalog.currentCatalog())

In [0]:
# Register df_status as a temp view
df_status.createOrReplaceTempView("updates")

# Create gold_parcel_delivery_quality table if it does not exist
spark.sql("""
CREATE TABLE IF NOT EXISTS gold_parcel_delivery_quality
USING DELTA
AS SELECT * FROM updates WHERE 1=0
""")



# Upsert into gold_parcel_delivery_quality
spark.sql("""
MERGE INTO gold_parcel_delivery_quality AS target
USING updates AS source
ON target.packageid = source.packageid AND target.systemdato = source.systemdato
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

In [0]:
%sql
select * 
from gold_parcel_delivery_quality
limit 5;