In [1]:
spark

###Structure Streaming of Yelp Dataset

Structured Streaming is a stream processing engine that lets you express computation on streaming data in the same way you express a batch computation on static data. The Spark SQL engine performs the computation incrementally and continuously updates the result as streaming data arrives

In [3]:
##Importing yelp dataset

from pyspark.sql.types import *
from pyspark.sql.functions import *

inputPath = "dbfs:/mnt/yelp-mount"


Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)

In [5]:
jsonSchema = StructType([ 
  StructField("review_id", StringType(), True), 
  StructField("user_id", StringType(), True) , 
  StructField("business_id", StringType(), True) , 
  StructField("stars", IntegerType(), True) , 
  StructField("date", TimestampType(), True) , 
  StructField("text", StringType(), True), 
  StructField("useful", IntegerType(), True), 
  StructField("funny", IntegerType(), True), 
  StructField("cool", IntegerType(), True) ])

In [6]:
# Stream DataFrame representing data in the JSON files
streamInputDF = (
  spark
    .readStream
    .schema(jsonSchema)
    .option("maxFilesPerTrigger", 1)
    .json(inputPath)
)

# Query for counting review grouped by stars
streamingCountsDF = (                 
  streamInputDF
    .groupBy(
      streamInputDF.stars, 
      window(streamInputDF.date, "1 hour"))
    .count()
)

streamingCountsDF.isStreaming

As we can see, streamingCountsDF is a streaming Dataframe (streamingCountsDF.isStreaming was true). We can start streaming computation, by defining the sink and starting it. In our case, we want to interactively query the counts (same queries as above), so we will set the complete set of 1 hour counts to be a in a in-memory table (note that this for testing purpose only in Spark 2.0).

In [8]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

Let's wait a bit for a few files to be processed and then query the in-memory counts table.

In [10]:
from time import sleep
sleep(5)  # wait a bit for computation to start

In [11]:
%sql select stars, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, stars

In [12]:
sleep(5)  # wait a bit more for more data to be computed

Also, let's see the total number of each rating

In [14]:
%sql select stars, sum(count) as total_count from counts group by stars order by stars