## Stream Processing Exercise 4 - Consuming from Kafka

Goals:

* Perform different computations on a input stream: read, aggregation, windowed aggregation
* Additional references
    * [Spark Streaming](https://spark.apache.org/streaming/)
    * [Structured Spark Streaming documentation](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
    * [Spark and Kafka integration guide](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)


Let’s inspect content of Pageviews topic, showing it every 5 seconds:

In [None]:
import sys
import os 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils


sc = SparkContext(appName="PageViewsConsumer")

ssc = StreamingContext(sc, 5)


topics = ['pageviews']

kafkaParams = {'bootstrap.servers': 'broker:29092', 
               'group.id' : 'test'}

stream = KafkaUtils.createDirectStream(ssc, topics, kafkaParams)

stream.map(lambda record : (record[0], record[1])).pprint()


ssc.start()
ssc.awaitTermination()

Now, inspect also the content of Users topic

In [None]:
import sys
import os 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils


sc = SparkContext(appName="UsersConsumer")

ssc = StreamingContext(sc, 5)


topics = ['users']

kafkaParams = {'bootstrap.servers': 'broker:29092', 
               'group.id' : 'test'}

stream = KafkaUtils.createDirectStream(ssc, topics, kafkaParams)

stream.map(lambda record : (record[0], record[1])).pprint()


ssc.start()
ssc.awaitTermination()

Here we will consume streaming data from pageviews kafka topic to count numer of visits per page.
First we are going to define input Stream

In [None]:
from pyspark import sql
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession \
    .builder \
    .appName("PageViewsConsumer") \
    .getOrCreate()


dfPageViewsStream = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:29092")
    .option("subscribe", "pageviews")
    .load()
)

dfPageViews = (
    dfPageViewsStream
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    .withColumn("_tmp", split(col("value"), "\\,"))
    .select((col("_tmp").getItem(0).cast("long") / lit(1000)).cast("timestamp").alias("viewtime"),
            col("_tmp").getItem(1).alias("userid"),
            col("_tmp").getItem(2).alias("pageid"),
            col("timestamp"))
)

dfPageViews.printSchema()


Now let's create a table to store query output on memory



In [None]:
dfPageViews.writeStream.format("memory").outputMode("append").queryName("PageViews").start()

Here you can see table structure

In [None]:
spark.sql("describe pageviews").show()

Now, select those events happening in odd minutes.

Profe:

In [None]:
spark.sql("select * from PageViews where (minute(viewtime)%2) !=0").show()

Yo:

In [None]:
import sys
import os 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils

from datetime import datetime

sc = SparkContext(appName="PageViewsConsumer")

ssc = StreamingContext(sc, 5)


topics = ['pageviews']

kafkaParams = {'bootstrap.servers': 'broker:29092', 
               'group.id' : 'test'}

stream = KafkaUtils.createDirectStream(ssc, topics, kafkaParams)

time = stream.map(lambda r: (int(r[1].split(",")[0]), r[1].split(",")[1]))
odd = time.filter(lambda r: (datetime.fromtimestamp(r[0]/1000).minute%2)!= 0)
odd.pprint()

ssc.start()
ssc.awaitTermination()

Try with an order over userid.

Profe:

In [None]:
spark.sql("select * from PageViews where (minute(viewtime)%2) !=0  order by userid").show()

Yo:

In [None]:
import sys
import os 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils

from datetime import datetime

sc = SparkContext(appName="PageViewsConsumer")

ssc = StreamingContext(sc, 5)


topics = ['pageviews']

kafkaParams = {'bootstrap.servers': 'broker:29092', 
               'group.id' : 'test'}

stream = KafkaUtils.createDirectStream(ssc, topics, kafkaParams)

time = stream.map(lambda r: (int(r[1].split(",")[0]), r[1].split(",")[1]))
odd = time.filter(lambda r: (datetime.fromtimestamp(r[0]/1000).minute%2)!= 0)
odd.transform(lambda rdd: rdd.sortBy(lambda x: x[1])).pprint()

ssc.start()
ssc.awaitTermination()


Now count number of visits of each page:

* from the source stream: dfPageViews
* by page means group by pageid 
* count as the aggregation operation
* store the output stream as an in-memory table: CountsByPage.

Describe its content and show part of the content

Profe:

In [None]:
dfCountByPage = dfPageViews.groupBy("pageid").count()
dfCountByPage.printSchema()

In [None]:
dfCountByPage.writeStream.format("memory").outputMode("complete").queryName("CountByPage").start()

In [None]:
spark.sql("describe CountByPage").show()

In [None]:
spark.sql("select * from CountByPage").show()

Yo:

In [None]:
from pyspark import sql
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession \
    .builder \
    .appName("PageViewsConsumer") \
    .getOrCreate()


dfPageViewsStream = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:29092")
    .option("subscribe", "pageviews")
    .load()
)

