# Spark streaming Lecture 3 examples

These are the examples used in the slides of lecture 3.

The code assumes that the following streams are available:
* web.log at server logsender, port 7777. 

`docker run --name logsender --network psnet -d smduarte/ps2021-logsender`

* simple.log at server logsender, port 7776. This is a simple stream with an event every 5 second, with the exception of a few periods where up to 3 events were produced.

`docker exec -d logsender python ./server.py /data/simple.log 7776`

## Example 1

List the data frames produced while processing the stream.

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

def dumpBatchDF(df, epoch_id):
    df.show(20, False)
  
spark = SparkSession \
    .builder \
    .appName("StructuredWebLogExample") \
    .getOrCreate()

# Create DataFrame representing the stream of input 
# lines from connection to logsender 7776
lines = spark.readStream.format("socket") \
    .option("host", "logsender") \
    .option("port", 7776) \
    .load() 

# Just dump the created data frames
query = lines \
    .writeStream \
    .outputMode("append") \
    .foreachBatch(dumpBatchDF) \
    .start()

query.awaitTermination(20)
query.stop()


+-----+
|value|
+-----+
+-----+

+--------------------------------------------------------------------+
|value                                                               |
+--------------------------------------------------------------------+
|2020-03-15T10:00:00.000+0000 37.139.9.11 200 GET /date/10h00m00s 0.1|
+--------------------------------------------------------------------+

+---------------------------------------------------------------------+
|value                                                                |
+---------------------------------------------------------------------+
|2020-03-15T10:00:05.000+0000 37.139.9.12 200 GET /date/10h00m05s 0.12|
+---------------------------------------------------------------------+

+--------------------------------------------------------------------+
|value                                                               |
+--------------------------------------------------------------------+
|2020-03-15T10:00:10.000+0000 37.139.

## Example 2

List the top-3 IP sources with more accesses.


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

def dumpBatchDF(df, epoch_id):
    df.show(20, False)
  
spark = SparkSession \
    .builder \
    .appName("StructuredWebLogExample") \
    .getOrCreate()

# Create DataFrame representing the stream of input 
# lines from connection to logsender 7776
lines = spark.readStream.format("socket") \
    .option("host", "logsender") \
    .option("port", 7777) \
    .load() 

split_lines = split(lines['value'], ' ')

lines = lines.withColumn('time', split_lines.getItem(0).cast("timestamp")) \
    .withColumn('IP', split_lines.getItem(1).cast("string")) \
    .withColumn('code', split_lines.getItem(2).cast("integer")) \
    .withColumn('op', split_lines.getItem(3).cast("string")) \
    .withColumn('URL', split_lines.getItem(4).cast("string")) \
    .withColumn('dur', split_lines.getItem(5).cast("float")) \
    .drop('value')

query = lines.groupBy('IP') \
    .count() \
    .orderBy('count', ascending=False) \
    .limit(3)

# Dump the results
query = query \
    .writeStream \
    .outputMode("complete") \
    .foreachBatch(dumpBatchDF) \
    .start()

query.awaitTermination(20)
query.stop()


Same program with alternative computations.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import count

def dumpBatchDF(df, epoch_id):
    df.show(20, False)
  
spark = SparkSession \
    .builder \
    .appName("StructuredWebLogExample") \
    .getOrCreate()

# Create DataFrame representing the stream of input 
# lines from connection to logsender 7776
lines = spark.readStream.format("socket") \
    .option("host", "logsender") \
    .option("port", 7777) \
    .load() 

split_lines = split(lines['value'], ' ')

lines = lines.withColumn('time', split_lines.getItem(0).cast("timestamp")) \
    .withColumn('IP', split_lines.getItem(1).cast("string")) \
    .withColumn('code', split_lines.getItem(2).cast("integer")) \
    .withColumn('op', split_lines.getItem(3).cast("string")) \
    .withColumn('URL', split_lines.getItem(4).cast("string")) \
    .withColumn('dur', split_lines.getItem(5).cast("float")) \
    .drop('value')

query = lines.groupBy('IP') \
    .agg(count('*').alias('count')) \
    .orderBy('count',ascending=False) \
    .limit(3)

# Dump the results
query = query \
    .writeStream \
    .outputMode("complete") \
    .foreachBatch(dumpBatchDF) \
    .start()

query.awaitTermination(20)
query.stop()


## Example 3

Example 2 using SQL.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

def dumpBatchDF(df, epoch_id):
    df.show(20, False)
  
spark = SparkSession \
    .builder \
    .appName("StructuredWebLogExample") \
    .getOrCreate()

# Create DataFrame representing the stream of input 
# lines from connection to logsender 7776
lines = spark.readStream.format("socket") \
    .option("host", "logsender") \
    .option("port", 7777) \
    .load() 

