# 603: Big Data
# Group Project: Search for Persons of Interest (SPI)
## Professor: Akshata Kishore Moharir
## Student: Edwin Brown, Lidia Hutcherson, Levan Sulimanov

----------------------

In [None]:
# videos: https://gist.github.com/jsturgis/3b19447b304616f18657

# Main imports:
import os

from bson.json_util import dumps
from kafka import KafkaProducer
from kafka import KafkaConsumer
from pymongo.server_api import ServerApi
from pymongo import MongoClient
import matplotlib as plt
plt.interactive(True)
import json
from json import JSONEncoder
from threading import Thread
import cv2, time
# import pp
#import gridfs
import numpy as np
import hashlib
import imutils
import base64
from datetime import date
#from simple_facerec import SimpleFacerec
from PIL import Image
import folium
from IPython.display import display
from ipyleaflet import (
    Map, Marker, MarkerCluster,
    TileLayer, ImageOverlay,
    Polyline, Polygon, Rectangle, Circle, CircleMarker,
    Popup,
    GeoJSON,
    DrawControl,
    basemaps
)
# for labeling marker as image:
from ipywidgets import HTML
import IPython
from ipywidgets import widgets
from sidecar import Sidecar
import traceback



def mkdir_if_none(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)
        
def get_marker_widget(img_path, name):
    
    try:
    
        url = img_path.replace("\\", "/")
        image = IPython.display.Image(url, width = 300)

        widget = widgets.Image(
            value=image.data,
            format='jpg', 
            width=300,
            height=400,
        )

        return widget
    except:
        print(traceback.format_exc())
        safe_message = HTML()
        safe_message.value = name
        return safe_message


# the following module offers Kakfa Producer, Consumer, and mongodb objects.
# Through here, you can initialize the class and just call modules you need only.
class SPI_Utils:
    
    def __init__(self, mode,
                       streaming_kafka_topic="spi_topic",
                       kafka_bootstrap_servers=["localhost:9092"],
                       json_requests_file_path=os.path.join(os.getcwd(), "json_requests.json"),
                       data_store_dir = os.path.join(os.getcwd(), "frame_data"),
                       face_store_dir = os.path.join(os.getcwd(), "faces")
                ):
        # setup main directory where we will store the images arriving from online/offline stream
        self.data_store_dir = data_store_dir
        mkdir_if_none(self.data_store_dir)
        self.face_store_dir = face_store_dir
        # setup default width for all images to be cropped down to
        self.width = 640
        
        # setup parallel processing server for processing videos simultaneously:
        # self.job_server = pp.Server()  # for parallel processing videos to MongoDB
        self.job_list = []
        self.results = []
        
        # initialize kafka topic to write and read from:
        self.streaming_kafka_topic = streaming_kafka_topic
        
        # initializing Kafka Producer:
        self.producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers)
        
        # initializing Kafka Consumer:
        self.consumer = KafkaConsumer(self.streaming_kafka_topic, bootstrap_servers=kafka_bootstrap_servers)
        
        # initializing MongoDB client and its databases (db and images):      
        self.client = MongoClient('mongodb+srv://')
        self.db = self.client.testdb
        self.data_collection = self.db.images
        #self.fs = gridfs.GridFS(self.db)
       
        
        # get list of videos requested from agency:
        self.json_requests_dict = self.read_json_data_into_dict(json_requests_file_path)
        if mode != "consumer":
            if self.json_requests_dict is None:
                print("<<<ERRROR: No video requests tag was found is provided json.>>>")
            else:
                print("Requested video sources are:")
                for v in self.json_requests_dict:
                    print(f"  - {self.json_requests_dict[v]['video_path']}")
                print("\n")
        
        # if we are initiating this project for recogniton, then load up face embeddings and map:
        if mode == "recognition":
            # Encode faces from a folder
            self.sfr = SimpleFacerec()
            self.sfr.load_encoding_images(os.path.join(os.getcwd(), "faces"))
            
            # LDN_COORDINATES = (51.5074, 0.1278)
            us_center = [38.6252978589571, -97.3458993652344]
            zoom = 0
            self.spi_map = Map(center=us_center, zoom=zoom)            
            s = Sidecar(title='SPI Map')
            # show the map:
            display(self.spi_map)

        
    # load request from json file (this data serves as user request form for giving camera feeds):
    def read_json_data_into_dict(self, json_file_path):
        with open(json_file_path) as json_data:
            data = json.load(json_data)
        if "video_requests" in data:
            return data["video_requests"]
        else:
            return None
        
    # encode message into bytes for sending through kafka
    @staticmethod
    def encode_message(message):
        return json.dumps(message).encode('utf-8')
    
    # decode message and load into json format:
    @staticmethod
    def decode_message(message):
        message_string = message.decode('utf-8')       
        message_json = json.loads(message_string)
        return message_json
        
    # send kafka message through producer.
    def produce_kafka_messages(self):
        for vid_request in self.json_requests_dict:
            kafka_message = {
                             "video_path": self.json_requests_dict[vid_request]["video_path"],
                             "video_tag": vid_request,
                             "camera_id": self.json_requests_dict[vid_request]["camera_id"],
                             "camera_latitude": self.json_requests_dict[vid_request]["camera_latitude"],
                             "camera_longitude" : self.json_requests_dict[vid_request]["camera_longitude"]
                            }
            self.producer.send(self.streaming_kafka_topic, self.encode_message(kafka_message))
                               
    # get single message from kafka consumer:
    def consume_kafka_messages(self):
        for message in self.consumer:
            decoded_message = self.decode_message(message.value)
            yield decoded_message
    
    # send data to mongodb:
    def push_data_to_mongodb(self, data, data_collection):
        data_collection.insert_many([data])
