In [0]:
# DLT works with three types of Datasets
# Streaming Tables (Permanent/Temporary) Used as Append Data Sources,Incremental data
# Materialized Views Used for transformations, aggregations or computations
# Views Used for intermediate Tranformations, not stored in Target Schema
import dlt

In [0]:
_order_status = spark.conf.get("custom.orderStatus", "NA")

In [0]:
#Rules for Data Quality (warn, drop and fail)
__order_rules = {
"Valid Order Status": "o_orderstatus in ('O', 'F', 'P')",
"Valid Order Price": "o_totalprice > 0"
}
__customer_rules = {
"Valid Market Segment": "c_mktsegment is not null"
}

In [0]:
#Create a streaming table for Orders
@dlt.table(
table_properties = {"quality": "bronze","pipelines.reset.allowed":"false"},
comment = "Order bronze table"
)

def orders_bronze():
 df = spark.readStream.option("skipChangeCommits","true").table("dev1.bronze.order_raw")
 return df

Name,Type
o_orderkey,bigint
o_custkey,bigint
o_orderstatus,string
o_totalprice,"decimal(18,2)"
o_orderdate,date
o_orderpriority,string
o_clerk,string
o_shippriority,int
o_comment,string


In [0]:
# Create a streaming table for Orders Autoloader
@dlt.table(
 table_properties = {"quality": "bronze"},
 comment = "Order Autoloader",
 name = "orders_autoloader_bronze"
)
def func():
 df = (
  spark
  .readStream
  .format("cloudFiles")
  .option("cloudFiles.schemaHints", "o_orderkey long, o_custkey long, o_orderstatus string, o_totalprice decimal(18,2), o_orderdate date, o_orderpriority string, o_clerk string, o_shippriority integer, o_comment string")
  .option("cloudFiles.schemalocation", "/Volumes/dev1/etl/landing/autoloader/schemas/1/")
  .option("cloudFiles.format", "CSV")
  .option("pathGlobfilter", "*.csv")
  .option("cloudFiles.schemaEvolutionMode", "none")
  .load("/Volumes/dev1/etl/landing/files/")
)
 return df

In [0]:
dlt.create_streaming_table("orders_union_bronze")

# Append Flow
@dlt.append_flow(
target = "orders_union_bronze"
)
def order_delta_append():
 df = spark.readStream.table("LIVE.orders_bronze")
 return df

# Append Flow

@dlt.append_flow(
target = "orders_union_bronze"
)
def order_autoloader_append():
 df = spark.readStream.table("LIVE.orders_autoloader_bronze")
 return df

In [0]:
# @dlt.table(
# table_properties = {"quality": "bronze"},
# comment = "Customer bronze table",
# name = "customer_bronze"
# )
# def cust_bronze():
#  df = spark.read.table("dev1.bronze.customers_raw")
#  return df

Name,Type
c_custkey,bigint
c_name,string
c_address,string
c_nationkey,bigint
c_phone,string
c_acctbal,"decimal(18,2)"
c_mktsegment,string
c_comment,string


In [0]:
#Create a Materialized View for customers
@dlt.view(
comment = "Customer bronze view"
)
def customer_bronze_vw():
 df = spark.readStream.table("dev1.bronze.customers_raw")
 return df

In [0]:
from pyspark.sql.functions import expr
dlt.create_streaming_table("customer_scd1_bronze")
#SCD 1 Customer
dlt.apply_changes(
  target = "customer_scd1_bronze",
  source = "customer_bronze_vw",
  keys = ["c_custkey"],
  stored_as_scd_type = 1,
  apply_as_deletes = expr("__src_action = 'D'"),
  apply_as_truncates = expr("__src_action = 'T'"),
  sequence_by = "__src_insert_dt"
)

In [0]:
from pyspark.sql.functions import expr
dlt.create_streaming_table("customer_scd2_bronze",table_properties = {"pipelines.reset.allowed":"false"})
#SCD 2 Customer
dlt.apply_changes(
  target = "customer_scd2_bronze",
  source = "customer_bronze_vw",
  keys = ["c_custkey"],
  stored_as_scd_type = 2,
  except_column_list = ["__src_action","__src_insert_dt"],
  sequence_by = "__src_insert_dt"
)

In [0]:
#Create a View to join orders with customers
@dlt.table(
comment = "Joined View"
)
@dlt.expect_all_or_drop(__order_rules) #warn # expect_all_or_fail # expect_all_or_drop
@dlt.expect_all(__customer_rules)
def joined_vw():
 df_c = spark.read.table("LIVE.customer_scd2_bronze").where("__END_AT is null")
 df_o = spark.read.table("LIVE.orders_union_bronze")
 df_join = df_o.join(df_c, how = "left_outer", on=df_c.c_custkey == df_o.o_custkey)
 return df_join

Name,Type
o_orderkey,bigint
o_custkey,bigint
o_orderstatus,string
o_totalprice,"decimal(18,2)"
o_orderdate,date
o_orderpriority,string
o_clerk,string
o_shippriority,int
o_comment,string
c_custkey,bigint


In [0]:
#Create MV to add new column.
from pyspark.sql.functions import current_timestamp
@dlt.table(
table_properties = {"quality": "silver"},
comment = "Joined table",
name = "orders_silver"
)
def joined_silver():
 df = spark.read.table("LIVE.joined_vw").withColumn("_insert_date", current_timestamp())
 return df

Name,Type
o_orderkey,bigint
o_custkey,bigint
o_orderstatus,string
o_totalprice,"decimal(18,2)"
o_orderdate,date
o_orderpriority,string
o_clerk,string
o_shippriority,int
o_comment,string
c_custkey,bigint


In [0]:
#Aggregate based on c_mktsegment and find the count of order (c_orderkey)
from pyspark.sql.functions import count,sum
@dlt.table(
table_properties = {"quality": "gold"},
comment = "orders aggregated table"
)
def orders_agg_gold():
 df = spark.read.table("LIVE.orders_silver")
 df_final = df.groupBy("c_mktsegment").agg(count("o_orderkey").alias("count_orders"),sum("o_totalprice").alias("sum_totalprice")).withColumn("_insert_date", current_timestamp())
 return df_final

Name,Type
c_mktsegment,string
sum_orders,bigint
_insert_date,timestamp


In [0]:
for _status in _order_status.split(","):
 @dlt.table(
 table_properties = {"quality": "gold"}, comment = "orders aggregated table", name = f"orders_agg_{_status}_gold" )
 def func():
  df = spark.read.table("LIVE.orders_silver")
  df_final = df.where(f"o_orderstatus = '{_status}'").groupBy("c_mktsegment").agg(count("o_orderkey").alias("count_orders"),
  sum("o_totalprice").alias("sum_totalprice")).withColumn("_insert_date", current_timestamp())
  return df_final