In [1]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from pyspark.sql import Window
from delta.tables import DeltaTable

StatementMeta(, f545a473-0db4-4cbc-a20a-901bb1af1cc1, 3, Finished, Available, Finished)

### ORDER 

In [2]:
# Parameters (pipeline should inject ingest_date; edit for testing)
ingest_date = "20251007"                       # folder name (YYYYMMDD) — pipeline must set this
bronze_folder = f"Files/Bronze/{ingest_date}/"
silver_order_table = "silver.order" 
silver_supplier_table = "silver.supplier" 

StatementMeta(, f545a473-0db4-4cbc-a20a-901bb1af1cc1, 4, Finished, Available, Finished)

In [3]:
# Business key(s) to identify unique records in source
business_keys = ["order_id", "stop_id"]

# Canonical payload columns (ordered consistently) — used to compute row_hash
payload_cols = [
    "order_id","stop_id","service_id","supplier_id","partner",
    "actual_city_name","district","status","distance","stop_status",
    "cancel_comment","cancel_by_user","cancel_code",
    # normalized date/time columns (these will be created in Cell 3)
    "create_time_date","create_time_time",
    "order_time_date","order_time_time",
    "accept_time_date","accept_time_time",
    "board_time_date","board_time_time",
    "pickup_time_date","pickup_time_time",
    "complete_time_date","complete_time_time",
    "cancel_time_date","cancel_time_time"
]

# Format used by your source timestamp strings, e.g. "September 5, 2023, 08:57"
datetime_fmt = "MMMM d, yyyy, HH:mm"

StatementMeta(, f545a473-0db4-4cbc-a20a-901bb1af1cc1, 5, Finished, Available, Finished)

In [4]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DateType

# Read CSVs in the folder.
src_raw = spark.read \
    .option("header", "true") \
    .option("inferSchema", "false") \
    .option("multiLine", "true") \
    .option("escape", "\"") \
    .option("quote", "\"") \
    .option("nullValue", "") \
    .option("ignoreLeadingWhiteSpace", "true") \
    .option("ignoreTrailingWhiteSpace", "true") \
    .csv(bronze_folder + "order.csv")

# Trim/normalize column names (remove accidental whitespace)
for c in src_raw.columns:
    if c != c.strip():
        src_raw = src_raw.withColumnRenamed(c, c.strip())

# Normalize the supplier supplier_id column ("supplier_id")
if "supplier_id" in src_raw.columns:
    supplier_id_trim = F.trim(F.col("supplier_id").cast(StringType()))

    supplier_id_norm = (
        F.when(supplier_id_trim.isNull(), None)
         .when(supplier_id_trim.rlike(r'^[0-9]+(\.0+)?$'), F.regexp_replace(supplier_id_trim, r'\.0+$', ''))
         .when(supplier_id_trim.rlike(r'^[0-9]+(\.[0-9]+)?[eE][+-]?[0-9]+$'),
               F.col("supplier_id").cast("double").cast("long").cast(StringType()))
         .otherwise(F.regexp_replace(supplier_id_trim, r'(^"|"$)|(^\'|\'$)', ''))
    )

    src_raw = src_raw.withColumn("supplier_id", supplier_id_norm)

# Normalize cancel_by_user column to Boolean
if "cancel_by_user" in src_raw.columns:
    cancel_trim = F.trim(F.col("cancel_by_user").cast(StringType()))
    cancel_bool = (
        F.when(cancel_trim.isNull(), None)
         .when(cancel_trim.rlike("^(?i:true|1|1\\.0)$"), F.lit(True))
         .when(cancel_trim.rlike("^(?i:false|0|0\\.0)$"), F.lit(False))
         .otherwise(None)
    )
    src_raw = src_raw.withColumn("cancel_by_user", cancel_bool.cast("boolean"))

# === Robust timestamp parsing: handle two formats ===
# List of original timestamp columns present in source
time_columns = [
    "create_time","order_time","accept_time","board_time",
    "pickup_time","complete_time","cancel_time"
]

# helper: clean string (trim + remove NBSP/BOM and stray quotes)
def clean_str_col(column_name):
    return F.when(F.col(column_name).isNull(), None) \
            .otherwise(F.regexp_replace(F.trim(F.col(column_name).cast(StringType())), r"[\u00A0\ufeff'\"]", ""))

