In [0]:
spark


<pyspark.sql.connect.session.SparkSession at 0x7f8db50b28a0>

In [0]:
%sql
USE CATALOG workspace;

CREATE SCHEMA IF NOT EXISTS eligibility;
CREATE VOLUME IF NOT EXISTS eligibility.ingest;
CREATE VOLUME IF NOT EXISTS eligibility.config;

In [0]:
import json

CONFIG_PATH = "/Volumes/workspace/eligibility/config/partners.json"
BASE = "/Volumes/workspace/eligibility/ingest"

config = {
  "acme": {
    "partner_code": "ACME",
    "file": "acme.txt",
    "format": "csv",
    "header": True,
    "delimiter": "|",
    "columns": {
      "MBI": "external_id",
      "FNAME": "first_name",
      "LNAME": "last_name",
      "DOB": "dob",
      "EMAIL": "email",
      "PHONE": "phone"
    }
  },
  "bettercare": {
    "partner_code": "BETTERCARE",
    "file": "bettercare.csv",
    "format": "csv",
    "header": True,
    "delimiter": ",",
    "columns": {
      "subscriber_id": "external_id",
      "first_name": "first_name",
      "last_name": "last_name",
      "date_of_birth": "dob",
      "email": "email",
      "phone": "phone"
    }
  }
}

with open(CONFIG_PATH, "w") as f:
    json.dump(config, f, indent=2)

display(dbutils.fs.ls("/Volumes/workspace/eligibility/config"))


path,name,size,modificationTime
dbfs:/Volumes/workspace/eligibility/config/partners.json,partners.json,663,1768262757000


In [0]:
acme_content = """MBI|FNAME|LNAME|DOB|EMAIL|PHONE
1234567890A|John|Doe|03/15/1955|JOHN.DOE@EMAIL.COM|5551234567
9876543210B|Jane|Smith|07/22/1948|jane.smith@email.com|5559876543
"""
bettercare_content = """subscriber_id,first_name,last_name,date_of_birth,email,phone
BC-001,Alice,Johnson,1965-08-10,alice.j@test.com,555-222-3333
BC-002,Charlie,Brown,1972-03-25,charlie.b@test.com,5554445555
"""

with open(f"{BASE}/acme.txt", "w") as f:
    f.write(acme_content)
with open(f"{BASE}/bettercare.csv", "w") as f:
    f.write(bettercare_content)

display(dbutils.fs.ls(BASE))


path,name,size,modificationTime
dbfs:/Volumes/workspace/eligibility/ingest/acme.txt,acme.txt,160,1768262775000
dbfs:/Volumes/workspace/eligibility/ingest/bettercare.csv,bettercare.csv,185,1768262775000


In [0]:
import json
from pyspark.sql import functions as F
from functools import reduce

with open(CONFIG_PATH, "r") as f:
    PARTNER_CONFIG = json.load(f)

REQUIRED = ["external_id", "first_name", "last_name", "dob", "email", "phone"]

print("Loaded partners:", list(PARTNER_CONFIG.keys()))


Loaded partners: ['acme', 'bettercare']


In [0]:
def read_partner(cfg: dict):
    path = f"{INGEST_BASE}/{cfg['file']}"

    df = (spark.read.format(cfg.get("format", "csv"))
          .option("header", str(cfg.get("header", True)).lower())
          .option("sep", cfg["delimiter"])
          .option("mode", "PERMISSIVE")
          .option("columnNameOfCorruptRecord", "_corrupt_record")
          .load(path)
          .withColumn("_source_file", F.col("_metadata.file_path"))
         )

    if "_corrupt_record" not in df.columns:
        df = df.withColumn("_corrupt_record", F.lit(None).cast("string"))

    return df


In [0]:
# Column mapping
def map_columns(df, col_map: dict):
    out = df
    for src, dst in col_map.items():
        out = out.withColumn(dst, F.col(src) if src in out.columns else F.lit(None))

    for c in REQUIRED:
        if c not in out.columns:
            out = out.withColumn(c, F.lit(None))

    return out


In [0]:
def normalize_phone(col):
    digits = F.regexp_replace(col, r"[^0-9]", "")
    return F.when(
        F.length(digits) == 10,
        F.concat_ws("-",
                    F.substring(digits, 1, 3),
                    F.substring(digits, 4, 3),
                    F.substring(digits, 7, 4))
    ).otherwise(F.lit(None))

