<a href="https://colab.research.google.com/github/smduarte/ps2024/blob/main/lab4/ps2024_lab3_sol.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Processamento de Streams 2024
## Lab 3 - Structured Spark Streaming (Solução)
---
### Colab Setup



In [None]:
#@title Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#@title Install PySpark
!pip install pyspark findspark --quiet
import findspark
findspark.init()
findspark.find()

---
### Weblog Sender
The stream server is a small python TCP server, listening
on port 7777 (localhost). 

The stream will consist of a set of text lines, obtained from the output log of a webserver.



In [None]:
!wget -q -O - https://github.com/smduarte/ps2024/raw/main/colab/logsender.tgz | tar xfz - 2> /dev/null

!nohup python logsender/server.py logsender/web.log 7777 > /dev/null 2> /dev/null &

The python code below shows the basics needed to process data from socket source using PySpark.

Spark Streaming python documentation is found [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.streaming.html)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, count

spark = SparkSession \
    .builder \
    .appName('StructuredWebLogExample') \
    .getOrCreate()

# Create DataFrame representing the stream of input
# lines from connection to logsender 7777

lines = spark.readStream.format('socket') \
    .option('host', 'localhost') \
    .option('port', 7777) \
    .load()

query = lines \
    .writeStream \
    .outputMode('append') \
    .trigger(processingTime='1 seconds') \
    .foreachBatch(lambda df, epoch: df.show(10, False)) \
    .start()

query.awaitTermination(60)
query.stop()


---
# Exercises

## Exercise 1

In a denial-of-service event it is important to identify the IP sources that might be attacking the system, by issuing a large number of requests.

Write a program to find the IP sources that have done more than 50 requests in the last 10 seconds -- dump this information every 5 seconds. 


#### Using COMPLETE output mode.

In this mode, we can order the result in the main query.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession \
    .builder \
    .appName('StructuredWebLogExample') \
    .getOrCreate()

# Create DataFrame representing the stream of input
# lines from connection to logsender 7777

lines = spark.readStream.format('socket') \
    .option('host', 'localhost') \
    .option('port', 7777) \
    .load()

sl = split( lines.value, ' ')
results = lines \
      .withColumn('time', sl.getItem(0).cast('timestamp')) \
      .withColumn('ip', sl.getItem(1).cast('string')) \
      .drop('value')

results = results \
          .withWatermark('time', '1 seconds') \
          .groupBy( window( results.time, '10 seconds', '5 seconds'), 'ip') \
          .count().alias('count') \
          .where( 'count > 50') \
          .orderBy(['window', 'count'], ascending=True)

query = results \
    .writeStream \
    .outputMode('complete') \
    .trigger(processingTime='1 seconds') \
    .foreachBatch(lambda df, epoch: df.show(100, False)) \
    .start()

query.awaitTermination(60)
query.stop()


#### Using UPDATE output mode.

In this mode, we have to order in the output sink

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

def dumpBatchDFOrdered(df, epoch_id):
    df = df.orderBy('window', 'count', ascending=False)
    df.show(truncate=False)


spark = SparkSession \
    .builder \
    .appName('StructuredWebLogExample') \
    .getOrCreate()

# Create DataFrame representing the stream of input
# lines from connection to logsender 7777

lines = spark.readStream.format('socket') \
    .option('host', 'localhost') \
    .option('port', 7777) \
    .load()

sl = split( lines.value, ' ')
results = lines \
      .withColumn('time', sl.getItem(0).cast('timestamp')) \
      .withColumn('ip', sl.getItem(1).cast('string')) \
      .drop('value')

results = results \
          .withWatermark('time', '1 seconds') \
          .groupBy( window( results.time, '10 seconds', '5 seconds'), 'ip') \
          .count().alias('count') \
          .where( 'count > 50')

query = results \
    .writeStream \
    .outputMode('update') \
    .foreachBatch( dumpBatchDFOrdered ) \
    .start()

query.awaitTermination(60)
query.stop()

## Exercise 2