#    # def push_data_to_mongodb(self, frame, cam_id, cam_latitude, cam_longitude):
#         # convert ndarray to string
#         imageString = frame.tobytes()

#         # store the frame
#         image_encoded = self.fs.put(imageString, encoding='utf-8')

#         # create our frame meta data
#         meta = {
#             'name': 'spi_frame',
#             'frame': 
#             {
#                     'image_encoded': image_encoded,
#                     'shape': frame.shape,
#                     'dtype': str(frame.dtype),
#                     'cam_id': cam_id,
#                     'camera_latitude': cam_latitude,
#                     'camera_longitude': cam_longitude
#             }
#         }

#         # insert the meta data
#         self.data_collection.insert_one(meta)
#     #print(client.changestream.collection.insert_one({"hello": "world"}).inserted_id)
#     # get data from mongodb:
    def get_data_from_mongodb(self):
        image = self.data_collection.find_one({'name': 'spi_frame'})['frame']

        # get the image from gridfs
        gOut = self.fs.get(image['image_encoded'])

        # convert bytes to ndarray
        img = np.frombuffer(gOut.read(), dtype=np.uint8)

        # reshape to match the image size
        frame = np.reshape(img, image['shape'])
        
        return frame
            
        
        
    # FOR MONGO: submit requested video to be processed and recognized by camera thread:
    def run_video_submission_job(self, message):
        src = message["video_path"]
        cam_id = message["camera_id"]
        cam_latitude = message["camera_latitude"]
        cam_longitude = message["camera_longitude"]
        threaded_camera = ThreadedCamera(self.data_collection, self.data_store_dir,self.width, cam_id, cam_latitude, cam_longitude, src)
        print(f"Started processing: {src}")
        
            
            
    def mongodb_filler(self):
        kafka_consumer_w_messg_decoder = self.consume_kafka_messages()
        for decoded_message in kafka_consumer_w_messg_decoder:
            print("decoded_message:", decoded_message)
            self.run_video_submission_job(decoded_message)
            
        '''
        for decoded_message in kafka_consumer_w_messg_decoder:
            print("decoded_message:", decoded_message)
            
            # shift it to parallel job:
            self.job_list.append(self.job_server.submit(self.run_video_submission_job(decoded_message), (1,), modules=('pptest',)))
            print(f"Job sumbitted for video source: {decoded_message['video_path']}")
        for job in self.job_list:
            self.results.append(job())
        for result in self.results:
            print(f"Result from parallel job: {result}")
        '''
        
    # Change streaming approach (for processing only new items):
    def perform_spark_streaming_and_processing(self, patience=50):
        change_stream = self.data_collection.watch()
        #change_stream = client.testdb.images.watch()
        
        processed_data = []
        frame_counter = 0
        
        # starting patience level: 
        waited_for = 0
        