def parse_dob(col):
    return F.coalesce(
        F.try_to_date(col, "yyyy-MM-dd"),
        F.try_to_date(col, "MM/dd/yyyy")
    )

In [0]:
def standardize(df, partner_code: str):
    return (df
        .withColumn("external_id", F.col("external_id").cast("string"))
        .withColumn("first_name", F.initcap(F.col("first_name")))
        .withColumn("last_name", F.initcap(F.col("last_name")))
        .withColumn("dob", parse_dob(F.col("dob")))
        .withColumn("email", F.lower(F.col("email")))
        .withColumn("phone", normalize_phone(F.col("phone")))
        .withColumn("partner_code", F.lit(partner_code))
    )


In [0]:
def split_good_bad(df):
    missing_id = F.col("external_id").isNull() | (F.length(F.trim(F.col("external_id"))) == 0)
    bad_dob = F.col("dob").isNull()
    bad_phone = F.col("phone").isNull()  # strict: 10-digit US rule
    corrupt = F.col("_corrupt_record").isNotNull()

    reason = (F.when(corrupt, F.lit("MALFORMED_ROW"))
              .when(missing_id, F.lit("MISSING_EXTERNAL_ID"))
              .when(bad_dob, F.lit("INVALID_DOB"))
              .when(bad_phone, F.lit("INVALID_PHONE"))
              .otherwise(F.lit(None)))

    tagged = df.withColumn("_dq_reason", reason)

    good = (tagged.filter(F.col("_dq_reason").isNull())
                  .select("external_id","first_name","last_name","dob","email","phone","partner_code"))

    bad = (tagged.filter(F.col("_dq_reason").isNotNull())
                 .select("_dq_reason", "_corrupt_record", "_source_file",
                         "partner_code", *REQUIRED))

    return good, bad


In [0]:
def process_partner(cfg: dict):
    raw = read_partner(cfg)
    mapped = map_columns(raw, cfg["columns"])
    std = standardize(mapped, cfg["partner_code"])
    return split_good_bad(std)

goods, bads = [], []

for partner_key, cfg in PARTNER_CONFIG.items():
    g, b = process_partner(cfg)
    goods.append(g)
    bads.append(b)

unified_good = reduce(lambda a, b: a.unionByName(b), goods)
unified_bad  = reduce(lambda a, b: a.unionByName(b), bads)


expected_cols = ["external_id","first_name","last_name","dob","email","phone","partner_code"]
assert unified_good.columns == expected_cols, f"Unexpected columns: {unified_good.columns}"

good_count = unified_good.count()
bad_count = unified_bad.count()
print("GOOD rows:", good_count)
print("BAD rows:", bad_count)

assert good_count == 4, f"Expected 4 good rows for sample input, got {good_count}"

display(unified_good)
display(unified_bad)
display(unified_bad.groupBy("_dq_reason").count())


GOOD rows: 4
BAD rows: 0


external_id,first_name,last_name,dob,email,phone,partner_code
1234567890A,John,Doe,1955-03-15,john.doe@email.com,555-123-4567,ACME
9876543210B,Jane,Smith,1948-07-22,jane.smith@email.com,555-987-6543,ACME
BC-001,Alice,Johnson,1965-08-10,alice.j@test.com,555-222-3333,BETTERCARE
BC-002,Charlie,Brown,1972-03-25,charlie.b@test.com,555-444-5555,BETTERCARE


_dq_reason,_corrupt_record,_source_file,partner_code,external_id,first_name,last_name,dob,email,phone


_dq_reason,count


In [0]:
unified_good.write.mode("overwrite").format("delta").saveAsTable("workspace.eligibility.eligibility_unified")
unified_bad.write.mode("overwrite").format("delta").saveAsTable("workspace.eligibility.eligibility_quarantine")


In [0]:
%sql 

DESCRIBE EXTENDED workspace.eligibility.eligibility_unified;




col_name,data_type,comment
external_id,string,
first_name,string,
last_name,string,
dob,date,
email,string,
phone,string,
partner_code,string,
,,
# Delta Statistics Columns,,
Column Names,"first_name, email, last_name, external_id, phone, dob, partner_code",
