In [None]:
# Get annotations in a format that can be written in the darknet fromat. So .txt files 

In [1]:
from shutil import copy

In [2]:
import os
import numpy as np
import matplotlib.pylab as plt
from skimage.io import imread
from VOClabelcolormap import color_map
from anno import ImageAnnotation
import glob
%matplotlib inline
import cv2
import tqdm

In [3]:
anno_paths = glob.glob("Annotations_Part/*.mat")

In [None]:
# Demo for showing the parts on an image
for anno in anno_paths[:100]:
    im_path = "../VOC2010/VOCdevkit/VOC2010/JPEGImages/" + anno.rstrip(".mat").split("/")[1] + ".jpg"
    
    an = ImageAnnotation(im_path, anno)
    
    f, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2)
    ax1.imshow(an.im)
    ax1.set_title('Image')
    ax1.axis('off')
    ax2.imshow(an.cls_mask, cmap=color_map(N=np.max(an.cls_mask) + 1))
    ax2.set_title('Class mask')
    ax2.axis('off')
    ax3.imshow(an.inst_mask, cmap=color_map(N=np.max(an.inst_mask) + 1))
    ax3.set_title('Instance mask')
    ax3.axis('off')
    if np.max(an.part_mask) == 0:
        ax4.imshow(an.part_mask, cmap='gray')
    else:
        ax4.imshow(an.part_mask, cmap=color_map(N=np.max(an.part_mask) + 1))
    ax4.set_title('Part mask')
    ax4.axis('off')
    plt.show()



In [4]:
def get_enclosing_bb(combined_mask, show = True):
    # The function receives as input a combined mask where the mask pixels are asserted where there is a part
    # The function return the smallest possible upright bounding box which encloses all the white pixel  
        # A cool idea would be to return a non-upright bounding box which encloses all the white pixels but this would entail a shift in training strategy as yolo would now require for us to give an extra parameter of rotation. This may not be the best idea but worth exploring
    
    # To get a minimum enclosing rectangle we need to get the minimum 
    
    contours, hierarchy = cv2.findContours(combined_mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)

    height, width = combined_mask.shape
    min_x, min_y = width, height
    max_x = max_y = 0

    # computes the bounding box for the contour, and draws it on the frame,
    for contour in contours:
        (x,y,w,h) = cv2.boundingRect(contour)
        min_x, max_x = min(x, min_x), max(x+w, max_x)
        min_y, max_y = min(y, min_y), max(y+h, max_y)
        
    w = max_x - min_x
    h = max_y - min_y
    xc = min_x + (w/2)
    yc = min_y + (h/2)
    
#     # Debugging
#     if show:
# #         plt.imshow(combined_mask, cmap=color_map(N=np.max(combined_mask) + 1))
#         cv2.rectangle(combined_mask, (min_x, min_y), (max_x, max_y), (255, 0, 0), 2)
#         f, (ax1) = plt.subplots(1, 1)
# #         , cmap=color_map(N=np.max(combined_mask) + 1)
#         ax1.imshow(combined_mask)
#         ax1.set_title('Image')
#         ax1.axis('off')
#         plt.show()
    
    
    # normalise the coordinates
    w = w/width
    h = h/height
    xc = xc/width
    yc = yc/height
    
    return xc, yc, w, h, (min_x, min_y), (max_x, max_y)

def get_combined_mask(an_obj, sub_part):
    # The sub parts array is a tuple which contains body parts.
    # The function returns a combined mask for the available body parts if they exist, otherwise that part just doesnt exist in the current image
    # for example a head doesnt exist for the current an_object then we just skip that body part
    
    # Send an_object.mask if 'all' in subparts
    if 'all' in sub_part:
        return an_obj.mask

    # First we make a buffer mask which will contain all the amalgamated mask
    buffer_mask = np.zeros((an_obj.mask.shape), dtype = np.uint8)
    
    if 'frontal_face' in sub_part:
#         # Here we will return the head mask but with the hair mask subtracted
#         hair_mask = neck_mask = np.zeros_like(an_obj.mask, dtype = np.uint8)
        
#         for tiny_part in an_obj.parts:
#             if tiny_part.part_name == 'head':
#                 buffer_mask += tiny_part.mask
#             if tiny_part.part_name == 'hair':
#                 hair_mask = tiny_part.mask
#             if tiny_part.part_name == 'neck':
#                 neck_mask = tiny_part.mask
            
                
#         # Here we extend the hair mask slightly beyond its boundaries to get a cleaner cut
#         kernel = np.ones((100,100),np.uint8)
#         cv2.dilate(hair_mask, kernel, iterations = 1)
#         cv2.dilate(neck_mask, kernel, iterations = 1)
        