# Try these formats (ordered: most specific first)
formats = [
    "yyyy-MM-dd HH:mm:ss",      # 2023-10-23 18:17:14
    "yyyy-MM-dd H:mm:ss",
    "yyyy-MM-dd HH:mm",
    "MMMM d, yyyy, HH:mm:ss",   # January 1, 2023, 00:12:05  
    "MMMM d, yyyy, H:mm:ss",
    "MMMM d, yyyy, HH:mm",      # January 1, 2023, 00:12
    "MMMM d, yyyy, H:mm"
]

for col_name in time_columns:
    if col_name not in src_raw.columns:
        # keep schema consistent if source column missing
        src_raw = src_raw.withColumn(f"{col_name}_date", F.lit(None).cast(DateType())) \
                         .withColumn(f"{col_name}_time", F.lit(None).cast(StringType()))
        continue

    # 1) create a cleaned temporary column
    tmp_col = f"{col_name}_clean"
    src_raw = src_raw.withColumn(tmp_col, clean_str_col(col_name))

    # 2) try parsing with multiple formats and pick first successful
    ts_exprs = [F.to_timestamp(F.col(tmp_col), fmt) for fmt in formats]
    ts = F.coalesce(*ts_exprs)

    # 3) produce date and HH:mm time columns, then drop the temp
    src_raw = src_raw.withColumn(f"{col_name}_date", F.to_date(ts)) \
                     .withColumn(f"{col_name}_time", F.when(ts.isNotNull(), F.date_format(ts, "HH:mm")).otherwise(None)) \
                     .drop(tmp_col)


StatementMeta(, f545a473-0db4-4cbc-a20a-901bb1af1cc1, 6, Finished, Available, Finished)

In [5]:
# # Read CSVs in the folder.
# src_raw = spark.read \
#     .option("header", "true") \
#     .option("inferSchema", "false") \
#     .option("multiLine", "false") \
#     .option("escape", "\"") \
#     .option("quote", "\"") \
#     .option("nullValue", "") \
#     .csv(bronze_folder + "order.csv")

# # Trim/normalize column names (remove accsupplier_idental whitespace)
# for c in src_raw.columns:
#     if c != c.strip():
#         src_raw = src_raw.withColumnRenamed(c, c.strip())

# # Normalize the supplier supplier_id column ("supplier_id")
# #    - remove trailing ".0" for integer-like strings
# #    - convert scientific notation to integer string when possible
# #    - keep original otherwise
# if "supplier_id" in src_raw.columns:
#     supplier_id_trim = F.trim(F.col("supplier_id").cast(StringType()))

#     supplier_id_norm = (
#         F.when(supplier_id_trim.isNull(), None)
#          # case: plain integer or integer with .0 suffix (e.g. 12345 or 12345.0)
#          .when(supplier_id_trim.rlike(r'^[0-9]+(\.0+)?$'), F.regexp_replace(supplier_id_trim, r'\.0+$', ''))
#          # case: scientific notation e.g. 8.3624089559E10 -> cast double then long then string
#          .when(supplier_id_trim.rlike(r'^[0-9]+(\.[0-9]+)?[eE][+-]?[0-9]+$'),
#                F.col("supplier_id").cast("double").cast("long").cast(StringType()))
#          # fallback: remove any accsupplier_idental surrounding quotes and trim
#          .otherwise(F.regexp_replace(supplier_id_trim, r'(^"|"$)|(^\'|\'$)', ''))
#     )

#     src_raw = src_raw.withColumn("supplier_id", supplier_id_norm)

# if "cancel_by_user" in src_raw.columns:
#     cancel_trim = F.trim(F.col("cancel_by_user").cast(StringType()))
    
#     cancel_bool = (
#         F.when(cancel_trim.isNull(), None)
#          .when(cancel_trim.rlike("^(?i:true|1|1\\.0)$"), F.lit(True))
#          .when(cancel_trim.rlike("^(?i:false|0|0\\.0)$"), F.lit(False))
#          .otherwise(None)
#     )
    
