<a href="https://colab.research.google.com/github/smduarte/ps2022/blob/main/lab4/ps2022_lab4_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Processamento de Streams 2022
## Lab 4.1 - Kafka Streaming 

Kafka + Spark Structured Streaming
---
### Colab Setup



In [None]:
#@title Mount Google Drive (Optional)
from google.colab import drive
drive.mount('/content/drive')

In [1]:
#@title Install PySpark
!pip install pyspark findspark --quiet
import findspark
findspark.init()
findspark.find()

[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[K     |████████████████████████████████| 198 kB 46.6 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


'/usr/local/lib/python3.7/dist-packages/pyspark'

In [3]:
#@title Install & Launch Kafka
%%bash
KAFKA_VERSION=3.1.0
KAFKA=kafka_2.13-$KAFKA_VERSION
wget -q -O /tmp/$KAFKA.tgz https://dlcdn.apache.org/kafka/$KAFKA_VERSION/$KAFKA.tgz
tar xfz /tmp/$KAFKA.tgz
wget -q -O $KAFKA/config/server1.properties - https://github.com/smduarte/ps2022/raw/main/colab/server1.properties

UUID=`$KAFKA/bin/kafka-storage.sh random-uuid`
$KAFKA/bin/kafka-storage.sh format -t $UUID -c $KAFKA/config/server1.properties
$KAFKA/bin/kafka-server-start.sh -daemon $KAFKA/config/server1.properties


Formatting /tmp/kraft-combined-logs


### Weblog Publisher
This a small python Kafka client that publishes a continous stream of text lines, obtained from the output log of a webserver.

* The Kafka server is accessible @localhost:9092 
* The events are published to the `weblog` topic

In [4]:
#@title Start Kafka Publisher
%%bash
pip install kafka-python dataclasses --quiet
wget -q -O - https://github.com/smduarte/ps2022/raw/main/colab/kafka-logsender.tgz | tar xfz - 2> /dev/null

nohup python kafka-logsender/publisher.py kafka-logsender/web.log weblog_json > /dev/null 2> /dev/null &

The python code below shows the basics needed to process JSON data from Kafka source using PySpark.

Spark Streaming python documentation is found [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.streaming.html)

---
#### PySpark Kafka Stream Example


In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

def dumpBatchDF(df, epoch_id):
    df.show(20, False)

spark = SparkSession \
    .builder \
    .appName('Kafka Spark Structured Streaming Example') \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1') \
    .getOrCreate()

lines = spark \
  .readStream \
  .format('kafka') \
  .option('kafka.bootstrap.servers', 'localhost:9092') \
  .option('subscribe', 'weblog_json') \
  .option('startingOffsets', 'earliest') \
  .load() \
  .selectExpr('CAST(value AS STRING)')


schema = StructType([StructField('timestamp', TimestampType(), True),
                     StructField('ip', StringType(), True),
                     StructField('code', IntegerType(), True),
                     StructField('method', StringType(), True),
                     StructField('path', StringType(), True),
                     StructField('duration', FloatType(), True)])

lines = lines.select( from_json(col('value'), schema).alias('data')).select('data.*')
top5 = lines.groupBy(window(lines.timestamp, '30 seconds', '5 seconds'), 'ip') \
       .count() \
       .orderBy('count', ascending = False) \
       .limit(5) \
       .drop('window')

query = top5 \
    .writeStream \
    .outputMode('complete') \
    .foreachBatch(dumpBatchDF) \
    .start()

query.awaitTermination(600)
query.stop()
spark.stop()

+-------------+-----+
|ip           |count|
+-------------+-----+
|185.28.193.95|5245 |
|185.28.193.95|4795 |
|185.28.193.95|4582 |
|185.28.193.95|3924 |
|120.52.73.97 |3848 |
+-------------+-----+

+-------------+-----+
|ip           |count|
+-------------+-----+
|185.28.193.95|5350 |
|185.28.193.95|5350 |
|185.28.193.95|5272 |
|120.52.73.97 |4971 |
|120.52.73.97 |4918 |
+-------------+-----+

+-------------+-----+
|ip           |count|
+-------------+-----+
|185.28.193.95|5362 |
|185.28.193.95|5355 |
|185.28.193.95|5272 |
|120.52.73.97 |5015 |
|120.52.73.97 |4971 |
+-------------+-----+



ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
RuntimeError: reentrant call inside <_io.BufferedReader name=55>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.7/dist-packages/py4j/clientserver.py", line 504, in send_command
    "Error while sending or receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1

Py4JError: ignored

---
# Exercises

## Exercise 1

In a denial-of-service event it is important to identify the IP sources that might be attacking the system, by issuing a large number of requests.

Write a program to find the IP sources that have done more than 50 requests in the last 10 seconds -- dump this information every 5 seconds. 


## Exercise 2

#### a)
Write a program to dump the number of requests, minimum processing time, maximum processing time for request in the last 10 seconds, **for all** source IPs that performed more than 100 requests -- dump this information every 5 second.  

#### b)

Write a program to dump the number of requests, minimum processing time, maximum processing time for request in the last 10 seconds, **only if at least one** source IP has performed more than 100 requests -- dump this information every 5 second.

## Exercise 3
Write a program to dump the IP sources that deviate most from the average in terms of the number of requests made in the last 30 seconds - dump this information every 5 seconds.

## Exercise 4

Run additional logsender servers for subsets of the logs (IPv4 and IPv6 logs), using the following commands.

```
!nohup python logsender/server.py logsender/webipv4.log 7778 > /dev/null 2> /dev/null &
!nohup python logsender/server.py logsender/webipv6.log 7779 > /dev/null 2> /dev/null &
```

Write a program that combines the two streams, dumping the number of requests made in the last 15 seconds - dump this information every 5 seconds.

## Exercise 5

Write a program that combines the two streams from the previous exercise and dumps the proportion of IPv4 vs IPv6 requests in the last 20 seconds - dump this information every 5 seconds.