#         buffer_mask -= hair_mask
#         buffer_mask[buffer_mask == 255] = 0
        
#         buffer_mask -= neck_mask
#         buffer_mask[buffer_mask == 255] = 0
        
#         return buffer_mask
#         frontal_face = set(['lear', 'rear', 'mouth', 'leye', 'reye', 'lebrow', 'rebrow'])
        frontal_face = set(['mouth', 'leye', 'reye', 'lebrow', 'rebrow'])
    
        # For a completely frontal face we need to be able to locate all of the above
        # For a partial frontal face we need to be able to locate atleast one of the ears. This could be the logic
        all_parts = [x.part_name for x in an_obj.parts]
        all_parts = set(all_parts)
        
        if all_parts == all_parts.union(frontal_face):
            # then this is a frontal face. We can just return the head here
            for tiny_part in an_obj.parts:
                if tiny_part.part_name == 'head':                    
                    return tiny_part.mask
#             hair_mask = neck_mask = np.zeros_like(an_obj.mask, dtype = np.uint8)
#             for tiny_part in an_obj.parts:
#                 if tiny_part.part_name == 'head':
#                     buffer_mask += tiny_part.mask
#                 if tiny_part.part_name == 'hair':
#                     hair_mask = tiny_part.mask
#                 if tiny_part.part_name == 'neck':
#                     neck_mask = tiny_part.mask
 
#             buffer_mask -= hair_mask
#             buffer_mask[buffer_mask == 255] = 0

#             buffer_mask -= neck_mask
#             buffer_mask[buffer_mask == 255] = 0
            
#             # Now find the biggest contour and return a mask that satisfies it
#             contours, hierarchy = cv2.findContours(buffer_mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)   
            
#             # Find the biggest area
#             c = max(contours, key = cv2.contourArea)
#             x,y,w,h = cv2.boundingRect(c)
#             height, width = buffer_mask.shape
#             xc = (x+(w/2))/width
#             yc = (y+(h/2))/width
#             w = w/width
#             h = h/height
            
#             return xc,yc,w,h
        else:
            # Return an empty mask if the head doest not contain all the required parts
            return buffer_mask
        

    
    for tiny_part in an_obj.parts:    
        # Now we check whether the tiny part is in our sub_part list
        if tiny_part.part_name in sub_part:
            buffer_mask += tiny_part.mask
        else:
            continue 
    
    # It should be noted that the buffer mask could be empty in case where the parts are not available.
    return buffer_mask
        

def has_person(an, PERSON_IND = 15):
    if PERSON_IND in np.unique(an.cls_mask):
        return True
    return False

In [5]:
def generate_bbs(an_obj, parts, parts_stat, image = None):
    # This function is responsible for handling the annotations for a single person
    # It will also generate bounding boxes for each (combined) part
    # For this object it return for example 3 bounding boxes [(0, xc,yc,w,h), (1, xc, yc, w, h), (2, xc,yc,w,h)] for the three parts [all, head, (torso)]
    
    annot_per_part = []
    for index, sub_part in enumerate(parts):
        # Get a combined mask for the sub_part for example a mask that encampasses [ear, hair] or [head, left_shoulder, right_shoulder]
        combined_mask = get_combined_mask(an_obj, sub_part)

        if type(combined_mask) == "tuple":
            parts_stat[index] += 1
            xc,yc,w,h = combined_mask
            annot_per_part.append([index, xc,yc,w,h])
            continue
        
        # Check if this combined mask is empty:
        if len(np.unique(combined_mask)) > 1:
            
            parts_stat[index] += 1
            
            xc,yc,w,h, p1, p2 = get_enclosing_bb(combined_mask)
            
#             if type(image) != "NoneType":
#                 cv2.rectangle(image, p1, p2, (255, 0, 0), 2)
#                 f, (ax1, ax2) = plt.subplots(1, 2, figsize=(20, 20))
#                 ax1.imshow(image)
#                 ax1.set_title('Image')
#                 ax1.axis('off')
#                 print(combined_mask.dtype)
#                 cv2.rectangle(combined_mask, p1, p2, (0, 255, 200), 2)
#                 ax2.imshow(combined_mask)
#                 ax2.set_title('Mask BB')
#                 ax2.axis('off')             
#                 plt.show()
            
            # Make sure these are normalised
            annot = [index, xc, yc, w, h]

            # And now we append the annotation for this sub part to the list that we will return to our main func
            annot_per_part.append(annot)
        
    return annot_per_part, parts_stat

