# Import

In [1]:
from jetcam.usb_camera import USBCamera
import ipywidgets, time, cv2, traitlets, sys, argparse
from IPython.display import display, clear_output
import traitlets
import threading
import numpy as np
import atexit
import PIL.Image

# Variables

In [2]:
display_as = "web" # ipywidgets opencv web
camera_method = ["traitlets"] # observe traitlets

DISPLAY_WIDTH = 640
DISPLAY_HEIGHT = 480
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480

fps_str_display = None

In [3]:
def in_notebook():
    result = False
    try:
        ipython_config = get_ipython().config
    #     print(ipython_config)
        if "IPKernelApp" in ipython_config:
    #         print(ipython_config["IPKernelApp"]["connection_file"].find("jupyter"))
            if ipython_config["IPKernelApp"]["connection_file"].find("jupyter") > -1:
                result = True
        return result
    except:
        pass
print("in_notebook", in_notebook())

in_notebook True


# Args

In [4]:
# print(sys.argv)
if not in_notebook():
    parser = argparse.ArgumentParser()
    parser.add_argument('--camera_method', default='observe', choices=['observe', 'traitlets'],  help='camera_method: traitlets / observe')
    parser.add_argument('--display_as', default='opencv', choices=["ipywidgets", "opencv", "web"],  help='display_as: ipywidgets opencv web')
    args = parser.parse_args()
    camera_method = [args.camera_method]
    display_as = args.display_as


# Setup Camera

In [5]:
camera = USBCamera(width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT, capture_width=CAMERA_WIDTH, 
                   capture_height=CAMERA_HEIGHT, capture_device=0)

## Read one frame

In [6]:
image = camera.read()
print("image.shape", image.shape)
print("camera.value.shape", camera.value.shape)

image.shape (480, 640, 3)
camera.value.shape (480, 640, 3)


# Utils

In [7]:
class Frames:
    def __init__(self):
        self.frames_count = 0
        self._time = time.time()
        self._old_fps = 0
    @property
    def fps(self):
        if int(time.time())%5 == 0 and self.frames_count > 140:
            result = self.frames_count/(time.time() - self._time)
#             print(self.frames_count, time.time() - self._time, result)
            self._old_fps = result
            self._time = time.time()
            self.frames_count = 0
        else:
            result = self._old_fps
        return result
    @property
    def fps_str(self):
        if self.fps > 0:
            return f"{self.fps:0.2f}"
        else: 
            return None
        
    def increment_frame(self):
        self.frames_count += 1
        if self.fps > 0:
            return f"{self.fps:0.2f}"
        else: 
            return None
        
    def reset_frames_count(self):
        self.frames_count = 0
    
    def fps_update(self):
        self.frames_count += 1
        fps_str = self.fps_str
        return self.fps_str
    

frames_camera = Frames()
frames_display = Frames()

if display_as == "ipywidgets":
    image_widget = ipywidgets.Image(format='jpeg')
    fps_widget = ipywidgets.Text("0")
    
def bgr8_to_jpeg(value, frames=frames_camera):
    fps_str = frames.increment_frame()
    if display_as == "ipywidgets":
        if fps_str is not None:
            fps_widget.value = fps_str
    return bytes(cv2.imencode('.jpg', value)[1])

if display_as == "ipywidgets":
    image_widget.value = bgr8_to_jpeg(image, frames_camera)
    display(image_widget)
    display(fps_widget)
    clear_output()

# Start Camera

In [8]:
camera.running = True

# Observe Mode

In [9]:
if "observe" in camera_method:
    def update_image(change):
        global fps_str_display
        image = change['new']
        if display_as == "ipywidgets":
            image_widget.value = bgr8_to_jpeg(image, frames_camera)
        elif display_as == "opencv":
            fps_str = frames_camera.fps_update()
            if fps_str is not None:
                fps_str_display = fps_str
            label_width, label_height=cv2.getTextSize(fps_str_display, cv2.FONT_HERSHEY_PLAIN, 1.5, 2)[0]
            cv2.rectangle(image, (10, 0), (10 + label_width, 10 + label_height), (0,0,0), -1)
            cv2.putText(image, fps_str_display, (10,20), cv2.FONT_HERSHEY_PLAIN, 1.5, (255,32,32), 2, cv2.LINE_AA)
            
            fps_str = frames_display.fps_update()
            if fps_str is not None:
                fps_str_display = fps_str
                
            label_width, label_height=cv2.getTextSize(fps_str_display, cv2.FONT_HERSHEY_PLAIN, 1.5, 2)[0]
            cv2.rectangle(image, (CAMERA_WIDTH-label_width-10, 0), (CAMERA_WIDTH-10, 10 + label_height), (0,0,0), -1)
            cv2.putText(image, fps_str_display, (CAMERA_WIDTH-label_width-10, 20), cv2.FONT_HERSHEY_PLAIN, 1.5, (255,32,32), 2, cv2.LINE_AA)

            cv2.imshow("Video Frame", image)
            key=cv2.waitKey(1)
            if key == 27: # Check for ESC key
                cv2.destroyAllWindows()


    camera.observe(update_image, names='value')

    if display_as == "ipywidgets":
        display(fps_widget)
        display(image_widget)

    input("camera.unobserve")

    if display_as == "ipywidgets":
        clear_output()
    elif display_as == "opencv":
        cv2.destroyAllWindows()
    
    camera.unobserve(update_image, names='value')
    camera.running = False


