In [5]:
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow import keras
from keras.layers import Conv2D, Flatten, GlobalAveragePooling2D, MaxPooling2D
from keras.models import Model 
from keras.callbacks import CSVLogger, EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from keras.optimizers import Adam, SGD, RMSprop
from keras.preprocessing.image import ImageDataGenerator
from keras.losses import huber_loss
from models.model import DARKNET19_ARCHITECTURE, INPUT_SIZE, build_model
from keras.metrics import MeanIoU
from keras.utils import np_utils
import albumentations
from sklearn.model_selection import train_test_split
import numpy as np
import cv2
import matplotlib.pyplot as plt
import pandas as pd
import json
import os
import random
from tensorflow.python.ops.numpy_ops import np_config

In [6]:
np.random.seed(42)
tf.random.set_seed(42)

print(tf.config.list_physical_devices('GPU'))
np_config.enable_numpy_behavior()

WIDTH = 448
HEIGHT = 448 
BATCH_SIZE = 8
TEST_SPLIT = 0.2
LEARNING_RATE = 1e-2
EPOCHS = 100

dataset_path = "data/images"

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


## 1. Data

In [8]:
data = {}

files = [item for item in os.listdir(dataset_path) if os.path.isdir(f"{dataset_path}/{item}")]

for dir in files:
    for i in range(100):
        for j in range(4):
            bbox = []
            try:
                with open(f"{dataset_path}/{dir}/{i}_bbox_{j+1}.txt", "r") as f:
                    for line in f:
                        bbox.append(int(line.split(" ")[1].strip("\n")))
                bbox[0], bbox[1] = bbox[1], bbox[0]
                bbox[2], bbox[3] = bbox[3], bbox[2]
                data[f"{dir}/{i}_webcam_{j+1}.jpg"] = bbox
            except: 
                continue

print((data))

{'data_1/0_webcam_1.jpg': [183, 167, 431, 382], 'data_1/0_webcam_2.jpg': [223, 151, 469, 343], 'data_1/0_webcam_3.jpg': [234, 140, 434, 353], 'data_1/0_webcam_4.jpg': [192, 245, 507, 387], 'data_1/1_webcam_1.jpg': [182, 165, 430, 380], 'data_1/1_webcam_2.jpg': [223, 152, 470, 339], 'data_1/1_webcam_3.jpg': [235, 141, 433, 354], 'data_1/1_webcam_4.jpg': [194, 241, 506, 386], 'data_1/2_webcam_1.jpg': [183, 162, 431, 377], 'data_1/2_webcam_2.jpg': [222, 150, 470, 334], 'data_1/2_webcam_3.jpg': [235, 140, 433, 354], 'data_1/2_webcam_4.jpg': [197, 234, 504, 381], 'data_1/3_webcam_1.jpg': [184, 162, 433, 366], 'data_1/3_webcam_2.jpg': [219, 143, 469, 337], 'data_1/3_webcam_3.jpg': [234, 139, 430, 354], 'data_1/3_webcam_4.jpg': [194, 227, 497, 371], 'data_1/4_webcam_1.jpg': [185, 165, 437, 345], 'data_1/4_webcam_2.jpg': [214, 128, 468, 341], 'data_1/4_webcam_3.jpg': [235, 136, 427, 353], 'data_1/4_webcam_4.jpg': [195, 216, 484, 356], 'data_1/5_webcam_1.jpg': [185, 169, 439, 324], 'data_1/5_we

In [9]:
# Get data and transform left hand labels to 0 and right hand labels to 1
df = pd.DataFrame({"filename": [], "bbox": []})
for filename in data:
    df.loc[len(df)] = [filename, data[filename]]
df = df.sample(frac=1)

# Split the data into training and testing sets
train, test = train_test_split(df, test_size=TEST_SPLIT, shuffle=False)

train_images, train_bboxes = train["filename"].to_numpy(), train["bbox"].to_numpy()
test_images, test_bboxes = test["filename"].to_numpy(), test["bbox"].to_numpy()

print(train_images)
print(test_images)

  element = np.asarray(element)


['data_1/8_webcam_2.jpg' 'data_21/20_webcam_2.jpg'
 'data_20/28_webcam_1.jpg' ... 'data_21/2_webcam_1.jpg'
 'data_5/52_webcam_2.jpg' 'data_21/29_webcam_4.jpg']
['data_10/77_webcam_2.jpg' 'data_15/5_webcam_2.jpg'
 'data_3/1_webcam_1.jpg' ... 'data_21/47_webcam_3.jpg'
 'data_11/15_webcam_1.jpg' 'data_7/17_webcam_3.jpg']


## 2. Data Preprocessing
### 2.1 Resizing & Normalisation

In [10]:
def read_image(path, bbox):

    filename = path.decode("utf-8")
    path = f"{dataset_path}/{filename}"
    
    image = cv2.imread(path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    if(bbox[0] > image.shape[1]):
        bbox[0] = image.shape[1]
    if(bbox[2] > image.shape[1]):
        bbox[2] = image.shape[1]

    if(bbox[1] > image.shape[0]):
        bbox[1] = image.shape[0]
    if(bbox[3] > image.shape[0]):
        bbox[3] = image.shape[0]

    for i in range(4):
        if(bbox[i] < 0):
            bbox[i] = 0
        
    # cv2.rectangle(image, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), (255,0,0), 1)
    # plt.imshow(image)
    # plt.show()

    transform = albumentations.Compose(
        [albumentations.Resize(height=HEIGHT, width=WIDTH, always_apply=True)],
        bbox_params=albumentations.BboxParams(format='pascal_voc'))

    transformed = transform(image=image, bboxes=[np.concatenate([bbox,[0]])])
    image, bbox = transformed["image"], transformed["bboxes"][0]



    norm_image = tf.cast(image, tf.float32) / 255.

    x1, y1, x2, y2 = bbox[0]/WIDTH, bbox[1]/HEIGHT, bbox[2]/WIDTH, bbox[3]/HEIGHT
    norm_bbox = np.array([x1, y1, x2, y2], dtype=np.float32)


    return norm_image, norm_bbox


### 2.2 Transforming data to Tensors

In [20]:
def parse(image, bbox):
    image, bbox = tf.numpy_function(read_image, [image, bbox], [tf.float32, tf.float32])
    image.set_shape((WIDTH, HEIGHT, 3))
    bbox.set_shape((4))
    return image, bbox

# print(train_labels)
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, list(train_bboxes)))
train_dataset = train_dataset.map(parse).batch(BATCH_SIZE)

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, list(test_bboxes)))
test_dataset = test_dataset.map(parse).batch(BATCH_SIZE)