dfPageViews = (
    dfPageViewsStream
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    .withColumn("_tmp", split(col("value"), "\\,"))
    .select((col("_tmp").getItem(0).cast("long") / lit(1000)).cast("timestamp").alias("viewtime"),
            col("_tmp").getItem(1).alias("userid"),
            col("_tmp").getItem(2).alias("pageid"),
            col("timestamp"))
    .groupBy(col("pageid"))
    .count()
)

dfPageViews.writeStream.format("memory").outputMode("complete").queryName("CountsByPage").start()

In [None]:
spark.sql("select * from CountsByPage order by count desc").show()

Now we want to get number of visits every 5 minutes over last 10 minutes:

* 5 minutes is the window duration
* 2 minutes is the slide duration

Additional references for windowing in Spark can be found [here](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time).

In [None]:
dfCountByPageWindows = dfPageViews.withWatermark("timestamp", "10 minutes") \
                                .groupBy(window(col("viewtime"), "10 minutes", "5 minutes"),col("pageid")) \
                                .count()
dfCountByPageWindows.printSchema()

In [None]:
dfCountByPageWindows.writeStream.format("memory").outputMode("complete").queryName("CountsByPageWindows").start()

In [None]:
spark.sql("select * from CountsByPageWindows order by count desc").show()

# Exercise 5

Purpose of this exercise is to analyze the data that is populated on Users topic following same approach than Exercise 4.

Finally we want to obtain how many times a user has accessed every 2 minutes over last 5 minutes.


Yo:

In [1]:
from pyspark import sql
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession \
    .builder \
    .appName("PageViewsConsumer") \
    .getOrCreate()


dfPageViewsStream = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:29092")
    .option("subscribe", "pageviews")
    .load()
)

dfPageViews = (
    dfPageViewsStream
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    .withColumn("_tmp", split(col("value"), "\\,"))
    .select((col("_tmp").getItem(0).cast("long") / lit(1000)).cast("timestamp").alias("viewtime"),
            col("_tmp").getItem(1).alias("userid"),
            col("_tmp").getItem(2).alias("pageid"),
            col("timestamp"))
    .groupBy(
        window(col("viewtime"), "5 minutes", "2 minutes"),col("userid"))
    .count()
)

dfPageViews.writeStream.format("memory").outputMode("complete").queryName("CountsByUser").start()

<pyspark.sql.streaming.StreamingQuery at 0x7f1ef03d44d0>

In [4]:
spark.sql("select * from CountsByUser order by count desc").show()

+--------------------+------+-----+
|              window|userid|count|
+--------------------+------+-----+
|[2021-01-28 19:14...|User_9|   11|
|[2021-01-28 19:16...|User_9|   11|
|[2021-01-28 19:16...|User_8|    8|
|[2021-01-28 19:14...|User_8|    8|
|[2021-01-28 19:16...|User_3|    8|
|[2021-01-28 19:14...|User_3|    8|
|[2021-01-28 19:16...|User_5|    7|
|[2021-01-28 19:14...|User_5|    7|
|[2021-01-28 19:14...|User_7|    6|
|[2021-01-28 19:16...|User_4|    6|
|[2021-01-28 19:16...|User_7|    6|
|[2021-01-28 19:14...|User_1|    6|
|[2021-01-28 19:16...|User_1|    6|
|[2021-01-28 19:14...|User_4|    6|
|[2021-01-28 19:16...|User_6|    4|
|[2021-01-28 19:16...|User_2|    4|
|[2021-01-28 19:14...|User_6|    4|
|[2021-01-28 19:14...|User_2|    4|
+--------------------+------+-----+



Profe: igual