## Simple PySpark Structured streaming example

- using the examples of "[Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)"
- Start the streaming query, then send words to socket with e.g. 'telnet localhost 9999' (included in the data-science-python image)

In [1]:
import findspark
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
import timeit 

findspark.init()

spark = SparkSession.builder \
    .appName('PySpark local structured streaming test') \
    .getOrCreate()

### Create streaming dataframe
- representing the stream of input files in the path
  - to test, simply put text files in the given directory
- socket source results in 'connection refused' 


In [35]:
# .format("socket") \  - connection refused
# .option("host", "localhost") \
# .option("port", 9999) \

lines = spark \
    .readStream \
    .format("text") \
    .option("path", "streaming-test-input") \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
# .withWatermark("timestamp", "10 minutes") \
wordCounts = words \
    .groupBy("word").count()

### Start query
- have to stop kernel manually, because awaitTermination blocks - cannot call query.stop() 
- for the results we have to check the console output - e.g. in the logs of the Docker container

In [None]:
# Start running the query that prints the running counts to the console
# .format("console") - can be "orc", "json", "csv", etc.
# .outputMode("complete") \
# .option("path", "streaming-test-output.csv") \
#     .option("checkpointLocation", "streaming-test-checkpoint") \
query = wordCounts \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .option("path", "streaming-test-output.txt") \
    .start()

query.awaitTermination()