# Streaming

Sensors, IoT devices, social networks, and online transactions all generate data that needs to be monitored constantly and acted upon quickly. As a result, the need for large-scale, real-time stream processing is more evident than ever before. This tutorial module introduces Structured Streaming, the main model for handling streaming datasets in Apache Spark. *In Structured Streaming, a data stream is treated as a table that is being continuously appended*. This leads to a stream processing model that is very similar to a batch processing model. You express your streaming computation as a standard batch-like query as on a static table, but Spark runs it as an incremental query on the unbounded input table.


## Load data

Databricks has sample event data as files in`/databricks-datasets/structured-streaming/events/` to use to build a Structured Streaming application. Let's take a look at the contents of this directory.

This would work in databricks: 

    %fs ls /databricks-datasets/structured-streaming/events/

In [3]:
!ls datasets

data_geo.csv  file-18.json  file-28.json  file-38.json	file-48.json
file-0.json   file-19.json  file-29.json  file-39.json	file-49.json
file-1.json   file-2.json   file-3.json   file-4.json	file-5.json
file-10.json  file-20.json  file-30.json  file-40.json	file-6.json
file-11.json  file-21.json  file-31.json  file-41.json	file-7.json
file-12.json  file-22.json  file-32.json  file-42.json	file-8.json
file-13.json  file-23.json  file-33.json  file-43.json	file-9.json
file-14.json  file-24.json  file-34.json  file-44.json	iot_devices.json
file-15.json  file-25.json  file-35.json  file-45.json	people.json
file-16.json  file-26.json  file-36.json  file-46.json
file-17.json  file-27.json  file-37.json  file-47.json


Each line in the file contains a JSON record with two fields: `time` and `action`.

```
{"time":1469501675,"action":"Open"}
{"time":1469501678,"action":"Close"}{"time":1469501680,"action":"Open"}{"time":1469501685,"action":"Open"}{"time":1469501686,"action":"Open"}{"time":1469501689,"action":"Open"}{"time":1469501691,"action":"Open"}{"time":1469501694,"action":"Open"}{"time":1469501696,"action":"Close"}{"time":1469501702,"action":"Open"}{"time":1469501703,"action":"Open"}{"time":1469501704,"action":"Open"}
```

## Initialize the stream

Since the sample data is just a static set of files, you can emulate a stream from them by reading one file at a time, in the chronological order in which they were created:

```python
streamingInputDF = (
  spark.readStream\
    .schema(jsonSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)
```

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import window

In [None]:
inputPath = "/databricks-datasets/structured-streaming/events/"

# Define the schema to speed up processing
jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

streamingInputDF = (
  spark.readStream\
    .schema(jsonSchema)\
    .option("maxFilesPerTrigger", 1)\
    .json(inputPath)
)

streamingCountsDF = (
  streamingInputDF
    .groupBy(
      streamingInputDF.action,
      window(streamingInputDF.time, "1 hour"))
    .count()
)

You start a streaming computation by defining a sink and starting it. In our case, to query the counts interactively, set the complete set of 1 hour counts to be in an in-memory table. The command below essentially simulates a time series generator in real time, using the files in the directory.

In [None]:
query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only)
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-5892011683551895>, line 7[0m
[1;32m      1[0m query [38;5;241m=[39m (
[1;32m      2[0m   streamingCountsDF
[1;32m      3[0m     [38;5;241m.[39mwriteStream
[1;32m      4[0m     [38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mmemory[39m[38;5;124m"[39m)        [38;5;66;03m# memory = store in-memory table (for testing only)[39;00m
[1;32m      5[0m     [38;5;241m.[39mqueryName([38;5;124m"[39m[38;5;124mcounts[39m[38;5;124m"[39m)     [38;5;66;03m# counts = name of the in-memory table[39;00m
[1;32m      6[0m     [38;5;241m.[39moutputMode([38;5;124m"[39m[38;5;124mcomplete[39m[38;5;124m"[39m)  [38;5;66;03m# complete = all the counts should be in the table[39;00m
[0;32m----> 7[0m     [38;5;241m.[39mstart()
[1;32m      8[0m )

File [0;32m/databricks/pyth

`query` is a handle to the streaming query named `counts` that is running in the background. This query continuously picks up files and updates the windowed counts. The command window reports the status of the stream.

Sadly, we cannot proceed in databricks because we cannot start a continuously updating streaming process. But if we could, we could periodically query the counts aggregation:

```sql
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action
```

The query would change every time you execute it to reflect the action count based on the input stream of data.