In [0]:
import dlt
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import current_timestamp, from_utc_timestamp

Approach 4

In [0]:
schemas = {
    "address": {
        "AddressID": "int",
        "PostalCode": "int",
        "StateProvinceID":"int",
        "ModifiedDate": "date",
        "Year":"int"
    },  
    "customer": {
        "CustomerID": "int",
        "PersonID": "double",
        "StoreID": "double",
        "TerritoryID": "int",
        "ModifiedDate": "timestamp",
        "Year":"int"
    },
    "addresstype":{
        "AddressTypeID": "int",
        "ModifiedDate": "date",
        "Year":"int"
    }, 
     "businessentityaddress":{
         "BusinessEntityID": "int",
         "AddressID": "int",
         "AddressTypeID": "int",
         "ModifiedDate": "date",
         "Year":"int"
     },
     "culture":{
         "CultureID": "string",
         "ModifiedDate": "date",
         "Year":"int"
     },
    "emailaddress":{
        "BusinessEntityID": "int",
        "EmailAddressID": "int",
        "ModifiedDate": "date",
        "Year":"int"
    },
    "person":{
        "BusinessEntityID": "int",
        "PersonType": "string",
        "NameStyle": "boolean",
        "EmailPromotion": "int",
        "ModifiedDate": "date",
        "Year":"int"
    },
    "product":{
        "ProductID": "int",
        "MakeFlag": "boolean",
        "FinishedGoodsFlag": "boolean",
        "SafetyStockLevel": "int",
        "ReorderPoint": "int",
        "StandardCost": "double",
        "ListPrice": "double",
        "DaysToManufacture": "int",
        "ProductSubcategoryID":"int",
        "ProductModelID":"int",
        "Weight": "double",
        "SellStartDate": "date",
        "SellEndDate": "date",
        "DiscontinuedDate": "date",
        "Year":"int"
    },
     "productcategory":{
         "ProductCategoryID": "int",
         "ModifiedDate": "date",
         "Year":"int"
     },
     "productdescription":{
         "ProductDescriptionID": "int",
         "ModifiedDate": "date",
         "Year":"int"
     },
     "productmodel":{
         "ProductModelID": "int",
         "ModifiedDate": "date",
         "Year":"int"
     },
     "productmodelproductdescriptionculture":{
         "ProductModelID": "int",
         "ProductDescriptionID": "int",
         "CultureID": "string",
         "ModifiedDate": "date",
         "Year":"int"
     },
     "productsubcategory":{
         "ProductSubcategoryID": "int",
         "ProductCategoryID": "int",
         "ModifiedDate": "date",
         "Year":"int"
     },
     "salesorderdetail":{
         "SalesOrderID": "int",
         "SalesOrderDetailID":"int",
         "OrderQty": "int",
         "ProductID": "int",
         "SpecialOfferID": "int",
         "UnitPrice": "double",
         "UnitPriceDiscount": "double",
         "LineTotal": "double",
         "ModifiedDate": "date",
         "Year":"int"
     },
     "salesorderheader":{
         "SalesOrderID": "int",
         "RevisionNumber": "int",
         "OrderDate": "date",
         "DueDate": "date",
         "ShipDate": "date",
         "Status": "int",
         "OnlineOrderFlag": "boolean",
         "CustomerID": "int",
         "SalesPersonID":"double",
         "TerritoryID":"int",
         "BillToAddress":"int",
         "ShipMethodID": "int",
         "ShipToAddressID": "int",
         "CreditCardID": "double",
         "CurrencyRateID": "double",
         "SubTotal": "double",
         "TaxAmt": "double",
         "Freight": "double",
         "TotalDue": "double",
         "ModifiedDate": "date",
         "Year":"int"
     }
     # Add schemas for your other tables here...
 }

