In [1]:
#Tutorial https://databricks.com/spark/getting-started-with-apache-spark/streaming

In [2]:
#Each line in the file contains a JSON record with two fields: time and action.
#{"time":1469501675,"action":"Open"}
#{"time":1469501678,"action":"Close"}{"time":1469501680,"action":"Open"}{"time":1469501685,"action":"Open"}{"time":1469501686,"action":"Open"}{"time":1469501689,"action":"Open"}{"time":1469501691,"action":"Open"}{"time":1469501694,"action":"Open"}{"time":1469501696,"action":"Close"}{"time":1469501702,"action":"Open"}{"time":1469501703,"action":"Open"}{"time":1469501704,"action":"Open"}

%fs ls /databricks-datasets/structured-streaming/events/

In [3]:
from pyspark.sql.types import StructType, StructField, TimestampType, StringType
from pyspark.sql.window import Window
from pyspark.sql import window

In [4]:
#Since the sample data is just a static set of files, you can emulate a stream from them by reading one file at a time, in the chronological order in which they were created.
inputPath = '/databricks-datasets/structured-streaming/events/'

In [5]:
# Define the schema to speed up processing
jsonSchema = StructType([StructField('time', TimestampType(), True), StructField('action', StringType(), True)])

In [6]:
#Input Data Frame is created by using readStream on the jsonSchema from the input path where the files are present, reading one file at a time
streamingInputDF = (
  spark
    .readStream
    .schema(jsonSchema)                     # Set the schema of the JSON data (same as above)
    .option('maxFilesPerTrigger', 1)        # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

In [8]:
#Transformation is applied on streaming in Data by doing a count of action. As we know from previous steps, each file has two fields, timestamp and action
#Action is like 'Open', 'Closed' etc, so from the following we will know how many opened, how many closed actions etc
streamingCountsDF = (
  streamingInputDF
      .groupBy(
        streamingInputDF.action)
      .count()
)

In [9]:
#Start the Streaming job
query = (
  streamingCountsDF           #Defined in previous step as groupby of count of action
     .writeStream
      .format('memory')       # memory = store in-memory table (for testing only)
      .queryName('counts')    # counts = name of the in-memory table
      .queryName('counts')    # complete = all the counts should be in the table
      .outputMode('complete')
      .start()
)

In [10]:
#query is a handle to the streaming query named counts that is running in the background. This query continuously picks up files and updates the windowed counts.
#The command window above reports the status of the stream and when we expand counts, we get a dashboard of the number of records processed, batch statistics, and the state of the aggregation as graphs above

In [11]:
#Interactively query the stream
#We can periodically query the counts aggregation

In [12]:
%sql select action, count from counts

action,count
Open,30510
Close,29490


In [13]:
#The query changes result every time we execute it. It reflects the action count based on the input stream of data

In [14]:
%sql select action, count from counts

action,count
Open,36513
Close,35487


In [15]:
%sql select action, count from counts

action,count
Open,40495
Close,39505
