In [32]:
import numpy as np
import tensorflow as tf
import cv2
import xml.etree.ElementTree as ET
import cv2
import matplotlib.pyplot as plt

configfilepath = '../data/yolov3.cfg.txt'
weightfilepath = '../data/yolov3.weights'
video_path = '../data/videos/'
video_name = 'mumbai_traffic.mp4'
video_path_comp = video_path + video_name
input_image_path = '../data/test.jpg'
model_image_shape = (416,416,3)
max_output_size = 40
max_output_size_per_class= 20
iou_thresh = 0.5
confidence_thresh = 0.5

In [2]:
"""
block_list will store all blocks in a list
each block is a dictionary describing the execution at that step
"""
def get_blocks(configfilepath):
  
  block_list = []
  with open(configfilepath, 'r') as file:
        lines = [line.rstrip('\n') for line in file if line != '\n' and line[0] != '#']
  presentblock = {}
  for line in lines:

    if line[0] == '[':
      # print(line)
      if presentblock != {}:
        block_list.append(presentblock)
        presentblock = {}
      presentblock['type'] = line[1:-1].rstrip()
    else:
        category,value = line.split('=')
        presentblock[category.rstrip()] = value.lstrip()
  if presentblock != {}:
    block_list.append(presentblock)
    presentblock = {}

  return block_list

"""
Using configuration and weights from https://pjreddie.com/darknet/yolo/
specifically --> YOLOv3-416
"""

'\nUsing configuration and weights from https://pjreddie.com/darknet/yolo/\nspecifically --> YOLOv3-416\n'

