In [1]:
#https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
#https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html
#https://databricks.com/blog/2017/01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1.html

#The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended

In [2]:
#3. start writing stream to memory 
#dfNew.writeStream\
#.format("memory")\
#.queryName("test")\
#.outputMode("complete")\
#.start()

In [3]:
#The “Output” is defined as what gets written out to the external storage.
#Complete Mode - The entire updated Result Table will be written to the external storage. 
#Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage.
#Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage 

In [4]:
#Handling Event-time and Late Data


In [5]:
#Input Sources
# - File source, - Kafka source, - Socket source , - Rate source


In [6]:
#Schema inference and partition of streaming DataFrames/Datasets


In [7]:
socketDF = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

socketDF.isStreaming    # Returns True for DataFrames that have streaming sources

userSchema = socketDF.schema

# Read all the csv files written atomically in a directory
#userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
    .readStream \
    .option("sep", " ") \
    .schema(userSchema) \
    .csv("/FileStore/tables/")
  # Equivalent to format("csv").load("/path/to/directory")

In [8]:
runningStream = csvDF.writeStream\
.format("memory")\
.queryName("myResult")\
.outputMode("update")\
.start()
#ERROR
#u'Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;\nFileSource[/FileStore/tables/]'.start()

In [9]:
runningStream.stop()

In [10]:
#Schema inference and partition of streaming DataFrames/Datasets


In [11]:
#Window Operations on Event Time
from pyspark.sql.functions import window
#userSchema = StructType().add("name", "string").add("age", "integer")

# grounBy aggregation in every 2 minutes window, sliding every 1 minute

windowedCount = csvDF.groupBy(window(csvDF[0], "2 minutes", "1 minutes"), csvDF[0] ).count()


In [12]:
runningStreamWindow = windowedCount.writeStream\
.format("memory")\
.queryName("myResult")\
.outputMode("complete")\
.start()

In [13]:
runningStreamWindow.stop()

In [14]:
#Handling Late Data and Watermarking
#Now consider what happens if one of the events arrives late to the application. For example, say, a word generated at 12:04 (i.e. event time) could be received by the application at 12:11. The application should use the time 12:04 instead of 12:11 to update the older counts for the window 12:00 - 12:10. 
#To enable this, in Spark 2.1, we have introduced watermarking, which lets the engine automatically track the current event time in the data and attempt to clean up old state accordingly.

runningStreamWindowWatermark = csvDF.withWatermark("timestamp", "5 minutes")\
.groupBy(window(csvDF[0], "2 minutes","1 minutes")).count()