In [1]:
import tensorflow as tf

from cv2 import VideoCapture, cvtColor, COLOR_BGR2RGB, resize
import numpy as np
from PIL import Image

from pathlib import Path

In [2]:
MODEL_PATH = '../model/yolov3-tiny-416-int8.tflite'
VIDEO_PATH = '../sample/showcase.mp4'
SAMPLE_PATH = '../sample/sample.jpg'

print(f'''
Path Check:
{'-'*10}
MODEL_PATH <- {MODEL_PATH}
status: {Path(MODEL_PATH).exists()}
{'-'*10}
VIDEO_PATH <- {VIDEO_PATH}
status: {Path(VIDEO_PATH).exists()}
{'-'*10}
SAMPLE_PATH <- {SAMPLE_PATH}
status: {Path(SAMPLE_PATH).exists()}
{'-'*10}
''')


Path Check:
----------
MODEL_PATH <- ../model/yolov3-tiny-416-int8.tflite
status: True
----------
VIDEO_PATH <- ../sample/showcase.mp4
status: True
----------
SAMPLE_PATH <- ../sample/sample.jpg
status: True
----------



In [3]:
interpreter = tf.lite.Interpreter(model_path=MODEL_PATH)
interpreter.allocate_tensors()

In [4]:
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

print(f'''
Model Details
{'-'*10}
Input Details:
{input_details}
{'-'*10}
Output Details:
{output_details}
{'-'*10}
''')


