
<div  style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://raw.githubusercontent.com/derar-alhussein/Databricks-Certified-Data-Engineer-Associate/main/Includes/images/bookstore_schema.png" alt="Databricks Learning" style="width: 600">
</div>

In [0]:
%run ../Includes/Copy-Datasets


## Exploring The Source dDirectory

In [0]:
files = dbutils.fs.ls(f"{dataset_bookstore}/orders-raw")
display(files)

path,name,size,modificationTime
dbfs:/mnt/demo-datasets/bookstore/orders-raw/01.parquet,01.parquet,18823,1746436831000
dbfs:/mnt/demo-datasets/bookstore/orders-raw/02.parquet,02.parquet,18814,1746505605000



## Auto Loader

In [0]:
(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", "dbfs:/mnt/demo/checkpoints/orders_raw")
    .load(f"{dataset_bookstore}/orders-raw")
    .createOrReplaceTempView("orders_raw_temp"))


## Enriching Raw Data

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW orders_tmp AS (
  SELECT *, current_timestamp() arrival_time, input_file_name() source_file
  FROM orders_raw_temp
)

In [0]:
%sql
SELECT * FROM orders_tmp
LIMIT 2

order_id,order_timestamp,customer_id,quantity,total,books,_rescued_data,arrival_time,source_file
6341,1657520256,C00788,1,41,"List(List(B08, 1, 41))",,2025-05-06T04:41:21.075+0000,dbfs:/mnt/demo-datasets/bookstore/orders-raw/01.parquet
6342,1657520256,C00788,1,41,"List(List(B08, 1, 41))",,2025-05-06T04:41:21.075+0000,dbfs:/mnt/demo-datasets/bookstore/orders-raw/01.parquet


## Creating Bronze Table

In [0]:
(spark.table("orders_tmp")
      .writeStream
      .format("delta")
      .option("checkpointLocation", "dbfs:/mnt/demo/checkpoints/orders_bronze")
      .outputMode("append")
      .table("orders_bronze"))

Out[14]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f2490af1c70>

In [0]:
%sql
SELECT count(*) FROM orders_bronze

count(1)
2000


In [0]:
load_new_data()

Loading 03.parquet file to the bookstore dataset


In [0]:
%sql
SELECT count(*) FROM orders_bronze

count(1)
3000



#### Creating Static Lookup Table

In [0]:
(spark.read
      .format("json")
      .load(f"{dataset_bookstore}/customers-json")
      .createOrReplaceTempView("customers_lookup"))

In [0]:
%sql
SELECT * FROM customers_lookup
LIMIT 2

customer_id,email,profile,updated
C00301,thomas.lane@gmail.com,"{""first_name"":""Thomas"",""last_name"":""Lane"",""gender"":""Male"",""address"":{""street"":""06 Boulevard Victor Hugo"",""city"":""Paris"",""country"":""France""}}",2021-12-14T23:15:43.375Z
C00302,ocolegatele@blogger.com,"{""first_name"":""Odilia"",""last_name"":""Colegate"",""gender"":""Female"",""address"":{""street"":""07 Sommers Parkway"",""city"":""Lyon"",""country"":""France""}}",2021-12-14T23:15:43.375Z


## Creating Silver Table

In [0]:
(spark.readStream
  .table("orders_bronze")
  .createOrReplaceTempView("orders_bronze_tmp"))

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW orders_enriched_tmp AS (
  SELECT order_id, quantity, o.customer_id, c.profile:first_name as f_name, c.profile:last_name as l_name,
         cast(from_unixtime(order_timestamp, 'yyyy-MM-dd HH:mm:ss') AS timestamp) order_timestamp, books
  FROM orders_bronze_tmp o
  INNER JOIN customers_lookup c
  ON o.customer_id = c.customer_id
  WHERE quantity > 0)

In [0]:
(spark.table("orders_enriched_tmp")
      .writeStream
      .format("delta")
      .option("checkpointLocation", "dbfs:/mnt/demo/checkpoints/orders_silver")
      .outputMode("append")
      .table("orders_silver"))

Out[24]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f2490ae37c0>

In [0]:
%sql
SELECT * FROM orders_silver LIMIT 3

order_id,quantity,customer_id,f_name,l_name,order_timestamp,books
7397,1,C00494,Sherlocke,Fairbard,2022-07-12T17:13:57.000+0000,"List(List(B08, 1, 41))"
7396,1,C00494,Sherlocke,Fairbard,2022-07-12T17:13:57.000+0000,"List(List(B08, 1, 41))"
8397,1,C00494,Sherlocke,Fairbard,2022-07-12T17:13:57.000+0000,"List(List(B08, 1, 41))"


In [0]:
%sql
SELECT COUNT(*) FROM orders_silver

count(1)
3000


In [0]:
load_new_data()

Loading 04.parquet file to the bookstore dataset


In [0]:
%sql
SELECT COUNT(*) FROM orders_silver

count(1)
4000


## Creating Gold Table

In [0]:
(spark.readStream
  .table("orders_silver")
  .createOrReplaceTempView("orders_silver_tmp"))

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW daily_customer_books_tmp AS (
  SELECT customer_id, f_name, l_name, date_trunc("DD", order_timestamp) order_date, sum(quantity) books_counts
  FROM orders_silver_tmp
  GROUP BY customer_id, f_name, l_name, date_trunc("DD", order_timestamp)
  )

In [0]:
(spark.table("daily_customer_books_tmp")
      .writeStream
      .format("delta")
      .outputMode("complete")
      .option("checkpointLocation", "dbfs:/mnt/demo/checkpoints/daily_customer_books")
      .trigger(availableNow=True)
      .table("daily_customer_books"))

Out[33]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f2490af1d90>

In [0]:
%sql
SELECT * FROM daily_customer_books LIMIT 2

customer_id,f_name,l_name,order_date,books_counts
C01120,Carrissa,Nairn,2022-07-30T00:00:00.000+0000,4
C01011,Nickey,McBeith,2022-07-16T00:00:00.000+0000,4


In [0]:
load_new_data()


## Stopping active streams

In [0]:
for s in spark.streams.active:
    print("Stopping stream: " + s.id)
    s.stop()
    s.awaitTermination()

Stopping stream: 1b1b3d6a-a004-45c0-9e6e-708ad38ed8fe
Stopping stream: 6e9e9772-926d-42bf-bdc2-2e1decf70bb5
Stopping stream: 7565cbf3-f054-464e-9092-d3a720159c98