#         while True:
            
#             time.sleep(5)
            
#             ongoing_files = os.listdir(self.data_store_dir)
#             new_files = [i for i in ongoing_files if i not in processed_data]
#             print(f"New files #: {len(new_files)}")
            
#             # setup auto-shutdown:
#             if new_files == []:
#                 waited_for+=1
#             else:
#                 waited_for = 0
#             if waited_for >= patience:
#                 print("Waiting period for new video source has been reached. Exiting SPI Recognition Module.")
                
#                 break
        
#             # start processing each image:
#             for f in new_files:
        for change in change_stream:
            data = dumps(change)
            data = json.loads(data)
            #frame = np.array(data["fullDocument"]["frame"])
            frame = data["fullDocument"]["frame"]
            try:
#                     with open(os.path.join(self.data_store_dir, f)) as json_file:
#                         json_data = json.load(json_file)
#                     # print(np.array(json_data["frame"]).shape)
#                     frame = Image.fromarray(np.array(json_data["frame"]).astype(np.uint8))
                # print(np.array(frame).shape)
                cam_id = data["fullDocument"]["cam_id"]
                cam_latitude = data["fullDocument"]["cam_latitude"]
                cam_longitude = data["fullDocument"]["cam_longitude"]

                # Detect Faces
                #my_image = np.array(my_image, dtype='uint8')
                #print(frame)
                face_locations, face_names = self.sfr.detect_known_faces(np.asarray(frame,dtype='uint8'))
                for face_loc, name in zip(face_locations, face_names):
                    #y1, x2, y2, x1 = face_loc[0], face_loc[1], face_loc[2], face_loc[3]
                    #cv2.putText(frame, name,(x1, y1 - 10), cv2.FONT_HERSHEY_DUPLEX, 1, (0, 0, 200), 2)
                    #cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 200), 4)
                    if name != "Unknown":
                        print(f"Face detected at cam_id = {cam_id}, frame_num = {frame_counter}: {name} at Lat/Long = {cam_latitude}/{cam_longitude}")
                        frame_counter+=1

                        # mark:
                        #poi_message.value = f"{name}"
                        current_poi_img_path = os.path.join(self.face_store_dir, f"{name}.jpg")
                        current_poi_img_marker = get_marker_widget(current_poi_img_path, name)
                        mark = Marker(location=[cam_latitude, cam_longitude], title=f"{name}\n{cam_latitude}, {cam_longitude}", draggable=False)
                        mark.popup = current_poi_img_marker
                        self.spi_map+=mark
                        mark.interact(opacity=(0.0, 1.0, 0.01))

                        #plt.figure()
                        #plt.title(f"<<<{name}>>> Detected at {cam_latitude}/{cam_longitude} lat/long.")
                        #plt.imshow(frame[y1:y2, x1:x2])

                        # display(self.spi_map)
               # processed_data.append(f)
            except:
                #print(f"<<<ERROR with file: {f}>>>")
                print(traceback.format_exc())
                print("\n")
                
                
            
        
        
