# Structured Streaming using the Python DataFrames API

Apache Spark includes a high-level stream processing API, [Structured Streaming](http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html). In this notebook we take a quick look at how to use the DataFrame API to build Structured Streaming applications. We want to compute real-time metrics like running counts and windowed counts on a stream of timestamped actions (e.g. Open, Close, etc).

To run this notebook, import it and attach it to a Spark cluster.

## Sample Data
We have some sample action data as files in `/databricks-datasets/structured-streaming/events/` which we are going to use to build this appication. Let's take a look at the contents of this directory.

In [0]:
# Look at the content of the following folder: /databricks-datasets/structured-streaming/events/
# What do you see?
display(dbutils.fs.ls("/databricks-datasets/structured-streaming/events/"))



path,name,size,modificationTime
dbfs:/databricks-datasets/structured-streaming/events/file-0.json,file-0.json,72530,1596690605000
dbfs:/databricks-datasets/structured-streaming/events/file-1.json,file-1.json,72961,1596690605000
dbfs:/databricks-datasets/structured-streaming/events/file-10.json,file-10.json,73025,1596690605000
dbfs:/databricks-datasets/structured-streaming/events/file-11.json,file-11.json,72999,1596690605000
dbfs:/databricks-datasets/structured-streaming/events/file-12.json,file-12.json,72987,1596690605000
dbfs:/databricks-datasets/structured-streaming/events/file-13.json,file-13.json,73006,1596690605000
dbfs:/databricks-datasets/structured-streaming/events/file-14.json,file-14.json,73003,1596690605000
dbfs:/databricks-datasets/structured-streaming/events/file-15.json,file-15.json,73007,1596690605000
dbfs:/databricks-datasets/structured-streaming/events/file-16.json,file-16.json,72978,1596690605000
dbfs:/databricks-datasets/structured-streaming/events/file-17.json,file-17.json,73008,1596690605000


There are about 50 JSON files in the directory. Let's see what each JSON file contains.

In [0]:
# Look at the functions head in dbutils
# Open one file
print(dbutils.fs.head("/databricks-datasets/structured-streaming/events/file-0.json"))


