## Step 1: Within one cell, write a streaming application

- load the saved Pipeline Model as `pipelineModel`

as part of the streaming processing:
- load streaming data from directory `file:/databricks/driver/tweets`
- use the pipeline model to transform the streaming dataframe.
- drop the unwanted intermediarely columns in the output, keeping only `text`, `time`, and `prediction`
- output to a `memory` sink (`scored_tweets`) using the append mode.
- trigger output in `2 seconds` intervals.

In [None]:
from pyspark.ml import PipelineModel


modelPath = "/FileStore/twitter_nbpipeline"
inputPath = "file:/databricks/driver/tweets"
pipelineModel = PipelineModel.load(modelPath)

#input source
streamingInputDF = spark.readStream \
                        .schema("time timestamp, text string") \
                        .option("maxFilesPerTrigger",1) \  # Adjustment for new data source
                        .json(inputPath)

#processing logic
scored_tweets = pipelineModel.transform(streamingInputDF)

#sink
query = scored_tweets.drop("rawPrediction", "probability", "features", "words", "words_filtered") \
                     .writeStream \
                     .trigger(processingTime="2 seconds") \
                     .format("memory") \
                     .queryName("scored_tweets") \
                     .outputMode("append") \
                     .start()

## Step 2: View the stream results

- Query the number of rows in the `scored_teweets` table
- Visulize the count of positive and negative tweets by 30 second windows.

In [None]:
%sql
SELECT COUNT(*) FROM scored_tweets;

count(1)
20


In [None]:
%sql

SELECT 
  SUM(IF (prediction=1, 1, 0)) AS positive, 
  SUM(IF (prediction=0, 1, 0)) AS negative,
  WINDOW(time, "30 seconds")
FROM scored_tweets
-- WHERE time > CURRENT_TIMESTAMP - INTERVAL 10 minutes
GROUP BY WINDOW(time, "30 seconds")

positive,negative,window
1,0,"List(2022-11-05T17:36:00.000+0000, 2022-11-05T17:36:30.000+0000)"
74,88,"List(2022-11-05T17:37:00.000+0000, 2022-11-05T17:37:30.000+0000)"
59,73,"List(2022-11-05T17:36:30.000+0000, 2022-11-05T17:37:00.000+0000)"
55,80,"List(2022-11-05T17:38:00.000+0000, 2022-11-05T17:38:30.000+0000)"
70,77,"List(2022-11-05T17:37:30.000+0000, 2022-11-05T17:38:00.000+0000)"
67,80,"List(2022-11-05T17:38:30.000+0000, 2022-11-05T17:39:00.000+0000)"
67,82,"List(2022-11-05T17:39:00.000+0000, 2022-11-05T17:39:30.000+0000)"
67,73,"List(2022-11-05T17:40:00.000+0000, 2022-11-05T17:40:30.000+0000)"
59,64,"List(2022-11-05T17:39:30.000+0000, 2022-11-05T17:40:00.000+0000)"
60,69,"List(2022-11-05T17:40:30.000+0000, 2022-11-05T17:41:00.000+0000)"


Output can only be rendered in Databricks