Model Details
----------
Input Details:
[{'name': 'input_1', 'index': 0, 'shape': array([  1, 416, 416,   3]), 'shape_signature': array([ -1, 416, 416,   3]), 'dtype': <class 'numpy.float32'>, 'quantization': (0.0, 0), 'quantization_parameters': {'scales': array([], dtype=float32), 'zero_points': array([], dtype=int32), 'quantized_dimension': 0}, 'sparsity_parameters': {}}]
----------
Output Details:
[{'name': 'Identity', 'index': 167, 'shape': array([   1, 2535,    2]), 'shape_signature': array([ 1, -1,  2]), 'dtype': <class 'numpy.float32'>, 'quantization': (0.0, 0), 'quantization_parameters': {'scales': array([], dtype=float32), 'zero_points': array([], dtype=int32), 'quantized_dimension': 0}, 'sparsity_parameters': {}}, {'name': 'Identity_1', 'index': 188, 'shape': array([   1, 2535,    4]), 'shape_signature': array([ 1, -1,  4]), 'dtype': <class 'numpy.float32'>, 'quantization': (0.0, 0), 'quantization_parameters': {'scales': array([], dtype=float32), 'zero_points': array([], dty

In [5]:
input_shape = input_details[0]['shape']
input_size = tuple(input_shape[1:3])

print(f'''
input_shape <- {input_shape}
input_size <- {input_size}
''')


input_shape <- [  1 416 416   3]
input_size <- (416, 416)



### Camera Not Working !! <br>Temperorily using Video Feed...

In [6]:
cap = VideoCapture(VIDEO_PATH)

if not cap.isOpened(): 
    print("Error opening video stream or file")
    

#Only for using as Example:
# ------------------------
SAMPLE_FRAME = None
count = 0
# ------------------------

while(cap.isOpened()):
    ret, frame = cap.read()
    
    #Taking 1 frame for Example
    # ------------------------
    if count == 225: SAMPLE_FRAME = frame
    count += 1
    # ------------------------
    
    if not ret: break

print(f'''
reading loop works:
{'-'*10}
Sample Video Frames: {count}
{'-'*10}
Frame that was captured:
SAMPLE_FRAME <- {SAMPLE_FRAME.shape}
{'-'*10}
WE NOT USING THIS FRAME AS HARD TO FIND FRAME WITH PREDICTION!!
{'-'*10}
''')


reading loop works:
----------
Sample Video Frames: 608
----------
Frame that was captured:
SAMPLE_FRAME <- (492, 656, 3)
----------
WE NOT USING THIS FRAME AS HARD TO FIND FRAME WITH PREDICTION!!
----------



### Helper Functions Here ---

In [7]:
ANOTHER_SAMPLE_FRAME = np.asarray(Image.open(SAMPLE_PATH))

print(f'''
{'-'*10}
Frame that will be used as sample
ANOTHER_SAMPLE_FRAME <- {ANOTHER_SAMPLE_FRAME.shape}
{'-'*10}
''')


----------
Frame that will be used as sample
ANOTHER_SAMPLE_FRAME <- (667, 889, 3)
----------



In [8]:
def get_frame_image(frame, log=False):
#     _, frame = cap.read() hm...
    if log: print(f"get_frame_image \n{'-'*10}\n")
    
    if log: print(f'converted to COLOR_BGR2RGB\nBefore: {frame[0][0][:11]}')
    frame = cvtColor(frame, COLOR_BGR2RGB)
    if log: print(f'After: {frame[0][0][:11]}\n')
    
    if log: print(f'Frame shold resized\nBefore: {frame.shape}')
    image_data = resize(frame, input_size)
    before_norm = image_data
    if log: print(f'After: {image_data.shape}\n')
        
    if log: print(f'Normalizing\nBefore: {image_data[0][0][:11]}')
    image_data = image_data / 255.
    if log: print(f'After: {image_data[0][0][:11]}\n')
    
    if log: print(f'Expanding Dimensions\nBefore: {image_data.shape}')
    image_data = image_data[np.newaxis, ...].astype(np.float32)
    if log: print(f'After: {image_data.shape}\n')
        
    if log: print(f"{'-'*10}\n")
        
    return frame, image_data, before_norm

ORIGINAL_IMAGE, IMAGE_TO_PRIDICT, BEFORE_NORM = get_frame_image(ANOTHER_SAMPLE_FRAME, log=True)

get_frame_image 
----------

Frame shold resized
Before: (667, 889, 3)
After: (416, 416, 3)

Normalizing
Before: [ 7  6 12]
After: [0.02745098 0.02352941 0.04705882]

Expanding Dimensions
Before: (416, 416, 3)
After: (1, 416, 416, 3)

----------



In [9]:
# Image.fromarray(BEFORE_NORM)

In [10]:
def forward_pass(image, log=False):
    if log: print(f"forward_pass\n{'-'*10}")
        
    interpreter.set_tensor(input_details[0]['index'], image)
    interpreter.invoke()
    pred = [interpreter.get_tensor(output_details[i]['index']) for i in range(len(output_details))]
    
    if log: print(f"Predictions: {pred}\nPrediction Shape: {[p.shape for p in pred]}\n{'-'*10}")
    return pred

prediction = forward_pass(IMAGE_TO_PRIDICT, log=True)

forward_pass
----------
Predictions: [array([[[1.9111993e-05, 1.5654377e-05],
        [1.1842297e-05, 9.2808159e-06],
        [4.8030220e-06, 2.7827407e-06],
        ...,
        [1.5267733e-07, 2.9837591e-07],
        [8.9499039e-08, 1.3190538e-07],
        [7.7039101e-07, 3.1981273e-07]]], dtype=float32), array([[[  9.258622 ,   6.6577353,   8.323171 ,  65.703735 ],
        [ 21.849075 ,   6.213477 ,  17.905348 ,  36.328445 ],
        [ 38.847076 ,   6.2583847,  19.240072 ,  39.04131  ],
        ...,
        [323.95117  , 392.8551   , 380.1087   , 473.58838  ],
        [357.4076   , 393.01978  , 378.88672  , 475.02396  ],
        [394.93228  , 393.88318  , 343.7354   , 500.18134  ]]],
      dtype=float32)]
Prediction Shape: [(1, 2535, 2), (1, 2535, 4)]
----------


#### Predictions:
**`[(1, 2535, 2), (1, 2535, 4)]`**  
**`scores`**: (1, 2535, 2)  
**`Box Prediction`**: (1, 2535, 4)

In [11]:
SAMPLE_PREDICTION = [pa[:, :, :] for pa in prediction]

print(f'''
{'-'*10}
SAMPLE_PREDICTION <- {SAMPLE_PREDICTION}
Size: {[a.shape for a in SAMPLE_PREDICTION]}
{'-'*10}
''')


----------
SAMPLE_PREDICTION <- [array([[[1.9111993e-05, 1.5654377e-05],
        [1.1842297e-05, 9.2808159e-06],
        [4.8030220e-06, 2.7827407e-06],
        ...,
        [1.5267733e-07, 2.9837591e-07],
        [8.9499039e-08, 1.3190538e-07],
        [7.7039101e-07, 3.1981273e-07]]], dtype=float32), array([[[  9.258622 ,   6.6577353,   8.323171 ,  65.703735 ],
        [ 21.849075 ,   6.213477 ,  17.905348 ,  36.328445 ],
        [ 38.847076 ,   6.2583847,  19.240072 ,  39.04131  ],
        ...,
        [323.95117  , 392.8551   , 380.1087   , 473.58838  ],
        [357.4076   , 393.01978  , 378.88672  , 475.02396  ],
        [394.93228  , 393.88318  , 343.7354   , 500.18134  ]]],
      dtype=float32)]
Size: [(1, 2535, 2), (1, 2535, 4)]
----------



In [12]:
def filter_boxes(box_xywh, scores, score_threshold=0.4, input_shape = tf.constant([416,416]), log=False):
    if log: print(f"filter_boxes\n{'-'*10}")
        
    if log: print(f"Reducing Max Scores\n[We have {scores.shape[1]} classes here we select class with highest probability\nBefore: {scores.shape} 1/5: {scores[0, 0]}")
    scores_max = tf.math.reduce_max(scores, axis=-1)
    if log: print(f"After: {scores_max.shape} 1/5: {scores_max[0, 0]}\n")
    
    if log: print(f"Creating Mask for filtering score_max\nKeeping probabilities above treshhold: {score_threshold}\nscore_max: {scores_max}")
    mask = scores_max >= score_threshold
    if log: print(f"Mask: {mask} -> {mask.shape}\n")
                  
    if log: print(f"Filtering Boxes wrt score_max Mask\nbox_xywh Before: {box_xywh.shape}")     
    class_boxes = tf.boolean_mask(box_xywh, mask)
    if log: print(f"box_xywh After: {class_boxes.shape}\n")
    
    if log: print(f"Filtering scores wrt score_max Mask\nscore Before: {scores.shape}")
    pred_conf = tf.boolean_mask(scores, mask)
    if log: print(f"score After: {pred_conf.shape}\n")
    
    if log: print(f"Reshaping class boxes\nclass_boxes Before: {class_boxes.shape}")
    class_boxes = tf.reshape(class_boxes, [tf.shape(scores)[0], -1, tf.shape(class_boxes)[-1]])
    if log: print(f"After: {class_boxes.shape}\n")
    
    if log: print(f"Reshaping pred confidence\npred_conf Before: {pred_conf.shape}")
    pred_conf = tf.reshape(pred_conf, [tf.shape(scores)[0], -1, tf.shape(pred_conf)[-1]])
    if log: print(f"pred_conf After: {pred_conf.shape}\n")
    
    if log: print(f"Splitting class boxes\nclass_boxes Before: {class_boxes.shape}")
    box_xy, box_wh = tf.split(class_boxes, (2, 2), axis=-1)
    if log: print(f"After [xy, wh]: {[box_xy.shape, box_wh.shape]}\n")
        
    if log: print(f"Changing input shape type\nBefore: {input_shape.dtype, input_shape.shape}")
    input_shape = tf.cast(input_shape, dtype=tf.float32)
    if log: print(f"input shape After: {input_shape.dtype}\n")
    
    if log: print(f"Changing xy to yx\nbox xy Before: {box_xy}")
    box_yx = box_xy[..., ::-1]
    if log: print(f"box yx After: {box_yx}\n")
    
    if log: print(f"Changing hw to wh\nbox xy Before: {box_wh}")
    box_hw = box_wh[..., ::-1]
    if log: print(f"box yx After: {box_hw}\n")
    
    if log: print(f"Getting the corners of rectangle to draw it")
    box_mins = (box_yx - (box_hw / 2.))/ input_shape
    box_maxes = (box_yx + (box_hw / 2.))/ input_shape
    if log: print(f"box bottom corners: {box_mins}\nbox upper corners: {box_maxes}\n")
    
    if log: print(f"Finally concating upper and lower corners")
    boxes = tf.concat([
        box_mins[..., 0:1],  # y_min
        box_mins[..., 1:2],  # x_min
        box_maxes[..., 0:1],  # y_max
        box_maxes[..., 1:2]  # x_max
    ], axis=-1)
    if log: print(f"Final Boxes: {boxes}\n{'-'*10}")
    
    return (boxes, pred_conf)

boxes, pred_conf = filter_boxes(SAMPLE_PREDICTION[1], SAMPLE_PREDICTION[0], 
                                score_threshold=0.25, input_shape=tf.constant([input_size]), log=True)

filter_boxes
----------
Reducing Max Scores
[We have 2535 classes here we select class with highest probability
Before: (1, 2535, 2) 1/5: [1.9111993e-05 1.5654377e-05]
After: (1, 2535) 1/5: 1.9111992514808662e-05

Creating Mask for filtering score_max
Keeping probabilities above treshhold: 0.25
score_max: [[1.9111993e-05 1.1842297e-05 4.8030220e-06 ... 2.9837591e-07
  1.3190538e-07 7.7039101e-07]]
Mask: [[False False False ... False False False]] -> (1, 2535)

Filtering Boxes wrt score_max Mask
box_xywh Before: (1, 2535, 4)
box_xywh After: (5, 4)

Filtering scores wrt score_max Mask
score Before: (1, 2535, 2)
score After: (5, 2)

Reshaping class boxes
class_boxes Before: (5, 4)
After: (1, 5, 4)

Reshaping pred confidence
pred_conf Before: (5, 2)
pred_conf After: (1, 5, 2)

Splitting class boxes
class_boxes Before: (1, 5, 4)
After [xy, wh]: [TensorShape([1, 5, 2]), TensorShape([1, 5, 2])]

Changing input shape type
Before: (tf.int32, TensorShape([1, 2]))
input shape After: <dtype: 'floa

In [13]:
# NON MAX SUPRESSION -> Used to remove box overlappings.
IOU = 0.4 # -> tolerence of overlapping boxes measured as IOU
SCORE = 0.25

print(f"Performing Non Max Supression Parameters\n{'-'*10}")

print(f"Boxes Before: {boxes.shape}")
nm_s_boxes = tf.reshape(boxes, (tf.shape(boxes)[0], -1, 1, 4))
print(f"Boxes After: {nm_s_boxes.shape}\n")

print(f"Pred confidense i.e scores Before: {pred_conf.shape}")
nm_s_scores = tf.reshape(pred_conf, (tf.shape(pred_conf)[0], -1, tf.shape(pred_conf)[-1]))
print(f"scores After: {nm_s_scores.shape}\n")

print(f'IOU threshhold set to: {IOU*100}%\nSCORE threshold set to: {SCORE*100}%\n')

classes, valid_detection = tf.image.combined_non_max_suppression(
            boxes=tf.reshape(boxes, (tf.shape(boxes)[0], -1, 1, 4)),
            scores=tf.reshape(pred_conf, (tf.shape(pred_conf)[0], -1, tf.shape(pred_conf)[-1])),
            max_output_size_per_class=50,
            max_total_size=50,
            iou_threshold=IOU,
            score_threshold=SCORE)[2:]

print(f"{'-'*10}")
print(f"Results\nclasses <- {classes} shape: {classes.shape}\nvalid detection <- {valid_detection} shape: {valid_detection.shape}")
print(f"{'-'*10}")

Performing Non Max Supression Parameters
----------
Boxes Before: (1, 5, 4)
Boxes After: (1, 5, 1, 4)

Pred confidense i.e scores Before: (1, 5, 2)
scores After: (1, 5, 2)

IOU threshhold set to: 40.0%
SCORE threshold set to: 25.0%

----------
Results
classes <- [[0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0.]] shape: (1, 50)
valid detection <- [2] shape: (1,)
----------


#### classes < lists index of every class that was detected <br>valid detection < lists How many values in `classes` are vaid

In [14]:
detected_items = [classes.numpy()[0][i].astype(int) for i in range(valid_detection.numpy()[0])]

print(f"Detected Items i.e classes that were finally detected: {detected_items}\nThese are the classes that were detected")

Detected Items i.e classes that were finally detected: [0, 1]
These are the classes that were detected


### Chain that we will be using

In [18]:
%%time

cap = VideoCapture(VIDEO_PATH)

if not cap.isOpened(): 
    print("Error opening video stream or file")

IOU = 0.4
SCORE = 0.25

count = 0
while(cap.isOpened()):
    ret, frame = cap.read()
    count += 1
    
    if not ret: break
        
    
    # we dont need ORIGINAL IMAGE
    ORIGINAL_IMAGE, IMAGE_TO_PRIDICT, BEFORE_NORM = get_frame_image(frame)
    
    prediction = forward_pass(IMAGE_TO_PRIDICT)
    
    boxes, pred_conf = filter_boxes(prediction[1], prediction[0], 
                                score_threshold=0.25, input_shape=tf.constant([input_size]))
    
    classes, valid_detection = tf.image.combined_non_max_suppression(
            boxes=tf.reshape(boxes, (tf.shape(boxes)[0], -1, 1, 4)),
            scores=tf.reshape(pred_conf, (tf.shape(pred_conf)[0], -1, tf.shape(pred_conf)[-1])),
            max_output_size_per_class=50,
            max_total_size=50,
            iou_threshold=IOU,
            score_threshold=SCORE)[2:]

    detected_items = classes.numpy()[0, :valid_detection.numpy()[0]].astype(int)
#     detected_items = [classes.numpy()[0][i].astype(int) for i in range(valid_detection.numpy()[0])]
    
    print(f"Frame {count}: {detected_items}")
    

<PIL.Image.Image image mode=RGB size=656x492 at 0x21FA56D5BE0>
Frame 1: [1]
<PIL.Image.Image image mode=RGB size=656x492 at 0x21FA5C9DDF0>
Frame 2: [1]
<PIL.Image.Image image mode=RGB size=656x492 at 0x21FA5C9D190>
Frame 3: [1]
<PIL.Image.Image image mode=RGB size=656x492 at 0x21FA5C9D400>
Frame 4: [1]
<PIL.Image.Image image mode=RGB size=656x492 at 0x21FA5C98AF0>
Frame 5: [1]
<PIL.Image.Image image mode=RGB size=656x492 at 0x21FA5C9D190>
Frame 6: [1]
<PIL.Image.Image image mode=RGB size=656x492 at 0x21FA5C9D400>
Frame 7: [1]
<PIL.Image.Image image mode=RGB size=656x492 at 0x21FA5C9D160>
Frame 8: [1]
<PIL.Image.Image image mode=RGB size=656x492 at 0x21FA5C9D400>
Frame 9: [1]
<PIL.Image.Image image mode=RGB size=656x492 at 0x21FA5C9D190>
Frame 10: [1]
<PIL.Image.Image image mode=RGB size=656x492 at 0x21FA5C9D190>
Frame 11: [1]
<PIL.Image.Image image mode=RGB size=656x492 at 0x21FA5C9D190>
Frame 12: [1]
<PIL.Image.Image image mode=RGB size=656x492 at 0x21FA5A193D0>
Frame 13: [1]
<PIL.Ima

KeyboardInterrupt: 