-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

# Stream-Static Joins

In this lesson, you'll join streaming heart rate data with the completed workouts table.

We'll be creating the table **`workout_bpm`** in our architectural diagram.

This pattern will take advantage of Delta Lake's ability to guarantee that the latest version of a table is returned each time it is queried.



<img src="https://files.training.databricks.com/images/ade/ADE_arch_workout_bpm.png" width="60%" />

## Learning Objectives
By the end of this lesson, students will be able to:
- Describe guarantees around versioning and matching for stream-static joins
- Leverage Spark SQL and PySpark to process stream-static joins

## Setup

**NOTE**: The setup script includes logic to define a **`user_lookup`** table required for the join below.

In [0]:
%run ../Includes/Classroom-Setup-4.5

Set up your streaming temp view. Note that we will only be streaming from **one** of our tables. The **`completed_workouts`** table is no longer streamable as it breaks the requirement of an ever-appending source for Structured Streaming. However, when performing a stream-static join with a Delta table, each batch will confirm that the newest version of the static Delta table is being used.

In [0]:
(spark.readStream
      .table("heart_rate_silver")
      .createOrReplaceTempView("TEMP_heart_rate_silver"))

## Perform Stream-Static Join to Align Workouts to Heart Rate Recordings

Below we'll configure our query to join our stream to our **`completed_workouts`** table.

Note that our heart rate recordings only have **`device_id`**, while our workouts use **`user_id`** as the unique identifier. We'll need to use our **`user_lookup`** table to match these values. Because all tables are Delta Lake tables, we're guaranteed to get the latest version of each table during each microbatch transaction.

Importantly, our devices occasionally send messages with negative recordings, which represent a potential error in the recorded values. We'll need to define predicate conditions to ensure that only positive recordings are processed.

In [0]:
%sql 
CREATE OR REPLACE TEMP VIEW TEMP_workout_bpm AS
  SELECT d.user_id, d.workout_id, d.session_id, time, heartrate
  FROM TEMP_heart_rate_silver c
  INNER JOIN (
    SELECT a.user_id, b.device_id, workout_id, session_id, start_time, end_time
    FROM completed_workouts a
    INNER JOIN user_lookup b
    ON a.user_id = b.user_id) d
  ON c.device_id = d.device_id AND time BETWEEN start_time AND end_time
  WHERE c.bpm_check = 'OK'

Note that the streaming portion of the join drives this join process. As currently implemented, this means that records from the **`heart_rate_silver`** table will only appear in our results table if a matching record has been written to the **`completed_workouts`** table prior to processing this query.

Stream-static joins are not stateful, meaning that we cannot configure our query to wait for records to appear in the right side of the join prior to calculating the results. When leveraging stream-static joins, make sure to be aware of potential limitations for unmatched records. (Note that a separate batch job could be configured to find and insert records that were missed during incremental execution).

### Write Stream in Append Mode

Below, we'll use our streaming temp view from above to insert new values into our **`workout_bpm`** table.

In [0]:
def process_workout_bpm():
    query = (spark.table("TEMP_workout_bpm")
                  .writeStream
                  .format("delta")
                  .outputMode("append")
                  .option("checkpointLocation", f"{DA.paths.checkpoints}/workout_bpm")
                  .trigger(availableNow=True)
                  .table("workout_bpm"))
    
    query.awaitTermination()
    
process_workout_bpm()

Explore this results table below.

In [0]:
%sql

SELECT COUNT(*)
FROM workout_bpm

count(1)
8170


In [0]:
%sql

SELECT * FROM workout_bpm

user_id,workout_id,session_id,time,heartrate
40872,8,76,2019-12-01T20:39:04.000+0000,89.62777889680326
40872,8,76,2019-12-01T20:40:21.000+0000,91.07456265977248
40872,8,76,2019-12-01T20:45:20.000+0000,86.35837279045256
40872,8,76,2019-12-01T20:52:00.000+0000,101.1583290342824
24018,33,147,2019-12-01T06:58:34.000+0000,111.77176684015244
24018,33,147,2019-12-01T07:06:43.000+0000,95.27407344716973
32018,18,362,2019-12-01T07:11:10.000+0000,78.52475309663265
27306,27,161,2019-12-01T07:12:40.000+0000,98.90368346751713
24018,33,147,2019-12-01T07:16:12.000+0000,111.80430684441323
32018,18,362,2019-12-01T07:21:51.000+0000,83.3097908533361


If desired, process another batch through all tables and update these results.

In [0]:
DA.daily_stream.load()          # Load one new day for DA.paths.source_daily
DA.process_bronze()             # Process through the bronze table
DA.process_heart_rate_silver()  # Process the heart_rate_silver table
DA.process_workouts_silver()    # Process the workouts_silver table
DA.process_completed_workouts() # Process the completed_workouts table

process_workout_bpm()

In [0]:
%sql

SELECT COUNT(*)
FROM workout_bpm

count(1)
18272


Run the following cell to delete the tables and files associated with this lesson.

In [0]:
DA.cleanup()

-sandbox
&copy; 2022 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>