In [0]:
import dlt
from pyspark.sql.functions import col, lit, expr
 

 
# -------------------------------------------------------------------
# 2. Quality Checks
# -------------------------------------------------------------------
quality_checks = {
    "address": {
        "AddressID": "AddressID IS NOT NULL",
        
        "ModifiedDate": "ModifiedDate IS NOT NULL"
    },
    "customer": {
        "CustomerID": "CustomerID IS NOT NULL",
        "PersonID": " PersonID IS NOT NULL"
    },
    "person": {
        "BusinessEntityID": "BusinessEntityID IS NOT NULL",
        "rowguid": "len(rowguid) = 36"
    },
    "emailaddress": {
        "EmailAddressID": "EmailAddressID IS NOT NULL"
        # "EmailAddress": "EmailAddress REGEXP '^[A-Za-z0-9+_.-]+@[A-Za-z0-9.-]+$'"
        
    },
    "product": {
        "ProductID": "ProductID IS NOT NULL"
    },
    "addresstype":{
        "AddressTypeID": "AddressTypeID IS NOT NULL"
    },
    "businessentityaddress":{
       "BusinessEntityID":"BusinessEntityID IS NOT NULL",
        "AddressID": "AddressID IS NOT NULL",
        "AddressTypeID": "AddressTypeID IS NOT NULL"
    },
    "culture":{
        "CultureID": "CultureID IS NOT NULL"
    },
    "productcategory":{
        "ProductCategoryID": "ProductCategoryID IS NOT NULL"
    },
    "productdescription":{
        "ProductDescriptionID": "ProductDescriptionID IS NOT NULL"
    },
    "productmodel":{
        "ProductModelID": "ProductModelID IS NOT NULL"
    },
    "productmodelproductdescriptionculture":{
        "ProductModelID": "ProductModelID IS NOT NULL",
        "ProductDescriptionID": "ProductDescriptionID IS NOT NULL",
        "CultureID": "CultureID IS NOT NULL"
    },
    "productsubcategory":{
        "ProductSubcategoryID": "ProductSubcategoryID IS NOT NULL",
        "ProductCategoryID": "ProductCategoryID IS NOT NULL",
    },
    "salesorderdetail":{
        "SalesOrderID": "SalesOrderID IS NOT NULL",
        "SalesOrderDetailID":"SalesOrderDetailID IS NOT NULL"
    },
    "salesorderheader":{
        "SalesOrderID": "SalesOrderID IS NOT NULL",
        "CustomerID": "CustomerID IS NOT NULL"
    }
}
 
# -------------------------------------------------------------------
# 3. Schema Casting + Validation
# -------------------------------------------------------------------
def apply_schema_and_validate(df, schema_dict, table_rules, table_name):
    # Cast schema
    casted_df = df
    for field, dtype in schema_dict.items():
        if field in df.columns:
            casted_df = casted_df.withColumn(field, col(field).cast(dtype))
        else:
            print(f"Warning: Column {field} not found in table {table_name}")
 
    # Build one validation flag from all rules
    validation_expr = " AND ".join([rule for _, rule in table_rules.items()])
    if validation_expr:
        casted_df = casted_df.withColumn("is_valid", expr(validation_expr))
    else:
        casted_df = casted_df.withColumn("is_valid", lit(False))
 
    # Tag table name
    casted_df = casted_df.withColumn("source_table", lit(table_name))
    return casted_df
 
# -------------------------------------------------------------------
# 4. Silver + Quarantine creation
# -------------------------------------------------------------------
def create_silver_and_quarantine(table_name, schema_dict, table_rules, create_quarantine=False):
 
    # ---------------- Silver ----------------
    @dlt.table(
        name=f"{table_name}_silver",
        comment=f"Clean silver table for {table_name}"
    )
    @dlt.expect_or_drop(f"{table_name}_all_rules", "is_valid = true")
    def silver():
        df = dlt.read_stream(f"training.piyush.{table_name}_bronze")
        silver_df= apply_schema_and_validate(df, schema_dict, table_rules, table_name)
        silver_df = silver_df.dropDuplicates()
        return silver_df
 
    # ---------------- Quarantine ----------------
    if create_quarantine:
        @dlt.table(
            name=f"{table_name}_quarantine",
            comment=f"Quarantine rows for {table_name}"
        )
        def quarantine():
            df = dlt.read_stream(f"training.piyush.{table_name}_bronze")
            df = apply_schema_and_validate(df, schema_dict, table_rules, table_name)
            return df.filter(~col("is_valid"))                              # invalid rows only
 
        return silver, quarantine
 
    return silver, None
 
# -------------------------------------------------------------------
# 5. Register all tables
# -------------------------------------------------------------------
tables_with_quarantine = ['address', 'person']   # add more as needed
 
for table_name in schemas.keys() & quality_checks.keys():
    schema_dict = schemas[table_name]
    table_rules = quality_checks[table_name]
 
    if table_name in tables_with_quarantine:
        silver_func, quarantine_func = create_silver_and_quarantine(
            table_name, schema_dict, table_rules, create_quarantine=True
        )
    else:
        silver_func, _ = create_silver_and_quarantine(
            table_name, schema_dict, table_rules, create_quarantine=False
        )