In [6]:
def parts2darknet(parts, anno_paths, darknet_path = "darknet/darknet_annots/aug_pf/"):
    # Parts is a list of tuples
    # for example: [('head'), ('torso', 'left_lower_leg', 'right_lower_leg'), ('all')]

    # Traverse each annotation
    # Each Annotation is its own image so for now lets store

    # The function will return a dictionary where each valid image's name will become the key and have a list of darknet annotations
    # for values. So for examples

    parts_stat = [0] * len(parts)
    
    n_person = 0
    n_neg_frames = 0
    n_objects = 0
    
    anno_dict = dict()
    for path in tqdm.tqdm(anno_paths):
        im_id = path.rstrip(".mat").split("/")[1]
        im_path = "../VOC2010/VOCdevkit/VOC2010/JPEGImages/" + im_id + ".jpg"
        an = ImageAnnotation(im_path, path)

#         print(f"Processing {im_id}")
        
        if has_person(an) == False:
            # Breaks early out of loop
            n_neg_frames += 1
            
            # Add an empty annot to the dict
            anno_dict[im_id] = []
            
            continue

        ##################
        #### Augment Image
        ##################
        # Add new augmented data to the anno_dict
        # We will need to read the anno_dict for annotations
        # Because we know that our camera will be static we can be sure that only our subjects will be blurred out due to motion
        # SO we will utilise this knowledge to motion blur only the subjects which are potentially candidates for motion blur. Which are all persons
        # When ever we find a anno dict key which has 
        # Here we append augmented data to the dictionary
            # The problem is that afterwards we copy the images

        # Read the Image
        aug_img = np.copy(an.im)

        # Apply Motion Blur in several Directions
        v, h, d, o = apply_motion_blur(aug_img, k_size=10)

        # Adjust the bounding boxes 
            # No need to adjust the bounding box

        # Add person_bbs to the anno_dict against a new aug_0_{im_id} <- key
        v_id = f"aug_v_{im_id}"
        h_id = f"aug_h_{im_id}"
        d_id = f"aug_d_{im_id}"
        o_id = f"aug_o_{im_id}"
        
        # Now copy augmented images to the the folder
        cv2.imwrite(darknet_path + v_id+".jpg", v)
        cv2.imwrite(darknet_path + h_id+".jpg", h)
        cv2.imwrite(darknet_path + d_id+".jpg", d)
        cv2.imwrite(darknet_path + o_id+".jpg", o)
            
        # Traverse the objects
        for obj in an.objects:
            n_objects += 1
            
            # Check if person exists in this an
            if obj.class_name == 'person':
                n_person += 1
                
                # The chunk of the work happens here: Bounding boxes and classes are generated here
#                 person_bbs, parts_count = generate_bbs(obj, parts, parts_stat, image= an.im)
                person_bbs, parts_count = generate_bbs(obj, parts, parts_stat)
                
                # Here we append these bbs to our dictionary
                if im_id not in anno_dict.keys():
                    anno_dict[im_id] = []
                    anno_dict[v_id] = []
                    anno_dict[h_id] = []
                    anno_dict[d_id] = []
                    anno_dict[o_id] = []

                # Appending new annotations to the dictionary
                for bb in person_bbs:
                    anno_dict[im_id].append(bb)
                    anno_dict[v_id].append(bb)
                    anno_dict[h_id].append(bb)
                    anno_dict[d_id].append(bb)
                    anno_dict[o_id].append(bb)
                    
    print(f"Found {n_person} persons")
    print(f"Found {n_neg_frames} negative frames")
    print(f"Found {n_objects} total objects")
    print(f"Parts Distribution: ", parts_count)
    
    return anno_dict

