## Stream Processing Exercise 4 - Consuming from Kafka

Goals:

* Perform different computations on a input stream: read, aggregation, windowed aggregation
* Additional references
    * [Spark Streaming](https://spark.apache.org/streaming/)
    * [Structured Spark Streaming documentation](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
    * [Spark and Kafka integration guide](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)


Let’s inspect content of Pageviews topic, showing it every 5 seconds:

In [None]:
import sys
import os 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils


sc = SparkContext(appName="PageViewsConsumer")

ssc = StreamingContext(sc, 5)


topics = ['pageviews']

kafkaParams = {'bootstrap.servers': 'broker:29092', 
               'group.id' : 'test'}

stream = KafkaUtils.createDirectStream(ssc, topics, kafkaParams)

stream.map(lambda record : (record[0], record[1])).pprint()


ssc.start()
ssc.awaitTermination()

Now, inspect also the content of Users topic

Here we will consume streaming data from pageviews kafka topic to count numer of visits per page.
First we are going to define input Stream

In [None]:
from pyspark import sql
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession \
    .builder \
    .appName("PageViewsConsumer") \
    .getOrCreate()


dfPageViewsStream = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:29092")
    .option("subscribe", "pageviews")
    .load()
)

dfPageViews = (
    dfPageViewsStream
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    .withColumn("_tmp", split(col("value"), "\\,"))
    .select((col("_tmp").getItem(0).cast("long") / lit(1000)).cast("timestamp").alias("viewtime"),
            col("_tmp").getItem(1).alias("userid"),
            col("_tmp").getItem(2).alias("pageid"),
            col("timestamp"))
)

dfPageViews.printSchema()


Now let's create a table to store query output on memory



In [None]:
dfPageViews.writeStream.format("memory").outputMode("append").queryName("PageViews").start()


Here you can see table structure

In [None]:
spark.sql("describe pageviews").show()

Now, select those events happening in odd minutes.

Try with an order over userid.


Now count number of visits of each page:

* from the source stream: dfPageViews
* by page means group by pageid 
* count as the aggregation operation
* store the output stream as an in-memory table: CountsByPage.

Describe its content and show part of the content

Now we want to get number of visits every 5 minutes over last 10 minutes:

* 10 minutes is the window duration
* 5 minutes is the slide duration

Additional references for windowing in Spark can be found [here](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time).
