# Sales Orders Processing (NAV → EXT)

This notebook consolidates **current** and **archived** NAV sales data into clean, partitioned EXT tables for downstream BI consumption.  
It performs the following steps:
- Reads **sales header** and **sales line** data from NAV (current + archive).  
- Merges rows so that **current records take priority**, and archived records are included only when `archive_reason = 3`.  
- Flags each row with `is_archived` (Boolean).  
- Filters to include only orders from the past **two years** (by `order_date`).  
- Maintains lightweight **meta tracker tables** to track missing or inactive IDs.  
- Writes partitioned outputs into the `ext.sales_header` and `ext.sales_line` tables, ready for reporting and analysis.  

This process ensures a consistent, deduplicated view of sales orders across both current and historical sources.

# 0 — Config & constants

In [1]:
from pyspark.sql import functions as F, types as T, Window
import datetime

# Source schema (all four inputs live here)
NAV_DB = "nav"

# Output schema
EXT_SCHEMA = "ext"

# Separate trackers (recommended)
TRACKER_HEADER_TBL = "meta.sales_header"
TRACKER_LINE_TBL   = "meta.sales_line"

# Primary key
ID_COL = "id"

# For pretty ordering in outputs
ORDER_ANCHORS = ["id", "company_id"]

StatementMeta(, ad70c9c2-6571-4cc8-9ed2-474c4983b770, 3, Finished, Available, Finished)

# 1 — Core utilities (I/O, schema align, whitelists, partition helpers)

In [2]:
def table_exists(fully_qualified: str) -> bool:
    try:
        spark.read.table(fully_qualified).limit(1).collect()
        return True
    except Exception:
        return False

def safe_read_table(tbl: str, schema: T.StructType = None):
    if table_exists(tbl):
        return spark.read.table(tbl)
    return spark.createDataFrame([], schema or T.StructType([T.StructField(ID_COL, T.StringType(), True)]))

def normalize_id_str(df, id_col=ID_COL):
    return df.withColumn(id_col, F.col(id_col).cast(T.StringType()))

def reorder_cols_id_company_first(df):
    cols = df.columns
    front = [c for c in ["id", "company_id"] if c in cols]
    rest  = [c for c in cols if c not in front]
    return df.select(*front, *rest)

# ------- Schema alignment -------
def _add_missing_from(df_target, df_ref):
    ref_types = {f.name: f.dataType for f in df_ref.schema.fields}
    for name, dtype in ref_types.items():
        if name not in df_target.columns:
            df_target = df_target.withColumn(name, F.lit(None).cast(dtype))
    return df_target

def _align_pair(left, right):
    left2  = _add_missing_from(left,  right)
    right2 = _add_missing_from(right, left)
    ordered = list(dict.fromkeys(left.columns + [c for c in right.columns if c not in left.columns]))
    return left2.select(*ordered), right2.select(*ordered)

# ------- Column whitelisting (safe if missing) -------
def _lit_null(dtype):
    return F.lit(None).cast(dtype if dtype is not None else T.StringType())

def project_whitelist(df, keep_cols, *, type_hints=None):
    """
    Return df with exactly keep_cols (in this order),
    creating missing columns as NULL with dtype from type_hints (else STRING).
    """
    type_hints = type_hints or {}
    existing = set(df.columns)
    out_cols = []
    for c in keep_cols:
        if c in existing:
            out_cols.append(F.col(c))
        else:
            out_cols.append(_lit_null(type_hints.get(c)).alias(c))
    return df.select(*out_cols)

# ------- Partition helpers -------
def yymm_as_int(col):
    """Return yyMM as INT (e.g., 2025-08-01 -> 2508)."""
    return F.date_format(col, "yyMM").cast(T.IntegerType())

StatementMeta(, ad70c9c2-6571-4cc8-9ed2-474c4983b770, 4, Finished, Available, Finished)

# 2 — NAV loader + preferred rows

