<div  style="text-align: center; line-height: 0; padding-top: 9px;">
<img src="https://raw.githubusercontent.com/derar-alhussein/Databricks-Certified-Data-Engineer-Associate/main/Includes/images/bookstore_schema.png" alt="Databricks Learning" style="width: 600">
</div>

In [0]:
%sql
-- set dataset path
SET datasets.path=dbfs:/mnt/demo-datasets/bookstore;

key,value
datasets.path,dbfs:/mnt/demo-datasets/bookstore


In [0]:
%sql
-- Bronze Layer
-- Auto Loader Files ingest increment
CREATE OR REFRESH STREAMING LIVE TABLE orders_raw
COMMENT "The raw books orders, ingested from orders-raw"
AS SELECT * FROM cloud_files("${datasets.path}/orders-raw", "parquet", map("schema", "order_id string, order_timestamp long, customer_id string, quantity long")) 

message
"This Delta Live Tables query is syntactically valid, but you must create a pipeline in order to define and populate your table."


In [0]:
%sql
CREATE OR REFRESH LIVE TABLE customers
COMMENT "The customers lookup table, ingested from customers-json"
AS SELECT * FROM json.`${dataset_path}/customers-json`

message
"This Delta Live Tables query is syntactically valid, but you must create a pipeline in order to define and populate your table."


In [0]:
%sql
-- Silver Layer
CREATE OR REFRESH STREAMING LIVE TABLE orders_cleaned (
  CONSTRAINT valid_order_number EXPECT (order_id IS NOT NULL) ON VIOLATION DROP ROW
)
COMMENT "The cleaned books order with valid order_id"
AS 
  SELECT 
    order_id,
    quantity,
    o.customer_id,
    c.profile:first_name as f_name,
    c.profile:last_name as l_name,
    cast(from_unixtime(order_timestamp, 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) order_timestamp,
    c.profile:address:country as country
  FROM 
    STREAM(LIVE.orders_raw) o LEFT JOIN LIVE.customers c
  ON
    o.customer_id = c.customer_id

message
"This Delta Live Tables query is syntactically valid, but you must create a pipeline in order to define and populate your table."


>> Constraint violation
| **`ON VIOLATION`** | Behavior |
| --- | --- |
| **`DROP ROW`** | Discard records that violate constraints |
| **`FAIL UPDATE`** | Violated constraint causes the pipeline to fail  |
| Omitted | Records violating constraints will be kept, and reported in metrics |

In [0]:
%sql
-- Gold Table
CREATE OR REFRESH LIVE TABLE cn_daily_customer_books
COMMENT "Daily number of books per customer in China"
AS 
  SELECT 
    customer_id,
    f_name,
    l_name,
    DATE_TRUNC("DD", order_timestamp) order_date,
    SUM(quantity) books_counts
  FROM 
    LIVE.orders_cleaned
  WHERE 
    country = "China"
  GROUP BY 
    customer_id,
    f_name,
    l_name,
    DATE_TRUNC("DD", order_timestamp)

message
"This Delta Live Tables query is syntactically valid, but you must create a pipeline in order to define and populate your table."


In [0]:
%sql
-- Gold Table
CREATE OR REFRESH LIVE TABLE fr_daily_customer_books
COMMENT "Daily number of books per customer in France"
AS 
  SELECT 
    customer_id,
    f_name,
    l_name,
    DATE_TRUNC("DD", order_timestamp) order_date,
    SUM(quantity) books_counts
  FROM 
    LIVE.orders_cleaned
  WHERE 
    country = "France"
  GROUP BY 
    customer_id,
    f_name,
    l_name,
    DATE_TRUNC("DD", order_timestamp)

message
"This Delta Live Tables query is syntactically valid, but you must create a pipeline in order to define and populate your table."


In [0]:
# check pipeline results
# autoloader
# checkpoints
# system
# tables
files = dbutils.fs.ls("dbfs:/mnt/demo/dlt/demo_bookstore")
display(files)

In [0]:
# system dir captures all the events associated with the pipeline
files = dbutils.fs.ls("dbfs:/mnt/demo/dlt/demo_bookstore/system/events")
display(files)

In [0]:
%sql
select * from delta.`dbfs:/mnt/demo/dlt/demo_bookstore/system/events`

In [0]:
files = dbutils.fs.ls("dbfs:/mnt/demo/dlt/demo_bookstore/tables")
display(files)

In [0]:
%sql
SELECT * FROM demo_bookstore_dlt_db.cn_daily_customer_books

In [0]:
%sql
SELECT * FROM demo_bookstore_dlt_db.fr_daily_customer_books