<a href="https://colab.research.google.com/github/smduarte/spbd-2425/blob/main/docs/labs/lab9/SPBD_Labs_spark4_exercise_sol.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Python Spark Streaming Exercises


In [None]:
#@title Install Pyspark
!pip install --quiet pyspark

---
# Exercises

**Every 3 seconds**,
1. Dump the number of requests in the last 10 seconds;
2. Dump the number of requests in the last 10 seconds, only if they total more than 100;
3. Dump the number of requests in the last 10 seconds, if there is an IP address with more than 100 requests;
4. Dump the proportion of IPv4 vs IPv6 requests in the last 20 seconds.


# Structured Spark Streaming

The python code below shows the basics needed to process data from socket source using PySpark.

Spark Structured Streaming (Spark SQL Streaming) python documentation is found [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/index.html#)

In [None]:
#@title Start the Structured Source

!wget -q -O - https://github.com/smduarte/spbd-2425/raw/main/scripts/json_logsender.tgz | tar xfz - 2> /dev/null

!nohup python json_logsender/server.py json_logsender/web.log 8888 > /dev/null 2> /dev/null &

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *

spark = SparkSession \
    .builder \
    .appName("StructuredWebLogExample") \
    .getOrCreate()


# Extract a sample JSON string to infer schema
sample_json = '{"timestamp": "2024-11-13T10:50:59.936+0000", "ip": "37.139.9.11", "code": 404, "cmd": "GET", "url": "/codemove/TTCENCUFMH3C", "time": 0.026}'
inferred_schema = schema_of_json(sample_json)


# Create DataFrame representing the stream of input
# lines from connection to logsender 7777
try:
  json_lines = spark.readStream.format("socket") \
      .option("host", "localhost") \
      .option("port", 8888) \
      .load()

  # Parse the JSON using the inferred schema
  json_lines = json_lines.withColumn("json_data", from_json(col("value"), inferred_schema)) \
    .select("json_data.*")  # Expand the JSON fields into columns


  query = json_lines \
    .writeStream \
    .outputMode("append") \
    .trigger(processingTime='1 seconds') \
    .foreachBatch(lambda df, epoch: df.show(10, False)) \
    .start()

  query.awaitTermination(60)
except Exception as err:
  print(err)
  query.stop()


---
# Exercises

Do the follwing exercises:

**Every 3 seconds**,
1. Dump the number of requests in the last 10 seconds;
2. Dump the number of requests in the last 10 seconds, only if they total more than 100;
3. Dump the number of requests in the last 10 seconds, if there is an IP address with more than 100 requests;
4. Dump the proportion of IPv4 vs IPv6 requests in the last 20 seconds.

In [None]:
#@title Q1

from pyspark.sql import *
from pyspark.sql.functions import *

spark = SparkSession \
    .builder \
    .appName("StructuredWebLogExample") \
    .getOrCreate()


# Extract a sample JSON string to infer schema
sample_json = '{"timestamp": "2024-11-13T10:50:59.936+0000", "ip": "37.139.9.11", "code": 404, "cmd": "GET", "url": "/codemove/TTCENCUFMH3C", "time": 0.026}'
inferred_schema = schema_of_json(sample_json)

# Create DataFrame representing the stream of input
# lines from connection to logsender 7777
try:
  json_lines = spark.readStream.format("socket") \
      .option("host", "localhost") \
      .option("port", 8888) \
      .load()

  # Parse the JSON using the inferred schema
  json_lines = json_lines.withColumn("json_data", from_json(col("value"), inferred_schema)) \
    .select("json_data.*")  # Expand the JSON fields into columns

  json_lines.printSchema()

  requests = json_lines \
      .withColumn("date", to_timestamp(col("timestamp"))) \
      .withWatermark("date", "0 seconds") \
      .groupBy(window("date", "10 seconds")).count() \
      .withColumnRenamed("count", "#requests") \
      .orderBy('window', ascending = False).limit(1) \

  query = requests \
    .writeStream \
    .outputMode("complete") \
    .trigger(processingTime='3 seconds') \
    .foreachBatch(lambda df, epoch: df.show(10, False)) \
    .start()

  query.awaitTermination(160)
except Exception as err:
  print(err)

In [None]:
#@title Q2

from pyspark.sql import *
from pyspark.sql.functions import *

spark = SparkSession \
    .builder \
    .appName("StructuredWebLogExample") \
    .getOrCreate()


# Extract a sample JSON string to infer schema
sample_json = '{"timestamp": "2024-11-13T10:50:59.936+0000", "ip": "37.139.9.11", "code": 404, "cmd": "GET", "url": "/codemove/TTCENCUFMH3C", "time": 0.026}'
inferred_schema = schema_of_json(sample_json)

# Create DataFrame representing the stream of input
# lines from connection to logsender 7777
try:
  json_lines = spark.readStream.format("socket") \
      .option("host", "localhost") \
      .option("port", 8888) \
      .load()

  # Parse the JSON using the inferred schema
  json_lines = json_lines.withColumn("json_data", from_json(col("value"), inferred_schema)) \
    .select("json_data.*")  # Expand the JSON fields into columns

  json_lines.printSchema()

  requests = json_lines \
      .withColumn("date", to_timestamp(col("timestamp"))) \
      .withWatermark("date", "0 seconds") \
      .groupBy(window("date", "10 seconds")).count() \
      .where("count > 100") \
      .withColumnRenamed("count", "#requests") \
      .orderBy('window', ascending = False).limit(1) \

  query = requests \
    .writeStream \
    .outputMode("complete") \
    .trigger(processingTime='3 seconds') \
    .foreachBatch(lambda df, epoch: df.show(10, False)) \
    .start()

  query.awaitTermination(120)
except Exception as err:
  print(err)

In [None]:
#@title Q3

from pyspark.sql import *
from pyspark.sql.functions import *

spark = SparkSession \
    .builder \
    .appName("StructuredWebLogExample") \
    .config('spark.sql.streaming.statefulOperator.checkCorrectness.enabled','False')\
    .getOrCreate()


# Extract a sample JSON string to infer schema
sample_json = '{"timestamp": "2024-11-13T10:50:59.936+0000", "ip": "37.139.9.11", "code": 404, "cmd": "GET", "url": "/codemove/TTCENCUFMH3C", "time": 0.026}'
inferred_schema = schema_of_json(sample_json)

# Create DataFrame representing the stream of input
# lines from connection to logsender 7777
try:
  json_lines = spark.readStream.format("socket") \
      .option("host", "localhost") \
      .option("port", 8888) \
      .load()

  # Parse the JSON using the inferred schema
  json_lines = json_lines.withColumn("json_data", from_json(col("value"), inferred_schema)) \
    .select("json_data.*")  # Expand the JSON fields into columns

  json_lines.printSchema()

  requests = json_lines \
      .withColumn("date", to_timestamp(col("timestamp"))) \
      .withWatermark("date", "0 seconds") \

  requests = (requests \
      .groupBy(window("date", "10 seconds"), "ip").count()
      .groupBy('window').agg(sum('count').alias('#requests'), max('count').alias('max_requests_by_ip'))
      .where("max_requests_by_ip > 100")
      .orderBy('window', ascending = False).limit(1)
  )

  query = requests \
    .writeStream \
    .outputMode("complete") \
    .trigger(processingTime='3 seconds') \
    .foreachBatch(lambda df, epoch: df.show(10, False)) \
    .start()

  query.awaitTermination(300)
except Exception as err:
  print(err)

In [None]:
#@title Q4

def processResult(df, epoch):

    df = df.groupBy('window') \
          .pivot('ip_version', ['v4', 'v6']).agg(sum("count").cast('double')) \
          .fillna(0) \
          .orderBy('window', ascending = False).limit(1)

    df = df.selectExpr('window', 'v4', 'v4 / (v4+v6) as v4pc', 'v6', 'v6 / (v4+v6) as v6pc')
    df.show(1000, truncate=False)


from pyspark.sql import *
from pyspark.sql.functions import *

spark = SparkSession \
    .builder \
    .appName("StructuredWebLogExample") \
    .getOrCreate()


# Extract a sample JSON string to infer schema
sample_json = '{"timestamp": "2024-11-13T10:50:59.936+0000", "ip": "37.139.9.11", "code": 404, "cmd": "GET", "url": "/codemove/TTCENCUFMH3C", "time": 0.026}'
inferred_schema = schema_of_json(sample_json)

# Create DataFrame representing the stream of input
# lines from connection to logsender 7777
try:
  json_lines = spark.readStream.format("socket") \
      .option("host", "localhost") \
      .option("port", 8888) \
      .load()

  # Parse the JSON using the inferred schema
  json_lines = json_lines.withColumn("json_data", from_json(col("value"), inferred_schema)) \
    .select("json_data.*")  # Expand the JSON fields into columns

  json_lines.printSchema()

  requests = json_lines \
            .withColumn("ip_version",
              when(col("ip").contains('.'), "v4")
              .when(col("ip").contains(':'), "v6").otherwise("?")
  )

  requests = requests \
      .withColumn("date", to_timestamp(col("timestamp"))) \
      .withWatermark("date", "0 seconds") \
      .groupBy(window("date", "20 seconds"), 'ip_version').count() \

  query = requests \
    .writeStream \
    .outputMode("complete") \
    .trigger(processingTime='3 seconds') \
    .foreachBatch( processResult ) \
    .start()

  query.awaitTermination(360)
except Exception as err:
  print(err)