# ***Processamento de Streams trabalho 1***

---



---

## Autores

> Ruben Belo 55967


> Andre Matos 55358





---



---



# Setup

In [None]:
#@title Install PySpark
!pip install pyspark findspark --quiet
import findspark
findspark.init()
findspark.find()

'/usr/local/lib/python3.10/dist-packages/pyspark'

In [None]:
#@title Install & Launch Kafka
%%bash
KAFKA_VERSION=3.4.0
KAFKA=kafka_2.12-$KAFKA_VERSION
wget -q -O /tmp/$KAFKA.tgz https://dlcdn.apache.org/kafka/$KAFKA_VERSION/$KAFKA.tgz
tar xfz /tmp/$KAFKA.tgz
wget -q -O $KAFKA/config/server1.properties - https://github.com/smduarte/ps2023/raw/main/colab/server1.properties

UUID=`$KAFKA/bin/kafka-storage.sh random-uuid`
$KAFKA/bin/kafka-storage.sh format -t $UUID -c $KAFKA/config/server1.properties
$KAFKA/bin/kafka-server-start.sh -daemon $KAFKA/config/server1.properties

Log directory /tmp/kraft-combined-logs is already formatted. Use --ignore-formatted to ignore this directory and format the others.


In [None]:
#@title Start Kafka Publisher
%%bash
pip install kafka-python dataclasses --quiet
wget -q -O - https://github.com/smduarte/ps2023/raw/main/colab/kafka-tp1-logsender.tgz | tar xfz - 2> /dev/null
wget -q -O sensors-sorted.csv https://github.com/smduarte/ps2023/raw/main/tp1/sensors-sorted.csv

nohup python kafka-tp1-logsender/publisher.py --filename sensors-sorted.csv --topic air_quality  --speedup 60 2> kafka-publisher-error.log > kafka-publisher-out.log &

In [None]:
#@title Start Socket-based Publisher
%%bash
pip install kafka-python dataclasses --quiet

wget -q -O - https://github.com/smduarte/ps2023/raw/main/colab/socket-tp1-logsender.tgz | tar xfz - 2> /dev/null
wget -q -O sensors-sorted.csv https://github.com/smduarte/ps2023/raw/main/tp1/sensors-sorted.csv

nohup python socket-tp1-logsender/publisher.py --filename sensors-sorted.csv --speedup 60 2> socket-publisher-error.log > socket-publisher-out.log &

# 1- Compute the cummulative average for P1, updated on a hourly basis.

## Table output

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

def dumpBatchDF(df, epoch_id):
    df = df.orderBy('sensor_id', ascending=True)
    df.show(20, False)

spark = SparkSession \
    .builder \
    .appName('Kafka Spark Structured Streaming Example') \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2') \
    .getOrCreate()

lines = spark \
  .readStream \
  .format('kafka') \
  .option('kafka.bootstrap.servers', 'localhost:9092') \
  .option('subscribe', 'air_quality') \
  .option('startingOffsets', 'earliest') \
  .load() \
  .selectExpr('CAST(value AS STRING)')


schema = StructType([StructField('timestamp', TimestampType(), True),
                     StructField('sensor_id', StringType(), True),
                     StructField('sensor_type', StringType(), True),
                     StructField('location', StringType(), True),
                     StructField('latitude', FloatType(), True),
                     StructField('longitude', FloatType(), True),
                     StructField('p1', FloatType(), True)])

results = lines.select( from_json(col('value'), schema).alias('data')).select('data.*')

results = results \
          .groupBy(  results.sensor_id) \
          .agg(avg('p1').alias('avg p1'))

query = results \
    .writeStream \
    .outputMode('update') \
    .foreachBatch(dumpBatchDF) \
    .start()

query.awaitTermination(600)
query.stop()
spark.stop()

## Graphical output

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import matplotlib.pyplot as plt

def dumpBatchDF(df, epoch_id):
    df = df.orderBy('sensor_id', ascending=True)
    df.show(10, False)
    df = df.select("*").toPandas()
    df.plot(x ='sensor_id', y='avg p1', kind = 'bar')
    plt.show()



spark = SparkSession \
    .builder \
    .appName('Kafka Spark Structured Streaming Example') \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2') \
    .getOrCreate()

lines = spark \
  .readStream \
  .format('kafka') \
  .option('kafka.bootstrap.servers', 'localhost:9092') \
  .option('subscribe', 'air_quality') \
  .option('startingOffsets', 'earliest') \
  .load() \
  .selectExpr('CAST(value AS STRING)')


schema = StructType([StructField('timestamp', TimestampType(), True),
                     StructField('sensor_id', StringType(), True),
                     StructField('sensor_type', StringType(), True),
                     StructField('location', StringType(), True),
                     StructField('latitude', FloatType(), True),
                     StructField('longitude', FloatType(), True),
                     StructField('p1', FloatType(), True)])

results = lines.select( from_json(col('value'), schema).alias('data')).select('data.*')

