# Test of OpenCV + MobileNet for video analysis


In [1]:
# RUN_NAME = '20230120 - arch=mobilenet_v2_fpn - samples=7500 frozen=1 epochs=10 bs=48 res=380 _data=external'

SAVE_IMAGES_TO_FILE = False  # flag if we want to capture the image to disk

SYMBOL = None

RESOLUTION = 300

import sys
import time
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import warnings
warnings.filterwarnings('ignore')

import numpy as np
import matplotlib.pyplot as plt
import cv2 as cv
import tensorflow as tf

tf.get_logger().setLevel('ERROR')
                         

from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as viz_utils

from IPython.display import Video, display


In [2]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

<br>
<br>

## Load the model

In [4]:
pwd

'E:\\Springboard_GIT\\RIIPEN_ANTLER\\Pool_Drowning_Detection\\Tim Notebooks'

In [41]:
PATH_TO_SAVED_MODEL = "./Tensorflow/workspace/second_training/exported_models/my_mobilenet320_fpn/saved_model"

print('Loading model...', end='')
start_time = time.time()

detect_fn = tf.saved_model.load(PATH_TO_SAVED_MODEL)

end_time = time.time()
elapsed_time = end_time - start_time
print('Done! Took {} seconds'.format(elapsed_time))

Loading model...Done! Took 14.159499645233154 seconds


In [42]:
PATH_TO_LABELS = "./Tensorflow/workspace/second_training/annotations/label_map.pbtxt"

category_index = label_map_util.create_category_index_from_labelmap(PATH_TO_LABELS,
                                                                   use_display_name=True)

<br>
<br>

## Function definitions

In [27]:
pwd

'E:\\Springboard_GIT\\RIIPEN_ANTLER\\Pool_Drowning_Detection\\Tim Notebooks'

In [33]:
os.path.relpath('E:\\Springboard_GIT\\RIIPEN_ANTLER\\Pool_Drowning_Detection\\Pool Videos\\notdrowning')

'..\\Pool Videos\\notdrowning'

In [38]:
NOTDROWNING_VIDEO_URL = "..\\Pool Videos\\notdrowning"

video_url = NOTDROWNING_VIDEO_URL + '\\notdrowning18.avi'
print(video_url)

..\Pool Videos\notdrowning\notdrowning18.avi


In [35]:
# frac = 0.65  # scaling factor for display
# display(
#     Video(data=video_url, embed=True, height=int(720 * frac), 
#           width=int(1280 * frac))
# )

In [36]:
def setup_camera() -> cv.VideoCapture:
    '''
    Set up the video source
    '''
    cap = cv.VideoCapture(video_url)  

#     print(f'Autofocus status: {cap.set(cv.CAP_PROP_AUTOFOCUS, 0)}')
#     print(f'Manual focus to shortest distance: {cap.set(cv.CAP_PROP_FOCUS, 0)}')

    # Check if camera opened successfully
    if not cap.isOpened():
        print("Error opening video stream or file!")
    
    return cap




def close_camera():
    '''
    Close all capture devices and destroy open capture windows
    '''
    cap.release()
    cv.destroyAllWindows()
    
    


def print_stats():
    '''
    Print some basic stats to stdout
    '''
    
    print(f'Frame shape = {frame.shape}')
    print(f'Total number of Frames = {nframe}')
    print(f'Number of frames processed = {n_proc_frames // PROC_NTH_FRAME}')
    
    
    
# @TODO: Factor out the directory creation logic - slow/redundant    
def save_images(frame, prframe):
    '''
    write small and large images to jpeg
    '''
    symbol_dir = SYMBOL
    
    #seperate the directory path
    datadir_small = f'../data/{maindir}-S/{symbol_dir}'
    datadir_large = f'../data/{maindir}-L/{symbol_dir}'

    # build the entire directory structure if it doesn't exist
    if not os.path.isdir(datadir_small):
        os.makedirs(datadir_small)
        
    if not os.path.isdir(datadir_large):
        os.makedirs(datadir_large)
    
    # create a directory+filename template
    ftemplate_small = f'{datadir_small}/{file_prefix}-{n_save_frames}.jpg'
    ftemplate_large = f'{datadir_large}/{file_prefix}-{n_save_frames}.jpg'
    
    # write the image to a file
