In [None]:
from __future__ import absolute_import, division, print_function, unicode_literals

import os

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import load_model
print("TensorFlow version is ", tf.__version__)

from tensorflow.keras.metrics import Precision
from tensorflow.keras.metrics import Accuracy

import numpy as np

import random

import matplotlib.pyplot as plt
import matplotlib.image as mpimg

In [None]:
tf.enable_eager_execution()

## Create and fill folders

In [None]:
base_dir = "./xebikart-ml-tubes/"

In [None]:
train_dir = base_dir + "/obstacles_train"
test_dir = base_dir + "/obstacles_test"
demo_dir = base_dir + "/obstacles_demo"

import os

for directory in [train_dir, test_dir, demo_dir]:
    for name in ["/clear","/obstacles"]:
        if not os.path.exists(directory + name):
            os.makedirs(directory + name)

In [None]:
import random
def random_split(items, size):
    sample = random.sample(items, size)
    a = [x for x in items if x in sample]
    b = [x for x in items if x not in sample]
    return a, b

In [None]:
list_ = [x for x in os.listdir(folder[key]) if x.split('.')[1]=="jpg"]

In [None]:
def move_train_test(folder, key, base_dir):

    list_ = [x for x in os.listdir(folder[key]) if x.split('.')[1]=="jpg"]
    a, b = random_split(list_, int(len(list_)*0.8))
    c = random.sample(list_, 100)
    
    train_folder = base_dir + "obstacles_train/"
    test_folder = base_dir + "obstacles_test/"
    demo_folder = base_dir + "obstacles_demo/"
    
    # train
    if len(os.listdir(train_folder + key) ) == 0:
        for file in a :
            try :
                copyfile(folder[key] + "/" + file, 
                      train_folder + key + "/" + file)
            except:
                print(key + " " + file)
    
    # test
    if len(os.listdir(test_folder + key) ) == 0:
        for file in b :
            try :
                copyfile(folder[key] + "/" + file, 
                      test_folder + key + "/" + file)
            except:
                print(key + " " + file)
    
    # demo
    if len(os.listdir(demo_folder + key) ) == 0:
        for file in c :
            try :
                copyfile(folder[key] + "/" + file, 
                      demo_folder + key + "/" + file)
            except:
                print(key + " " + file)

In [None]:
from shutil import copyfile

folder = {"clear" : base_dir + "tub.v4.02",
          "obstacles" : base_dir + "tub.v6.01"}

for key in folder.keys():
    move_train_test(folder=folder, key=key, base_dir=base_dir)

In [None]:
train_dir = os.path.join(base_dir, 'obstacles_train')
validation_dir = os.path.join(base_dir, 'obstacles_test')
demo_dir = os.path.join(base_dir, 'obstacles_demo')

# Directory with our training obstacles
train_obstacles_dir = os.path.join(train_dir, 'obstacles')
print ('Total training obstacles images:', len(os.listdir(train_obstacles_dir)))

# Directory with our training clear pictures
train_clear_dir = os.path.join(train_dir, 'clear')
print ('Total training clear road images:', len(os.listdir(train_clear_dir)))

# Directory with our validation obstacles pictures
validation_obstacles_dir = os.path.join(validation_dir, 'obstacles')
print ('Total validation obstacles images:', len(os.listdir(validation_obstacles_dir)))

# Directory with our validation clear pictures
validation_clear_dir = os.path.join(validation_dir, 'clear')
print ('Total validation clear road images:', len(os.listdir(validation_clear_dir)))

# Get or compile and train a model

In [None]:
model_name = input()

### If a model already exists

In [None]:
model = load_model(model_name + '.h5')

### Train a new model

#### Make the data generator

In [None]:
image_size = 160 # All images will be resized to 160x160
batch_size = 32

# Rescale all images by 1./255 and apply image augmentation
datagen = keras.preprocessing.image.ImageDataGenerator(
                rescale=1./255
                )

# Flow training images in batches of 20 using train_datagen generator
train_generator = datagen.flow_from_directory(
                train_dir,  # Source directory for the training images
                target_size=(image_size, image_size),
                batch_size=batch_size,
                
                # Since we use binary_crossentropy loss, we need binary labels
                class_mode='binary')

# Flow validation images in batches of 20 using test_datagen generator
validation_generator = datagen.flow_from_directory(
                validation_dir, # Source directory for the validation images
                target_size=(image_size, image_size),
                batch_size=batch_size,
                class_mode='binary')

### Use a pretrained model

##### Import the model

In [None]:
IMG_SHAPE = (image_size, image_size, 3)

# Create the base model from the pre-trained model MobileNet V2
base_model = tf.keras.applications.MobileNetV2(input_shape=IMG_SHAPE,
                                               include_top=False,
                                               weights='imagenet')

##### Choose the number of layer to train

In [None]:
nb_layer_train = 5
nb_layer_freez = len(base_model.layers) - nb_layer_train