In [3]:
# === Cell 2 : NAV loader + preferred rows (Boolean is_archived only) ===
from pyspark.sql import functions as F, types as T, Window

def _load_nav_pair(name: str):
    """
    name ∈ {'sales_header','sales_line'}
    Reads: nav.<name> and nav.<name>_archive
    Returns aligned (df_arc, df_cur) with ids as STRING.
    """
    cur_tbl = f"{NAV_DB}.{name}"
    arc_tbl = f"{NAV_DB}.{name}_archive"

    cur_exists = table_exists(cur_tbl)
    arc_exists = table_exists(arc_tbl)
    if not cur_exists and not arc_exists:
        raise ValueError(f"No sources found for '{name}' — expected {cur_tbl} and/or {arc_tbl}")

    df_cur = safe_read_table(cur_tbl)
    df_arc = safe_read_table(arc_tbl, schema=df_cur.schema) if arc_exists else spark.createDataFrame([], df_cur.schema)

    # align both directions; normalize id
    df_arc, df_cur = _align_pair(df_arc, df_cur)
    df_arc = normalize_id_str(df_arc)
    df_cur = normalize_id_str(df_cur)
    return df_arc, df_cur

def build_preferred_rows_header(df_arc, df_cur, id_col: str):
    """
    HEADERS:
      - Prefer CURRENT rows if they exist (is_archived=False)
      - Else take ARCHIVE rows only when archive_reason = 3 (is_archived=True)
      - Drop ids that only have archive with reason != 3
    Ensures archive_reason exists on both sides (NULL on current).
    """
    # ensure archive_reason exists on both sides with consistent type
    if "archive_reason" not in df_cur.columns:
        df_cur = df_cur.withColumn("archive_reason", F.lit(None).cast(T.IntegerType()))
    else:
        df_cur = df_cur.withColumn("archive_reason", F.col("archive_reason").cast(T.IntegerType()))
    if "archive_reason" not in df_arc.columns:
        df_arc = df_arc.withColumn("archive_reason", F.lit(None).cast(T.IntegerType()))
    else:
        df_arc = df_arc.withColumn("archive_reason", F.col("archive_reason").cast(T.IntegerType()))

    # tag boolean is_archived
    df_arc = df_arc.withColumn("is_archived", F.lit(True))
    df_cur = df_cur.withColumn("is_archived", F.lit(False))

    combined = df_arc.unionByName(df_cur, allowMissingColumns=True)

    # rank: current wins (2), archived&reason3 next (1), archived other (0 -> excluded)
    rank = (
        F.when(F.col("is_archived") == False, F.lit(2))
         .when((F.col("is_archived") == True) & (F.col("archive_reason") == 3), F.lit(1))
         .otherwise(F.lit(0))
    )
    w = Window.partitionBy(id_col).orderBy(rank.desc())

    preferred = (combined
                 .withColumn("_rank", rank)
                 .withColumn("_rn", F.row_number().over(w))
                 .filter((F.col("_rn") == 1) & (F.col("_rank") > 0))
                 .drop("_rn", "_rank"))
    return preferred

