In [0]:
%fs ls /mnt/s3_hootsuite/Final/facebook_connections/

path,name,size
dbfs:/mnt/s3_hootsuite/Final/facebook_connections/_delta_log/,_delta_log/,0
dbfs:/mnt/s3_hootsuite/Final/facebook_connections/part-00000-45738476-6719-4936-a1b1-52cca629fe21-c000.snappy.parquet,part-00000-45738476-6719-4936-a1b1-52cca629fe21-c000.snappy.parquet,708
dbfs:/mnt/s3_hootsuite/Final/facebook_connections/part-00000-901e11d0-1606-4977-9fdf-8e92f41bd6f6-c000.snappy.parquet,part-00000-901e11d0-1606-4977-9fdf-8e92f41bd6f6-c000.snappy.parquet,711
dbfs:/mnt/s3_hootsuite/Final/facebook_connections/part-00000-967fcbb6-cb17-4c3f-bc27-9bf67852334e-c000.snappy.parquet,part-00000-967fcbb6-cb17-4c3f-bc27-9bf67852334e-c000.snappy.parquet,711
dbfs:/mnt/s3_hootsuite/Final/facebook_connections/part-00000-b0a7d7de-2d8a-44eb-a2e6-2aac8de05f28-c000.snappy.parquet,part-00000-b0a7d7de-2d8a-44eb-a2e6-2aac8de05f28-c000.snappy.parquet,708
dbfs:/mnt/s3_hootsuite/Final/facebook_connections/part-00000-b5fa8588-3380-4a18-84c1-909483cdb642-c000.snappy.parquet,part-00000-b5fa8588-3380-4a18-84c1-909483cdb642-c000.snappy.parquet,711
dbfs:/mnt/s3_hootsuite/Final/facebook_connections/part-00000-f65b5cd3-2985-4d1d-98c3-bfc41d1a9d1d-c000.snappy.parquet,part-00000-f65b5cd3-2985-4d1d-98c3-bfc41d1a9d1d-c000.snappy.parquet,2003


In [0]:
%fs ls /databricks-datasets/structured-streaming/events/

path,name,size
dbfs:/databricks-datasets/structured-streaming/events/file-0.json,file-0.json,72530
dbfs:/databricks-datasets/structured-streaming/events/file-1.json,file-1.json,72961
dbfs:/databricks-datasets/structured-streaming/events/file-10.json,file-10.json,73025
dbfs:/databricks-datasets/structured-streaming/events/file-11.json,file-11.json,72999
dbfs:/databricks-datasets/structured-streaming/events/file-12.json,file-12.json,72987
dbfs:/databricks-datasets/structured-streaming/events/file-13.json,file-13.json,73006
dbfs:/databricks-datasets/structured-streaming/events/file-14.json,file-14.json,73003
dbfs:/databricks-datasets/structured-streaming/events/file-15.json,file-15.json,73007
dbfs:/databricks-datasets/structured-streaming/events/file-16.json,file-16.json,72978
dbfs:/databricks-datasets/structured-streaming/events/file-17.json,file-17.json,73008


In [0]:
from pyspark.sql.types import *

inputPath = "/databricks-datasets/structured-streaming/events/"

# Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)
jsonSchema = StructType([ 
  StructField("time", TimestampType(), True), 
  StructField("action", StringType(), True) 
])

# Static DataFrame representing data in the JSON files
staticInputDF = (
  spark
    .read
    .schema(jsonSchema)
    .json(inputPath)
)

display(staticInputDF)

time,action
2016-07-28T04:19:28.000+0000,Close
2016-07-28T04:19:28.000+0000,Close
2016-07-28T04:19:29.000+0000,Open
2016-07-28T04:19:31.000+0000,Close
2016-07-28T04:19:31.000+0000,Open
2016-07-28T04:19:31.000+0000,Open
2016-07-28T04:19:32.000+0000,Close
2016-07-28T04:19:33.000+0000,Close
2016-07-28T04:19:35.000+0000,Close
2016-07-28T04:19:36.000+0000,Open


In [0]:
from pyspark.sql.functions import *      # for window() function

staticCountsDF = (
  staticInputDF
    .groupBy(
       staticInputDF.action, 
       window(staticInputDF.time, "1 hour"))    
    .count()
)
staticCountsDF.cache()

# Register the DataFrame as table 'static_counts'
staticCountsDF.createOrReplaceTempView("static_counts")

In [0]:
%sql select action, sum(count) as total_count from static_counts group by action

action,total_count
Open,50000
Close,50000


In [0]:
from pyspark.sql.functions import *

# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(jsonSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

# Same query as staticInputDF
streamingCountsDF = (                 
  streamingInputDF
    .groupBy(
      streamingInputDF.action, 
      window(streamingInputDF.time, "1 hour"))
    .count()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table 
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

In [0]:
from time import sleep
sleep(5)  # wait a bit for computation to start

In [0]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

action,time,count
Close,Jul-26 03:00,11
Open,Jul-26 03:00,179
Close,Jul-26 04:00,344
Open,Jul-26 04:00,1001
Close,Jul-26 05:00,815
Open,Jul-26 05:00,999
Close,Jul-26 06:00,1003
Open,Jul-26 06:00,1000
Close,Jul-26 07:00,1011
Open,Jul-26 07:00,993


In [0]:
query.stop()