# encode frame when saving into json file:
class NumpyArrayEncoder(JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        return JSONEncoder.default(self, obj)
    

class BytesEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, bytes):
            return obj.decode('utf-8')
        return json.JSONEncoder.default(self, obj)    

        
# Camera streaming class => saves arriving images into either (a) Mongo or (b) into local files as json (requiring auto cleaning every once in a while).ThreadedCamera(object, SPI_Utils)
class ThreadedCamera(SPI_Utils):
    def __init__(self, data_collection, data_store_dir, width, cam_id, cam_latitude, cam_longitude, src=0, fps=30):
        self.capture = cv2.VideoCapture(src)
        
        self.data_store_dir = data_store_dir
        self.base_name = hashlib.sha256(src.encode('utf-8')).hexdigest()
        self.frame_count = 0
        self.fps = fps
        self.data_collection = data_collection
        self.width = width
        self.cam_id = cam_id
        self.cam_latitude = cam_latitude
        self.cam_longitude = cam_longitude
        
        # Start frame retrieval thread
        self.thread = Thread(target=self.update, args=())
        self.thread.daemon = True
        self.thread.start()
        
        
        
        
    def update(self):
        while True:
            if self.capture.isOpened():
                self.status, self.frame = self.capture.read()
            
            if self.status:
                #if ((self.frame_count) % int(self.fps/3)) != 0:
                #    self.frame_count+=1
                #    continue
                self.frame = imutils.resize(self.frame, width=self.width)
                self.frame_count+=1
                
                # save_frame_as = os.path.join(self.data_store_dir, f"{self.base_name}_{self.frame_count}.csv")
                # #numpy array from image
                # self.frame_reshaped = self.frame.reshape(self.frame.shape[0], -1) # instead of looping and slicing through channels shape = (50, 300)
                # np.savetxt(save_frame_as, self.frame_reshaped, delimiter=',') # save it as numpy array in csv file
                
                # for encoding:
                save_frame_as = os.path.join(self.data_store_dir, f"{self.base_name}_{self.frame_count}.json")
                
                # json file from image
                data = {
                        'frame' : self.frame.tolist(),
                        'shape' : str(self.frame.shape),
                        'dtype' : str(self.frame.dtype),
                        'processed': str("False"),
                        'timestamp': str(date.today()),
                        'cam_id' : self.cam_id,
                        'cam_latitude' : self.cam_latitude,
                        'cam_longitude' : self.cam_longitude
                }
                if (self.frame_count >= 180) and (self.frame_count <= 210):
                    print(self.push_data_to_mongodb(data,self.data_collection))


        
    '''
    while True:
        try:
            curr_frame = threaded_camera.fetch_frame()

            self.push_data_to_mongodb(curr_frame, cam_id, cam_latitude, cam_longitude) ##############################################################################

            got_frame = self.get_data_from_mongodb()
            got_frame = imutils.resize(got_frame, width=self.width)
            frame+=1
            print("got_frame.dtype:", got_frame.dtype)
            print("got_frame.shape:", got_frame.shape)
            save_frame_as = os.path.join(self.data_store_dir, f"{base_name}_{frame}.csv")
            #numpy array from image
            got_frame_reshaped = got_frame.reshape(got_frame.shape[0], -1) # instead of looping and slicing through channels shape = (50, 300)
            np.savetxt(save_frame_as, got_frame_reshaped, delimiter=',') # save it as numpy array in csv file
            
            if curr_frame is None:
                print(f"<<<WARNING: FRAME RETURNED NONE at frame={frame}")
                return f"Processed video source: {src}"
        except AttributeError:
            pass
    return f"Did not complete processing the video source: {src}"
    '''

In [None]:
#pip install kafka-python opencv-python pymongo imutils folium ipyleaflet sidecar face_recognition 

Python interpreter will be restarted.
Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
Collecting opencv-python
  Downloading opencv_python-4.6.0.66-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (60.9 MB)