In [3]:
def create_model(configfilepath,model_image_shape):
  
  block_list = get_blocks(configfilepath)
  parameter_block = block_list[0]
  x = input = tf.keras.Input(shape =(model_image_shape))
  x = x / 255.0
  layers_cache = []
  out_pred = []
  scale = 0
  # print('a')
  # print(len(block_list))
  for i,block in enumerate(block_list[1:]):
    
    if block['type'] == 'convolutional':
      # print('b')
      filters = int(block['filters'])
      kernel_size = int(block['size'])
      strides = int(block['stride'])
      activation=block['activation']
      if strides > 1:
        x = tf.keras.layers.ZeroPadding2D(((1,0),(1,0)))(x)   #Padding done as --> (top,bottom),(left,right) top and left favoured by model thus padded
      x = tf.keras.layers.Conv2D(filters = filters,kernel_size = kernel_size,strides = strides,padding = 'valid' if strides > 1 else 'same',use_bias = False if 'batch_normalize' in block else True,name = f'conv_{i}')(x)
      if 'batch_normalize' in block:
        x = tf.keras.layers.BatchNormalization(name = f'batch_norm_{i}')(x)
      if activation == 'leaky':
        x = tf.keras.layers.LeakyReLU(alpha = 0.1,name = f'leakyRLU_{i}')(x)
      ##alpha is actually parameterblocks's hue parameter i.e parameter_block['hue']
    elif block['type'] == 'shortcut':
      from_ = int(block['from'])
      # print(from_)
      ######MARKER##############################################
      x = tf.keras.layers.add([x, layers_cache[from_]]) ##since from is -3 we will get 3rd element in list from Behind
    elif block['type'] == 'upsample':
      x = tf.keras.layers.UpSampling2D(size=int(block['stride']))(x)
    
    elif block['type'] == 'route':

        block['layers'] = block['layers'].split(',')
        if len(block['layers']) > 1:
          
          rel = int(block['layers'][0])
          abs = int(block['layers'][1])
          ###############MARKER########################
          x= tf.concat([layers_cache[rel],layers_cache[abs]],axis = -1)
        else:
          rel = int(block['layers'][0])
          x = layers_cache[rel] 

    elif block['type'] == 'yolo':
      # print('c')
      mask = block["mask"].split(",")
      mask = [int(x) for x in mask]
      anchors = block["anchors"].split(",")
      anchors = [int(a) for a in anchors]
      anchors = [(anchors[i], anchors[i + 1]) for i in range(0, len(anchors), 2)]
      anchors = [anchors[i] for i in mask]
      n_anchors = len(anchors)
      num_classes = int(block['classes'])
      # (batch_size x  grid_size x grid_size x (num_anchors)*(5 + num_classes)) --> Shape of Output of yolo
      out_shape = x.get_shape().as_list()
      x = tf.reshape(x, [-1, n_anchors * out_shape[1] * out_shape[2],5 + num_classes])

      # We can infer what is where in bbox
      box_centers = x[:, :, 0:2]
      box_shapes = x[:, :, 2:4]
      confidence = x[:, :, 4:5]
      classes = x[:, :, 5:num_classes + 5]
      box_centers = tf.sigmoid(box_centers)
      confidence = tf.sigmoid(confidence)
      classes = tf.sigmoid(classes)

      anchors = tf.tile(anchors, [out_shape[1] * out_shape[2], 1])
      box_shapes = tf.exp(box_shapes) * tf.cast(anchors, dtype=tf.float32)

      x = tf.range(out_shape[1], dtype=tf.float32)
      y = tf.range(out_shape[2], dtype=tf.float32)
      cx, cy = tf.meshgrid(x, y)
      cx = tf.reshape(cx, (-1, 1))
      cy = tf.reshape(cy, (-1, 1))
      cxy = tf.concat([cx, cy], axis=-1)
      cxy = tf.tile(cxy, [1, n_anchors])
      cxy = tf.reshape(cxy, [1, -1, 2])
      strides = (input.shape[1] // out_shape[1],input.shape[2] // out_shape[2])
      box_centers = (box_centers + cxy) * strides

      prediction = tf.concat([box_centers, box_shapes, confidence, classes], axis=-1)
      # prediction stored in (batch_size x image_dim*image_dim*anchor_num x (5 +  num_classes))

      if scale == 0:
        # print('hello')
        out_pred = prediction
        scale = 1
      else:
        out_pred = tf.concat([out_pred, prediction], axis=1)    
    layers_cache.append(x)
  model = tf.keras.Model(input,out_pred)
  model.summary()
  print(layers_cache)
  return model


In [4]:
def save_weights(model,weightfilepath,configfilepath):

  weightfile = open(weightfilepath,"rb")

  # skipping initial header values
  np.fromfile(weightfile, dtype=np.int32, count=5)

  # Now reading weights

  block_list = get_blocks(configfilepath)

  for i,block in enumerate(block_list[1:]):

    if block['type'] == 'convolutional':

      conv_layer = model.get_layer(f'conv_{i}')
      print("layer: ",i+1,conv_layer)

      filters = conv_layer.filters
      kernel = conv_layer.kernel_size[0]
      in_dim = conv_layer.input_shape[-1]
      if "batch_normalize" in block:
        # print("layer: ",i+1,norm_layer)
        norm_layer =  model.get_layer(f'batch_norm_{i}')
        print("layer: ",i+1,norm_layer)
        batch_norm_weights = np.fromfile(weightfile, dtype=np.float32, count=4 * filters) #4*filters as batch norm will have the below weights
        # tf [gamma, beta, mean, variance]
        batch_norm_weights = batch_norm_weights.reshape((4, filters))[[1, 0, 2, 3]]
      else:
          conv_bias = np.fromfile(weightfile, dtype=np.float32, count=filters)

      # darknet shape (out_dim, in_dim, height, width)
      conv_shape = (filters, in_dim, kernel, kernel)
      conv_weights = np.fromfile(weightfile,dtype = np.float32, count = np.prod(conv_shape))
      # tf shape (height, width, in_dim, out_dim)
      conv_weights = conv_weights.reshape(conv_shape).transpose([2, 3, 1, 0])

      if "batch_normalize" in block:
        norm_layer.set_weights(batch_norm_weights)
        conv_layer.set_weights([conv_weights])
      else:
          conv_layer.set_weights([conv_weights, conv_bias])

  weightfile.close()
#   model.save_weights('/content/yolov3_weights.tf')
#   print('weights saved in tensorflow format!!')
print('weights loaded')

weights loaded


In [5]:
def non_max_suppression(predictions,model_image_shape,iou_thresh,confidence_thresh,max_output_size,max_output_size_per_class):

  bbox , confidence , classes = tf.split(predictions,[4,1,-1],axis = -1)
  bbox /= model_image_shape[0]  #scaling shape to 0 to 1 for ease in rescaling to input image shape

  scores = confidence * classes

  boxes, scores, classes, valid_detections = \
        tf.image.combined_non_max_suppression(
        boxes=tf.reshape(bbox, (tf.shape(bbox)[0], -1, 1, 4)), # first dim is batch_size , -1 will be the number of boxes 
        scores=tf.reshape(scores, (tf.shape(scores)[0], -1,tf.shape(scores)[-1])),
        max_output_size_per_class=max_output_size_per_class,
        max_total_size=max_output_size,
        iou_threshold=iou_thresh,
        score_threshold=confidence_thresh
    ) 
  return boxes, scores, classes, valid_detections # Only the top valid_detections[i] entries in boxes[i], scores[i] and classes[i] are valid. The rest of the entries are zero paddings. [i] denotes the ith element in batch_size for us it will be 0

"""
Getting final predicitons of boxes and scores and classes from yolo predictions
"""

def output_boxes(predictions,model_image_shape,iou_thresh, confidence_thresh, max_output_size, max_output_size_per_class):
    
    # each is batch_size x grid_size*grid_size*anchor* (1 / 80 for classes)
    center_x, center_y, width, height, confidence, classes = tf.split(predictions, [1, 1, 1, 1, 1, -1], axis=-1)
    top_left_x = center_x - width / 2.0
    top_left_y = center_y - height / 2.0
    bottom_right_x = center_x + width / 2.0
    bottom_right_y = center_y + height / 2.0
    inputs = tf.concat([top_left_x, top_left_y, bottom_right_x,
                        bottom_right_y, confidence, classes], axis=-1)
    boxes,scores,classes,valid_detections = non_max_suppression(inputs, model_image_shape, iou_thresh, confidence_thresh, max_output_size,max_output_size_per_class)
    return boxes,scores,classes,valid_detections
  
"""
Given input image and final boxes, drawing boxes on image
"""
def draw_output_bbox(input_image,boxes,scores,classes,valid_detections,class_names):

  # print(scores.shape)
  boxes, scores, classes, valid_detections = boxes[0], scores[0], classes[0], valid_detections[0] #firsr dim is batch size --> batch size is 1
  boxes = np.array(boxes)
  img = input_image.copy()
  for i in range(valid_detections):
        x1y1 = tuple((boxes[i,0:2] * [img.shape[1],img.shape[0]]).astype(np.int32)) #rescaling to input image shape
        x2y2 = tuple((boxes[i,2:4] * [img.shape[1],img.shape[0]]).astype(np.int32))
        img = cv2.rectangle(img, (x1y1), (x2y2), (255,0,0), 2)
        img = cv2.putText(img, '{} {:.4f}'.format(class_names[int(classes[i])], scores[i]),(x1y1), cv2.FONT_HERSHEY_PLAIN, 1, (0, 0, 255), 2)
  
  return img

"""
Loading Class Names from text file
"""
def load_class_names(classfilepath):
  
  with open(classfilepath, 'r') as file:
        class_names = [line.rstrip('\n') for line in file if line != '\n']
  return class_names

In [23]:
"""
Compilng all Functions and showing Output
"""
def initialize_model(configfilepath,weightfilepath):
  model = create_model(configfilepath,model_image_shape)
  save_weights(model,weightfilepath,configfilepath)
  model.save('../output/yolo.h5')
  # load_weights(model,configfilepath,weightfilepath)
  return model
def predict(input_image_path,model):

  input_image = cv2.imread(input_image_path)
  model_resized_image = cv2.resize(input_image,(model_image_shape[0],model_image_shape[1]))
  # #we need to create the batch_size axis
  input_to_model = tf.expand_dims(np.array(model_resized_image),axis = 0)
  model_prediction = model.predict(input_to_model)
  pred_boxes,pred_scores,pred_classes,pred_valid_detections = output_boxes(model_prediction,model_image_shape,iou_thresh, confidence_thresh,max_output_size, max_output_size_per_class)
  # print(pred_valid_detections)
  final_img = draw_output_bbox(input_image,pred_boxes,pred_scores,pred_classes,pred_valid_detections,class_names)
  print('../output/' + input_image_path[8:-4] + '_output.jpg')
  cv2.imshow('be amazed',final_img)
  cv2.imwrite('../output/' + input_image_path[8:-4] + '_output.jpg',final_img)
  cv2.waitKey(0)
def predict_video(video_path,model,save = False):

  # win_name = 'Be Amazed!!'
  # cv2.namedWindow(win_name)
  print('../output/output-videos/' + video_path[15:-4] + '_output.mp4')
  video = cv2.VideoCapture(video_path)
  frame_size = (int(video.get(cv2.CAP_PROP_FRAME_WIDTH)),int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)))
  fps = video.get(cv2.CAP_PROP_FPS)
  output_list = []
  while True:
    ret, frame = video.read()
    if ret == False:
      break
    model_resized_image = cv2.resize(frame,(model_image_shape[0],model_image_shape[1]))
    input_to_model = tf.expand_dims(np.array(model_resized_image),axis = 0)
    model_prediction = model.predict(input_to_model)
    pred_boxes,pred_scores,pred_classes,pred_valid_detections = output_boxes(model_prediction,model_image_shape,iou_thresh, confidence_thresh,max_output_size, max_output_size_per_class)
    final_img = draw_output_bbox(frame,pred_boxes,pred_scores,pred_classes,pred_valid_detections,class_names)
    output_list.append(final_img)
    
  video.release()
  
  if save == False:
    for image in output_list:
      cv2.imshow('be amazed',image)
      key = cv2.waitKey(50)
      if key == ord('q'):
        break
  else:
    print(fps,frame_size)
    out = cv2.VideoWriter('../output/' + video_path[15:-4] + '_output.mp4', cv2.VideoWriter_fourcc(*'MP4V'),fps, frame_size)
    for frame in output_list:
      out.write(np.array(frame))
    out.release()
    print('Output Video Saved!!')
  
  cv2.destroyAllWindows()

In [7]:
class_names = load_class_names('../data/coco.names.txt')
model = initialize_model(configfilepath,weightfilepath)

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 416, 416, 3  0           []                               
                                )]                                                                
                                                                                                  
 tf.math.truediv (TFOpLambda)   (None, 416, 416, 3)  0           ['input_1[0][0]']                
                                                                                                  
 conv_0 (Conv2D)                (None, 416, 416, 32  864         ['tf.math.truediv[0][0]']        
                                )                                                                 
                                                                                              

In [10]:
class_names = load_class_names('../data/coco.names.txt')
new_model = tf.keras.models.load_model('../output/yolo.h5')



In [33]:

"""FINAL OUTPUT BLOCK"""
predict(input_image_path,new_model)
predict_video(video_path_comp,new_model,save=True)

../output/test_output.jpg
../output/output-videos/mumbai_traffic_output.mp4
29.97002997002997 (1280, 720)
Output Video Saved!!


In [12]:
video_path_list = [video_path + './video2.mp4']
for video_path in video_path_list:
    predict_video(video_path,new_model,save=True)

30.0187617260788 (1920, 1080)
Output Video Saved!!
