Streaming DataFrames are largely the same as static DataFrames. We create them within Spark applications and then perform transformations on them to get our data into the correct format.
- Basically, all of the transformations that are available in the static Structured APIs apply to Streaming DataFrames. 
- However, one small difference is that Structured Streaming does not let you perform schema inference without explicitly enabling it. 
- You can enable schema inference for this by setting the configuration spark.sql.streaming.schemaInference to true.
-  Given that fact, we will read the schema from one file (that we know has a valid schema) and pass the dataSchema object from our static DataFrame to our streaming DataFrame.
-  As mentioned, you should avoid doing this in a production scenario where your data may (accidentally) change out from under you

In [0]:
static = spark.read.json("/FileStore/tables/activity_data")
dataSchema = static.schema


maxFilesPerTrigger
- allows you to control how quickly Spark will read all of the files in the folder
- By specifying this value lower, we’re artificially limiting the flow of the stream to one file per trigger. 
- This helps us demonstrate how Structured Streaming runs incrementally in our example, but probably isn’t something you’d use in production.

In [0]:
streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
  .json("/FileStore/tables/activity_data")


Streaming DataFrame creation and execution is lazy
-  specify transformations on our streaming DataFrame before finally calling an action to start the stream. In this case, we’ll show one simple transformation—we will group and count data by the gt column, which is the activity being performed by the user at that point in time:

In [0]:
activityCounts = streaming.groupBy("gt").count()


specify our action to start the query.
-  specify an output destination, or output sink for our result of this query. 
- write to a memory sink which keeps an in-memory table of the results. 
- define how Spark will output that data. 
- use the complete output mode - rewrites all of the keys along with their counts after every trigger

In [0]:
activityQuery = activityCounts.writeStream.queryName("activity_counts")\
  .format("memory").outputMode("complete")\
  .start()


In [0]:
from time import sleep
for x in range(5):
    spark.sql("SELECT * FROM activity_counts").show()
    sleep(1)


Selections and Filtering
-  All select and filter transformations are supported in Structured Streaming
-  as are all DataFrame functions and individual column manipulations
- We show a simple example using selections and filtering below. 
- In this case, because we are not updating any keys over time, we will use the Append output mode, so that new results are appended to the output table:

In [0]:
from pyspark.sql.functions import expr
simpleTransform = streaming.withColumn("stairs", expr("gt like '%stairs%'"))\
  .where("stairs")\
  .where("gt is not null")\
  .select("gt", "model", "arrival_time", "creation_time")\
  .writeStream\
  .queryName("simple_transform")\
  .format("memory")\
  .outputMode("append")\
  .start()


Aggregations
-  can specify arbitrary aggregations, as you saw in the Structured APIs
- can use a more exotic aggregation, like a cube, on the phone model and activity and the average x, y, z accelerations of our sensor
- (jump back to Chapter 7 in order to see potential aggregations that you can run on your stream)

In [0]:
deviceModelStats = streaming.cube("gt", "model").avg()\
  .drop("avg(Arrival_time)")\
  .drop("avg(Creation_Time)")\
  .drop("avg(Index)")\
  .writeStream.queryName("device_counts").format("memory")\
  .outputMode("complete")\
  .start()


In [0]:
display(spark.sql("SELECT * FROM device_counts"))

gt,model,avg(x),avg(y),avg(z)
,nexus4,-0.0070243796402783,-0.0005438909838219232,0.0051951586573739
,nexus4,0.0011945763431121,-0.0061174486583486,-0.0085919607111778
,,0.0011945763431121,-0.0061174486583486,-0.0085919607111778
bike,nexus4,0.0238530774859048,-0.0091833028639688,-0.08251574232228
stand,,-0.0004055901934813167,0.0004288678128930655,0.0002234958592930457
sit,nexus4,-0.0005349253688532204,0.000342148544390284,-0.0001266214101180325
stand,nexus4,-0.0004055901934813167,0.0004288678128930655,0.0002234958592930457
stairsdown,,0.0261411122283005,-0.0378194559945025,0.1259837445616258
stairsup,,-0.0268782764901848,-0.0083190020482818,-0.0978803121764713
sit,,-0.0005349253688532204,0.000342148544390284,-0.0001266214101180325


In [0]:
historicalAgg = static.groupBy("gt", "model").avg()
deviceModelStats = streaming.drop("Arrival_Time", "Creation_Time", "Index")\
  .cube("gt", "model").avg()\
  .join(historicalAgg, ["gt", "model"])\
  .writeStream.queryName("device_counts").format("memory")\
  .outputMode("complete")\
  .start()


Reading from the Kafka Source
- To read, you first need to choose one of the following options: assign, subscribe, or subscribePattern
- Only one of these can be present as an option when you go to read from Kafka.
- Assign is a fine-grained way of specifying not just the topic but also the topic partitions from which you would like to read. 
- This is specified as a JSON string {"topicA": [0,1],"topicB":[2,4]}. subscribe and subscribePattern are ways of subscribing to one or more topics either by specifying a list of topics (in the former) or via a pattern (via the latter).
- Second, you will need to specify the kafka.bootstrap.servers that Kafka provides to connect to the service.

In [0]:
# Subscribe to 1 topic
df1 = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribe", "topic1")\
  .load()
# Subscribe to multiple topics
df2 = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribe", "topic1,topic2")\
  .load()
# Subscribe to a pattern
df3 = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribePattern", "topic.*")\
  .load()


Writing to the Kafka Sink
- Writing to Kafka queries is largely the same as reading from them except for fewer parameters.
- You’ll still need to specify the Kafka bootstrap servers, but the only other option you will need to supply is either a column with the topic specification or supply that as an option.

In [0]:
df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("checkpointLocation", "/to/HDFS-compatible/dir")\
  .start()
df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("checkpointLocation", "/to/HDFS-compatible/dir")\
  .option("topic", "topic1")\
  .start()


Socket source
- The socket source allows you to send data to your Streams via TCP sockets. 
- To start one, specify a host and port to read data from. 
- Spark will open a new TCP connection to read from that address. 
- The socket source should not be used in production because the socket sits on the driver and does not provide end-to-end fault-tolerance guarantees. 
- Here is a short example of setting up this source to read from localhost:9999:

In [0]:
socketDF = spark.readStream.format("socket")\
  .option("host", "localhost").option("port", 9999).load()


Console sink 
- The console sink allows you to write out some of your streaming query to the console. 
- This is useful for debugging but is not fault-tolerant.
-  Writing out to the console is simple and only prints some rows of your streaming query to the console. This supports both append and complete output modes:

In [0]:
activityCounts.writeStream.trigger(processingTime='5 seconds')\
  .format("console").outputMode("complete").start()


Once trigger
- You can also just run a streaming job once by setting that as the trigger. 
- This might seem like a weird case, but it’s actually extremely useful in both development and production. 
- During development, you can test your application on just one trigger’s worth of data at a time. 
- During production, the Once trigger can be used to run your job manually at a low rate (e.g., import new data into a summary table just occasionally). 
- Because Structured Streaming still fully tracks all the input files processed and the state of the computation, this is easier than writing your own custom logic to track this in a batch job, and saves a lot of resources over running a continuous job 24/7:

In [0]:
activityCounts.writeStream.trigger(once=True)\
  .format("console").outputMode("complete").start()