Collecting pymongo
  Downloading pymongo-4.3.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (492 kB)
Collecting imutils
  Downloading imutils-0.5.4.tar.gz (17 kB)
Collecting folium
  Downloading folium-0.13.0-py2.py3-none-any.whl (96 kB)
Collecting ipyleaflet
  Downloading ipyleaflet-0.17.2-py3-none-any.whl (3.7 MB)
Collecting sidecar
  Downloading sidecar-0.5.2-py3-none-any.whl (63 kB)
Collecting face_recognition
  Downloading face_recognition-1.3.0-py2.py3-none-any.whl (15 kB)
Collecting dnspython<3.0.0,>=1.16.0
  Downloading dnspython-2.2.1-py3-none-any.whl (269 kB)
Collecting branca>=0.3.0
  Downloading branca-0.6.0-py3-none-any.whl (24 kB)
Collecting traittypes<3,>=0.2.1
  Downloading traittypes-0.2.1-py2.py3-none-any.w

In [None]:
#!sudo apt-get purge python-pymongo
#!sudo apt-get install python-pip
#!pip3 install pymongo



In [None]:
#import pymongo

[0;31m---------------------------------------------------------------------------[0m
[0;31mImportError[0m                               Traceback (most recent call last)
[0;32m<command-1700089173827115>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0;32mimport[0m [0mpymongo[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/python_shell/dbruntime/PythonPackageImportsInstrumentation/__init__.py[0m in [0;36mimport_patch[0;34m(name, globals, locals, fromlist, level)[0m
[1;32m    165[0m             [0;31m# Import the desired module. If you’re seeing this while debugging a failed import,[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m    166[0m             [0;31m# look at preceding stack frames for relevant error information.[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m--> 167[0;31m             [0moriginal_result[0m [0;34m=[0m [0mpython_builtin_import[0m[0;34m([0m[0mname[0m[0;34m,[0m [0mglobals[0m[0;34m,[0m [0mlocals[0m[0;34m,[0m [0m

In [None]:
#!/usr/bin/python3.8 -m pip install --upgrade pip

Collecting pip
  Using cached pip-22.3.1-py3-none-any.whl (2.1 MB)
Installing collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 21.0.1
    Uninstalling pip-21.0.1:
      Successfully uninstalled pip-21.0.1
Successfully installed pip-22.3.1


In [None]:
#!/local_disk0/.ephemeral_nfs/envs/pythonEnv-0936aa64-3696-40cc-a4f3-00040b40a541/bin/python -m pip install --upgrade pip

Collecting pip
  Downloading pip-22.3.1-py3-none-any.whl (2.1 MB)
[?25l[K     |▏                               | 10 kB 18.9 MB/s eta 0:00:01[K     |▎                               | 20 kB 6.0 MB/s eta 0:00:01[K     |▌                               | 30 kB 8.4 MB/s eta 0:00:01[K     |▋                               | 40 kB 4.6 MB/s eta 0:00:01[K     |▉                               | 51 kB 4.8 MB/s eta 0:00:01[K     |█                               | 61 kB 5.6 MB/s eta 0:00:01[K     |█▏                              | 71 kB 5.5 MB/s eta 0:00:01[K     |█▎                              | 81 kB 6.3 MB/s eta 0:00:01[K     |█▍                              | 92 kB 6.3 MB/s eta 0:00:01[K     |█▋                              | 102 kB 5.1 MB/s eta 0:00:01[K     |█▊                              | 112 kB 5.1 MB/s eta 0:00:01[K     |██                              | 122 kB 5.1 MB/s eta 0:00:01[K     |██                              | 133 kB 5.1 MB/s eta 0:00:01[K     |█

In [None]:
#import sys
#import os
# In the command below, replace <username> with your Databricks user name.
#sys.path.append(os.path.abspath('/FileStore'))


In [None]:
#import pymongo
#import os
#from bson.json_util import dumps
#from pymongo import MongoClient
#client = MongoClient('mongodb+srv://ebrown11:K3V0n$ky!@cluster0.e3felkp.mongodb.net')}}

In [None]:
#change_stream = client.changestream.data_collection.insert_one({"hello": "world"}).inserted_id
#for change in change_stream:
 #   print(dumps(change))
  #  print('') # for readability only

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
[0;32m<command-4197782257002992>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0mchange_stream[0m [0;34m=[0m [0mclient[0m[0;34m.[0m[0mchangestream[0m[0;34m.[0m[0mdata_collection[0m[0;34m.[0m[0minsert_one[0m[0;34m([0m[0;34m{[0m[0;34m"hello"[0m[0;34m:[0m [0;34m"world"[0m[0;34m}[0m[0;34m)[0m[0;34m.[0m[0minserted_id[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0;32mfor[0m [0mchange[0m [0;32min[0m [0mchange_stream[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      3[0m     [0mprint[0m[0;34m([0m[0mdumps[0m[0;34m([0m[0mchange[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m     [0mprint[0m[0;34m([0m[0;34m''[0m[0;34m)[0m [0;31m# for readability only[0m[0;34m[0m[0;34m[0m[0m

[0;31mTypeError[0m: 'ObjectId' ob

In [None]:
#client.changestream.collection.insert_one({"hello": "trust_test"})

Out[131]: <pymongo.results.InsertOneResult at 0x7f5b87e38460>

In [None]:
#print(client.changestream.data_collection)

Collection(Database(MongoClient(host=['ac-dtfilwz-shard-00-02.e3felkp.mongodb.net:27017', 'ac-dtfilwz-shard-00-00.e3felkp.mongodb.net:27017', 'ac-dtfilwz-shard-00-01.e3felkp.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, authsource='admin', replicaset='atlas-jqs18t-shard-0', tls=True), 'changestream'), 'data_collection')


In [None]:
#% run spi_utilities

In [None]:
#%run simple_facerec

In [None]:
# class ThreadedCamera(object):
#    def __init__(self, data_store_dir, width, cam_id, cam_latitude, cam_longitude, src=0, fps=30):
#     self.capture = cv2.VideoCapture(src)
     
#     self.data_store_dir = data_store_dir
#     self.base_name = hashlib.sha256(src.encode('utf-8')).hexdigest()
#     self.frame_count = 0
#     self.fps = fps
     
#     self.width = width
#     self.cam_id = cam_id
#     self.cam_latitude = cam_latitude
#     self.cam_longitude = cam_longitude
     
#     # Start frame retrieval thread
#     self.thread = Thread(target=self.update, args=())
#     self.thread.daemon = True
#     self.thread.start()

Out[38]: 2

In [None]:
#cam = ThreadedCamera(data_store_dir="./", width=640, cam_id="03", cam_latitude=38.99, cam_longitude=95.4, src="http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ForBiggerBlazes.mp4", fps=30)

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
[0;32m<command-1948266659758361>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mcam[0m [0;34m=[0m [0mThreadedCamera[0m[0;34m([0m[0mdata_store_dir[0m[0;34m=[0m[0;34m"./"[0m[0;34m,[0m [0mwidth[0m[0;34m=[0m[0;36m640[0m[0;34m,[0m [0mcam_id[0m[0;34m=[0m[0;34m"03"[0m[0;34m,[0m [0mcam_latitude[0m[0;34m=[0m[0;36m38.99[0m[0;34m,[0m [0mcam_longitude[0m[0;34m=[0m[0;36m95.4[0m[0;34m,[0m [0msrc[0m[0;34m=[0m[0;34m"http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ForBiggerBlazes.mp4"[0m[0;34m,[0m [0mfps[0m[0;34m=[0m[0;36m30[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;31mNameError[0m: name 'ThreadedCamera' is not defined

In [None]:
#cam