results = results \
          .groupBy(  results.sensor_id) \
          .agg(avg('p1').alias('avg p1'))

query = results \
    .writeStream \
    .outputMode('update') \
    .foreachBatch(dumpBatchDF) \
    .start()


query.awaitTermination(600)
query.stop()
spark.stop()

# 2- Compute the minumum, average and maximum of P1 (particles smaller than 10 µm) values, for the last two hours, updated every 10 minutes.

## Structured Spark Streaming

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

def dumpBatchDF(df, epoch_id):
    df = df.orderBy('window', ascending=True)
    df.show(20, False)

spark = SparkSession \
    .builder \
    .appName('Kafka Spark Structured Streaming Example') \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2') \
    .getOrCreate()

lines = spark \
  .readStream \
  .format('kafka') \
  .option('kafka.bootstrap.servers', 'localhost:9092') \
  .option('subscribe', 'air_quality') \
  .option('startingOffsets', 'earliest') \
  .load() \
  .selectExpr('CAST(value AS STRING)')


schema = StructType([StructField('timestamp', TimestampType(), True),
                     StructField('sensor_id', StringType(), True),
                     StructField('sensor_type', StringType(), True),
                     StructField('location', StringType(), True),
                     StructField('latitude', FloatType(), True),
                     StructField('longitude', FloatType(), True),
                     StructField('p1', FloatType(), True)])

results = lines.select( from_json(col('value'), schema).alias('data')).select('data.*')

results = results \
          .groupBy(  results.sensor_id, window( results.timestamp, '2 hours', '10 minutes')) \
          .agg(min('p1').alias('min p1'), avg('p1').alias('avg p1'), max('p1').alias('max p1'))

query = results \
    .writeStream \
    .outputMode('update') \
    .foreachBatch(dumpBatchDF) \
    .start()

query.awaitTermination(600)
query.stop()
spark.stop()

## Unstructured Spark Streaming

In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

spark = SparkSession \
    .builder \
    .appName('Spark UnStructured Streaming Example') \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2') \
    .getOrCreate()

try:
  ssc = StreamingContext(spark.sparkContext, 1)
  lines = ssc.socketTextStream('localhost', 7777)

# esta window é meramente ilustrativa pois ao fazer de 2h de 10 em 10 min apareciam poucos resultados mas a window seria window(2, 1/6)

  lines=lines.window(5, 1) \
        .filter(lambda line: len(line)>0) \
        .map(lambda line: line.split(" ")) \
        .map(lambda t: (t[1], (float(t[6]),float(t[6]),float(t[6]), 1))) \
        .reduceByKey(lambda a, b: ( max(a[0], b[0]), min(a[1], b[1]), a[2]+b[2], a[3]+b[3])) \
        .map(lambda v: (v[0], ( v[1][1], v[1][2]/v[1][3], v[1][0])))

  lines.pprint()
  ssc.start()
  ssc.awaitTermination(6000)
except Exception as err:
  print(err)
ssc.stop()
spark.stop()


# 3- Compute the (signed) deviation of P1 between the cummulative average and the two hour average (obtained in the previous step), updated hourly.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

def dumpBatchDF(df, epoch_id):
    df1 = df.groupBy('sensor_id') \
             .agg(sum('sum_p1').alias('sum_p1'), sum('count').alias('count')) \
             .selectExpr('sensor_id', 'sum_p1/count as acc_avg') \

    df = df.join(df1, 'sensor_id', 'inner') \
            .selectExpr('sensor_id', 'window' ,'sum_p1/count - acc_avg as deviation')
    df = df.orderBy(['window', 'sensor_id'], ascending=False)
    df.show(20, False)

spark = SparkSession \
    .builder \
    .appName('Kafka Spark Structured Streaming Example') \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2') \
    .getOrCreate()

lines = spark \
  .readStream \
  .format('kafka') \
  .option('kafka.bootstrap.servers', 'localhost:9092') \
  .option('subscribe', 'air_quality') \
  .option('startingOffsets', 'earliest') \
  .load() \
  .selectExpr('CAST(value AS STRING)')


schema = StructType([StructField('timestamp', TimestampType(), True),
                     StructField('sensor_id', StringType(), True),
                     StructField('sensor_type', StringType(), True),
                     StructField('location', StringType(), True),
                     StructField('latitude', FloatType(), True),
                     StructField('longitude', FloatType(), True),
                     StructField('p1', FloatType(), True)])

lines = lines.select( from_json(col('value'), schema).alias('data')).select('data.*')

query2 = lines \
          .groupBy(  lines.sensor_id, window( lines.timestamp, '2 hour', '1 hour')) \
          .agg(sum('p1').alias('sum_p1'), count('p1').alias('count'))

query = query2 \
    .writeStream \
    .outputMode('complete') \
    .foreachBatch(dumpBatchDF) \
    .start()

query.awaitTermination(600)
query.stop()
spark.stop()