# DBC_10 Quiz: Material Master Data: Data Processing

## Importing Libraries

In [0]:
import dlt
from pyspark import pipelines as dp
from pyspark.sql.functions import *
from pyspark.sql.types import *

## Dataset Rules

- Rule 1: `material_id` should not be null
- Rule 2: `unit_cost` should be greater than 0
- Rule 3: `status` should not be null


In [0]:
# Dataset Rules
# Pipelines Expectations

dataset_rules = {
  "R1": "material_id is not null",
  "R2": "unit_cost > 0",
  "R3": "status is not null"
}

## Bronze Layer Implementation

Read and store raw files as-is from Databricks `Volume` to `Source Table` and Copy Stream to Bronze Layer `bronze_mmd_data`

In [0]:
# Bronze Layer Implementation
@dlt.table
@dlt.expect_all(dataset_rules)
def bronze_mmd_data():
    df = spark.readStream \
        .format("delta") \
        .option("readChangeFeed", "true") \
        .table("workspace.mmd_schema.source_mmd_data")
    return df

## Silver Layer Implementation

Stream data from the bronze layer `bronze_mmd_data`, drop rows that do not meet `dataset rules`, and cast columns to appropriate `datatypes`.

In [0]:
@dlt.table
@dlt.expect_all_or_drop(dataset_rules)
def silver_mmd_data():
    df = spark.readStream.table("LIVE.bronze_mmd_data")
    df = df.withColumn("unit_cost", df.unit_cost.cast(DoubleType()))
    df = df.withColumn("last_updated", col("last_updated").cast(DateType()))
    df = df.withColumn("lead_time_days", col("lead_time_days").cast(IntegerType()))
    df = df.withColumn("safety_stock", col("safety_stock").cast(IntegerType()))
    df = df.withColumn("reorder_level", col("reorder_level").cast(IntegerType()))
    return df