# Stream Processing - Exercise 8.2 - Neugebauer
## Part I - Reading the CSVs, Simple Streaming

In [2]:
stream_dir = '/FileStore/tables/baby-data'
static = spark.read.csv(stream_dir, header=True) 
dataSchema = static.schema 

In [3]:
print(dataSchema)

In [4]:
baby_stream_df = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1).csv(stream_dir)

In [5]:
gender_count = baby_stream_df.groupBy('sex').count()

## Start the Stream

In [7]:
gender_count_query = gender_count.writeStream.queryName('gender_counts').format('memory').outputMode('complete').start()

## Periodically Read
This version uses a loop with a delay to read the aggregated DataFrame, which in turn is based on new CSV files being added.

In [9]:
# First part
import time
import datetime

for x in range(10):
  st = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
  print("Female count number", x, "at", st)
  spark.sql("SELECT * FROM gender_counts WHERE sex='F'").show()
  time.sleep(10)

## Part II - Triggers
This version is almost identical to the prior, except a trigger is used.

In [11]:
gender_count_query = gender_count.writeStream.queryName('gender_counts').format("memory").trigger(processingTime='30 seconds').outputMode('complete').start()

In [12]:
# Second part
from time import sleep
import datetime

for x in range(30):
  st = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
  print("Female count number", x, "at", st)
  spark.sql("SELECT * FROM gender_counts WHERE sex='F'").show()
  sleep(10)

## Conclusions
What's clear is that the triggers afford a bit more flexibility in terms of time. With the first iteration, I missed quite a bit of data because of timing; the 10th time through, the count was only 234k. Looking at the trigger, the stream waited for data and only added it at the trigger point.