def build_preferred_rows_header(df_arc, df_cur, id_col: str):
    """
    HEADERS:
      - Prefer CURRENT rows if they exist (is_archived=False)
      - Else take ARCHIVE rows only when archive_reason = 3 (is_archived=True)
      - Drop ids that only have archive with reason != 3
    Ensures archive_reason exists on both sides (NULL on current).
    """
    # ensure archive_reason exists and is INT on both sides
    if "archive_reason" not in df_cur.columns:
        df_cur = df_cur.withColumn("archive_reason", F.lit(None).cast(T.IntegerType()))
    else:
        df_cur = df_cur.withColumn("archive_reason", F.col("archive_reason").cast(T.IntegerType()))
    if "archive_reason" not in df_arc.columns:
        df_arc = df_arc.withColumn("archive_reason", F.lit(None).cast(T.IntegerType()))
    else:
        df_arc = df_arc.withColumn("archive_reason", F.col("archive_reason").cast(T.IntegerType()))

    # tag boolean is_archived
    df_arc = df_arc.withColumn("is_archived", F.lit(True))
    df_cur = df_cur.withColumn("is_archived", F.lit(False))

    combined = df_arc.unionByName(df_cur, allowMissingColumns=True)

    # rank: current wins (2), archived&reason3 next (1), archived other (0 -> excluded)
    rank = (
        F.when(F.col("is_archived") == False, F.lit(2))
         .when((F.col("is_archived") == True) & (F.col("archive_reason") == 3), F.lit(1))
         .otherwise(F.lit(0))
    )
    w = Window.partitionBy(id_col).orderBy(rank.desc())

    preferred = (combined
                 .withColumn("_rank", rank)
                 .withColumn("_rn", F.row_number().over(w))
                 .filter((F.col("_rn") == 1) & (F.col("_rank") > 0))
                 .drop("_rn", "_rank"))
    return preferred

def build_preferred_rows_line(df_arc, df_cur, id_col: str):
    """
    LINES:
      - Prefer CURRENT rows if they exist (is_archived=False)
      - Else take ARCHIVE rows (is_archived=True)
      - Lines don’t have archive_reason; filtering by reason=3 is done later
        via the header lookup in the EXT writer.
    """
    # tag boolean is_archived
    df_arc = df_arc.withColumn("is_archived", F.lit(True))
    df_cur = df_cur.withColumn("is_archived", F.lit(False))

    combined = df_arc.unionByName(df_cur, allowMissingColumns=True)

    # rank: current wins (2), archive next (1)
    rank = F.when(F.col("is_archived") == False, F.lit(2)).otherwise(F.lit(1))
    w = Window.partitionBy(id_col).orderBy(rank.desc())

    preferred = (combined
                 .withColumn("_rank", rank)
                 .withColumn("_rn", F.row_number().over(w))
                 .filter(F.col("_rn") == 1)
                 .drop("_rn", "_rank"))

    return preferred

def process_pair(name: str, tracker_tbl: str):
    """
    Returns:
      preferred : unified rows with Boolean is_archived
      tracker   : tracker after upsert for the seen IDs
    Uses header/line-specific logic to avoid referencing archive-only columns on lines.
    """
    df_arc, df_cur = _load_nav_pair(name)
    if name == "sales_header":
        preferred = build_preferred_rows_header(df_arc=df_arc, df_cur=df_cur, id_col=ID_COL)
    else:
        preferred = build_preferred_rows_line(df_arc=df_arc, df_cur=df_cur, id_col=ID_COL)

    tracker = upsert_tracker(tracker_tbl, preferred.select(ID_COL, "is_archived"))
    return preferred, tracker

StatementMeta(, ad70c9c2-6571-4cc8-9ed2-474c4983b770, 5, Finished, Available, Finished)

# 3 — Trackers (Boolean is_archived, first‑run safe)

In [4]:
TRACKER_SCHEMA = T.StructType([
    T.StructField(ID_COL, T.StringType(), True),
    T.StructField("is_archived", T.BooleanType(), True),        # True = archived, False = current
    T.StructField("missing_since_ts", T.TimestampType(), True), # NULL when seen this run
    T.StructField("deleted_confirmed", T.BooleanType(), True),
    T.StructField("updated_at", T.TimestampType(), True),
])

def _read_or_create_tracker(tracker_tbl: str):
    if table_exists(tracker_tbl):
        df = spark.read.table(tracker_tbl)
        # Add missing columns & cast to canonical schema
        have = set(df.columns)
        for f in TRACKER_SCHEMA.fields:
            if f.name not in have:
                df = df.withColumn(f.name, F.lit(None).cast(f.dataType))
        for f in TRACKER_SCHEMA.fields:
            df = df.withColumn(f.name, F.col(f.name).cast(f.dataType))
        return df.select(*[f.name for f in TRACKER_SCHEMA.fields])
    empty = spark.createDataFrame([], schema=TRACKER_SCHEMA)
    (empty.write
          .format("delta")
          .mode("overwrite")
          .option("overwriteSchema", "true")
          .saveAsTable(tracker_tbl))
    return spark.read.table(tracker_tbl)

