# Multi-hop Architecture

Here it is the schema for the bookstore dataset used in this notebook:

![bookstore dataset schema](../Includes/images/image1.png)

In [0]:
%run ../Includes/Copy-Datasets

## Exploring The Source Directory

In [0]:
files = dbutils.fs.ls(f"{dataset_bookstore}/orders-raw")
display(files)

There are 3 parquet files in the source directory.

## Auto Loader

The process starts creating an Auto Loader against the sorce directory. Configuring a `readStream`:
* On the parquet source
* Using Auto Loader with schema inference
* Once configured, it registers a streaming temporary view to data transformation in Spark SQL

In [0]:
(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", "dbfs:/mnt/demo/checkpoints/orders_raw")
    .load(f"{dataset_bookstore}/orders-raw")
    .createOrReplaceTempView("orders_raw_temp"))

The stream has been created, but it is not active yet until a `display` or `writeStream` execute.

## Enriching Raw Data

In [0]:
%sql
-- Enriching raw data with additional metadata describing the source file and the time it was ingested
CREATE OR REPLACE TEMPORARY VIEW orders_tmp AS (
  SELECT *, current_timestamp() arrival_time, input_file_name() source_file
  FROM orders_raw_temp
)

That information is useful for troubleshooting errors if corrupted data is encountered.

In [0]:
%sql
SELECT * FROM orders_tmp

The stream is active now, and data have been succesfully inserted with the metadata (`arrival_time` and `source_file`)

We interrupt this stream for now.

## Creating Bronze Table

Passing this enriched data back to PySpark API to process an incremental write to a Delta Lake table (`orders_bronze`):

In [0]:
(spark.table("orders_tmp")
      .writeStream
      .format("delta")
      .option("checkpointLocation", "dbfs:/mnt/demo/checkpoints/orders_bronze")
      .outputMode("append")
      .table("orders_bronze"))

The write stream is active and it has started processing the data.

In [0]:
%sql
SELECT COUNT(*) FROM orders_bronze

3000 records have been written corresponding to the three parquet files containing 1000 records each.

In [0]:
# Triggering another file arrival
load_new_data()

New data is immediately detected by the streaming query:

![](Screenshot 2025-05-28 154528.png)

In [0]:
%sql
SELECT COUNT(*) FROM orders_bronze

1000 new records has been added.

### Creating Static Lookup Table

To create the silver table, first it is needed a **static lookup table** to join it with `bronze table`

In [0]:
# Creating a customers static temporary view from JSON files
(spark.read
      .format("json")
      .load(f"{dataset_bookstore}/customers-json")
      .createOrReplaceTempView("customers_lookup"))

In [0]:
%sql
SELECT * FROM customers_lookup

There are three columns:
* `customer_id`: use to join the data
* `email`
* `profile`: as a cmplex JSON object

## Creating Silver Table

To work on the bronze data in the **sliver layer**, a streaming temporary view will be created against the bronze table.

In [0]:
(spark.readStream
  .table("orders_bronze")
  .createOrReplaceTempView("orders_bronze_tmp"))

In this silver level, several enrichments and checks will be done:
* `JOIN` order data with the customers information to add customers names
* Parse the order timestamp from Unix timestamp into human readable format
* Exclude any order with no items

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW orders_enriched_tmp AS (
  SELECT order_id, quantity, o.customer_id, c.profile:first_name as f_name, c.profile:last_name as l_name,
       cast(from_unixtime(order_timestamp, 'yyyy-MM-dd HH:mm:ss') AS timestamp) order_timestamp, books
  FROM orders_bronze_tmp o
  INNER JOIN customers_lookup c
  ON o.customer_id = c.customer_id
  WHERE quantity > 0)

`writeStream()` for this orders enriched data into a silver table:

In [0]:
(spark.table("orders_enriched_tmp")
      .writeStream
      .format("delta")
      .option("checkpointLocation", "dbfs:/mnt/demo/checkpoints/orders_silver")
      .outputMode("append")
      .table("orders_silver"))

Data has been processed with the stream.

In [0]:
%sql
SELECT * FROM orders_silver

In [0]:
%sql
SELECT COUNT(*) FROM orders_silver

All 4000 records have been succesfully processed.

In [0]:
# Triggering a new file
load_new_data()

In [0]:
%sql
SELECT COUNT(*) FROM orders_silver

## Creating Gold Table

To create the gold layer, a stream of data from the silver table into a streaming temporary view is needed.

In [0]:
(spark.readStream
  .table("orders_silver")
  .createOrReplaceTempView("orders_silver_tmp"))

Writing another stream to create an aggregate gold table for the daily number of books for each customer:

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW daily_customer_books_tmp AS (
  SELECT customer_id, f_name, l_name, date_trunc("DD", order_timestamp) order_date, sum(quantity) books_counts
  FROM orders_silver_tmp
  GROUP BY customer_id, f_name, l_name, date_trunc("DD", order_timestamp)
)

Writting this aggregated data into a gold table:

In [0]:
(spark.table("daily_customer_books_tmp")
      .writeStream
      .format("delta")
      .outputMode("complete")
      .option("checkpointLocation", "dbfs:/mnt/demo/checkpoints/daily_customer_books")
      .trigger(availableNow=True)
      .table("daily_customer_books"))

The stream stopped on its own after processing all available data in micro batches, due to the use of `availableNow` trigger option. This way, streaming and batch workloads can be combined in the same pipeline.

The `complete` output mode allows to rewwrite the updated aggregation each time the logic runs. However, structured streaming assumes data is only being appended in the upstream tables. Once a table is updated or overwritten, it is not longer valid for streaming.

Therefore, it is not possible to read a stream from this gold table. To change this behavior, some options like `ignoreChanges` can be set, but they have other limitations.

In [0]:
%sql
SELECT * FROM daily_customer_books

Customers currently have books counts between 5 and 10.

In [0]:
# Adding the remaining files into the source directory
load_new_data(all=True)

Data will be now propagated from the source directory into the bronze, silver, and gold layers. However, for the gold layer, the final query must be rerun to update the gold layer since this query is configured as a batch job using the trigger `availableNow`.

In [0]:
%sql
SELECT * FROM daily_customer_books

Now customers have more books counts after processing the last changes.

## Stopping active streams

In [0]:
for s in spark.streams.active:
    print("Stopping stream: " + s.id)
    s.stop()
    s.awaitTermination()