# Traitlets Mode

## Utils1

In [10]:
class cv2_display(traitlets.HasTraits):

    value = traitlets.Any()
    running = traitlets.Bool(default_value=False)
    
    def __init__(self, windowName="", frames_camera=None, frames_display=None, *args, **kwargs):
        super(cv2_display, self).__init__(*args, **kwargs)
        self._running = False
        self._fps_str_display = ""
        self._fps_str_camera = ""
        self._frames_camera = frames_camera
        self._frames_display = frames_display
        self._windowName = windowName
#         cv2.namedWindow(self._windowName, cv2.WINDOW_NORMAL)
#         cv2.resizeWindow(self._windowName,DISPLAY_WIDTH,DISPLAY_HEIGHT)
#         cv2.moveWindow(self._windowName,0,0)
#         cv2.setWindowTitle(self._windowName,self._windowName)
        
        atexit.register(cv2.destroyAllWindows)
        
    def _display_frames(self):
        while True:
            if not self._running: #  or cv2.getWindowProperty(self._windowName, 0) < 0
                break;
            if self.value is not None:
                label_width, label_height=cv2.getTextSize(self._fps_str_camera, cv2.FONT_HERSHEY_PLAIN, 1.5, 2)[0]
                cv2.rectangle(self.value, (10, 0), (10 + label_width, 10 + label_height), (0,0,0), -1)
                cv2.putText(self.value, self._fps_str_camera, (10,20), cv2.FONT_HERSHEY_PLAIN, 1.5, (255,32,32), 2, cv2.LINE_AA)

                fps_str = self._frames_display.fps_update()
                if fps_str is not None:
                    self._fps_str_display = fps_str
                
                label_width, label_height=cv2.getTextSize(self._fps_str_display, cv2.FONT_HERSHEY_PLAIN, 1.5, 2)[0]
                cv2.rectangle(self.value, (CAMERA_WIDTH-label_width-10, 0), (CAMERA_WIDTH-10, 10 + label_height), (0,0,0), -1)
                cv2.putText(self.value, self._fps_str_display, (CAMERA_WIDTH-label_width-11, 20), cv2.FONT_HERSHEY_PLAIN, 1.5, (255,32,32), 2, cv2.LINE_AA)

                cv2.imshow(self._windowName, self.value)
                key=cv2.waitKey(10)
                if key == 27: # Check for ESC key
                    cv2.destroyAllWindows()
                    self._running = False
                    break
            
    @traitlets.observe('value')
    def _on_value(self, change):
        fps_str = self._frames_camera.fps_update()
        if fps_str is not None:
            self._fps_str_camera = fps_str
            
    @traitlets.observe('running')
    def _on_running(self, change):
        if change['new'] and not change['old']:
            # transition from not running -> running
            self._running = True
        elif change['old'] and not change['new']:
            # transition from running -> not running
            self._running = False
#         print("_on_running", self._running)
        if self._running:
            self.thread = threading.Thread(target=self._display_frames)
            self.thread.daemon = True
            self.thread.start()
        else:
            self.thread.join()
            
def my_transform(frame, edgeThreshold=40):
    gray=cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    blur=cv2.GaussianBlur(gray,(7,7),1.5)
    edges=cv2.Canny(blur,0,edgeThreshold)
    edges = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR);
#     frame = cv2.copyTo(frame, edges)
#     frame += edges
    frame = np.bitwise_or(frame, edges)
#     frame = edges.copy()
    return frame

In [11]:
if "traitlets" in camera_method:
    if display_as == "ipywidgets":
        display(fps_widget)

        display(image_widget)
        camera_link = traitlets.dlink((camera, 'value'), (image_widget, 'value'), transform=bgr8_to_jpeg)
        camera_link.link()
    elif display_as == "opencv":
        image_widget = cv2_display(windowName="my_Demo", frames_camera=frames_camera, frames_display=frames_display)
        image_widget.running = True
        camera_link = traitlets.dlink((camera, 'value'), (image_widget, 'value'), transform=lambda x: my_transform(x, edgeThreshold=40))
        camera_link.link()
        
    if display_as in ["ipywidgets", "opencv"]:
        input("camera_link.unlink")
        try:
            if camera_link is not None: camera_link.unlink() 
        except:
            pass

