- **Streaming table**	- Each record is processed exactly once. This assumes an append-only source.
- **Materialized view** - Records are processed as required to return accurate results for the current data state. Materialized views should be used for data processing tasks such as transformations, aggregations, or pre-computing slow queries and frequently used computations.
- **View** - Records are processed each time the view is queried. Use views for intermediate transformations and data quality checks that should not be published to public datasets.

In [0]:
import dlt


In [0]:
_order_status = spark.conf.get("custom.orderStatus", "NA")

In [0]:
# Create a streaming table for Orders
@dlt.table(
  table_properties = { "quality": "bronze" },
  comment = "Order bronze table"
)

def orders_bronze():
  df = spark.readStream.table("dev.bronze.orders_raw")
  return df

In [0]:
# Create a streaming table for Orders Autoloader
@dlt.table(
  table_properties = { "quality": "bronze" },
  comment = "Order Autoloader",
  name = "orders_autoloader_bronze"
)

def func():
  df = (
      spark
      .readStream
      .format('cloudFiles')
      .option('cloudFiles.schemaHints',"o_orderkey long, o_custkey long, o_orderstatus string, o_totalprice decimal(18,2),o_orderdate date, o_orderpriority string, o_clerk string, o_shippriority integer, o_comment string")
      .option("cloudFiles.schemaLocation","/Volumes/dev/etl/landing/autoloader/schemas/1")
      .option("cloudFiles.format","CSV")
      .option("pathGlobfilter","*.csv")
      .option("cloudFiles.schemaEvolutionMode","none")
      .load("/Volumes/dev/etl/landing/files/")
  )
  return df

In [0]:
# Create streaming table with union of two orders tables

dlt.create_streaming_table("orders_union_bronze")

# Appendflow Stream table
@dlt.append_flow(
    target = "orders_union_bronze"
)
def order_delta_append():
    df = spark.readStream.table("LIVE.orders_bronze")
    return df

# Appendflow Autoloader
@dlt.append_flow(
    target = "orders_union_bronze"
)
def order_autoloader_append():
    df = spark.readStream.table("LIVE.orders_autoloader_bronze")
    return df

In [0]:
# # Create a materialized view for customers
# @dlt.table(
#   table_properties = {"quality": "bronze"},
#   comment = "Customer bronze table",
#   name = "customer_bronze"
# )

# def cust_bronze():
#   df = spark.read.table("dev.bronze.customer_raw")
#   return df

In [0]:
# Create a materialized view for customers
@dlt.view(
  comment = "Customer bronze view",
)

def customer_bronze_vw():
  df = spark.readStream.table("dev.bronze.customer_raw")
  return df

In [0]:
from pyspark.sql.functions import expr
# Write SCD Type 1 table - 
dlt.create_streaming_table("customer_scd1_bronze")

dlt.apply_changes(
    target = "customer_scd1_bronze",
    source = "customer_bronze_vw",
    keys = ["c_custkey"],
    stored_as_scd_type = 1,
    apply_as_deletes = expr("__src_action = 'D'"),
    apply_as_truncates = expr("__src_action = 'T'"),
    sequence_by = "__src_insert_dt"
)

In [0]:
from pyspark.sql.functions import expr
# Write SCD Type 2 table - 
dlt.create_streaming_table("customer_scd2_bronze")

dlt.apply_changes(
    target = "customer_scd2_bronze",
    source = "customer_bronze_vw",
    keys = ["c_custkey"],
    stored_as_scd_type = 2,
    except_column_list=["__src_action","__src_insert_dt"],
    sequence_by = "__src_insert_dt"
)

In [0]:
# Create a view to join orders with customers
@dlt.table(
  comment = "Joined View"
)

def joined_vw():
  df_c = spark.read.table("LIVE.customer_scd2_bronze").where("__END_AT is null")
  df_o = spark.read.table("LIVE.orders_union_bronze")
  df_join = df_o.join(df_c, how = "left_outer", on=df_c.c_custkey==df_o.o_custkey)
  
  return df_join

In [0]:
# Create MV to add new column
from pyspark.sql.functions import current_timestamp, count, sum

@dlt.table(
  table_properties = {"quality": "silver"},
  comment = "Joined Table",
  name = "joined_silver"
)

def joined_silver():
  df = spark.read.table("LIVE.joined_vw").withColumn("__insert_date", current_timestamp())
  return df

In [0]:
# Aggregate based on c_mktsegment and find the count of oder (c_orderkey)
@dlt.table(
  table_properties = { "quality": "gold" },
  comment = "Orders aggregated table"
)

def orders_agg_gold():
  df = spark.read.table("LIVE.joined_silver")

  df_final = df.groupBy("c_mktsegment").agg(count("o_orderkey").alias("count_orders"), sum("o_totalprice").alias("sum_totalprice")).withColumn("__insert_date", current_timestamp())

  return df_final

In [0]:
for _status in _order_status.split(","): 
    # Aggregate based on c_mktsegment and find the count of oder (c_orderkey)
    @dlt.table(
        table_properties = { "quality": "gold" },
        comment = "Orders aggregated table",
        name = f"orders_agg_{_status}_gold"
    )

    def func():
        df = spark.read.table("LIVE.joined_silver")

        df_final = df.where(f"o_orderstatus='{_status}'").groupBy("c_mktsegment").agg(count("o_orderkey").alias("count_orders"), sum("o_totalprice").alias("sum_totalprice")).withColumn("__insert_date", current_timestamp())

        return df_final