1.Data Quality and Expectations - only in Table and views
@dlt.expect_all(__order_rules) #warn
@dlt.expect_all_or_fail(__order_rules) #fail
@dlt.expect_all_or_drop(__order_rules) #drop

2.Multiple different Expectaions can defined at a same place (view or table)

3.If a table is being truncated and new records are inserted, use "skipChangeCommits" to "true" to make the streaming read work

4.If we do not want a full refrsh for a table - use "pipeline.reset.allowed" : "false" in table properties





In [0]:
# DLT works with 3 types of datasets
# Streaming Tables (Permanent/Temporary) - used as Append Data Sources, Incremental data
# Materialized views - used for transformation, aggregations or computations
# Views - used for intermediate transformations, not stored in target schema

import dlt

In [0]:
_order_status = spark.conf.get("custom.orderStatus", "NA")

In [0]:
# Rules for Data Quality (warn, drop and fail)
__order_rules = {
    "Valid Order Status" : "o_orderstatus in ('F', 'O', 'P')",
    "Valid Order Price" : "o_totalprice > 0"
}

__customer_rules = {
    "Valid Market Segment" : "c_mktsegment is not null"
}

In [0]:
# create a streaming table for orders
@dlt.table(
  table_properties = {"quality" : "bronze"},
  comment = "Streaming table for orders bronze table"
)
#@dlt.expect_all(__order_rules) #warn
#@dlt.expect_all_or_fail(__order_rules) #fail
@dlt.expect_all_or_drop(__order_rules) #drop
def orders_bronze():
  df = spark.readStream.table("dev.bronze.orders_raw")
  return df

#@dlt.create_streaming_table(comment = "Streaming table for orders")
#def orders():
#  return spark.readStream.table("orders")*/

In [0]:
# create a streaming table for orders AutoLoader
@dlt.table(
  table_properties = {"quality" : "bronze"},
  comment = "Streaming table for orders AutoLoader",
  name = "orders_autoloader_bronze"
)
def func():
  df = (
      spark
      .readStream
      .format("cloudFiles")
      .option("cloudFiles.SchemaHints", "o_orderkey long, o_custkey long, o_orderstatus string, o_totalprice decimal(18,2), o_orderdate date, o_orderpriority string, o_clerk string, o_shippriority integer, o_comment string")
      .option("cloudFiles.schemaLocation", "/Volumes/dev/etl/landing/autoloader/schemas/1/")
      .option("cloudFiles.format", "csv")
      .option("pathGlobFilter", "*.csv")
      .option("cloudFilesSchemaEvolutionMode", "none")
      .load("/Volumes/dev/etl/landing/files/") 
    )  
  return df

#@dlt.create_streaming_table(comment = "Streaming table for orders")
#def orders():
#  return spark.readStream.table("orders")*/

In [0]:
dlt.create_streaming_table("orders_union_bronze")

#Append flow - First Table
@dlt.append_flow(
    target = "orders_union_bronze"
)
def order_delta_append():
    df = spark.readStream.table("LIVE.orders_bronze")
    return df

#Append flow  - Second Table
@dlt.append_flow(
    target = "orders_union_bronze"
)
def order_autoloader_append():
    df = spark.readStream.table("LIVE.orders_autoloader_bronze")
    return df


In [0]:
# create a materialized views for customers
# create a streaming table for orders
#@dlt.table(
#  table_properties = {"quality" : "bronze"},
#  comment = "Materliazed view for customer bronze table",
#  name = "customer_bronze"
#)
#def cust_bronze():
#  df = spark.read.table("dev.bronze.customer_raw")
#  return df

In [0]:
# create a materialized views for customers
# create a streaming table for orders
@dlt.view(
  comment = "customer bronze view"
)
@dlt.expect_all(__customer_rules)
def customer_bronze_vw():
  df = spark.readStream.table("dev.bronze.customer_raw")
  return df

In [0]:
from pyspark.sql.functions import expr
dlt.create_streaming_table("customer_scd1_bronze")

# SCD 1 Customer
dlt.apply_changes(
    target = "customer_scd1_bronze",
    source = "customer_bronze_vw",
    keys = ["c_custkey"],
    stored_as_scd_type = 1,
    apply_as_deletes = expr("__src_action ='D'"),
    apply_as_truncates = expr("__src_action ='T'"),
    sequence_by = "__src_insert_dt"
)


In [0]:
dlt.create_streaming_table("customer_scd2_bronze")

# SCD 2 Customer
dlt.apply_changes(
    target = "customer_scd2_bronze",
    source = "customer_bronze_vw",
    keys = ["c_custkey"],
    stored_as_scd_type = 2,
    except_column_list = ["__src_action", "__src_insert_dt"],
    sequence_by = "__src_insert_dt"
)

In [0]:
# create a viewto join orders with customers
@dlt.view(
    comment = "Bronze Joined table"
)
def joined_vw():
  df_c = spark.read.table("LIVE.customer_scd2_bronze").where("__END_AT IS NULL")
  df_o = spark.read.table("LIVE.orders_union_bronze")

  df_join = df_o.join(df_c, how = "left_outer", on = df_o.o_custkey == df_c.c_custkey)
  return df_join


In [0]:
# create MV to add new column
from pyspark.sql.functions import current_timestamp

@dlt.table(
  table_properties = {"quality" : "silver"},
  comment = "joined table",
  name = "orders_silver"
)
def orders_silver():
  df = spark.read.table("LIVE.joined_vw").withColumn("__insert_date", current_timestamp())
  return df

In [0]:
# Aggregate based on c_mktsegment and find the count of order (c_orderkey)
from pyspark.sql.functions import current_timestamp, count, sum

@dlt.table(
  table_properties = {"quality" : "gold"},
  comment = "orders aggregated able"
)
def orders_agg_gold():
  df = spark.read.table("LIVE.orders_silver")

  df_final = df.groupBy("c_mktsegment").agg(count("o_orderkey").alias("count_orders"),sum("o_totalprice").alias("sum_totalprice")).withColumn("__insert_date", current_timestamp())

  return df_final

In [0]:
for _status in _order_status.split(","):
    @dlt.table(
    table_properties = {"quality" : "gold"},
    comment = "orders aggregated able",
    name = f"orders_agg_{_status}_gold"
    )
    def orders_agg_gold():
       df = spark.read.table("LIVE.orders_silver")

       df_final = df.where(f"o_orderstatus = '{_status}'").groupBy("c_mktsegment").agg(count("o_orderkey").alias("count_orders"),sum("o_totalprice").alias("sum_totalprice")).withColumn("__insert_date", current_timestamp())

       return df_final