In [None]:
for layer in base_model.layers[:nb_layer_freez]:
    layer.trainable = False

##### Let's take a look at the base model architecture

In [None]:
base_model.summary()

##### Add an output dense layer

In [None]:
model = tf.keras.Sequential([
  base_model,
  keras.layers.GlobalAveragePooling2D(),
  keras.layers.Dense(1, activation='sigmoid')
])

##### Choose the parameters and compile

In [None]:
model.compile(optimizer=tf.keras.optimizers.Adam(lr=0.001, 
                                                 decay=0.00005),
              loss='binary_crossentropy',
              metrics=[Precision(), Accuracy()])

In [None]:
model.summary()

In [None]:
len(model.trainable_variables)

## Train and evaluate the model

### Train

In [None]:
epochs = 20
steps_per_epoch = train_generator.n // batch_size
validation_steps = validation_generator.n // batch_size

history = model.fit_generator(train_generator,
                              steps_per_epoch = steps_per_epoch,
                              epochs=epochs,
                              workers=4,
                              validation_data=validation_generator,
                              validation_steps=validation_steps)

#### Evaluate

In [None]:
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

loss = history.history['loss']
val_loss = history.history['val_loss']

plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(acc, label='Training Accuracy')
plt.plot(val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.ylabel('Accuracy')
plt.ylim([min(plt.ylim()),1])
plt.title('Training and Validation Accuracy')

plt.subplot(2, 1, 2)
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.ylabel('Cross Entropy')
plt.ylim([0,max(plt.ylim())])
plt.title('Training and Validation Loss')
plt.show()

#### Vizualize the results

In [None]:
batch = []

# choose the data generator
datagen = keras.preprocessing.image.ImageDataGenerator(
                rescale=1./255,
#                preprocessing_function = preprocess
                )

# Flow validation images in batches of 20 using test_datagen generator
demo_generator = datagen.flow_from_directory(
                demo_dir , # Source directory for the validation images
                target_size=(image_size, image_size),
                batch_size=batch_size,
                class_mode='binary')


for image_batch, label_batch in demo_generator:
    print("Image batch shape: ", image_batch.shape)
    print("Label batch shape: ", label_batch.shape)
    batch.append({"img":image_batch,"label":label_batch})
    break

result_batch = model.predict(image_batch)
    
plt.figure(figsize=(20,30))
plt.subplots_adjust(hspace=0.3)

for n in range(30):
    plt.subplot(6,5,n+1)
    plt.imshow(image_batch[n])
    plt.title("Truth : {}/ Prediction : {}".format(label_batch[n],np.round(result_batch[n],1)))
    plt.axis('off')
    _ = plt.suptitle("ImageNet predictions")

In [None]:
# creates a HDF5 file 'my_model.h5'
model.save(model_name + '.h5')  

## Try to detect an obstacle in a box

#### Show an image and a blue on the same picture

In [None]:
def show_image_and_box(img, box, model, threshold = 0.5):
    
    cropped_img = img[box[0]:box[2], box[1]:box[3]]
    img_resized = np.resize(cropped_img,(160,160,3))
    img_reshaped = img_resized.reshape(1,160,160,3)
    box_prob = model.predict(img_reshaped)[0][0]
    
    if box_prob > threshold :
        label = "obstacle"
    else:
        label = "clear"
    
    plt.plot([box[1],box[1]],[box[0],box[2]], color="C0",linewidth=4)
    plt.plot([box[3],box[3]],[box[0],box[2]], color="C0",linewidth=4)
    plt.plot([box[1],box[3]],[box[0],box[0]], color="C0",linewidth=4)
    plt.plot([box[1],box[3]],[box[2],box[2]], color="C0",linewidth=4)
    plt.title("Detection : {} ({})".format(label,box_prob))
    plt.imshow(img)
    
    return cropped_img

#### Pick a picture from the demo folder

In [None]:
def pick_demo_image(demo_dir, image_category = 'obstacles'):

    assert (image_category in ['obstacles', 'clear']), "the category must be 'obstacles' or 'clear'"
    
    demo_dir_obstacle = os.path.join(demo_dir, image_category)
    path = os.path.join(demo_dir_obstacle, random.sample(os.listdir(demo_dir_obstacle),1)[0])
    tf_content = tf.io.read_file(path)
    tf_image = tf.image.decode_jpeg(tf_content, channels=3)
    img = mpimg.imread(path)
    
    return img

#### Choose parameters

In [None]:
top = 60
left = 4
win_heigh = 58
win_width = 150
box = (top, left, top + win_heigh, left + win_width)

#### Show images

In [None]:
img = pick_demo_image(demo_dir, image_category = 'obstacles')
cropped_img = show_image_and_box(img, box, model, threshold = 0.5)

In [None]:
img = pick_demo_image(demo_dir, image_category = 'clear')
cropped_img = show_image_and_box(img, box, model, threshold = 0.5)