#     src_raw = src_raw.withColumn("cancel_by_user", cancel_bool.cast("boolean"))


# # List of original timestamp columns present in source
# time_columns = [
#     "create_time","order_time","accept_time","board_time",
#     "pickup_time","complete_time","cancel_time"
# ]
# datetime_fmt = "MMMM d, yyyy, HH:mm"
# # Convert each textual time column to a timestamp, then split into date and time parts
# for col_name in time_columns:
#     ts_expr = F.to_timestamp(F.col(col_name), datetime_fmt)
#     date_col = f"{col_name}_date"
#     time_col = f"{col_name}_time"
#     src_raw = src_raw.withColumn(date_col, F.to_date(ts_expr)) \
#                      .withColumn(time_col, F.date_format(ts_expr, "HH:mm"))

# # Preview a few rows so you can confirm parsing worked
# print("Sample rows after parsing (showing order_date and pickup_time splits):")
# display(src_raw.limit(5))

StatementMeta(, f545a473-0db4-4cbc-a20a-901bb1af1cc1, 7, Finished, Available, Finished)

In [6]:
for c in payload_cols:
    if c not in src_raw.columns:
        src_raw = src_raw.withColumn(c, F.lit(None).cast(StringType()))

# build concatenated string over payload cols and compute SHA256 hash
concat_exprs = [F.coalesce(F.col(c).cast("string"), F.lit("")) for c in payload_cols]
stg = src_raw.withColumn("row_hash", F.sha2(F.concat_ws("||", *concat_exprs), 256)) \
             .withColumn("ingest_ts", F.current_timestamp()) \
             .withColumn("source_folder", F.lit(ingest_date))

# Select only needed columns: business keys, canonical payload, and metadata
ordered_cols = business_keys + payload_cols + ["row_hash", "ingest_ts", "source_folder"]
# use dict.fromkeys to remove duplicates but keep first occurrence
select_cols = list(dict.fromkeys(ordered_cols))
src = stg.select(*select_cols)

# register tmp view for SQL MERGE usage
src.createOrReplaceTempView("tmp_src")

print("Prepared source with row_hash and metadata. Columns:")
print(src.columns)
display(src.limit(5))

StatementMeta(, f545a473-0db4-4cbc-a20a-901bb1af1cc1, 8, Finished, Available, Finished)

Prepared source with row_hash and metadata. Columns:
['order_id', 'stop_id', 'service_id', 'supplier_id', 'partner', 'actual_city_name', 'district', 'status', 'distance', 'stop_status', 'cancel_comment', 'cancel_by_user', 'cancel_code', 'create_time_date', 'create_time_time', 'order_time_date', 'order_time_time', 'accept_time_date', 'accept_time_time', 'board_time_date', 'board_time_time', 'pickup_time_date', 'pickup_time_time', 'complete_time_date', 'complete_time_time', 'cancel_time_date', 'cancel_time_time', 'row_hash', 'ingest_ts', 'source_folder']


SynapseWidget(Synapse.DataFrame, 36f4a5e4-265a-4d5c-a84a-7a1285c60368)

In [7]:
silver_table_fq = silver_order_table   
business_keys = ["order_id", "stop_id"]  
partition_col = "source_folder"         
tmp_merge_view = "tmp_src_dedup"

# Deduplicate source: if multiple rows for same business key in this ingest, keep latest ingest_ts
w = Window.partitionBy(*business_keys).orderBy(F.col("ingest_ts").desc())
src_dedup = src.withColumn("_rn", F.row_number().over(w)) \
               .filter(F.col("_rn") == 1) \
               .drop("_rn")
src_dedup.createOrReplaceTempView(tmp_merge_view)

# 3) Create silver table skeleton if it doesn't exist
if not spark._jsparkSession.catalog().tableExists(silver_table_fq):
    print(f"Silver table {silver_table_fq} does not exist. Creating skeleton...")
    # create empty skeleton from the source schema (limit 0) and add metadata columns to guarantee schema
    skeleton = src_dedup.limit(0)
    # Optionally convert types here (e.g. cast order_id to string/int) if you want strong typing
    # Write skeleton as Delta table partitioned by the partition_col
    skeleton.write.format("delta").partitionBy(partition_col).mode("overwrite").saveAsTable(silver_table_fq)
    print("Created silver table:", silver_table_fq)