#### a)
Write a program to dump the number of requests, minimum processing time, maximum processing time for request in the last 10 seconds, **for all** source IPs that performed more than 100 requests -- dump this information every 5 second.  

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

def dumpBatchDF(df, epoch_id):
  df.show(20, False)
  
spark = SparkSession \
    .builder \
    .appName('StructuredWebLogExample') \
    .getOrCreate()

# Create DataFrame representing the stream of input 
# lines from connection to logsender 7776
lines = spark.readStream.format('socket') \
    .option('host', 'localhost') \
    .option('port', 7777) \
    .load() 

sl = split(lines.value, ' ')
results = lines.withColumn('time', sl.getItem(0).cast('timestamp')) \
    .withColumn('ip', sl.getItem(1).cast('string')) \
    .withColumn('dur', sl.getItem(5).cast('float')) \
    .drop('value')

query = results \
    .withWatermark('time', '1 seconds') \
    .groupBy(window(results.time, '10 seconds','5 seconds'),'ip') \
    .agg(count('*').alias('count'), max('dur').alias('maxdur'), min('dur')) \
    .where('count > 100')


# Dump the results
query = query \
    .writeStream \
    .outputMode('complete') \
    .foreachBatch(dumpBatchDF) \
    .start()

query.awaitTermination(60)
query.stop()

#### b)

Write a program to dump the number of requests, minimum processing time, maximum processing time for request in the last 10 seconds, **only if at least one** source IP has performed more than 1000 requests -- dump this information every 5 second.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

def dumpBatchDF(df, epoch_id):
  df2 = df \
    .where('count > 1000') \
    .groupBy( df.window ) \
    .agg(count('*').alias('count_over_1000')) 

  df3 = df2.join( df, df.window == df2.window ).drop(df.window) \
          .where('count_over_1000 > 0') \
          .drop('count_over_1000')

  df3.show(truncate=False)
  
try: 
  spark = SparkSession \
      .builder \
      .appName('StructuredWebLogExample') \
      .getOrCreate()

  # Create DataFrame representing the stream of input 
  # lines from connection to logsender 7776
  lines = spark.readStream.format('socket') \
      .option('host', 'localhost') \
      .option('port', 7777) \
      .load() 

  sl = split(lines.value, ' ')
  results = lines.withColumn('time', sl.getItem(0).cast('timestamp')) \
      .withColumn('ip', sl.getItem(1).cast('string')) \
      .withColumn('dur', sl.getItem(5).cast('float')) \
      .drop('value')

  query = results \
      .withWatermark('time', '1 seconds') \
      .groupBy(window(results.time,'10 seconds','5 seconds'),'ip') \
      .agg(count('*').alias('count'), max('dur').alias('maxdur'), min('dur').alias('mindur')) \


  # Dump the results
  query = query \
      .writeStream \
      .outputMode('complete') \
      .foreachBatch(dumpBatchDF) \
      .start()

  query.awaitTermination(120)
except Exception as e:
  print(e)

query.stop()

## Exercise 3
Write a program to dump the IP sources that deviate most from the average in terms of the number of requests made in the last 30 seconds - dump this information every 5 seconds.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Output functions does additional computation
def dumpBatchDF(df, epoch_id):
    df2 = df.groupBy('window') \
          .agg(avg('count').alias('avg'))

    df3 = df2 \
      .join( df, df.window == df2.window ) \
      .drop(df.window) 

    df4 = df3\
          .selectExpr('window', 'ip', 'abs(count-avg) as deviation') \
          .orderBy('window','deviation', ascending=False)\
          .show(20, False)
  
spark = SparkSession \
    .builder \
    .appName('StructuredWebLogExample') \
    .getOrCreate()

# Create DataFrame representing the stream of input 
# lines from connection to logsender 7776
lines = spark.readStream.format('socket') \
    .option('host', 'localhost') \
    .option('port', 7777) \
    .load() 

sl = split(lines.value, ' ')
results = lines \
    .withColumn('time', sl.getItem(0).cast('timestamp')) \
    .withColumn('ip', sl.getItem(1).cast('string')) \
    .drop('value')