def upsert_tracker(tracker_tbl: str, seen_df):
    """
    Input MUST include: id (STRING), is_archived (Boolean).
    Overwrite-based tracker:
      - seen → missing_since_ts = NULL; update is_archived & updated_at
      - not seen → set/retain missing_since_ts
    """
    now = F.current_timestamp()

    seen_latest = (seen_df
                   .withColumn(ID_COL, F.col(ID_COL).cast(T.StringType()))
                   .withColumn("is_archived", F.col("is_archived").cast(T.BooleanType()))
                   .select(ID_COL, "is_archived")
                   .dropDuplicates([ID_COL]))

    existing = _read_or_create_tracker(tracker_tbl)

    seen_ids = seen_latest.select(ID_COL).dropDuplicates([ID_COL])
    missing  = existing.join(seen_ids, on=ID_COL, how="left_anti")

    seen_upd = (seen_latest
                .withColumn("missing_since_ts", F.lit(None).cast(T.TimestampType()))
                .withColumn("deleted_confirmed", F.lit(False))
                .withColumn("updated_at", now))

    missing_upd = (missing
                   .withColumn("missing_since_ts",
                               F.when(F.col("missing_since_ts").isNull(), now).otherwise(F.col("missing_since_ts")))
                   .withColumn("updated_at", now))

    new_tracker = (missing_upd
                   .unionByName(seen_upd, allowMissingColumns=True)
                   .select(*[f.name for f in TRACKER_SCHEMA.fields]))

    (new_tracker.write
               .format("delta")
               .mode("overwrite")
               .option("overwriteSchema", "true")
               .saveAsTable(tracker_tbl))
    return spark.read.table(tracker_tbl)

StatementMeta(, ad70c9c2-6571-4cc8-9ed2-474c4983b770, 6, Finished, Available, Finished)

# 4 — Partitioning & EXT writer (clean line join; no duplicate cols)

In [5]:
def add_partition_header(df_header):
    """partition_by = yyMM when archived else 0 (INT)."""
    return df_header.withColumn(
        "partition_by",
        F.when(F.col("is_archived") == True, yymm_as_int(F.col("order_date"))).otherwise(F.lit(0))
    )

def add_partition_line(df_line, header_df, *, line_header_key="header_id", header_id_col="id"):
    """
    Derive line.partition_by from header via join and return ONLY:
      - all original line columns
      - derived partition_by (INT)
    Header columns are not retained to avoid duplicate names.
    """
    h = (header_df
         .select(
             F.col(header_id_col).alias("_header_key"),
             F.col("order_date").alias("_h_order_date"),
             F.col("is_archived").alias("_h_is_archived"),
             F.col("archive_reason").alias("_h_archive_reason")
         ))
    j = (df_line.alias("l")
         .join(h.alias("h"), F.col(f"l.{line_header_key}") == F.col("_header_key"), "inner")
         .withColumn(
             "partition_by",
             F.when(F.col("_h_is_archived") == True, yymm_as_int(F.col("_h_order_date"))).otherwise(F.lit(0))
         )
         .drop("_header_key", "_h_order_date", "_h_is_archived", "_h_archive_reason"))
    # ensure no accidental is_archived on line
    if "is_archived" in j.columns:
        j = j.drop("is_archived")
    return j