split_lines = split(lines['value'], ' ')
lines = lines.withColumn('time', split_lines.getItem(0).cast("timestamp")) \
    .withColumn('IP', split_lines.getItem(1).cast("string")) \
    .withColumn('code', split_lines.getItem(2).cast("integer")) \
    .withColumn('op', split_lines.getItem(3).cast("string")) \
    .withColumn('URL', split_lines.getItem(4).cast("string")) \
    .withColumn('dur', split_lines.getItem(5).cast("float")) \
    .drop('value')

lines.createOrReplaceTempView("weblog")

query = spark.sql("SELECT IP, COUNT(*) AS count FROM weblog GROUP BY IP ORDER BY count DESC LIMIT 3")

# Dump the results
query = query \
    .writeStream \
    .outputMode("complete") \
    .foreachBatch(dumpBatchDF) \
    .start()

query.awaitTermination(20)
query.stop()


## Example 4

List the top-3 IP sources with more accesses in the last 30 seconds. Update the list every 10 seconds.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

def dumpBatchDF(df, epoch_id):
    df.show(20, False)
  
spark = SparkSession \
    .builder \
    .appName("StructuredWebLogExample") \
    .getOrCreate()

# Create DataFrame representing the stream of input 
# lines from connection to logsender 7776
lines = spark.readStream.format("socket") \
    .option("host", "logsender") \
    .option("port", 7777) \
    .load() 

split_lines = split(lines['value'], ' ')
lines = lines.withColumn('time', split_lines.getItem(0).cast("timestamp")) \
    .withColumn('IP', split_lines.getItem(1).cast("string")) \
    .withColumn('code', split_lines.getItem(2).cast("integer")) \
    .withColumn('op', split_lines.getItem(3).cast("string")) \
    .withColumn('URL', split_lines.getItem(4).cast("string")) \
    .withColumn('dur', split_lines.getItem(5).cast("float")) \
    .drop('value')

query = lines.groupBy(window(lines.time, "30 seconds", "10 seconds"), 'IP') \
    .agg(count('*').alias('count')) \
    .orderBy('window', 'count', ascending=False) \
    .limit(3)

# Just dump the created data frames
query = query \
    .writeStream \
    .outputMode("complete") \
    .foreachBatch(dumpBatchDF) \
    .start()

query.awaitTermination(60)
query.stop()


+------+---+-----+
|window|IP |count|
+------+---+-----+
+------+---+-----+

+------------------------------------------+--------------+-----+
|window                                    |IP            |count|
+------------------------------------------+--------------+-----+
|[2016-12-06 08:58:40, 2016-12-06 08:59:10]|120.52.73.97  |152  |
|[2016-12-06 08:58:40, 2016-12-06 08:59:10]|178.22.148.122|120  |
|[2016-12-06 08:58:40, 2016-12-06 08:59:10]|120.52.73.98  |104  |
+------------------------------------------+--------------+-----+

+------------------------------------------+--------------+-----+
|window                                    |IP            |count|
+------------------------------------------+--------------+-----+
|[2016-12-06 08:58:40, 2016-12-06 08:59:10]|120.52.73.97  |588  |
|[2016-12-06 08:58:40, 2016-12-06 08:59:10]|120.52.73.98  |421  |
|[2016-12-06 08:58:40, 2016-12-06 08:59:10]|178.22.148.122|348  |
+------------------------------------------+--------------+-----

## Example 5

List the top-3 IP sources with more accesses in the last 30 seconds. Update the list every 10 seconds.

Print the country of the URL, assuming there is a CSV with the country for each IP.

Print also the total number of requests for each IP overtime.


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

def dumpBatchDF(df, epoch_id):
    df.show(20, False)
  
spark = SparkSession \
    .builder \
    .appName("StructuredWebLogExample") \
    .getOrCreate()

# Read the countries file
userSchema = StructType().add("IP", "string").add("country", "string")
countries = spark.read.schema(userSchema).csv("countries.csv")

# Create DataFrame representing the stream of input 
# lines from connection to logsender 7776
lines = spark.readStream.format("socket") \
    .option("host", "logsender") \
    .option("port", 7777) \
    .load() 

split_lines = split(lines['value'], ' ')
lines = lines.withColumn('time', split_lines.getItem(0).cast("timestamp")) \
    .withColumn('IP', split_lines.getItem(1).cast("string")) \
    .withColumn('code', split_lines.getItem(2).cast("integer")) \
    .withColumn('op', split_lines.getItem(3).cast("string")) \
    .withColumn('URL', split_lines.getItem(4).cast("string")) \
    .withColumn('dur', split_lines.getItem(5).cast("float")) \
    .drop('value')

query = lines.groupBy(window(lines.time,"30 seconds","10 seconds"),'IP') \
    .agg(count('*').alias('count')) \
    .orderBy('window','count', ascending=False) \
    .limit(3)

query = query.join(countries, query.IP == countries.IP, "inner")

# Just dump the created data frames
query = query \
    .writeStream \
    .outputMode("complete") \
    .foreachBatch(dumpBatchDF) \
    .start()

query.awaitTermination(60)
query.stop()