# Web Mode

## Utils2

In [12]:
class cv2_camera2display(traitlets.HasTraits):

    value = traitlets.Any()
    running = traitlets.Bool(default_value=False)
    
    def __init__(self, frames_camera=None, frames_display=None, *args, **kwargs):
        super(cv2_camera2display, self).__init__(*args, **kwargs)
        self._running = False
        self._fps_str_display = ""
        self._fps_str_camera = ""
        self._frames_camera = frames_camera
        self._frames_display = frames_display
        self.encoded_value = np.zeros((CAMERA_WIDTH, CAMERA_HEIGHT, 3), dtype = "uint8")

    def _display_frames(self):
        while True:
            if not self._running: #  or cv2.getWindowProperty(self._windowName, 0) < 0
                break;
            if self.value is not None:
                label_width, label_height=cv2.getTextSize(self._fps_str_camera, cv2.FONT_HERSHEY_PLAIN, 1.5, 2)[0]
                cv2.rectangle(self.value, (10, 0), (10 + label_width, 10 + label_height), (0,0,0), -1)
                cv2.putText(self.value, self._fps_str_camera, (10,20), cv2.FONT_HERSHEY_PLAIN, 1.5, (255,32,32), 2, cv2.LINE_AA)

                fps_str = self._frames_display.fps_update()
                if fps_str is not None:
                    self._fps_str_display = fps_str
                
                label_width, label_height=cv2.getTextSize(self._fps_str_display, cv2.FONT_HERSHEY_PLAIN, 1.5, 2)[0]
                cv2.rectangle(self.value, (CAMERA_WIDTH-label_width-10, 0), (CAMERA_WIDTH-10, 10 + label_height), (0,0,0), -1)
                cv2.putText(self.value, self._fps_str_display, (CAMERA_WIDTH-label_width-11, 20), cv2.FONT_HERSHEY_PLAIN, 1.5, (255,32,32), 2, cv2.LINE_AA)
                
                encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 70] 
                _, self.encoded_value = cv2.imencode(".jpg", self.value.copy(), encode_param)
                
                if self._frames_display.fps > 10:
                    time.sleep(0.05)

    @traitlets.observe('value')
    def _on_value(self, change):
        fps_str = self._frames_camera.fps_update()
        if fps_str is not None:
            self._fps_str_camera = fps_str
            
    @traitlets.observe('running')
    def _on_running(self, change):
        if change['new'] and not change['old']:
            # transition from not running -> running
            self._running = True
        elif change['old'] and not change['new']:
            # transition from running -> not running
            self._running = False
#         print("_on_running", self._running)
        if self._running:
            self.thread = threading.Thread(target=self._display_frames)
            self.thread.daemon = True
            self.thread.start()
        else:
            self.thread.join()

def my_transform(frame):
    
    return frame

In [13]:
if display_as == "web":
    from flask import Response
    from flask import Flask
    from flask import render_template
    import socket
    
    myip = [l for l in (
    [ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] if not ip.startswith("127.")][:1], [
        [(s.connect(('8.8.8.8', 53)), s.getsockname()[0], s.close()) for s in
         [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) if l][0][0]
    
    print(f"Please connect to http://{myip}:5000/video_feed to see the video feed.")
    
    image_widget = cv2_camera2display(frames_camera=frames_camera, frames_display=frames_display)
    image_widget.running = True
    camera_link = traitlets.dlink((camera, 'value'), (image_widget, 'value'), transform=lambda x: my_transform(x))
    camera_link.link()
    
    app = Flask(__name__)
    
    def generate_camera_stream():
        while True:
            if frames_display.fps > 10:
                time.sleep(0.05)
            yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' +
                   bytearray(image_widget.encoded_value) + b'\r\n')
    
    @app.route("/")
    def index():
        # return the rendered template
        return render_template("index.html")

    @app.route("/video_feed")
    def video_feed():
        # return the response generated along with the specific media
        # type (mime type)
        return Response(generate_camera_stream(),
            mimetype = "multipart/x-mixed-replace; boundary=frame")
    
    app.run(host='0.0.0.0')


Please connect to http://10.0.0.212:5000/video_feed to see the video feed.
 * Serving Flask app "__main__" (lazy loading)
 * Environment: production
[2m   Use a production WSGI server instead.[0m
 * Debug mode: off


 * Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)
10.0.0.217 - - [25/Mar/2021 16:01:45] "[37mGET /video_feed HTTP/1.1[0m" 200 -


# Finish

In [14]:
try:
    if camera is not None: camera.running = False
    camera.close()
    del camera
    if image_widget is not None: image_widget.running = False
    if app is not None: app.do_teardown_appcontext()
except:
    pass