def build_ext_output(
    preferred_df,
    tracker_df,
    ext_out_tbl: str,
    *,
    header_lookup_df=None,          # required for LINE
    line_header_key="header_id",
    header_id_col="id"
):
    # Present vs missing (flags only)
    present_df = (preferred_df
                  .withColumn("in_limbo", F.lit(False))
                  .withColumn("deleted_confirmed", F.lit(False))
                  .withColumn("limbo_started_ts", F.lit(None).cast(T.TimestampType())))

    missing_ids = (tracker_df
                   .filter(F.col("missing_since_ts").isNotNull())
                   .select(
                       F.col(ID_COL),
                       F.col("missing_since_ts").alias("limbo_started_ts"),
                       F.col("deleted_confirmed"))
                   .withColumn("in_limbo", F.when(F.col("deleted_confirmed") == False, True).otherwise(False)))

    if missing_ids.rdd.isEmpty():
        final_df = present_df
    else:
        business_cols = [c for c in preferred_df.columns]

        # Build a skeleton with all business columns present (NULLs with preferred dtypes)
        skeleton = missing_ids
        # map preferred dtypes for faithful casting
        pref_types = {f.name: f.dataType for f in preferred_df.schema.fields}
        for c in business_cols:
            if c not in skeleton.columns:
                skeleton = skeleton.withColumn(c, F.lit(None).cast(pref_types.get(c, T.StringType())))

        skeleton = skeleton.select(*business_cols, "in_limbo", "deleted_confirmed", "limbo_started_ts")

        present_df = present_df.select(*business_cols, "in_limbo", "deleted_confirmed", "limbo_started_ts")
        final_df   = present_df.unionByName(skeleton, allowMissingColumns=True)

    # Partition + filter
    if "order_date" in final_df.columns:
        # HEADER path
        final_df = add_partition_header(final_df).filter(
            ((F.col("is_archived") == True) & (F.col("archive_reason") == 3)) |
            (F.col("is_archived") == False)
        )
    else:
        # LINE path
        if header_lookup_df is None:
            final_df = add_partition_header(final_df).filter(
                ((F.col("is_archived") == True) & (F.col("archive_reason") == 3)) |
                (F.col("is_archived") == False)
                )
        else:
            final_df = add_partition_line(final_df, header_lookup_df,
                                          line_header_key=line_header_key, header_id_col=header_id_col)
            h2 = header_lookup_df.select(
            F.col(header_id_col).alias("_hid"),
            F.col("archive_reason").alias("_h_archive_reason"),
            F.col("is_archived").alias("_h_is_archived")
        )
        final_df = (final_df.alias("l")
                    .join(h2.alias("h2"), F.col(f"l.{line_header_key}") == F.col("_hid"), "inner")
                    .filter(
                        ((F.col("_h_is_archived") == True) & (F.col("_h_archive_reason") == 3)) |
                        (F.col("_h_is_archived") == False)
                    )
                    .drop("_hid","_h_archive_reason","_h_is_archived"))

    # Order columns: id, company_id first; keep partition_by
    final_df = reorder_cols_id_company_first(final_df)

    # Persist
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {EXT_SCHEMA}")
    final_df.createOrReplaceTempView("_final_ext")

    if not table_exists(ext_out_tbl):
        (final_df
          .repartition("partition_by")
          .write.format("delta").mode("overwrite")
          .option("overwriteSchema","true")
          .partitionBy("partition_by")
          .saveAsTable(ext_out_tbl))
    else:
        spark.sql(f"""
            MERGE INTO {ext_out_tbl} AS t
            USING _final_ext AS s
            ON  t.{ID_COL} = s.{ID_COL}
            AND t.partition_by = s.partition_by
            WHEN MATCHED THEN UPDATE SET *
            WHEN NOT MATCHED THEN INSERT *
        """)
    return spark.read.table(ext_out_tbl)

StatementMeta(, ad70c9c2-6571-4cc8-9ed2-474c4983b770, 7, Finished, Available, Finished)

# 5 — Column whitelists & type hints