query = results \
    .withWatermark('time', '10 seconds') \
    .groupBy(window(results.time,'10 seconds','5 seconds'),'ip') \
    .agg(count('*').alias('count'))

query = query \
    .writeStream \
    .outputMode('complete') \
    .foreachBatch(dumpBatchDF) \
    .start()

query.awaitTermination(120)
query.stop()

## Exercise 4

Run additional logsender servers for subsets of the logs (IPv4 and IPv6 logs), using the following commands.

```
!nohup python logsender/server.py logsender/webipv4.log 7778 > /dev/null 2> /dev/null &
!nohup python logsender/server.py logsender/webipv6.log 7779 > /dev/null 2> /dev/null &
```

Write a program that combines the two streams, dumping the number of requests made in the last 15 seconds - dump this information every 5 seconds.

In [None]:
!nohup python logsender/server.py logsender/webipv4.log 7778 > /dev/null 2> /dev/null &
!nohup python logsender/server.py logsender/webipv6.log 7779 > /dev/null 2> /dev/null &

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import *

spark = SparkSession \
    .builder \
    .appName('StructuredWebLogExample') \
    .getOrCreate()

linesV4 = spark.readStream.format('socket') \
        .option('host', 'localhost') \
        .option('port', 7778) \
        .load() 

linesV6 = spark.readStream.format('socket') \
        .option('host', 'localhost') \
        .option('port', 7779) \
        .load() 

slV4 = split(linesV4.value, ' ')
slV6 = split(linesV6.value, ' ')

linesV4 = linesV4.withColumn('time', slV4.getItem(0).cast('timestamp')) \
        .withColumn('ip version', lit('v4')) \
        .drop('value')

linesV6 = linesV6.withColumn('time', slV6.getItem(0).cast('timestamp')) \
        .withColumn('ip version', lit('v6')) \
        .drop('value')

lines = linesV4.union(linesV6)

results = lines \
        .groupBy(window(lines.time, "15 seconds","5 seconds")) \
        .count() \

#results = lines \
#        .groupBy(window(lines.time, "15 seconds","5 seconds"), 'ip version') \
#        .count()


query = results \
        .writeStream \
        .outputMode('complete') \
        .foreachBatch(lambda df, _: df.show(5, False)) \
        .start()

query.awaitTermination(120)
query.stop()

## Exercise 5

Write a program that combines the two streams from the previous exercise and dumps the proportion of IPv4 vs IPv6 requests in the last 20 seconds - dump this information every 5 seconds.


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import *

# Output functions does additional computation
def dumpBatchDF(df, epoch_id):
  df2 = df \
      .groupBy( df.window ) \
      .agg(sum('count').alias('total'))

  df3 = df2 \
      .join( df, df.window == df2.window ) \
      .drop(df2.window) 

  df4 = df3\
          .select('window', 'ip', df3['count'] / df3['total']) \
          .orderBy('window', ascending=False)

  df4.show(20, False)

spark = SparkSession \
    .builder \
    .appName('StructuredWebLogExample') \
    .getOrCreate()

linesV4 = spark.readStream.format('socket') \
        .option('host', 'localhost') \
        .option('port', 7778) \
        .load() 

linesV6 = spark.readStream.format('socket') \
        .option('host', 'localhost') \
        .option('port', 7779) \
        .load() 

slV4 = split(linesV4.value, ' ')
slV6 = split(linesV6.value, ' ')

linesV4 = linesV4.withColumn('time', slV4.getItem(0).cast('timestamp')) \
        .withColumn('ip', lit('v4')) \
        .drop('value') \

linesV6 = linesV6.withColumn('time', slV6.getItem(0).cast('timestamp')) \
        .withColumn('ip', lit('v6')) \
        .drop('value')


lines = linesV4.union( linesV6)

results = lines \
        .withWatermark('time', '1 seconds') \
        .groupBy(window(lines.time, '10 seconds','5 seconds'), 'ip') \
        .count()

query = results \
        .writeStream \
        .outputMode('complete') \
        .foreachBatch( dumpBatchDF ) \
        .start()

query.awaitTermination(300)
query.stop()