In [1]:
import findspark
findspark.init()
import os
import cv2
import face_recognition
import numpy as np
from pyspark.sql import SparkSession
from PIL import Image
from io import BytesIO
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, ByteType, BinaryType, StringType, IntegerType, DataType
from pyspark import SparkContext
from pyspark.ml.linalg import Vectors, VectorUDT

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = \
'--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1  pyspark-shell'

In [3]:
SparkContext.setSystemProperty('spark.executor.memory', '4g')
SparkContext.setSystemProperty('spark.driver.memory', '2g')
spark = SparkSession \
    .builder \
    .appName("Streaming from Kafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
    .config("spark.memory.offHeap.enabled","true")\
    .config("spark.memory.offHeap.size","8g")\
    .config("spark.sql.shuffle.partitions", 5) \
    .config("spark.network.timeout","360000ms")\
    .config("spark.executor.heartbeatInterval","300000ms")\
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
    .master("local[*]") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

your 131072x1 screen size is bogus. expect trouble
24/05/19 21:36:18 WARN Utils: Your hostname, LAPTOP-390RNSVU resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/05/19 21:36:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/ntp2003/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ntp2003/.ivy2/cache
The jars for the packages stored in: /home/ntp2003/.ivy2/jars
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-940a8a94-86b9-4af7-97b0-08df0470e677;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.spark#spark-sql-kafka-0

In [4]:
streaming_df = spark.readStream\
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test") \
    .option("startingOffsets", "earliest") \
    .option("maxOffsetsPerTrigger", 10)\
    .option("failOnDataLoss","false")\
    .load()

In [5]:
def bin2image(bin):
    stream = BytesIO(bin)
    image = np.array(Image.open(stream).convert('RGB'))
    return np.reshape(image, (480,600,3)).tolist()

udf_bin2image = udf(bin2image, ArrayType(ArrayType(ArrayType(ByteType()))))

In [6]:
identify_df0 = spark.read.format("image").option("dropInvalid", True)\
  .load('./facelabels', inferschema=True)

In [7]:
identify_df0.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)



In [8]:
import os

def get_file_name(file_path):
    return os.path.basename(file_path).split('.')[0]

def bin_to_128d(img):
    _img = cv2.cvtColor(np.reshape(np.asarray(img), (112,112,3)), cv2.COLOR_BGR2RGB) 
    top, right, bottom, left = face_recognition.face_locations(_img,0)[0]
    arr = np.asarray(_img)
    return Vectors.dense(
            face_recognition.face_encodings(
                np.asarray(Image.fromarray(arr[top:bottom, left:right], mode='RGB'))
            )[0].tolist()
        )

udf_get_file_name = udf(get_file_name, StringType())
udf_bin_to_128d = udf(bin_to_128d, VectorUDT())
identify_df1 = identify_df0.withColumn('label',udf_get_file_name(identify_df0["image.origin"])).select("image.data","label")

In [9]:
identify_df2 = identify_df1.withColumn('data', udf_bin_to_128d('data'))

In [10]:
identify_df2.show()

                                                                                

+--------------------+-----+
|                data|label|
+--------------------+-----+
|[-0.1083134263753...|  TQB|
|[-0.0703315660357...|  NTP|
+--------------------+-----+



In [11]:
streaming_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [12]:
identify_data = identify_df2.collect()
spark._sc.broadcast(identify_data)
    
def trans_detect(frame):
    _frame = np.asarray(frame, dtype=np.uint8)
    face_locations = face_recognition.face_locations(_frame, number_of_times_to_upsample=0)
    
    if len(face_locations) > 0:
        face_encodings = face_recognition.face_encodings(_frame, face_locations,0)
        result = list(map(lambda x: face_recognition.face_distance(face_encodings, x[0]), identify_data))
        for i, face_location in enumerate(face_locations):
            top, right, bottom, left = face_location
            
            flg = True
            for j in range(0,len(identify_data)):
                if result[j][i] < 0.4:
                    cv2.rectangle(_frame, (left, top), (right, bottom), (0, 255, 0), 2)
                    cv2.rectangle(_frame, (left, top), (right, top +30), (0, 255, 0), 2)
                    cv2.putText(_frame, f'{identify_data[j][1]} ({result[j][i]:.2})', (left+15, top +20), 
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
                    flg = False
                    break
            if flg:
                cv2.rectangle(_frame, (left, top), (right, bottom), (255, 0, 0), 2)
                cv2.rectangle(_frame, (left, top), (right, top +30), (255, 0, 0), 2)
                cv2.putText(_frame, 'Unknown',(left+15, top +20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1)
        

    rt, buffer = cv2.imencode('.jpg', _frame)
    return buffer.tobytes()

udf_trans_detect = udf(trans_detect, BinaryType())

                                                                                

In [13]:
def foreach_batch_function(df, epoch_id):
    print(epoch_id)
    trans0_df = df.withColumn("ImageArray", udf_bin2image(df["value"])).select("ImageArray","key")
    trans1_df = trans0_df.withColumn("value", udf_trans_detect(trans0_df["ImageArray"]))
    trans2_df = trans1_df.select("key","value")
    trans2_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "image") \
    .save()

In [15]:
streaming_df.writeStream.outputMode("append").trigger(availableNow = True)\
    .foreachBatch(foreach_batch_function).start().awaitTermination()

0


                                                                                

1


                                                                                

2


                                                                                

3


                                                                                

4


                                                                                

5


                                                                                

6


                                                                                

7


                                                                                

8


                                                                                

9


                                                                                

10


                                                                                

11


                                                                                

12


                                                                                

13


                                                                                

14


                                                                                

15


                                                                                

16


                                                                                

17


                                                                                

18


                                                                                

19


                                                                                

20


                                                                                

21


                                                                                

22


                                                                                

23


                                                                                

24


                                                                                

25


                                                                                

26


                                                                                

27


                                                                                

28


                                                                                