#     cv.imwrite(ftemplate_small, prframe)
    cv.imwrite(ftemplate_large, frame)
    
    frame = cv.putText(frame, f'Saved Frame {SYMBOL} #{n_save_frames}', 
                   org=(20,40), fontFace=cv.FONT_HERSHEY_PLAIN, 
                   fontScale=2, color=(0,255,255), thickness=2,
                   lineType=cv.LINE_AA) 
    
    # Display the frame with the writing on it
    cv.imshow('What the Camera Sees:',frame)
    
    
    
# def disable_progress():
#     fastprogress.fastprogress.NO_BAR = True
#     master_bar, progress_bar = fastprogress.fastprogress.force_console_behavior()
# #     fastai.basic_train.master_bar, fastai.basic_train.progress_bar = master_bar, progress_bar
    
# # def enable_progress():
# #     fastai.basic_train.master_bar, fastai.basic_train.progress_bar = fastprogress.master_bar, fastprogress.progress_bar
    


<br>
<br>

## Open the video camera, loop for every Frame, massage the image and make a prediction.  

#### This is the main code cel.  It has 2 purposes. When you hit the spacebar, it will either:
1. Make an ASL alphabet translation or 
2. If SAVE_IMAGES_TO_FILE is True, it will save an image to the data dir 

In [43]:
# from fastai.vision  import *
# import torch
# import PIL

PROC_NTH_FRAME = 5  # Skip every N-1 frames - increase this if your computer lags

# maindir = 'frank-ledlights'  # these are image saving parameters dir/file
# file_prefix = 'oldwebcam_leds'

n_proc_frames, nframe = 0,0  # number of frames
n_save_frames = 0  # number of captured frames for saving to file

# declare here to widen scope
pred, probs, pred_idx = [0], [0], 0
frame, prframe = None, None

if SAVE_IMAGES_TO_FILE:
    SYMBOL = input('Enter Symbol of interest: ')[0]
    
cap = setup_camera()


# "Game" Loop ... for every frame
while(cap.isOpened()):
    nframe += 1
    ret, frame = cap.read()  # Capture frame
    wait_ret = cv.waitKey(1)  # key char value if any

    # Break out of the loop if the user presses 'Q'
    if wait_ret & 0xFF == ord('q'):
        print_stats()
        break
    
    # only process the rest if the capture was successful
    if not ret:
        print("Can't read frame from capture source!")
        break  # break out of loop to let the camera close

    # process only if the space key is hit and
    # only process every PROC_NTH_FRAME frames 
    if (nframe % PROC_NTH_FRAME == 0):  # and wait_ret & 0xFF == ord(' ')  
        n_proc_frames +=1  # num processed frames


#         # this version for resnet34 w/ 300px image
#         prframe = cv.resize(frame[:, 80:-80], (300, 300))
        
        # Display the frame that we pass through the predictor
#         cv.imshow('What the Predictor Sees:', prframe)

#         if SAVE_IMAGES_TO_FILE :
#             if wait_ret & 0xFF == ord(' '):  # only save when pressing the spacebar
#                 n_save_frames = n_save_frames + 1
#                 save_images(frame, prframe)
#             continue  # no need to predict
    input_tensor = tf.convert_to_tensor(frame)
    
    input_tensor = input_tensor[tf.newaxis, ...]
    

    detections = detect_fn(input_tensor)
    
    num_detections = int(detections.pop('num_detections'))
    detections = {key: value[0, :num_detections].numpy()
                   for key, value in detections.items()}
    detections['num_detections'] = num_detections
    
    detections['detection_classes'] = detections['detection_classes'].astype(np.int64)

            
#     image_np_with_detections = image_np.copy()

    viz_utils.visualize_boxes_and_labels_on_image_array(
          frame,
          detections['detection_boxes'],
          detections['detection_classes'],
          detections['detection_scores'],
          category_index,
          use_normalized_coordinates=True,
          max_boxes_to_draw=200,
          min_score_thresh=.30,
          agnostic_mode=False)

#     plt.figure()
    cv.imshow('What we see:', frame)
#     print('Done')
# plt.show()      
       
# Display the original frame
# cv.imshow('What the Camera Sees:',frame)

# Release the capture object
close_camera()


Can't read frame from capture source!


In [None]:
close_camera()                

<br>
<br>
<br>
<br>
<br>


# The rest of this file is for reference only.  

<br>
<br>
<br>



<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>


# Snippets

```python
# rotate the image
frame = cv.rotate(frame, cv.ROTATE_90_COUNTERCLOCKWISE)
```

In [9]:
# learn.export(f'../models/{RUN_NAME}.pkl')
# path = Path('../models')
# path.ls(file_exts='.pkl')