def get_iou(ground_truth, pred):
    # coordinates of the area of intersection.
    ix1 = tf.maximum(ground_truth[0], pred[0])
    iy1 = tf.maximum(ground_truth[1], pred[1])
    ix2 = tf.minimum(ground_truth[2], pred[2])
    iy2 = tf.minimum(ground_truth[3], pred[3])
     
    # Intersection height and width.
    i_height = tf.maximum(iy2 - iy1 + 1, np.array(0.))
    i_width = tf.maximum(ix2 - ix1 + 1, np.array(0.))
     
    area_of_intersection = i_height * i_width
     
    # Ground Truth dimensions.
    gt_height = ground_truth[3] - ground_truth[1] + 1
    gt_width = ground_truth[2] - ground_truth[0] + 1
     
    # Prediction dimensions.
    pd_height = pred[3] - pred[1] + 1
    pd_width = pred[2] - pred[0] + 1
     
    area_of_union = gt_height * gt_width + pd_height * pd_width - area_of_intersection
     
    iou = area_of_intersection / area_of_union
     
    return iou

def l2_loss(y_true, y_pred):
    return tf.square(y_true - y_pred) 

def IoU_loss(y_true, y_pred):
    return 1 - get_iou(y_true, y_pred) 


## The CNN Model

In [21]:
model = build_model(DARKNET19_ARCHITECTURE, INPUT_SIZE)
model.compile(optimizer=SGD(learning_rate=LEARNING_RATE), loss=IoU_loss, metrics=[get_iou, "accuracy"])

callbacks = [
    CSVLogger("models/trained/model_iou.csv", append=True),
    ModelCheckpoint("models/trained/model_iou.hdf5", verbose=1, save_weights_only=True, monitor='val_loss', save_freq='epoch'),
    ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-7, verbose=1),
    EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=False)
]

print(train_dataset)
model.fit(train_dataset, epochs=EPOCHS, validation_data=test_dataset, callbacks=callbacks)

<BatchDataset element_spec=(TensorSpec(shape=(None, 448, 448, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 4), dtype=tf.float32, name=None))>
Epoch 1/10
Epoch 1: saving model to models/trained\model_iou.hdf5
Epoch 2/10
Epoch 2: saving model to models/trained\model_iou.hdf5
Epoch 3/10
Epoch 3: saving model to models/trained\model_iou.hdf5
Epoch 4/10
Epoch 4: saving model to models/trained\model_iou.hdf5
Epoch 5/10
Epoch 5: saving model to models/trained\model_iou.hdf5
Epoch 6/10
182/840 [=====>........................] - ETA: 3:24 - loss: 10.9005 - get_iou: 0.8374 - accuracy: 0.7905

KeyboardInterrupt: 

## 3. Evaluation

In [24]:
print("Huber Loss Model Evaluation")
model = build_model(DARKNET19_ARCHITECTURE, INPUT_SIZE)
model.load_weights("models/trained/model_huber.hdf5")
model.compile(optimizer=SGD(learning_rate=LEARNING_RATE), loss="huber_loss", metrics=[get_iou,"accuracy"])
model.evaluate(test_dataset)

print("L2 Loss Model Evaluation")
model = build_model(DARKNET19_ARCHITECTURE, INPUT_SIZE)
model.load_weights("models/trained/model_l2.hdf5")
model.compile(optimizer=SGD(learning_rate=LEARNING_RATE), loss=l2_loss, metrics=[get_iou,"accuracy"])
model.evaluate(test_dataset)

print("IoU Loss Model Evaluation")
model = build_model(DARKNET19_ARCHITECTURE, INPUT_SIZE)
model.load_weights("models/trained/model_iou.hdf5")
model.compile(optimizer=SGD(learning_rate=LEARNING_RATE), loss=IoU_loss, metrics=[get_iou,"accuracy"])
model.evaluate(test_dataset)


IoU Loss Model Evaluation
Huber Loss Model Evaluation
L2 Loss Model Evaluation


[2.070523738861084, 0.8145965337753296, 0.9029762148857117]