In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import TimestampType

In [0]:
inputPath = "/databricks-datasets/structured-streaming/events/"

In [0]:
# Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)
#While creating a PySpark DataFrame we can specify the structure 
#          using StructType and StructField classes. As specified in the introduction, StructType is a collection of StructField’s 
#          which is used to define the column name, data type, and a flag for nullable or not.
jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

In [0]:
# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(jsonSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

# Same query as staticInputDF
streamingCountsDF = (                 
  streamingInputDF
    .groupBy(
      streamingInputDF.action, 
      window(streamingInputDF.time, "1 hour"))
    .count()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

Out[25]: True

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table 
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

In [0]:
query.stop()