# Problem description

The majority of available models, trained for object detection and recognition tasks in the field of autonomous/automated driving systems, considers only large objects such as trees on the side of the road, pedestrians, surrounding vehicles, large animals or road blockages.

Detecting small (low-level) obstacles on the road has posed a challenge, mainly due to the noise, or skew, in their pixel frequency or the small size of features that can describe these obstacles relative to the size of the frame. It is difficult for Neural Networks to approximate these type of objects, therefore many times they are randomly classified.

Recently, however, small obstacle detection has gained more popularity as  the demand for fully automated vehicles rose. Detecting unexpected small obstacles on the road could prevent the accidents caused by falling debris, construction activities or lost cargo, etc, providing a safer driving experience.

# About the Dataset

The **Lost and Found** dataset contains images combined with RGB depth information used to segment the image, determining the pixels that belong either to obstacles, road or non-road surfaces. 

The problem with this dataset is that it has a limited depth of 20m and the poor accuracy of detecting very small objects.

The **Small Obstacle Dataset**, created by the Robotics Research Center IIIT from India, collected images as well as sensor data using a highly accurate Lidar sensor, detecting objects up to a depth of 75m. The data between the 2 devices is calibrated in order to obtain a better representation of the driving conditions. The images as well as the sensor data are labeled in order to detect only small level obstacles, which means they are specialized exactly for this type of task.

It consists of 2 sets: one containing data obtained from real-life situations, while the second set contains data from a simulator in Unreal Engine.



Start by importing the necessary libraries. The images as well as the point clouds will be plotted using the plotly.matplotlib library.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import os
import cv2
import csv
import open3d as o3d
import tensorflow as tf
import tensorflow_datasets as tfds
import keras
import scipy
import fnmatch
import pandas as pd
import math

In [None]:
# ------------------real data-----------------------------
TRAIN_BASE_DIR = "datasets/small_obs_dataset/Small_Obstacle_Dataset/train"
VAL_BASE_DIR = "datasets/small_obs_dataset/Small_Obstacle_Dataset/val"
TEST_BASE_DIR = "datasets/small_obs_dataset/Small_Obstacle_Dataset/test"

LABELS_DIR = "/labels"
IMAGE_DIR = "/image"
ODOMETRY_DIR = "/odometry"
VELODYNE_DIR = "/velodyne"

## Visualize the data

In [None]:
def view_image_input(image_dir, labels_dir, input_file_name):
    img_path = image_dir + "/" + input_file_name
    img = mpimg.imread(img_path)

    segm_path = labels_dir + "/" + input_file_name
    segm = mpimg.imread(segm_path)

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20, 15))

    ax1.set_title("Input image")
    ax1.imshow(img)

    ax2.set_title("Segmentation mask")
    ax2.imshow(segm)

    plt.show(block=True)

In [None]:
print("Real image sample:")
view_image_input(TRAIN_BASE_DIR + "/file_1" + IMAGE_DIR, TRAIN_BASE_DIR + "/file_1" + LABELS_DIR, "0000000080.png")

## Load data

In [None]:
def save_to_csv(output_path, headers, data):
    with open(output_path, 'w', newline='') as file:
        writer = csv.writer(file)
        # write headers in first row
        writer.writerow(headers)
        # write the data from the given list
        for row in data:
            writer.writerow(row)

### Load image data

In [None]:
def load_image_data(root_dir):
    data = []
    for root, dirs, files in os.walk(root_dir):
        if root.endswith("image"):
            files = [os.path.join(root, f).replace("\\", "/") for f in files if f.endswith('.png')]
            images = list(filter(lambda f: os.path.isfile(f.replace("/image", "/labels")), files))
            labels = [img.replace("/image", "/labels") for img in images]
            data += map(lambda t: [t[0], t[1]], zip(images, labels))

    return data

Store the path to the images and their respective labels, in a csv file.

In [None]:
train_img_csv_path = TRAIN_BASE_DIR + '/train_images.csv'
val_img_csv_path = VAL_BASE_DIR + '/val_images.csv'
test_img_csv_path = TEST_BASE_DIR + '/test_images.csv'

img_label_headers = ["image", "label"]

In [None]:
train_img_data = load_image_data(TRAIN_BASE_DIR)
print("Found %d train images along with their semantic masks" % len(train_img_data))
# save to csv file
save_to_csv(train_img_csv_path, img_label_headers, train_img_data)

In [None]:
val_img_data = load_image_data(VAL_BASE_DIR)
print("Found %d val images along with their semantic masks" % len(val_img_data))
# save to csv file
save_to_csv(val_img_csv_path, img_label_headers, val_img_data)

In [None]:
test_img_data = load_image_data(TEST_BASE_DIR)
print("Found %d test images along with their semantic masks" % len(test_img_data))
# save to csv file
save_to_csv(test_img_csv_path, img_label_headers, test_img_data)

In [None]:
IMG_HEIGHT = 180
IMG_WIDTH = 180
BATCH_SIZE = 32

In [None]:
train_image_df = pd.read_csv(train_img_csv_path)
print("Columns: ", train_image_df.columns.values)
print("Shape: ", train_image_df.shape)

In [None]:
train_image_df.head()

In [None]:
def load_and_preprocess_image(path):
    img_file = tf.io.read_file(path)
    img_array = tf.io.decode_png(img_file, channels=3)
    img_resized = tf.image.resize(img_array, [IMG_HEIGHT, IMG_WIDTH])
    return img_resized


def load_and_preprocess_image_and_label(row):
    img_data = load_and_preprocess_image(row[0])
    label_data = load_and_preprocess_image(row[1])
    return img_data, label_data

In [None]:
train_tensor = tf.data.Dataset.from_tensor_slices(train_image_df[['image', 'label']].values)
train_tensor = train_tensor.map(load_and_preprocess_image_and_label, tf.data.experimental.AUTOTUNE)
print(train_tensor.element_spec)

In [None]:
it = train_tensor.shuffle(len(train_image_df.values)).batch(32).as_numpy_iterator()
it.next()

## Lost and Found dataset

In [None]:
lost_found_dataset, info = tfds.load('lost_and_found',
                                     with_info=True)

The lost and found dataset contains 2104 annotated frames gathered from 112 video seqeunces: http://wwwlehre.dhbw-stuttgart.de/~sgehrig/lostAndFoundDataset/index.html

In [None]:
info.features

In [None]:
lost_found_dataset['train']

View a few sample images from the dataset:

In [None]:
_ = tfds.visualization.show_examples(lost_found_dataset['train'], info, image_key="image_left")

In [None]:
_ = tfds.visualization.show_examples(lost_found_dataset['train'], info, image_key="segmentation_label")

In [None]:
lost_found_dataset

### Restricted Boltzmann Machine

In [None]:
class RBM():
    def __init__(self, nv, nh):
        """

        :param nv: nr of neurons in the input/visible layer
        :param nh: nr of neurons in the hidden layer
        """
        # initialize the weight matrix
        self.W = tf.Variable(tf.truncated_normal((nv, nh)) * 0.01)
        self.bv = tf.Variable(tf.zeros((nv, 1)))
        self.bh = tf.Variable(tf.zeros((nh, 1)))

    def bernoulli(self, p):
        return tf.nn.relu(tf.sign(p = tf.random_uniform(p.shape)))

    def energy_function(self, v):
        b = tf.matmul(v, self.bv)
        linear_tr = tf.matmul(v, self.W) + tf.squeeze(self.bh)
        h = tf.reduce_sum(tf.log(tf.exp(linear_tr) + 1), axis=1)
        return tf.reduce_mean(-h, -b)