In [6]:
# Business columns you want to keep (examples — replace with your own)
HEADER_WHITELIST = [
    "id", "company_id", "document_type", "no_", "doc__no__occurrence", "sell_to_customer_no_", "order_date", "version_no_", 
    "ship_to_country_region_code", "channel_code", "media_code", "payment_method_code", "currency_factor", "inbound_integration_code", 
    "origin_datetime", "created_datetime", "order_created_datetime", "sales_order_status", "status", "on_hold", "external_document_no_", 
    "subscription_no_"
]

LINE_WHITELIST = [
    "id", "company_id", "header_id", "line_no_", "location_code", "no_", "quantity", "delivery_service", "promotion_discount_amount", 
    "line_discount_amount", "amount_including_vat", "amount", "quantity_shipped", "quantity_invoiced", "dimension_set_id", "type", 
    "outstanding_quantity", "bom_item_no_"
]

# Optional dtype hints for missing columns
HEADER_TYPE_HINTS = {
    "order_date": T.TimestampType(),
    "posting_date": T.TimestampType(),
    "archive_reason": T.IntegerType(),
}
LINE_TYPE_HINTS = {
    "quantity": T.DecimalType(18, 6),
    "unit_price": T.DecimalType(18, 6),
    "line_amount": T.DecimalType(18, 2),
    "shipment_date": T.TimestampType(),
}

StatementMeta(, ad70c9c2-6571-4cc8-9ed2-474c4983b770, 8, Finished, Available, Finished)

# 6 — Orchestration (cutoff, whitelists, write EXT)

In [7]:
# 0) Cutoff: Jan 1 two years ago from "today"
today = datetime.date.today()
cutoff = datetime.date(today.year - 2, 1, 1)   # e.g. 2023-01-01

# 1) Build preferred + trackers (from full preferred to avoid false "missing")
preferred_header, tracker_header = process_pair("sales_header", TRACKER_HEADER_TBL)
preferred_line,   tracker_line   = process_pair("sales_line",   TRACKER_LINE_TBL)

# 2) Apply cutoff to HEADER (order_date is datetime)
preferred_header_cut = preferred_header.filter(F.col("order_date") >= F.lit(cutoff))

# 3) Project to whitelisted columns (+ technical required)
#    - Keep is_archived (Boolean), archive_reason (INT), order_date (Timestamp) for partition/filter
header_keep = list(dict.fromkeys(
    HEADER_WHITELIST + ["is_archived", "partition_by", "archive_reason"]       # partition_by will be created later; harmless to include here
))
preferred_header_cut = project_whitelist(preferred_header_cut, header_keep, type_hints=HEADER_TYPE_HINTS)

line_keep = list(dict.fromkeys(
    LINE_WHITELIST + ["partition_by", "header_id"]           # partition_by created later
))
preferred_line = project_whitelist(preferred_line, line_keep, type_hints=LINE_TYPE_HINTS)

# 4) Header lookup for line (derive partition & filter from header) — includes cutoff
header_lookup = preferred_header_cut.select("id", "order_date", "is_archived", "archive_reason")

# 5) Write EXT tables (partitioned by INT partition_by, filtered to archive_reason=3)
results_header = build_ext_output(
    preferred_df = preferred_header_cut,
    tracker_df   = tracker_header,
    ext_out_tbl  = f"{EXT_SCHEMA}.sales_header"
)
results_line = build_ext_output(
    preferred_df      = preferred_line,
    tracker_df        = tracker_line,
    ext_out_tbl       = f"{EXT_SCHEMA}.sales_line",
    header_lookup_df  = header_lookup,
    line_header_key   = "header_id",
    header_id_col     = "id"
)

print("Done ✅")
print("ext.sales_header rows:", results_header.count())
print("ext.sales_line rows  :", results_line.count())
# display(results_header.limit(10))
# display(results_line.limit(10))

StatementMeta(, ad70c9c2-6571-4cc8-9ed2-474c4983b770, 9, Finished, Available, Finished)

Done ✅
ext.sales_header rows: 5941265
ext.sales_line rows  : 13662745
