-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

# Streaming Query

##### Objectives
1. Build streaming DataFrames
1. Display streaming query results
1. Write streaming query results
1. Monitor streaming query

##### Classes
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.html" target="_blank">DataStreamReader</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.html" target="_blank">DataStreamWriter</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.html" target="_blank">StreamingQuery</a>

In [0]:
%run ../Includes/Classroom-Setup

Python interpreter will be restarted.
Python interpreter will be restarted.


Resetting the learning environment...
...dropping the database "da_sergio_salgado_4613_asp"...(2 seconds)
...removing the working directory "dbfs:/mnt/dbacademy-users/sergio.salgado@n.world/apache-spark-programming-with-databricks"...(0 seconds)

Skipping install of existing datasets to "dbfs:/mnt/dbacademy-datasets/apache-spark-programming-with-databricks/v03"

Validating the locally installed datasets...(5 seconds)

Predefined tables in "da_sergio_salgado_4613_asp":
  -none-

Predefined paths variables:
  DA.paths.user_db:     dbfs:/mnt/dbacademy-users/sergio.salgado@n.world/apache-spark-programming-with-databricks/database.db
  DA.paths.datasets:    dbfs:/mnt/dbacademy-datasets/apache-spark-programming-with-databricks/v03
  DA.paths.working_dir: dbfs:/mnt/dbacademy-users/sergio.salgado@n.world/apache-spark-programming-with-databricks
  DA.paths.checkpoints: dbfs:/mnt/dbacademy-users/sergio.salgado@n.world/apache-spark-programming-with-databricks/_checkpoints
  DA.paths.sales:       

### Build streaming DataFrames

Obtain an initial streaming DataFrame from a Delta-format file source.

In [0]:
df = (spark
      .readStream
      .option("maxFilesPerTrigger", 1)
      .format("delta")
      .load(DA.paths.events)
     )

df.isStreaming

Out[4]: True

Apply some transformations, producing new streaming DataFrames.

In [0]:
from pyspark.sql.functions import col, approx_count_distinct, count

email_traffic_df = (df
                    .filter(col("traffic_source") == "email")
                    .withColumn("mobile", col("device").isin(["iOS", "Android"]))
                    .select("user_id", "event_timestamp", "mobile")
                   )

email_traffic_df.isStreaming

Out[7]: True

### Write streaming query results

Take the final streaming DataFrame (our result table) and write it to a file sink in "append" mode.

In [0]:
checkpoint_path = f"{DA.paths.checkpoints}/email_traffic"
output_path = f"{DA.paths.working_dir}/email_traffic/output"

devices_query = (email_traffic_df
                 .writeStream
                 .outputMode("append")
                 .format("delta")
                 .queryName("email_traffic")
                 .trigger(processingTime="1 second")
                 .option("checkpointLocation", checkpoint_path)
                 .start(output_path)
                )

### Monitor streaming query

Use the streaming query "handle" to monitor and control it.

In [0]:
devices_query.id

Out[9]: '44c6beac-c9a7-46b4-bb8f-44587510e9b3'

In [0]:
devices_query.status

Out[10]: {'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [0]:
devices_query.lastProgress

In [0]:
import time
# Run for 10 more seconds
time.sleep(10) 

devices_query.stop()

In [0]:
devices_query.awaitTermination()

### Classroom Cleanup
Run the cell below to clean up resources.

In [0]:
DA.cleanup()

Resetting the learning environment...
...dropping the database "da_sergio_salgado_4613_asp"...(0 seconds)
...removing the working directory "dbfs:/mnt/dbacademy-users/sergio.salgado@n.world/apache-spark-programming-with-databricks"...(1 seconds)

Validating the locally installed datasets...

-sandbox
&copy; 2022 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>