[Truncated to first 65536 bytes]
{"time":1469501107,"action":"Open"}
{"time":1469501147,"action":"Open"}
{"time":1469501202,"action":"Open"}
{"time":1469501219,"action":"Open"}
{"time":1469501225,"action":"Open"}
{"time":1469501234,"action":"Open"}
{"time":1469501245,"action":"Open"}
{"time":1469501246,"action":"Open"}
{"time":1469501248,"action":"Open"}
{"time":1469501256,"action":"Open"}
{"time":1469501264,"action":"Open"}
{"time":1469501266,"action":"Open"}
{"time":1469501267,"action":"Open"}
{"time":1469501269,"action":"Open"}
{"time":1469501271,"action":"Open"}
{"time":1469501282,"action":"Open"}
{"time":1469501285,"action":"Open"}
{"time":1469501291,"action":"Open"}
{"time":1469501297,"action":"Open"}
{"time":1469501303,"action":"Open"}
{"time":1469501322,"action":"Open"}
{"time":1469501335,"action":"Open"}
{"time":1469501344,"action":"Open"}
{"time":1469501346,"action":"Open"}
{"time":1469501349,"action":"Open"}
{"time":1469501357,"action":"Open"}
{"time":1469501366,"action":"Op

Each line in the file contains JSON record with two fields - `time` and `action`. Let's try to analyze these files interactively.

## Batch/Interactive Processing
The usual first step in attempting to process the data is to interactively query the data. Let's define a static DataFrame on the files, and give it a table name.

In [0]:
from pyspark.sql.types import *

inputPath = "/databricks-datasets/structured-streaming/events/"

# Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)
jsonSchema = StructType(
  [ StructField("time", TimestampType(), True),
   StructField("action", StringType(), True) ]
)



In [0]:
# Read all json files, taking into account the defined schema, and display the content 
df = (spark.read
          .schema(jsonSchema)
          .json(inputPath))
display(df) 
df.printSchema()


time,action
2016-07-28T04:19:28.000Z,Close
2016-07-28T04:19:28.000Z,Close
2016-07-28T04:19:29.000Z,Open
2016-07-28T04:19:31.000Z,Close
2016-07-28T04:19:31.000Z,Open
2016-07-28T04:19:31.000Z,Open
2016-07-28T04:19:32.000Z,Close
2016-07-28T04:19:33.000Z,Close
2016-07-28T04:19:35.000Z,Close
2016-07-28T04:19:36.000Z,Open


root
 |-- time: timestamp (nullable = true)
 |-- action: string (nullable = true)



- Compare the dates from the output without schema and with it. 
- Did you notice that inputPath is a folder?

When you read the JSON files without defining a schema, Spark tries to guess the column types automatically.
Sometimes, it interprets the time column as a string, meaning it treats the dates like plain text instead of actual timestamps.
That makes it harder to do proper date calculations or sorting.

But when you read the files with an explicit schema (for example, defining time as TimestampType), Spark immediately understands that this column contains date/time values.

In [0]:
# Calculate the total number of 'Open' and 'Close' actions 
df.groupBy("action").count().show()


+------+-----+
|action|count|
+------+-----+
| Close|50000|
|  Open|50000|
+------+-----+



In [0]:
# Determine min and max time
from pyspark.sql.functions import *

df.select(min(df.time), max(df.time)).show()

+-------------------+-------------------+
|          min(time)|          max(time)|
+-------------------+-------------------+
|2016-07-26 02:45:07|2016-07-28 06:48:19|
+-------------------+-------------------+



In [0]:
# Calculate the number of "open" and "close" actions with one hour windows: staticCountsDF
# Look at groupBy(..., window) function
staticCountsDF = df.groupBy(
    window("time", "1 hour"),
    df.action
  ).count()   
staticCountsDF.show()            



+--------------------+------+-----+
|              window|action|count|
+--------------------+------+-----+
|{2016-07-27 03:00...|  Open| 1014|
|{2016-07-27 03:00...| Close| 1025|
|{2016-07-28 04:00...| Close|  960|
|{2016-07-27 23:00...|  Open| 1008|
|{2016-07-27 23:00...| Close| 1011|
|{2016-07-26 13:00...| Close| 1028|
|{2016-07-26 11:00...|  Open|  991|
|{2016-07-27 12:00...| Close| 1035|
|{2016-07-26 14:00...|  Open|  991|
|{2016-07-27 20:00...|  Open| 1005|
|{2016-07-28 00:00...|  Open| 1000|
|{2016-07-28 06:00...| Close|  191|
|{2016-07-26 14:00...| Close|  994|
|{2016-07-27 13:00...| Close|  986|
|{2016-07-27 04:00...|  Open|  995|
|{2016-07-27 20:00...| Close| 1025|
|{2016-07-27 13:00...|  Open| 1008|
|{2016-07-26 13:00...|  Open| 1006|
|{2016-07-28 04:00...|  Open|  825|
|{2016-07-26 11:00...| Close| 1028|
+--------------------+------+-----+
only showing top 20 rows


In [0]:
# Make this window a sliding window (30 minutes overlap): staticCountsSW
staticCountsSW = df.groupBy(
    window("time", "1 hour", "30 minutes"),
    "action"
).count()
staticCountsSW.show()

+--------------------+------+-----+
|              window|action|count|
+--------------------+------+-----+
|{2016-07-26 13:30...| Close| 1042|
|{2016-07-27 20:30...| Close| 1001|
|{2016-07-27 03:30...| Close| 1079|
|{2016-07-27 03:00...|  Open| 1014|
|{2016-07-27 03:00...| Close| 1025|
|{2016-07-28 04:00...| Close|  960|
|{2016-07-27 23:00...|  Open| 1008|
|{2016-07-27 23:00...| Close| 1011|
|{2016-07-28 06:30...| Close|   33|
|{2016-07-26 13:00...| Close| 1028|
|{2016-07-28 05:30...| Close|  422|
|{2016-07-26 11:00...|  Open|  991|
|{2016-07-27 20:30...|  Open| 1008|
|{2016-07-27 12:00...| Close| 1035|
|{2016-07-26 14:00...|  Open|  991|
|{2016-07-27 20:00...|  Open| 1005|
|{2016-07-27 23:30...| Close| 1010|
|{2016-07-28 03:30...| Close|  997|
|{2016-07-26 10:30...| Close| 1040|
|{2016-07-28 00:00...|  Open| 1000|
+--------------------+------+-----+
only showing top 20 rows


In [0]:
# Register staticCountsDF (createOrReplaceTempView) as table 'static_counts'
staticCountsDF.createOrReplaceTempView("static_counts")

spark.sql("SELECT * FROM static_counts").show()


+--------------------+------+-----+
|              window|action|count|
+--------------------+------+-----+
|{2016-07-27 03:00...|  Open| 1014|
|{2016-07-27 03:00...| Close| 1025|
|{2016-07-28 04:00...| Close|  960|
|{2016-07-27 23:00...|  Open| 1008|
|{2016-07-27 23:00...| Close| 1011|
|{2016-07-26 13:00...| Close| 1028|
|{2016-07-26 11:00...|  Open|  991|
|{2016-07-27 12:00...| Close| 1035|
|{2016-07-26 14:00...|  Open|  991|
|{2016-07-27 20:00...|  Open| 1005|
|{2016-07-28 00:00...|  Open| 1000|
|{2016-07-28 06:00...| Close|  191|
|{2016-07-26 14:00...| Close|  994|
|{2016-07-27 13:00...| Close|  986|
|{2016-07-27 04:00...|  Open|  995|
|{2016-07-27 20:00...| Close| 1025|
|{2016-07-27 13:00...|  Open| 1008|
|{2016-07-26 13:00...|  Open| 1006|
|{2016-07-28 04:00...|  Open|  825|
|{2016-07-26 11:00...| Close| 1028|
+--------------------+------+-----+
only showing top 20 rows


Now we can directly use SQL to query the table.

In [0]:
%sql
-- Count all Open and Close actions in the table static_counts  
SELECT
SUM(CASE WHEN action = 'Open' THEN count ELSE 0 END) as nbr_open,
SUM(CASE WHEN action = 'Close' THEN count ELSE 0 END) as nbr_close
FROM static_counts



nbr_open,nbr_close
50000,50000


In [0]:
%sql
-- How many actions (Close and Open separately) is within each time window (in the table static_counts)
-- Make a plot
SELECT
  window.start AS start_time,
  window.end   AS end_time,
  SUM(CASE WHEN action = 'Open'  THEN count ELSE 0 END) AS open_count,
  SUM(CASE WHEN action = 'Close' THEN count ELSE 0 END) AS close_count
FROM static_counts
GROUP BY window.start, window.end
ORDER BY start_time;


start_time,end_time,open_count,close_count
2016-07-26T02:00:00.000Z,2016-07-26T03:00:00.000Z,179,11
2016-07-26T03:00:00.000Z,2016-07-26T04:00:00.000Z,1001,344
2016-07-26T04:00:00.000Z,2016-07-26T05:00:00.000Z,999,815
2016-07-26T05:00:00.000Z,2016-07-26T06:00:00.000Z,1000,1003
2016-07-26T06:00:00.000Z,2016-07-26T07:00:00.000Z,993,1011
2016-07-26T07:00:00.000Z,2016-07-26T08:00:00.000Z,1008,989
2016-07-26T08:00:00.000Z,2016-07-26T09:00:00.000Z,996,985
2016-07-26T09:00:00.000Z,2016-07-26T10:00:00.000Z,1000,983
2016-07-26T10:00:00.000Z,2016-07-26T11:00:00.000Z,1007,1022
2016-07-26T11:00:00.000Z,2016-07-26T12:00:00.000Z,991,1028


Note the two ends of the graph. The close actions are generated such that they are after the corresponding open actions, so there are more "opens" in the beginning and more "closes" in the end.

**NOTE:** Due to the Databricks environment, it is needed to also create temp tables in the unity catalog by running the below cell before starting the streaming processing. Additionally the below cell will need to be re-run to clear out the temp tables prior to re-running any  streaming dfs.

In [0]:
# Define the name of the new catalog
catalog = 'workspace'

# define variables for the trips data
schema = 'default'
volume = 'checkpoints'

# Path for file operations
path_volume = f'/Volumes/{catalog}/{schema}/{volume}'

# Three-part names for SQL operations
path_table = f'{catalog}.{schema}'
volume_name = f'{catalog}.{schema}.{volume}'

# Drop old temp volume (use three-part name, not path)
spark.sql(f"DROP VOLUME IF EXISTS {volume_name}")

# Create new temp volume
spark.sql(f"CREATE VOLUME IF NOT EXISTS {volume_name}")

# Define tmp dir for each stream
tmp_input = f"{path_volume}/input"
tmp_streaming_counts = f"{path_volume}/streaming_counts"
tmp_streaming_counts_filter = f"{path_volume}/streaming_counts_filter"
tmp_streaming_counts_run = f"{path_volume}/streaming_counts_run"

## Demo: Stream Processing 
Now that we have analyzed the data interactively, let's convert this to a streaming query that continuously updates as data comes. Since we just have a static set of files, we are going to emulate a stream from them by reading one file at a time, in the chronological order they were created. The query we have to write is pretty much the same as the interactive query above.


In [0]:
from pyspark.sql.functions import *

# Read data from a file
# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(jsonSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

# Do some transformations
# Same query as staticInputDF
streamingCountsDF = (                 
  streamingInputDF
    .groupBy(
      streamingInputDF.action, 
      window(streamingInputDF.time, "1 hour"))
    .count()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

True

In [0]:
# Display input data
streamingInputDF.display(checkpointLocation=tmp_input)

time,action
2016-07-26T02:45:07.000Z,Open
2016-07-26T02:45:47.000Z,Open
2016-07-26T02:46:42.000Z,Open
2016-07-26T02:46:59.000Z,Open
2016-07-26T02:47:05.000Z,Open
2016-07-26T02:47:14.000Z,Open
2016-07-26T02:47:25.000Z,Open
2016-07-26T02:47:26.000Z,Open
2016-07-26T02:47:28.000Z,Open
2016-07-26T02:47:36.000Z,Open


In [0]:
# Display transformed data
streamingCountsDF.display(checkpointLocation=tmp_streaming_counts)

action,window,count


In [0]:
# Add aditional filter to transformed dataframe
streamingCountsDF.filter(streamingCountsDF.action == 'Open').display(checkpointLocation=tmp_streaming_counts_filter)

action,window,count
Open,"List(2016-07-26T18:00:00.000Z, 2016-07-26T19:00:00.000Z)",1004
Open,"List(2016-07-27T04:00:00.000Z, 2016-07-27T05:00:00.000Z)",995
Open,"List(2016-07-27T05:00:00.000Z, 2016-07-27T06:00:00.000Z)",986
Open,"List(2016-07-26T05:00:00.000Z, 2016-07-26T06:00:00.000Z)",1000
Open,"List(2016-07-26T11:00:00.000Z, 2016-07-26T12:00:00.000Z)",991
Open,"List(2016-07-26T10:00:00.000Z, 2016-07-26T11:00:00.000Z)",1007
Open,"List(2016-07-27T07:00:00.000Z, 2016-07-27T08:00:00.000Z)",998
Open,"List(2016-07-27T18:00:00.000Z, 2016-07-27T19:00:00.000Z)",995
Open,"List(2016-07-27T19:00:00.000Z, 2016-07-27T20:00:00.000Z)",1007
Open,"List(2016-07-28T03:00:00.000Z, 2016-07-28T04:00:00.000Z)",996


As you can see, `streamingCountsDF` is a streaming Dataframe (`streamingCountsDF.isStreaming` was `true`). You can start streaming computation, by defining the sink and starting it. 
In our case, we want to interactively query the counts (same queries as above), so we will set the complete set of 1 hour counts to be in a in-memory table.

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table 
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .option("checkpointLocation", tmp_streaming_counts_run)
    .trigger(availableNow=True)
    .start()
)

`query` is a handle to the streaming query that is running in the background. This query is continuously picking up files and updating the windowed counts. 

Note the status of query in the above cell. The progress bar shows that the query is active. 
Furthermore, if you expand the `> counts` above, you will find the number of files they have already processed. 

Let's wait a bit for a few files to be processed and then interactively query the in-memory `counts` table.

In [0]:
%sql
SELECT *
FROM counts

action,window,count


In [0]:
from time import sleep
sleep(1)  # wait a bit for computation to start

In [0]:
%sql
select action, date_format(window.end, "MMM-dd HH:mm") as time, count
from counts
order by time, action

action,time,count


We see the timeline of windowed counts (similar to the static one earlier) building up. If we keep running this interactive query repeatedly, we will see the latest updated counts which the streaming query is updating in the background.

In [0]:
sleep(1)  # wait a bit more for more data to be computed

In [0]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

action,time,count
Close,Jul-26 03:00,11
Open,Jul-26 03:00,179
Close,Jul-26 04:00,344
Open,Jul-26 04:00,1001
Close,Jul-26 05:00,815
Open,Jul-26 05:00,999
Close,Jul-26 06:00,323
Open,Jul-26 06:00,328


In [0]:
sleep(1)  # wait a bit more for more data to be computed

In [0]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

action,time,count
Close,Jul-26 03:00,11
Open,Jul-26 03:00,179
Close,Jul-26 04:00,344
Open,Jul-26 04:00,1001
Close,Jul-26 05:00,815
Open,Jul-26 05:00,999
Close,Jul-26 06:00,323
Open,Jul-26 06:00,328
Close,Jul-26 14:00,699
Open,Jul-26 14:00,656


Also, let's see the total number of "opens" and "closes".

In [0]:
%sql 
select action, sum(count) as total_count 
from counts 
group by action 
order by action

action,total_count
Close,3519
Open,4481


If you keep running the above query repeatedly, you will always find that the number of "opens" is more than the number of "closes", as expected in a data stream where a "close" always appear after corresponding "open". This shows that Structured Streaming ensures **prefix integrity**. Read the blog posts linked below if you want to know more.

Note that there are only a few files, so consuming all of them there will be no updates to the counts. Rerun the query if you want to interact with the streaming query again.

Finally, you can stop the query running in the background, either by clicking on the 'Cancel' link in the cell of the query, or by executing `query.stop()`. Either way, when the query is stopped, the status of the corresponding cell above will automatically update to `TERMINATED`.

In [0]:
query.stop()

### IoT data

Develop a streaming example on `IoT device`dataset:

- inspect the dataset
- ask yourself couple of questions about the data and try to answer them (eg. how many steps users do, how many calories do they burn...)
- you read the data in streaming fashion (file by file) and keep the data for only one company? Here are some hints:
  - you can find the schema in the readme file 
  - as above, use this option: .option("maxFilesPerTrigger", 1)
  - use user_id or device_id for grouping
  - use timestamp for window definition
  - you can try streaming joins with the user data (/databricks-datasets/iot-stream/data-user/userData.csv). Here is the doc: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#join-operations

In [0]:
# Iot stream dataset
display(dbutils.fs.ls('/databricks-datasets/iot-stream/'))

path,name,size,modificationTime
dbfs:/databricks-datasets/iot-stream/README.md,README.md,1596,1596565744000
dbfs:/databricks-datasets/iot-stream/data-device/,data-device/,0,1761905122838
dbfs:/databricks-datasets/iot-stream/data-user/,data-user/,0,1761905122838


In [0]:
# Read the README file
print(dbutils.fs.head("/databricks-datasets/iot-stream/README.md", 2000))


IOT Device Data


This dataset was created by Databricks.  
It contains fake generated data in json and csv formats. 
e.g. 
`{"user_id": 12, "calories_burnt": 489.79998779296875, "num_steps": 9796, "miles_walked": 4.8979997634887695, "time_stamp": "2018-07-24 03:54:00.893775", "device_id": 10}`

Data Set Information

Schema for data-device:
```
[StructField(id,LongType,false),  
 StructField(user_id,LongType,true),  
 StructField(device_id,LongType,true),  
 StructField(num_steps,LongType,true),  
 StructField(miles_walked,FloatType,true),  
 StructField(calories_burnt,FloatType,true),  
 StructField(timestamp,StringType,true),  
 StructField(value,StringType,true)]  
```

Schema for data-user:
```
[StructField(userid,IntegerType,true),
 StructField(gender,StringType,true),
 StructField(age,IntegerType,true),
 StructField(height,IntegerType,true),
 StructField(weight,IntegerType,true),
 StructField(smoker,StringType,true),
 StructField(familyhistory,String

In [0]:
# Define the schema (copy from the README) data-device
# Open one file to see how the data looks like (as a static dataframe)

iot_path = "/databricks-datasets/iot-stream/data-device/"

# Afficher un exemple de fichier
sample = dbutils.fs.ls(iot_path)[0].path
print("Sample file:", sample)
print(dbutils.fs.head(sample, 500))

# Lire quelques fichiers en batch pour obtenir un schéma
iot_static_inferred = spark.read.json(iot_path)
iot_static_inferred.printSchema()
display(iot_static_inferred.limit(10))

Sample file: dbfs:/databricks-datasets/iot-stream/data-device/part-00000.json.gz
[Truncated to first 500 bytes]
�      ���ά;nz��ba]w
����,�/��فɅ�w�$U5gI⡪� ��*}c����������ϟ��?��_��/���_��o�_��_�*����?�ǿ��������+��>���?��?��_����?����?���O������_������������3�������������?���3'�%������`���QK���˟������J���������!�����}!M�5w,���_h姌���w��S��?�zD��q�� �����'�󿶉�����?���v��v�};s��R�xnh} }���
�������4~ �d�-ws;���k��r)PR���ͅV�΅���@�k9���Te7s����m;��Yk/�f�G��K:�6��v��)�S�`��v������,��N���&C�_ΒF�0s	���N|�M�v��9���S9��ӟ��I�vI������Dk'y�{�
root
 |-- calories_burnt: double (nullable = true)
 |-- device_id: long (nullable = true)
 |-- id: long (nullable = true)
 |-- miles_walked: double (nullable = true)
 |-- num_steps: long (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- value: string (nullable = true)



calories_burnt,device_id,id,miles_walked,num_steps,timestamp,user_id,value
250.7,5,950000,2.507,5014,2018-07-22 06:44:25.732267,24,"{""user_id"": 24, ""calories_burnt"": 250.6999969482422, ""num_steps"": 5014, ""miles_walked"": 2.506999969482422, ""time_stamp"": ""2018-07-22 06:44:25.732267"", ""device_id"": 5}"
126.8,13,950001,1.268,2536,2018-07-21 01:18:10.732306,24,"{""user_id"": 24, ""calories_burnt"": 126.80000305175781, ""num_steps"": 2536, ""miles_walked"": 1.2680000066757202, ""time_stamp"": ""2018-07-21 01:18:10.732306"", ""device_id"": 13}"
365.7,5,950002,3.657,7314,2018-07-24 12:42:53.732332,4,"{""user_id"": 4, ""calories_burnt"": 365.70001220703125, ""num_steps"": 7314, ""miles_walked"": 3.6570000648498535, ""time_stamp"": ""2018-07-24 12:42:53.732332"", ""device_id"": 5}"
489.8,10,950003,4.898,9796,2018-07-23 22:56:23.732358,22,"{""user_id"": 22, ""calories_burnt"": 489.79998779296875, ""num_steps"": 9796, ""miles_walked"": 4.8979997634887695, ""time_stamp"": ""2018-07-23 22:56:23.732358"", ""device_id"": 10}"
280.15,13,950004,2.8015,5603,2018-07-21 13:50:39.732385,34,"{""user_id"": 34, ""calories_burnt"": 280.1499938964844, ""num_steps"": 5603, ""miles_walked"": 2.801500082015991, ""time_stamp"": ""2018-07-21 13:50:39.732385"", ""device_id"": 13}"
591.6,1,950005,5.916,11832,2018-07-23 11:05:48.732412,21,"{""user_id"": 21, ""calories_burnt"": 591.5999755859375, ""num_steps"": 11832, ""miles_walked"": 5.915999889373779, ""time_stamp"": ""2018-07-23 11:05:48.732412"", ""device_id"": 1}"
548.1,12,950006,5.481,10962,2018-07-23 02:10:39.732438,7,"{""user_id"": 7, ""calories_burnt"": 548.0999755859375, ""num_steps"": 10962, ""miles_walked"": 5.480999946594238, ""time_stamp"": ""2018-07-23 02:10:39.732438"", ""device_id"": 12}"
272.4,4,950007,2.724,5448,2018-07-22 17:42:18.732465,33,"{""user_id"": 33, ""calories_burnt"": 272.3999938964844, ""num_steps"": 5448, ""miles_walked"": 2.7239999771118164, ""time_stamp"": ""2018-07-22 17:42:18.732465"", ""device_id"": 4}"
381.85,5,950008,3.8185,7637,2018-07-21 12:21:21.732491,16,"{""user_id"": 16, ""calories_burnt"": 381.8500061035156, ""num_steps"": 7637, ""miles_walked"": 3.81850004196167, ""time_stamp"": ""2018-07-21 12:21:21.732491"", ""device_id"": 5}"
585.44995,8,950009,5.8545,11709,2018-07-22 12:40:55.732517,27,"{""user_id"": 27, ""calories_burnt"": 585.449951171875, ""num_steps"": 11709, ""miles_walked"": 5.854499816894531, ""time_stamp"": ""2018-07-22 12:40:55.732517"", ""device_id"": 8}"


In [0]:
# Define your streaming dataframe
iot_schema = iot_static_inferred.schema  
tmp_iot_input   = f"{path_volume}/iot_input"
tmp_iot_counts  = f"{path_volume}/iot_counts"
tmp_iot_alerts  = f"{path_volume}/iot_alerts"
tmp_iot_sink    = f"{path_volume}/iot_sink"

iot_stream = (
    spark.readStream
         .schema(iot_schema)
         .option("maxFilesPerTrigger", 1)
         .json(iot_path)
)

iot_stream.isStreaming


True

In [0]:
iot_prepared = (
    iot_stream
    .withColumn("event_time", col("timestamp").cast("timestamp"))
    .select(
        "event_time",
        "device_id",
        "user_id",
        "num_steps",
        "miles_walked",
        "calories_burnt"
    )
)
iot_prepared.display(checkpointLocation=tmp_iot_input)

event_time,device_id,user_id,num_steps,miles_walked,calories_burnt
2018-07-20T07:34:28.546Z,9,36,3278,1.639,163.90001
2018-07-24T08:13:49.546Z,20,26,10293,5.1465,514.65
2018-07-24T18:41:57.546Z,9,11,5574,2.787,278.69998
2018-07-21T08:40:41.546Z,4,13,5590,2.795,279.5
2018-07-24T08:08:55.546Z,1,32,8373,4.1865,418.65
2018-07-19T22:58:37.546Z,1,8,11052,5.526,552.6
2018-07-20T18:54:52.546Z,9,33,3154,1.577,157.7
2018-07-22T07:16:44.546Z,16,18,1659,0.8295,82.950005
2018-07-23T17:06:48.546Z,10,15,10620,5.31,531.0
2018-07-21T08:45:33.546Z,3,9,8374,4.187,418.69998


In [0]:

from pyspark.sql.functions import col, window, avg, sum

# Fenêtre glissante : 10 minutes de largeur, pas de 5 minutes
iot_counts = (
    iot_prepared
    .withWatermark("event_time", "5 minutes")
    .groupBy(
        window(col("event_time"), "10 minutes", "5 minutes"),
        col("user_id")
    )
    .agg(
        avg("num_steps").alias("avg_steps"),
        sum("num_steps").alias("total_steps"),
        avg("calories_burnt").alias("avg_calories"),
        sum("miles_walked").alias("total_miles")
    )
)


In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "2")

db = "main"          # ou "default" / "your_schema"
tbl = "iot_counts"


query = (
  iot_counts
    .writeStream
    .format("delta")
    .outputMode("complete")
    .option("checkpointLocation", f"{path_volume}/chk_iot_counts_run")
    .trigger(availableNow=True)       
    .toTable(f"{db}.{tbl}")              
)


In [0]:
%sql
SELECT * FROM main.iot_counts ORDER BY window.start DESC LIMIT 200;


window,user_id,avg_steps,total_steps,avg_calories,total_miles
"List(2018-07-24T19:30:00.000Z, 2018-07-24T19:40:00.000Z)",13,8157.0,16314,407.85,8.157
"List(2018-07-24T19:30:00.000Z, 2018-07-24T19:40:00.000Z)",34,5049.0,5049,252.44998,2.5245
"List(2018-07-24T19:30:00.000Z, 2018-07-24T19:40:00.000Z)",12,9752.4,48762,487.62002,24.381
"List(2018-07-24T19:30:00.000Z, 2018-07-24T19:40:00.000Z)",26,5426.0,32556,271.30000666666666,16.278000000000002
"List(2018-07-24T19:30:00.000Z, 2018-07-24T19:40:00.000Z)",10,7527.75,30111,376.3875000000001,15.0555
"List(2018-07-24T19:30:00.000Z, 2018-07-24T19:40:00.000Z)",21,10062.0,30186,503.1,15.093
"List(2018-07-24T19:30:00.000Z, 2018-07-24T19:40:00.000Z)",9,1923.0,1923,96.15,0.9615
"List(2018-07-24T19:30:00.000Z, 2018-07-24T19:40:00.000Z)",35,5270.2,26351,263.51000600000003,13.1755
"List(2018-07-24T19:30:00.000Z, 2018-07-24T19:40:00.000Z)",11,7927.5,15855,396.375,7.9275
"List(2018-07-24T19:30:00.000Z, 2018-07-24T19:40:00.000Z)",30,9255.8,46279,462.7899880000001,23.1395
