# Detect faces on Kafka topic video streaming

### Unsing Spark Structured Streaming and OpenCV

![spark](https://prateekvjoshi.files.wordpress.com/2015/11/1-main1.png) ![opencv](http://www.decom.ufop.br/imobilis/wp-content/uploads/2015/03/OpenCV_Logo.png)

In [2]:
# import OpenCV and other libraries
import numpy as np
import cv2
import pickle, base64, json

In [3]:
# create function to Detect faces on Kafka payload
def detect_faces(kafka_value):
    # loads json from kafka
    msg_payload = json.loads(kafka_value.decode('utf-8'))
    # frame
    frame = pickle.loads(base64.standard_b64decode(msg_payload['frameB64'].encode('utf-8')))
    # detect faces
    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    face_detect = cv2.CascadeClassifier('/databricks/python3/lib/python3.5/site-packages/cv2/data/haarcascade_frontalface_alt.xml')
    faces = face_detect.detectMultiScale(frame_gray, scaleFactor=1.1, minNeighbors=5)
    for (x, y, w, h) in faces:
        cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
    # recreate json
    msg_payload['facesCount'] = str(len(faces))
    msg_payload['frameB64'] = base64.standard_b64encode(pickle.dumps(frame)).decode("utf-8")
    return json.dumps(msg_payload)

In [4]:
# register python function in Spark SQL
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
detect_faces_udf = udf(detect_faces, StringType())

In [5]:
# start structured streaming job
spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "wn0-kafka2.e4ix3a2tssaedkygl5ugloqjrb.cx.internal.cloudapp.net:9092") \
.option("subscribe", "camera-stream") \
.load() \
.select("key", detect_faces_udf("value").alias("value")) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "wn0-kafka2.e4ix3a2tssaedkygl5ugloqjrb.cx.internal.cloudapp.net:9092") \
.option("topic", "camera-analytics-stream") \
.option("checkpointLocation", "chkpt3") \
.start()