<a href="https://colab.research.google.com/github/vinagoros/dv_streamprocessing/blob/main/lab2/ps2022_lab2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Processamento de Streams 2022
## Lab 2 - (Unstructured) Spark Streaming
---
### Colab Setup



In [2]:
#@title Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
#@title Install PySpark
!pip install pyspark findspark --quiet
import findspark
findspark.init()
findspark.find()

[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[K     |████████████████████████████████| 198 kB 50.5 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


'/usr/local/lib/python3.7/dist-packages/pyspark'

---
### Weblog Sender
The stream server is a small python TCP server, listening
on port 7777 (localhost). 

The stream will consist of a set of text lines, obtained from the output log of a webserver.



In [4]:
!wget -q -O - https://github.com/smduarte/ps2022/raw/main/colab/logsender.tgz | tar xfz - 2> /dev/null

!nohup python logsender/server.py logsender/web.log 7777 > /dev/null 2> /dev/null &

The python code below shows the basics needed to process data from socket source using PySpark.

Spark Streaming python documentation is found [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.streaming.html)

In [5]:
import pyspark

from pyspark.streaming import StreamingContext

sc = pyspark.SparkContext('local[*]')
try:
    ssc = StreamingContext(sc, 1)
    lines = ssc.socketTextStream("localhost", 7777)


    lines.pprint()
    
    ssc.start()
    ssc.awaitTermination(20)
except:
    sc.stop()
    ssc.stop()

-------------------------------------------
Time: 2022-04-18 19:05:36
-------------------------------------------

-------------------------------------------
Time: 2022-04-18 19:05:37
-------------------------------------------

-------------------------------------------
Time: 2022-04-18 19:05:38
-------------------------------------------
2022-04-18T19:05:36.854+0000 37.139.9.11 404 GET /codemove/TTCENCUFMH3C 0.026  
2022-04-18T19:05:36.927+0000 178.22.148.122 404 GET /codemove/PSO83TYKET12 0.088  
2022-04-18T19:05:36.927+0000 178.22.148.122 404 GET /codemove/PSO83TYKET12 0.088  
2022-04-18T19:05:36.927+0000 37.139.9.11 404 GET /codemove/TTCENCUFMH3C 0.057  
2022-04-18T19:05:37.206+0000 37.139.9.11 404 GET /codemove/TTCENCUFMH3C 0.015  
2022-04-18T19:05:37.253+0000 185.28.193.95 404 GET /codemove/1U6HCG3V2S9D 0.056  
2022-04-18T19:05:37.253+0000 185.28.193.95 404 GET /codemove/1U6HCG3V2S9D 0.052  
2022-04-18T19:05:37.253+0000 185.28.193.95 404 GET /codemove/1U6HCG3V2S9D 0.055  
2022

---
# Exercises

## Exercise 1

In a denial-of-service event it is important to identify the IP sources that might be attacking the system, by issuing a large number of requests.

Write a program to find the IP sources that have done more than 50 requests in the last 10 seconds -- dump this information every 5 seconds. 


In [27]:
import pyspark

from pyspark.streaming import StreamingContext

sc = pyspark.SparkContext('local[*]')
try:
    ssc = StreamingContext(sc, 1)
    lines = ssc.socketTextStream("localhost", 7777)
    line_window = lines.window(5,5).map(lambda line : (line.split(" ")[1], 1)).reduceByKey(lambda a,b: a+b).filter(lambda tuplo: tuplo[1] > 50)

    line_window.pprint()
    
    ssc.start()
    ssc.awaitTermination(10)
    ssc.stop()
except:
    sc.stop()
    ssc.stop()

-------------------------------------------
Time: 2022-04-18 21:38:17
-------------------------------------------
('185.28.193.95', 93)
('120.52.73.98', 80)
('120.52.73.97', 100)

-------------------------------------------
Time: 2022-04-18 21:38:22
-------------------------------------------
('120.52.73.98', 137)
('97.77.104.22', 92)
('192.241.151.220', 90)
('120.52.73.97', 201)
('178.22.148.122', 160)



## Exercise 2

#### a)
Write a program to dump the number of requests, minimum processing time, maximum processing time for request in the last 10 seconds, **for all** source IPs that performed more than 100 requests -- dump this information every 5 second.  

In [44]:
import pyspark

from pyspark.streaming import StreamingContext

sc = pyspark.SparkContext('local[*]')

try:
    ssc = StreamingContext(sc, 1)
    lines = ssc.socketTextStream("localhost", 7777)
    line_window = lines.window(6,1)
    reduced_and_filtered = line_window.map(lambda line : line.split(" ")).map(lambda line : (line[1], [1,line[5],line[5]])).reduceByKey(lambda a,b : [a[0]+b[0],max(a[1],b[1]),min(a[2],b[2])])
    #.filter(lambda value: value[1][0] > 100)

    reduced_and_filtered.pprint()
    
    ssc.start()
    ssc.awaitTermination(10)
    ssc.stop()
except:
    sc.stop()
    ssc.stop()

-------------------------------------------
Time: 2022-04-18 22:50:22
-------------------------------------------
('185.28.193.95', [23, '0.264', '0.013'])
('2002:894a:3a93:d:250:56ff:fe00:88c0', [1, '0.186', '0.186'])
('37.139.9.11', [4, '0.057', '0.015'])
('178.22.148.122', [2, '0.088', '0.088'])

-------------------------------------------
Time: 2022-04-18 22:50:23
-------------------------------------------
('185.28.193.95', [73, '2.636', '0.013'])
('2002:894a:3a93:d:250:56ff:fe00:88c0', [1, '0.186', '0.186'])
('192.241.151.220', [9, '2.271', '0.146'])
('97.77.104.22', [8, '2.402', '0.155'])
('211.140.26.58', [1, '0.429', '0.429'])
('37.139.9.11', [4, '0.057', '0.015'])
('178.22.148.122', [25, '2.214', '0.016'])
('202.47.236.252', [4, '2.258', '0.208'])
('2a02:c207:2008:5497::1', [1, '0.203', '0.203'])
('2a01:488:66:1000:5c33:8503:0:1', [2, '0.230', '0.068'])

-------------------------------------------
Time: 2022-04-18 22:50:24
-------------------------------------------
('185.28.

#### b)

Write a program to dump the number of requests, minimum processing time, maximum processing time for request in the last 10 seconds, **only if at least one** source IP has performed more than 100 requests -- dump this information every 5 second.

## Exercise 3
Write a program to dump the IP sources that deviate most from the average in terms of the number of requests made in the last 30 seconds - dump this information every 5 seconds.

## Exercise 4

Run additional logsender servers for subsets of the logs (IPv4 and IPv6 logs), using the following commands.

```
!nohup python logsender/server.py logsender/webipv4.log 7778 > /dev/null 2> /dev/null &
!nohup python logsender/server.py logsender/webipv6.log 7779 > /dev/null 2> /dev/null &
```

Write a program that combines the two streams, dumping the number of requests made in the last 15 seconds - dump this information every 5 seconds.

## Exercise 5

Write a program that combines the two streams from the previous exercise and dumps the proportion of IPv4 vs IPv6 requests in the last 20 seconds - dump this information every 5 seconds.