else:
    print("Silver table already exists:", silver_table_fq)


StatementMeta(, f545a473-0db4-4cbc-a20a-901bb1af1cc1, 9, Finished, Available, Finished)

Silver table already exists: silver.order


In [8]:
# 4) Perform MERGE using DeltaTable API
#    Build join condition for composite keys
join_condition = " AND ".join([f"t.{k} = s.{k}" for k in business_keys])

# Get DeltaTable reference (if table was just created above, this will succeed)
target = DeltaTable.forName(spark, silver_table_fq)

# But whenMatchedUpdateAll/whenNotMatchedInsertAll() is concise and works if column names match.
print("Starting MERGE (upsert) into", silver_table_fq)
(target.alias("t")
       .merge(src_dedup.alias("s"), join_condition)
       .whenMatchedUpdateAll()
       .whenNotMatchedInsertAll()
       .execute())
print("MERGE completed into", silver_table_fq)

# 5) Quick verification (show some rows)
print("Top rows from silver table (latest ingest_folder):")
display(spark.table(silver_table_fq).where(F.col(partition_col) == ingest_date).limit(10))

StatementMeta(, f545a473-0db4-4cbc-a20a-901bb1af1cc1, 10, Finished, Available, Finished)

Starting MERGE (upsert) into silver.order
MERGE completed into silver.order
Top rows from silver table (latest ingest_folder):


SynapseWidget(Synapse.DataFrame, 07385ad1-9cdb-4caa-a819-2887c34a96ad)

In [9]:
%%sql
SELECT *
from silver.order

