In [1]:
import os
import sys
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.functions import concat, when, col, pow, lit, max as max_, udf, round as round_, count as count_
from pyspark.sql.types import *
import pickle
import cv2

In [2]:
os.environ['SPARK_HOME'] = '/usr/local/spark-2.4.3-bin-hadoop2.7'

In [3]:
#org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1

In [4]:
##Configuração default para spark-shell (Testes e Debug)
os.environ['PYSPARK_SUBMIT_ARGS'] = """--packages org.apache.hadoop:hadoop-aws:2.7.1,com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.spark:spark-streaming-kafka_2.10:1.6.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 
                                            --conf spark.executor.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true 
                                            --conf spark.driver.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true 
                                            --conf spark.hadoop.fs.s3a.endpoint=s3.us-east-2.amazonaws.com
                                            --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
                                            --conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2
                                            --conf spark.speculation=false
                                            --conf spark.sql.parquet.compression.codec=snappy
                                            --conf spark.io.compression.codec=snappy
                                            --driver-memory 2g 
                                            --conf spark.sql.shuffle.partitions=10
                                            --conf spark.default.parallelism=10 
                                            --master=local[6] 
                                             pyspark-shell """

In [5]:
spark_conf = SparkConf().setAppName('Video Stream Processing')
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
sc = SparkContext.getOrCreate()
ssc = StreamingContext(sc,1)

In [8]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "distributed-video1") \
  .option("startingOffsets", "earliest") \
  .load()

In [9]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [10]:
#kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming', {'distributed-video1':1})

In [None]:
# des = df.withColumn('deserialized', deserialize_udf(col('value')))
# deserialize_udf = udf(lambda value: str(pickle.loads(value)['deviceId']), StringType())
# out = des[['deserialized']]
# query = out \
#     .writeStream \
#     .outputMode("append") \
#     .format("console") \
#     .start()

# query.awaitTermination()

In [19]:
schema = StructType([
    StructField('deviceId', StringType(), nullable=True),
    StructField('frame', BinaryType(), nullable=True)
])

def deserialize(value):
    message = pickle.loads(value)
    return message['deviceId'], message['frame']
    
    
deserialize_udf = udf(lambda value: deserialize(value), schema)

In [26]:
des = df.withColumn('deserialized', deserialize_udf(col('value')))
des = des.withColumn('deviceId', col('deserialized.deviceId')) \
         .withColumn('frame', col('deserialized.frame'))

In [29]:
out = des.groupBy("deviceId")

In [37]:
des[['deviceId','frame']].repartition('deviceId')

DataFrame[deviceId: string, frame: binary]

In [30]:
query = out \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()
query.awaitTermination()

KeyboardInterrupt: 