In [0]:
@dlt.table(name = "customer_raw")
def get_customers_raw():
    return ( spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format","csv")
            .option("cloudFiles.inferColumnTypes","true")
            .load("/Volumes/dev/demo_db/dlt_data/customers/")
            .withColumn("load_time",current_timestamp())
            )

In [0]:
@dlt.table(name = "invoices_raw")
def get_customers_raw():
    return ( spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format","csv")
            .option("cloudFiles.inferColumnTypes","true")
            .load("/Volumes/dev/demo_db/dlt_data/invoices/")
            .withColumn("load_time",current_timestamp())
            )

In [0]:
@dlt.table(name = "customers_cleaned")
@dlt.expect_or_drop("valid_customers","customer_id is not null")
def get_customers_cleaned():
    return(spark.readStream
           .format("delta")
           .table("live.customers_raw")
           .selectExpr("CustomerID as customer_id","CustomerName as customer_name","load_time")
           )

In [0]:
@dlt.table(name = "invoices_cleaned",partitioned_cols = ["invoice_year","country"])
@dlt.expect_or_drop("valid_invoice_and_qty","invoice_no is not null and quantity >0")
def get_invoices_cleaned():
    return(spark.readStream
           .format("delta")
           .table("live.invoices_raw")
           .selectExpr("InvoiceNo as invoice_no","StockCode as stock_code","Description as description",
                       "Quantity as quantity","to_date(InvoiceDate,'d-M-y H.m') as invoice_date",
                       "UnitPrice as unit_price","CustomerID as customer_id","Country as country",
                       "year(to_date(InvoiceDate,'d-M-y H.m')) as invoice_year",
                       "month(to_date(InvoiceDate,'d-M-y H.m')) as invoice_month"
                       )
    )

In [0]:
dlt.create_streaming_table("customers")

dlt.apply_changes(
  target="customers",
  sources="customer_cleaned",
  keys=["customer_id"],
  sequence_by = col("load_time"),
  stored_as_scd_type = 2
)

In [0]:
dlt.create_streaming_table("invoices",partition_cols = ["invoice_year","country"])

dlt.apply_changes(
    target="invoices",
    sources="invoices_cleaned",
    keys=["invoice_no","stock_code","invoice_date"],
    sequence_by = col("load_time")
)

In [0]:
@dlt.table(name = "daily_sales_uk_2022")
def compute_daily_sales_uk_2022():
    return(spark.read
           .format(delta)
           .table("live.invoices")
           .where("invoice_year = 2022 and country = 'United Kingdom'")
           .groupBy("country","invoice_year","invoice_month","invoice_date")
           .agg(expr("round(sum(quantity*unit_price),2)").alias("total_sales"))

    )