# Definition

## Import libraries and dependencies

In [1]:
import findspark
findspark.init()

# Standard library imports
from time import sleep
import threading

# Third-party imports
import numpy as np
import cv2
from ultralytics import YOLO

# PySpark imports
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql.functions import col, udf
from pyspark.sql.streaming import DataStreamReader
from pyspark.sql.types import BinaryType

In [2]:
scala_version = '2.12'
spark_version = '3.5.1'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.7.0'
]
spark = SparkSession \
.builder \
.appName("BigSale") \
.master("local") \
.config("spark.executor.memory", "8g") \
.config("spark.driver.memory", "8g") \
.config("spark.python.worker.reuse", "true") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.config("spark.sql.execution.arrow.maxRecordsPerBatch", "16") \
.config("spark.jars.packages", ",".join(packages)) \
.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

conf=SparkConf()

spark

24/07/05 16:41:33 WARN Utils: Your hostname, cthi-Inspiron-5515 resolves to a loopback address: 127.0.1.1; using 192.168.1.37 instead (on interface wlp2s0)
24/07/05 16:41:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/cthi/.ivy2/cache
The jars for the packages stored in: /home/cthi/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.kafka#kafka-clients added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f95a77ab-8a69-4a66-bff8-c45d3880eca4;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
	found org.apache.kafka#kafka-clients;3.7.0 in central
	found com.github.luben#zstd-jni;1.5.5-6 in central
	f

## Load a model

In [3]:
yolo = YOLO("../params/pt_yolov5n.pt")
# yolo = YOLO("../params/pt_yolov8s.pt") 
# yolo = YOLO("../params/pt_yolov8n.pt")  

PRO TIP 💡 Replace 'model=../params/pt_yolov5n.pt' with new 'model=../params/pt_yolov5nu.pt'.
YOLOv5 'u' models are trained with https://github.com/ultralytics/ultralytics and feature improved performance vs standard YOLOv5 models trained with https://github.com/ultralytics/yolov5.



## Broadcast model

In [4]:
# Broadcast model
sc = SparkContext.getOrCreate()
broadcast_model = sc.broadcast(yolo)

# Production Process

## Define Spark's User-Defined Function (UDF) and get broadcasted model

In [5]:
model = broadcast_model.value

def load_and_preprocess_frames(frame_bytes):
    frame = np.frombuffer(frame_bytes, dtype=np.uint8)
    frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
    return frame

def predict(frame_bytes):
    image = load_and_preprocess_frames(frame_bytes)
    prediction = model.predict(image)
    ret, buffer = cv2.imencode('.jpg', prediction[0].plot())
    print(buffer)
    return buffer.tobytes()

predict_udf = udf(predict, BinaryType())

## Spark Structured Streaming

In [6]:
kafka_server = 'localhost:9092'

In [7]:
def queryWriter(topic_in, topic_out, checkpointPath):
    streamRawDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafka_server).option("subscribe", topic_in).option("startingOffsets","latest").load()

    streamRawDF = streamRawDF.withColumn('value1', col('value'))
    streamRawDF = streamRawDF.drop('value')
    streamRawDF = streamRawDF.withColumn('value', predict_udf('value1'))

    query = streamRawDF.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_server) \
    .option('topic', topic_out) \
    .option("checkpointLocation", f'/home/cthi/UIT/IE212/checkpoint/' + checkpointPath) \
    .trigger(processingTime="1 seconds")

    return query


## Threading

In [None]:
def threadQuery(topic_in, topic_out):
    streamRawDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafka_server).option("subscribe", topic_in).option("startingOffsets","latest").load()

    streamRawDF = streamRawDF.withColumn('value1', col('value'))
    streamRawDF = streamRawDF.drop('value')
    streamRawDF = streamRawDF.withColumn('value', predict_udf('value1'))

    query = streamRawDF.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_server) \
    .option('topic', topic_out) \
    .option("checkpointLocation", '/home/cthi/UIT/IE212/checkpoint/uav1') \
    .trigger(processingTime="1 seconds") \
    .start()

## Start Streaming Query

In [8]:
topic_in_1 = 'drone_1'
topic_in_2 = 'drone_2'

topic_out_1 = 'UAV_1'
topic_out_2 = 'UAV_2'

checkpoint_1 = 'uav1'
checkpoint_2 = 'uav2'

query_1 = queryWriter(topic_in_1, topic_out_1, checkpoint_1)
query_2 = queryWriter(topic_in_2, topic_out_2, checkpoint_2)
# query_1 = threading.Thread(target=threadQuery, args=(topic_in_1, topic_out_1))
# query_2 = threading.Thread(target=threadQuery, args=(topic_in_2, topic_out_2))

In [None]:
query_2.start()

In [9]:
query_1.start() 
query_2.start()
spark.streams.awaitAnyTermination()

[Stage 2:>                                                          (0 + 1) / 4]
0: 544x1024 13 cars, 1 truck, 1 bus, 279.1ms
Speed: 7.9ms preprocess, 279.1ms inference, 1.1ms postprocess per image at shape (1, 3, 544, 1024)
[255 216 255 ... 159 255 217]
0: 544x1024 13 cars, 1 truck, 1 bus, 244.1ms
Speed: 5.4ms preprocess, 244.1ms inference, 1.0ms postprocess per image at shape (1, 3, 544, 1024)
[255 216 255 ... 127 255 217]
0: 544x1024 13 cars, 1 truck, 1 bus, 257.1ms
Speed: 6.8ms preprocess, 257.1ms inference, 0.6ms postprocess per image at shape (1, 3, 544, 1024)
[255 216 255 ... 179 255 217]
0: 544x1024 13 cars, 1 truck, 1 bus, 226.4ms
Speed: 5.7ms preprocess, 226.4ms inference, 0.7ms postprocess per image at shape (1, 3, 544, 1024)
[255 216 255 ... 120 255 217]
[Stage 3:>                                                          (0 + 1) / 5]
0: 544x1024 13 cars, 1 truck, 1 bus, 243.3ms
Speed: 7.1ms preprocess, 243.3ms inference, 1.2ms postprocess per image at shape (1, 3, 544, 1024

KeyboardInterrupt: 

In [None]:
query_1.stop()
query_2.stop()