# Adding your own Image Analysis function

In [1]:
import threading

from IPython.display import display,Image, clear_output
import ipywidgets

import numpy as np
import cv2
from collections import deque
import skimage
import time

## Basic

In [2]:
def gui_simple(image_analysis_func = None):
    
    def core(camera):
        def _wrapper(func,image):
            '''
            wrapper of a image analysis function so that image, log will always be outputed
            '''
            ret = func(image)
            if type(ret).__module__ == np.__name__: # if returned value is a numpy (which means, image only)
                log = "" # pseudo log value
                image = ret
            elif type(ret) == tuple: # if returned value is multiple, ex. image, log 
                image, log = ret            
            return image, log
        
        def live(state_widget, camera,image_widget):
            nonlocal fps, q, n_frame, log,image
            
            while state_widget.value == 'connect':
                _, image = camera.read()
                if image is not None:
                    if (image_analysis_func is not None) and image_analysis_widget.value ==True:
                        image, log = _wrapper(image_analysis_func, image)                        
                        log_widget.value = str(log)
                    image_widget.value = bytes(cv2.imencode('.jpg', image)[1])
                
                #calculate fps
                now = time.time()
                fps = n_frame / (now - q.popleft())
                q.append(now)
                fps_widget.value = "fps: " + str(fps)

        def flag(change):
            nonlocal camera
            
            if change['new'] == 'exit':
                camera.release()
                clear_output()
                #gc.collect()
                print("program ended properly")
                
            if change['new'] == 'disconnect':
                image_widget.value = bytes(cv2.imencode('.jpg',image)[1])
                camera.release()
                
            if change['new'] == 'connect':
                if camera.isOpened() == False:
                    camera = cv2.VideoCapture(0)

                execute_thread = threading.Thread(target=live, args=(state_widget, camera, image_widget))
                execute_thread.start() 

            if change['new'] == 'pause':
                pass
            

        #width = camera.get(cv2.CAP_PROP_FRAME_WIDTH)
        #height = camera.get(cv2.CAP_PROP_FRAME_HEIGHT)
        #splash =  cv2.imread("../codes/assets/logo.jpg")
        splash = skimage.data.camera()[...,::-1]
        log = []

        
        #fps calculation
        fps = 0
        n_frame = 3
        q = deque([time.time() for i in range(n_frame)])
        image = splash

        #widgets
        state_widget = ipywidgets.ToggleButtons(options=['exit','disconnect','pause', 'connect'], description='', value='pause')
        image_widget = ipywidgets.Image(format='jpeg',value=bytes(cv2.imencode('.jpg',image)[1]), layout={'width':'50%'})
        log_widget = ipywidgets.Textarea(value="", layout={'width':'50%'})
        fps_widget = ipywidgets.Text(value="fps: "+str(fps))
        image_analysis_widget = ipywidgets.Checkbox(value=False,description="apply image analysis")
        
        state_widget.observe(flag, names='value')
        live_execution_widget = ipywidgets.VBox([
            state_widget,
            image_analysis_widget,
            fps_widget,
            image_widget,
            log_widget,
        ])

        display(live_execution_widget)

    camera = cv2.VideoCapture(0)
    assert camera.isOpened(), print("camera not detected")
    core(camera)


In [3]:
def image_filter(image):
    '''
    input image is a BGR image with a dtype of uint8 from cv2.VideoCapture
    image to be returned should also be a BGR of uint8.
    becareful if you use skimage function, as they prefer float.
    the returned value should be
        image (height,width,channel)
        or
        image and string(for logging inthe log window)
    '''
    image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)  #convert to grayscale image
    return image


In [4]:
gui_simple(image_filter)

VBox(children=(ToggleButtons(index=2, options=('exit', 'disconnect', 'pause', 'connect'), value='pause'), Chec…

## CNN classification

In [5]:
import tensorflow as tf

import tensorflow.keras.backend as K
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input, decode_predictions
K.clear_session()
model = MobileNetV2(input_shape=(160,160,3),weights="imagenet")

In [6]:
def classification(image):    
    '''
    - crop the center of the camera image, resize to 160,160 -> feed to mobilenetV2
    - draw a rectangle where the image region was used for clarity      
    '''
    
    
    def get_center(img,cropx,cropy):
        y,x,_ = img.shape
        startx = x//2-(cropx//2)
        starty = y//2-(cropy//2)    
        return starty, startx
        
    x = image.copy()
    
    height, width = x.shape[0], x.shape[1]
    l = min([height,width])
    toplefty, topleftx = get_center(x,l,l)   
    
    x = x[toplefty:toplefty+l,topleftx:topleftx+l,:]
    
    x = cv2.resize(x,(160,160))
    #x = crop_center(image,224,224)
    x = preprocess_input(x)[np.newaxis,...]
    preds = model.predict(x)
    labels = decode_predictions(preds)
    toplabel = labels[0][0]
    
    
    #draw a rectangle on the input image to highlight the input area used in the NN
    image = cv2.rectangle(image,(topleftx,toplefty),(topleftx+l,topleftx+l),(0,255,0),3)
    
    return image, toplabel

In [8]:
gui_simple(classification)

VBox(children=(ToggleButtons(index=2, options=('exit', 'disconnect', 'pause', 'connect'), value='pause'), Chec…