StatementMeta(, f545a473-0db4-4cbc-a20a-901bb1af1cc1, 11, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 30 fields>

### SUPPLIER

In [10]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from pyspark.sql import Window
from delta.tables import DeltaTable
import io, pandas as pd

StatementMeta(, f545a473-0db4-4cbc-a20a-901bb1af1cc1, 12, Finished, Available, Finished)

In [11]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from pyspark.sql import Window
from delta.tables import DeltaTable

# Config / names 
supplier_file_path = bronze_folder + "supplier.csv"
silver_table_fq = silver_supplier_table   # e.g. "silver.supplier"
partition_col = "source_folder"
business_key = "supplier_id" 
supplier_time_cols = [
    "activate_time","create_time","first_complete_time","first_activate_time"
]

StatementMeta(, f545a473-0db4-4cbc-a20a-901bb1af1cc1, 13, Finished, Available, Finished)

In [12]:
sup_raw = spark.read \
    .option("header", "true") \
    .option("inferSchema", "false") \
    .option("multiLine", "false") \
    .option("escape", "\"") \
    .option("quote", "\"") \
    .option("nullValue", "") \
    .csv(supplier_file_path)

# Drop rows where all columns are null
sup_raw = sup_raw.na.drop(how="all")

# normalize column names (trim)
for c in sup_raw.columns:
    if c != c.strip():
        sup_raw = sup_raw.withColumnRenamed(c, c.strip())

# Detect and remove blank or unnamed columns       
invalid_cols = [c for c in sup_raw.columns if c.lower().startswith("_c") or c.strip() == ""]
if invalid_cols:
    print(f"Dropping invalid columns: {invalid_cols}")
    sup_raw = sup_raw.drop(*invalid_cols)

# Convert age column to integer and filter out invalid values
if "age" in sup_raw.columns:
    sup_raw = sup_raw.withColumn("age", F.col("age").cast("int"))
    sup_raw = sup_raw.filter(F.col("age") < 100)
else:
    print("Warning: Column 'age' not found — skipping age filter.")

print("supplier raw columns:", sup_raw.columns)

StatementMeta(, f545a473-0db4-4cbc-a20a-901bb1af1cc1, 14, Finished, Available, Finished)

Dropping invalid columns: ['_c0']
supplier raw columns: ['supplier_id', 'age', 'activate_time', 'create_time', 'first_activate_time', 'first_complete_time']


In [13]:
all_cols = sup_raw.columns[:]  # original order
# ensure business_key present
if business_key not in all_cols:
    raise ValueError(f"Business key column '{business_key}' not found in supplier CSV columns: {all_cols}")

# build ordered canonical payload: business_key first, then the rest (excluding duplicates)
ordered_payload = [business_key] + [c for c in all_cols if c != business_key]
# dedupe while preserving order
payload_cols = list(dict.fromkeys(ordered_payload))

# 3) Ensure every payload column exists as a column in sup_raw (safety)
src_tmp = sup_raw
for c in payload_cols:
    if c not in src_tmp.columns:
        src_tmp = src_tmp.withColumn(c, F.lit(None).cast(StringType()))

# 4) Compute row_hash (SHA-256) over the payload columns, add ingest metadata
concat_exprs = [F.coalesce(F.col(c).cast("string"), F.lit("")) for c in payload_cols]
src_with_hash = (src_tmp.select(*payload_cols)
                        .withColumn("row_hash", F.sha2(F.concat_ws("||", *concat_exprs), 256))
                        .withColumn("ingest_ts", F.current_timestamp())
                        .withColumn("source_folder", F.lit(ingest_date)))

# 5) Build final src with unique column ordering (payload_cols + metadata)
ordered_cols = payload_cols + ["row_hash", "ingest_ts", "source_folder"]
# remove any accidental duplicates in ordered_cols
select_cols_unique = list(dict.fromkeys(ordered_cols))
src = src_with_hash.select(*select_cols_unique)

print("Prepared supplier src columns:", src.columns)
# display(src.limit(5))

StatementMeta(, f545a473-0db4-4cbc-a20a-901bb1af1cc1, 15, Finished, Available, Finished)

Prepared supplier src columns: ['supplier_id', 'age', 'activate_time', 'create_time', 'first_activate_time', 'first_complete_time', 'row_hash', 'ingest_ts', 'source_folder']


In [14]:
w = Window.partitionBy(business_key).orderBy(F.col("ingest_ts").desc())
src_dedup = src.withColumn("_rn", F.row_number().over(w)).filter(F.col("_rn") == 1).drop("_rn")
src_dedup.createOrReplaceTempView("tmp_supplier_dedup")

# 7) Create silver supplier table skeleton if it doesn't exist
if not spark._jsparkSession.catalog().tableExists(silver_table_fq):
    print(f"Silver table {silver_table_fq} does not exist. Creating skeleton...")
    skeleton = src_dedup.limit(0)
    # Optionally cast types here for strong typing (e.g., id -> LongType/StringType)
    skeleton.write.format("delta").partitionBy(partition_col).mode("overwrite").saveAsTable(silver_table_fq)
    print("Created silver table:", silver_table_fq)
else:
    print("Silver table already exists:", silver_table_fq)


StatementMeta(, f545a473-0db4-4cbc-a20a-901bb1af1cc1, 16, Finished, Available, Finished)

Silver table already exists: silver.supplier


In [15]:
# 8) MERGE (upsert) deduped supplier rows into silver
join_condition = f"t.{business_key} = s.{business_key}"
target = DeltaTable.forName(spark, silver_table_fq)

print("Starting MERGE into", silver_table_fq)
(target.alias("t")
       .merge(src_dedup.alias("s"), join_condition)
       .whenMatchedUpdateAll()
       .whenNotMatchedInsertAll()
       .execute())
print("MERGE completed into", silver_table_fq)

# 9) Quick verification
display(spark.table(silver_table_fq).where(F.col(partition_col) == ingest_date).limit(10))

StatementMeta(, f545a473-0db4-4cbc-a20a-901bb1af1cc1, 17, Finished, Available, Finished)

Starting MERGE into silver.supplier
MERGE completed into silver.supplier


SynapseWidget(Synapse.DataFrame, e859b215-e01b-444d-9956-39d48526a80f)

In [16]:
%%sql
SELECT age from silver.supplier where age >= 50

StatementMeta(, f545a473-0db4-4cbc-a20a-901bb1af1cc1, 18, Finished, Available, Finished)

<Spark SQL result set with 623 rows and 1 fields>

In [17]:
%%sql
SELECT count(*) from silver.order

StatementMeta(, f545a473-0db4-4cbc-a20a-901bb1af1cc1, 19, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>