<a href="https://colab.research.google.com/github/paulesta55/Notebooks/blob/master/Y2TF.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Yet another TF implementation of Yolo


In [1]:
from google.colab import drive
drive.mount('/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /gdrive


In [0]:
import os
import logging
import sys

from __future__ import absolute_import, division, print_function, unicode_literals

import copy as cp

import tensorflow as tf
import numpy as np
from tensorflow.keras.layers import Dense, Flatten, Conv2D, BatchNormalization, LeakyReLU, ZeroPadding2D, UpSampling2D, Concatenate
from tensorflow.keras import Model
from tensorflow.keras import layers
from tensorflow.keras.utils import plot_model
import cv2

logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')

In [0]:
CFG_PATH = "/gdrive/My Drive/Y2TF/yolo-obj.cfg"

In [0]:
class ConfigReader:
  def __init__(self,cfg_path,verbose=False):
    self.cfg_path = cfg_path
    self.verbose = verbose


  def parse_cfg(self):
      """
      Takes a configuration file
    
      Returns a list of blocks. Each blocks describes a block in the neural
      network to be built. Block is represented as a dictionary in the list
    
      """
      with open(os.path.abspath(self.cfg_path),'r') as cfg_file:
        lines = cfg_file.readlines()
      lines = [x for x in lines if len(x) > 0]               # get read of the empty lines 
      lines = [x for x in lines if x[0] != '#']              # get rid of comments
      lines = [x.rstrip().lstrip() for x in lines]           # get rid of fringe whitespaces
      lines = [x for x in lines if len(x) > 0]               # get read of the empty lines because some of the previous operation appends empty lines
      if(self.verbose):
        print(lines)
        logging.debug(lines)
      return lines 
    
  def get_blocks(self,lines):
    block = {}
    blocks = []

    for line in lines:
      if(self.verbose):
        logging.debug(line)
        print(line)
      if line[0] == '[':           # This marks the start of a new block
        if len(block) != 0:        # If block is not empty, implies it is storing values of previous block.
          blocks.append(block)     # add it the blocks list
          block = {}               # re-init the block
        block["type"] = line[1:-1].rstrip()     
      else:
        key,value = line.split("=") 
        block[key.rstrip()] = value.lstrip()
    blocks.append(block)
    return blocks

In [0]:

configReader =ConfigReader(CFG_PATH,verbose=False) 
str_blocks = configReader.parse_cfg()
blocks = configReader.get_blocks(str_blocks)

In [0]:
class FormatLayer(layers.Layer):
  def __init__(self,
               in_dim, 
               anchors, 
               num_classes,
               name='FormatLayer',
               **kwargs):
    super(FormatLayer,self).__init__(name=name, **kwargs)
    self.in_dim = in_dim
    self.anchors = anchors
    self.num_classes = num_classes

  def call(self,prediction):
    return self.formatPredict(prediction);
  
  
  def formatPredict(self,prediction):
    sess = tf.compat.v1.Session()
    prediction_shape = tf.shape(prediction)
    assert prediction_shape.shape == 4
    batch_size = prediction_shape[0]
    stride = self.in_dim // prediction_shape[2]

    grid_size = self.in_dim // stride
    # print("grid_size = ",grid_size.eval(session=sess))      
    bbox_attrs = 5 + self.num_classes
    # print("bbox_attrs = ",bbox_attrs)
    num_anchors = len(self.anchors)
    # print("num_anchors = ",num_anchors)
    prediction = tf.reshape(prediction,[batch_size, bbox_attrs*num_anchors, grid_size*grid_size])
    prediction = tf.transpose(prediction,perm=[0,2,1])
    prediction = tf.reshape(prediction,[batch_size, grid_size*grid_size*num_anchors, bbox_attrs])

    anchors = [(a[0]/stride, a[1]/stride) for a in self.anchors]
    # necessary cause tf.Tensors don't support item assignment...WTF?
    tf_var = tf.Variable(prediction)
    tf_var.assign(prediction)
    # print(tf_var)
    tf_var[:,:,0].assign(tf.math.sigmoid(tf_var[:,:,0]))
    tf_var[:,:,1].assign(tf.math.sigmoid(tf_var[:,:,1]))
    tf_var[:,:,4].assign(tf.math.sigmoid(tf_var[:,:,4]))
    # print(tf_var)
    # print(type(grid_size))
    #Add the center offsets
    grid_size = grid_size.eval(session=sess)  
    grid = np.arange(grid_size)
    a,b = np.meshgrid(grid, grid)
    # print(tf_var)

    x_offset = tf.reshape(tf.Variable(a,dtype=tf.float32).assign(a),[-1,1])
    y_offset = tf.reshape(tf.Variable(b,dtype=tf.float32).assign(b),[-1,1])
    
    x_y_offset = tf.concat([x_offset,y_offset],axis=1)
    x_y_offset = tf.reshape(tf.tile(x_y_offset,[1,num_anchors]),[-1,2])
  
    tf_var.assign(tf_var[:,:,:2].assign(tf_var[:,:,:2]+x_y_offset))

  # log space transform height and the width
    anchors = tf.Variable(anchors,dtype=tf.float32).assign(anchors)
    #anchors = anchors.repeat(grid_size*grid_size, 1).unsqueeze(0)
    anchors = tf.tile(anchors,[grid_size*grid_size, 1])

    # print(anchors)
    # print(tf_var)
    tf_var[:,:,2:4].assign(tf.math.exp(tf_var[:,:,2:4]))
    tf_var[:,:,2:4].assign(tf_var[:,:,2:4]*anchors)
  
    tf_var[:,:, 5 : 5 + self.num_classes].assign(tf.math.sigmoid(tf_var[:,:, 5 : 5 + self.num_classes]))
    # prediction = tf.convert_to_tensor(tf_var)
    # prediction[:,:,:4] *= stride
    tf_var[:,:,:4].assign(tf_var[:,:,:4]*tf.dtypes.cast(stride,tf.float32))
    # print(tf_var)
    # assert tf_var.shape == (batch_size, grid_size*grid_size*num_anchors, bbox_attrs)
    return tf_var
  
class DarknetModel(Model):
  def __init__(self,cfg_path):
    super(DarknetModel, self).__init__()
    self.cfg_path = cfg_path
    configReader = ConfigReader(self.cfg_path,verbose=False) 
    str_blocks = configReader.parse_cfg()
    self.blocks = configReader.get_blocks(str_blocks)
    self.net_info = self.blocks[0]
    self.subBlocks = self.createModules()
    
    
    
  def createModules(self):
         #Captures the information about the input and pre-processing    
    self.prev_filters = 3
    modules = {}
    firstConv = True
    for index, block in enumerate(self.blocks[1:]):
      blockType = block["type"]
      if blockType == "convolutional":
        modules[index] = DarknetConv2D(block,index,firstConv,self.net_info)

      elif blockType == "upsample":
        modules[index] = UpSampling2D(size=2,interpolation='bilinear',name="upsample_{}".format(index))
        
      #shortcut corresponds to skip connection
      #shortcuts are managed in call method
      elif blockType == "shortcut":
        modules[index] = DarknetShortcut(block,index)
        
      #routes are managed in call method  
      elif blockType == "route":
        modules[index] = DarknetRoute(block,index)
      
      #Yolo is the detection layer
      elif blockType == "yolo":
        modules[index] = YoloDetection(block,index,self.net_info)
    return modules


  def call(self, inputs, training=False):
    x = inputs
    modules = self.blocks[1:]
    outputs = {}
    write = 0
    for i, module in enumerate(modules):
      moduleType = (module["type"]) 
      print("block type ",moduleType)
      print(self.subBlocks[i])
      if(moduleType == "convolutional" or moduleType == "upsample"):
        x = self.subBlocks[i](x)
      elif(moduleType == "route"):
        x = self.subBlocks[i](outputs,i)
      elif  moduleType == "shortcut":
        from_ = int(module['from'])
        feat_other_layer = outputs[i+from_]
        # print('feature of other layer',feat_other_layer)
        feat_layer = outputs[i-1] 
        # print('feature of current layer',feat_layer)
        x = self.subBlocks[i](feat_layer,feat_other_layer)
      elif moduleType == "yolo":
        x = self.subBlocks[i](x,write)
        if not write:              #if no collector has been intialised. 
          self.detections = tf.Variable(x).assign(x)
        else:       
          self.detections = tf.concat([self.detections, x], axis=1)
        write = 1
      outputs[i] = x
    return self.detections

class DarknetConv2D(Model):
  def __init__(self,block,index,firstConv,net_info):
    super(DarknetConv2D,self).__init__(name="darknet_conv_{0}".format(index))
    self.activation = block["activation"]
    try:
      self.batchNormalize = int(block["batch_normalize"])
      bias = False
    except:
      self.batchNormalize = 0
      bias = True
    filters= int(block["filters"])
    self.padding = int(block["pad"])
    kernel_size = int(block["size"])
    stride = int(block["stride"])
    self.inputLayer = layers.InputLayer(input_shape=(
            int(net_info['height']),int(net_info['height']),3))
    if self.padding:
      pad = (kernel_size - 1) // 2 # not necessary pad = self.padding would also work
      if firstConv:
        self.paddingLayer = ZeroPadding2D((pad,pad),name="padding_{0}".format(index))
        firstConv= False
      else:
        self.paddingLayer = ZeroPadding2D((pad,pad),name="padding_{0}".format(index))

    if firstConv:
      self.convLayer = Conv2D(filters=filters,kernel_size=kernel_size,strides=stride
                              ,padding="valid",use_bias=bias,name="conv_{0}".format(index))
    else:
      self.convLayer = Conv2D(filters=filters,kernel_size=kernel_size,strides=stride
                              ,padding="valid",use_bias=bias,name="conv_{0}".format(index))   
    #Add the Batch Norm Layer
    if self.batchNormalize:
      self.batchNormLayer = BatchNormalization(name="batch_norm_{0}".format(index))
    
    #Check the activation. 
    #It is either Linear or a Leaky ReLU for YOLO
    if self.activation == "leaky":
      self.activationLayer = LeakyReLU(name="leaky_activation_{0}".format(index))
  
  def call(self, input_tensor):
    x = input_tensor
    # print("conv2D {0} input = {1}".format(self.name,x))
    self.inputLayer(x)
    if self.padding:
      x = self.paddingLayer(x)
      # print("conv2D {0} padding res = {1}".format(self.name,x))
    x = self.convLayer(x)
    # print("conv2D {0} conv res = {1}".format(self.name,x))
    if self.batchNormalize:
      x = self.batchNormLayer(x)
      # print("batchNorm {0} batchnorm res = {1}".format(self.name,x))
    if self.activation == "leaky":
      x = self.activationLayer(x)
    # tf.debugging.Assert(type(x)==tf.Tensor)
    # print("{0} output = {1}".format(self.name,x))
    return x
    
  
class DarknetRoute(Model):
  def __init__(self,block,index):
    super(DarknetRoute,self).__init__(name="darknet_route_{0}".format(index))
    self.blockLayers = block["layers"].split(',')
    self.blockLayers = [int(a) for a in self.blockLayers]
    #Start  of a route
    self.start = int(self.blockLayers[0])
    #end, if there exists one
    try:
      self.end = int(self.blockLayers[1])
      self.concat = Concatenate()
    except:
      self.end = 0
    #Positive anotation
    if self.start > 0: 
      self.start = self.start - index
    if self.end > 0:
      self.end = self.end - index
    self.index = index

  def call(self,outputs,i):
    layers = self.blockLayers
    if(layers[0]) > 0:
      layers[0] = layers[0] - i
          
    if len(layers) == 1:
      x = outputs[i + (layers[0])]
    else:
      if(layers[1]) > 0:
        layers[1] = layers[1] - i
      map1 = outputs[i + layers[0]]
      map2 = outputs[i + layers[1]]    
      x = self.concat([map1, map2])
    # print("{0} output = {1}".format(self.name,x))
    return x
    
class DarknetShortcut(Model):
  def __init__(self,block,index):
    super(DarknetShortcut,self).__init__(name="darknet_shortcut_{0}".format(index))
    self.block = block
    self.index = index
    self.from_ = int(block['from'])
    self.addLayer = layers.Add()  
  
  def call(self, feat_layer, feat_other_layer):
    x =  self.addLayer([feat_layer , feat_other_layer])
    # print("{0} output = {1}".format(self.name,x))
    return x
    
class YoloDetection(Model):
  def __init__(self,block,index,net_info):
    super(YoloDetection,self).__init__(name="yolo_detection_{0}".format(index))
    self.net_info = net_info
    self.inp_dim = int(self.net_info["height"])
    self.num_classes = int(block["classes"])
    mask = block["mask"].split(",")
    mask = [int(idx) for idx in mask]
        
    anchors = block["anchors"].split(",")
    anchors = [int(a) for a in anchors]
    anchors = [(anchors[i], anchors[i+1]) for i in range(0, len(anchors),2)]
    self.anchors = [anchors[i] for i in mask]
    self.formatLayer = FormatLayer(
               self.inp_dim,                
               self.anchors, 
               self.num_classes,
               name='FormatLayer')

  def call(self,x,write):
    x = self.formatLayer(x)
    return x
  
    
 

In [0]:
darknet = DarknetModel(CFG_PATH)

In [86]:
print(len(darknet.layers))

107


In [10]:
!wget https://github.com/ayooshkathuria/pytorch-yolo-v3/raw/master/dog-cycle-car.png

--2019-11-16 22:09:09--  https://github.com/ayooshkathuria/pytorch-yolo-v3/raw/master/dog-cycle-car.png
Resolving github.com (github.com)... 140.82.114.4
Connecting to github.com (github.com)|140.82.114.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/ayooshkathuria/pytorch-yolo-v3/master/dog-cycle-car.png [following]
--2019-11-16 22:09:10--  https://raw.githubusercontent.com/ayooshkathuria/pytorch-yolo-v3/master/dog-cycle-car.png
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 347445 (339K) [image/png]
Saving to: ‘dog-cycle-car.png’


2019-11-16 22:09:10 (8.07 MB/s) - ‘dog-cycle-car.png’ saved [347445/347445]



In [11]:
import pathlib
data_dir = tf.keras.utils.get_file(origin='https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz',
                                         fname='flower_photos', untar=True)


Downloading data from https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz


In [0]:
shortcut = DarknetShortcut()
shortcut()

In [119]:
# tf.config.experimental_run_functions_eagerly(True)
data_dir = pathlib.Path(data_dir)
image_count = len(list(data_dir.glob('*/*.jpg')))
image_count

CLASS_NAMES = np.array([item.name for item in data_dir.glob('*') if item.name != "LICENSE.txt"])
CLASS_NAMES
len(CLASS_NAMES)
# The 1./255 is to convert from uint8 to float32 in range [0,1].
image_generator = tf.keras.preprocessing.image.ImageDataGenerator(rescale=1./255)
BATCH_SIZE = 32
IMG_HEIGHT = 416
IMG_WIDTH = 416
STEPS_PER_EPOCH = np.ceil(image_count/BATCH_SIZE)
train_data_gen = image_generator.flow_from_directory(directory=str(data_dir),
                                                     batch_size=BATCH_SIZE,
                                                     shuffle=True,
                                                     target_size=(IMG_HEIGHT, IMG_WIDTH),
                                                     classes = list(CLASS_NAMES))
image_batch, label_batch = next(train_data_gen)

def forward(model,image_batch):
  results = model(image_batch)
  print(results)
  return results

forward(darknet,image_batch)

Found 3670 images belonging to 5 classes.
block type  convolutional
<__main__.DarknetConv2D object at 0x7fc96f714e80>
block type  convolutional
<__main__.DarknetConv2D object at 0x7fc96f6f7ef0>
block type  convolutional
<__main__.DarknetConv2D object at 0x7fc96f6cc048>
block type  convolutional
<__main__.DarknetConv2D object at 0x7fc96f6b5160>
block type  shortcut
<__main__.DarknetShortcut object at 0x7fc96f6bd278>
block type  convolutional
<__main__.DarknetConv2D object at 0x7fc96f6bd5f8>
block type  convolutional
<__main__.DarknetConv2D object at 0x7fc96f6c47f0>
block type  convolutional
<__main__.DarknetConv2D object at 0x7fc96f6db828>
block type  shortcut
<__main__.DarknetShortcut object at 0x7fc96f6e2940>
block type  convolutional
<__main__.DarknetConv2D object at 0x7fc96f6e2cc0>
block type  convolutional
<__main__.DarknetConv2D object at 0x7fc96f6ead68>
block type  shortcut
<__main__.DarknetShortcut object at 0x7fc96f7abe80>
block type  convolutional
<__main__.DarknetConv2D objec

<tf.Tensor 'darknet_model_26/concat_1:0' shape=(32, 10647, 39) dtype=float32>

In [120]:
# darknet.build(image_batch.shape)
darknet.summary()

Model: "darknet_model_26"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
darknet_conv_0 (DarknetConv2 multiple                  992       
_________________________________________________________________
darknet_conv_1 (DarknetConv2 multiple                  18688     
_________________________________________________________________
darknet_conv_2 (DarknetConv2 multiple                  2176      
_________________________________________________________________
darknet_conv_3 (DarknetConv2 multiple                  18688     
_________________________________________________________________
darknet_shortcut_4 (DarknetS multiple                  0         
_________________________________________________________________
darknet_conv_5 (DarknetConv2 multiple                  74240     
_________________________________________________________________
darknet_conv_6 (DarknetConv2 multiple             