In [7]:
def apply_motion_blur(image, k_size = 30):
    # Returns the augmented image
    # Specify the kernel size. 
    # The greater the size, the more the motion. 
    kernel_size = k_size
    
    # Vertical Kernel 
    kernel_v = np.zeros((kernel_size, kernel_size))
    # Horizontal Kernel
    kernel_h = np.copy(kernel_v) 
    # Diagonal Kernel
    kernel_diag = np.copy(kernel_v)
    # Opposite Diagonal Kernel
    kernel_opposite_diag = np.copy(kernel_v)

    # Fill the middle row with ones. 
    kernel_v[:, int((kernel_size - 1)/2)] = np.ones(kernel_size)
    kernel_h[int((kernel_size - 1)/2), :] = np.ones(kernel_size)
    kernel_diag = cv2.line(kernel_diag, (0, 0), (kernel_size-1, kernel_size-1), (1), 1)
    kernel_opposite_diag = cv2.line(kernel_opposite_diag, (kernel_size-1, 0), (0, kernel_size-1) , (1), 1)

    # Normalize. 
    kernel_v /= kernel_size 
    kernel_h /= kernel_size 
    kernel_diag /= kernel_size
    kernel_opposite_diag /= kernel_size

    # Apply the vertical kernel. 
    vertical_mb = cv2.filter2D(image, -1, kernel_v) 
    
    # Apply the horizontal kernel. 
    horizonal_mb = cv2.filter2D(image, -1, kernel_h) 
    
    # Apply the diagonal kernel
    diagonal_mb = cv2.filter2D(image, -1, kernel_diag)
    
    opposite_diagonal_mb = cv2.filter2D(image, -1, kernel_opposite_diag)
    
    
    
    return vertical_mb, horizonal_mb, diagonal_mb, opposite_diagonal_mb

In [8]:
def write_annots(anno_dict, containing_folder="darknet_annots/pfhul/", base_path = "darknet/"):
    darknet_annot_path = base_path + containing_folder

    # Write the annot dict to file:
    for key in anno_dict.keys():
        out_path = darknet_annot_path + key + ".txt"
        with open(out_path, 'w+') as out:
            for line in anno_dict[key]:
                # Write all lines to 
                line_s = ""
                for s in line:
                    line_s += str(s) + " "
                line_s += "\n"
                out.write(line_s)
    print("Annotations were written to file successfully !!")

In [9]:
def generate_test_train_files(anno_dict, containing_folder_name, base_path="darknet/", split=0.7):
    train_path = base_path + f"train_test_split/{containing_folder_name}_train.txt"
    test_path = base_path + f"train_test_split/{containing_folder_name}_test.txt"
    
    anno_keys = list(anno_dict.keys())
    train_count = int(len(anno_dict.keys()) * split)
    with open(train_path, "w+") as train:
        for key in anno_keys[:train_count]:
            train.write(f"data/{containing_folder_name}/{key}.jpg\n")
    with open(test_path, "w+") as test:
        for key in anno_keys[train_count:]:
            test.write(f"data/{containing_folder_name}/{key}.jpg\n")
            

In [10]:
def copy_images(anno_dict, obj_name):
    for im_id in anno_dict.keys():
        # If the image is augmented then dont copy it to the folder because it already exists
        if im_id.split("_")[0] == "aug":
            continue
        im_path = "../VOC2010/VOCdevkit/VOC2010/JPEGImages/" + im_id + ".jpg"
        copy(im_path, f"./darknet/darknet_annots/{obj_name}/")

In [11]:
def do_everything(obj_name):
    anno_paths = glob.glob("Annotations_Part/*.mat")
    
    upper_body = ('torso', 'luarm', 'llarm', 'ruarm', 'rlarm', 'lhand', 'rhand')
    lower_body = ('luleg', 'llleg', 'ruleg', 'rlleg', 'rfoot', 'lfoot')
    body = ('torso', 'lhand', 'rhand', 'luleg', 'llleg', 'ruleg', 'rlleg', 'rfoot', 'lfoot')

    face = ('frontal_face')
    pfhul = [('all'), face, ('head'), upper_body, lower_body]
    pf = [('all'), face]
    person = [('all')]
    
    bf = [('torso'), face]
    
    anno_dict = parts2darknet(bf, anno_paths, darknet_path="darknet/darknet_annots/{0}/".format(obj_name))    
    write_annots(anno_dict, containing_folder=f"darknet_annots/{obj_name}/")
    generate_test_train_files(anno_dict, obj_name)
    copy_images(anno_dict, obj_name)

In [12]:
do_everything("bf")

100%|██████████| 10103/10103 [13:56<00:00, 12.07it/s]


Found 7803 persons
Found 6564 negative frames
Found 12958 total objects
Parts Distribution:  [7368, 2859]
Annotations were written to file successfully !!
