
# LEO ROVER

In [None]:
import numpy as np
import keras as K
import keras.layers as KL
import pickle
import tqdm
import os
import cv2
import pandas as pd
from matplotlib import pyplot as plt

!pip3 install -U tensorflow-addons
import tensorflow_addons as tfa
import tensorflow as tf
from zipfile import ZipFile

In [None]:
#Connecting to drive
from google.colab import drive
drive.mount('/content/drive')

Variables that need to be changed if you are training on your data

In [None]:
# Name of root directory of the dataset (name of the zip file with your dataset)
root_dir = "/content/drive/MyDrive/LeoRover"

# Height of the crop area (number of pixels, starting from upper edge of image that will be cropped)
crop_height = 220

# Save directory for trained model
save_dir =  root_dir + "/model"

# Chose the name of saved tflite model
tflite_model_name = save_dir + "/tflite_model"

#Extract zip
with ZipFile(root_dir+'/data_leoRover.zip', 'r') as zip:
  zip.extractall()
  print('Done')

Reading prepared data labels and partition

In [None]:
data=pd.read_csv('/content/data_leoRover/MLdata.csv')

In [None]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(data.values[:,:-1], data.values[:,-1],test_size=1/5,random_state=1)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train,test_size=1/4,random_state=1)

In [None]:
X_train=pd.DataFrame(data=X_train, columns=data.columns[:-1])
y_train=pd.DataFrame(data=y_train, columns=[data.columns[-1]])
trainData=pd.concat([X_train,y_train],axis=1)

X_test=pd.DataFrame(data=X_test, columns=data.columns[:-1])
y_test=pd.DataFrame(data=y_test, columns=[data.columns[-1]])
testData=pd.concat([X_test,y_test],axis=1)

X_val=pd.DataFrame(data=X_val, columns=data.columns[:-1])
y_val=pd.DataFrame(data=y_val, columns=[data.columns[-1]])
validationData=pd.concat([X_val,y_val],axis=1)

Neural Network model parts

In [None]:
GPUs_num = len(tf.config.list_physical_devices('GPU'))
CPUs_num = len(tf.config.list_physical_devices('CPU'))
print("Num GPUs Available: ", GPUs_num)
print("gpu_devices: ", tf.config.list_physical_devices('GPU'))
print("Num CPUs Available: ", CPUs_num)
print("cpu_devices: ", tf.config.list_physical_devices('CPU'))
device = None
device = '/GPU:0' if GPUs_num > 0 else '/CPU:0'
device

Preprocess function

In [None]:
def preprocess(img, dim, hsv_lower, hsv_upper):
    # converting to hsv
    hsv_img = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
    # croping the img
    crop_img = hsv_img[crop_height:hsv_img.shape[0], :]
    # catching color mask
    color_mask = cv2.inRange(crop_img, hsv_lower, hsv_upper)
    # conveting values to float
    float_img = color_mask.astype(np.float32)
    # resizing
    resized_img = cv2.resize(float_img, (dim[1], dim[0]))
    # normalizing
    final_img = resized_img / 255.0
    
    return final_img[:,:,np.newaxis]

Keras Data Generator

In [None]:
class DataGenerator(K.utils.Sequence):
    def __init__(self, data, batch_size=32, dim=(32,32,32), n_channels=1, shuffle=True):
        'Initialization'
        self.dim = dim
        self.batch_size = batch_size
        self.data = data
        self.n_channels = n_channels
        self.shuffle = shuffle
        self.on_epoch_end()
        
    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.data))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)
        
    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.data) / self.batch_size))
    
    def __data_generation(self, indexes):
        'Generates data containing batch_size samples'
        X = np.empty((self.batch_size, *self.dim, self.n_channels))
        y_linear = np.empty((self.batch_size, 1), dtype=float)
        y_angular = np.empty((self.batch_size, 1), dtype=float)
        i=0
        for index in indexes:
            image=self.data.iloc[index]
            img = cv2.imread(image.imgpath)
            hsv_min=np.fromstring(image.hsv_min.split('[')[1].split(']')[0], dtype=float, sep=',')
            hsv_max=np.fromstring(image.hsv_max.split('[')[1].split(']')[0], dtype=float, sep=',')
            target=np.fromstring(image.Target.split('[')[1].split(']')[0], dtype=float, sep=',')
            hsv_lower=(hsv_min[0],hsv_min[1],hsv_min[2])
            hsv_upper=(hsv_max[0],hsv_max[1],hsv_max[2])
            preprocess_img = preprocess(img, self.dim, hsv_lower,hsv_upper)
            X[i,:] = preprocess_img

            y_linear[i] = target[0]
            y_angular[i] = target[1]
            i=i+1
        return X, {'linear': y_linear, 'angular': y_angular}

    def __getitem__(self, index):
        'Generate one batch of data'
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        X, y = self.__data_generation(indexes)

        return X, y

Keral Model

In [None]:
img_in = KL.Input(shape=(120, 160, 1), name='img_in')
x = img_in

x = KL.Convolution2D(filters=24, kernel_size=(5, 5), strides=(2, 2), activation='relu')(x)
x = KL.Convolution2D(filters=32, kernel_size=(5, 5), strides=(2, 2), activation='relu')(x)
x = KL.Convolution2D(filters=64, kernel_size=(5, 5), strides=(2, 2), activation='relu')(x)
x = KL.Convolution2D(filters=64, kernel_size=(3, 3), strides=(2, 2), activation='relu')(x)
x = KL.Convolution2D(filters=64, kernel_size=(3, 3), strides=(1, 1), activation='relu')(x)

x = KL.Flatten(name='flattened')(x)
x = KL.Dense(units=100, activation='linear')(x)
x = KL.Dropout(rate=.1)(x)
x = KL.Dense(units=50, activation='linear')(x)
x = KL.Dropout(rate=.1)(x)

linear = KL.Dense(units=1, activation='linear', name='linear')(x)

angular = KL.Dense(units=1, activation='linear', name='angular')(x)

model = K.Model(inputs=[img_in], outputs=[linear, angular])

with tf.device(device):
    model.compile(optimizer='adam',
                  loss={'linear': 'mean_squared_error', 'angular': 'mean_squared_error'},
                  loss_weights={'linear': 0.3, 'angular': 0.7})

In [None]:
callbacks = [
        K.callbacks.ModelCheckpoint(save_dir, save_best_only=True),
        K.callbacks.EarlyStopping(monitor='val_loss',
                                  min_delta=.0005,
                                  patience=20,
                                  verbose=True,
                                  mode='auto'),
        tfa.callbacks.TQDMProgressBar(),
        K.callbacks.ReduceLROnPlateau(monitor='val_loss',
                                      factor=0.2,
                                      patience=5,
                                      min_lr=0.001),
        K.callbacks.TensorBoard(log_dir='./logs', profile_batch=(0, 10))


    ]
params = {'dim': (120, 160),
          'batch_size': 64,
          'n_channels': 1,
          'shuffle': True}


with tf.device(device):
    training_generator = DataGenerator(trainData, **params)
    validation_generator = DataGenerator(validationData, **params)

Training the model

In [None]:
with tf.device(device):
    hist = model.fit(training_generator,
                           validation_data=validation_generator,
                           use_multiprocessing=True,
                           workers=6,
                           callbacks=callbacks,
                           epochs=100)

In [None]:
model.save(save_dir + "/rover_weights.h5")
print("Model saved!")

Converting Keral model to TFlite

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()

with open(tflite_model_name + '_withOurData.tflite', 'wb') as f:
    f.write(tflite_model)

In [None]:
def custom_loss(y_true, y_pred):
  mu = y_pred[:, :1] # First output neuron
  log_sig = y_pred[:, 1:] # Second output neuron
  sig = tf.exp(log_sig) # Undo the log

  return tf.reduce_mean(2*log_sig + ((y_true-mu)/sig)**2)

In [None]:
from keras.models import load_model
model = K.models.load_model("/content/drive/MyDrive/LeoRover/model/uncertainty-weights-1671283392.h5", custom_objects={'custom_loss':custom_loss})


In [None]:
from tqdm import tqdm
from google.colab.patches import cv2_imshow

test_dimension = (120, 160)
prediction_linear=[]
prediction_angular=[]
target_linear=[]
target_angular=[]


print("Predicting...")
for i in tqdm(range(0, len(testData)), ncols=100):
  image = testData.iloc[i]
  img = cv2.imread(image.imgpath)
  
  hsv_min = np.fromstring(image.hsv_min.split('[')[1].split(']')[0], dtype=float, sep=',')
  hsv_max = np.fromstring(image.hsv_max.split('[')[1].split(']')[0], dtype=float, sep=',')
  target = np.fromstring(image.Target.split('[')[1].split(']')[0], dtype=float, sep=',')
  hsv_lower = (hsv_min[0], hsv_min[1], hsv_min[2])
  hsv_upper = (hsv_max[0], hsv_max[1], hsv_max[2])
  
  preprocessed_test = preprocess(img, test_dimension, hsv_lower, hsv_upper)  
  
  keras_input = np.empty((1, 120, 160, 1))
  keras_input[0,:] = preprocessed_test
  
  
  prediction = model.predict(keras_input, verbose=0)
 
  predicted_linear = prediction[0][0][0] # Predicted Linear
  linear_uncertainty = prediction[0][0][1] # Linear uncertanty
  predicted_angular = prediction[1][0][0] # Predicted Angular
  angular_uncertainty = prediction[1][0][1] # Angular uncertanty

  prediction_linear.append([predicted_linear, linear_uncertainty])
  prediction_angular.append([predicted_angular, angular_uncertainty])
  target_angular.append(target[1])
  target_linear.append(target[0])

print("\nDone!")

In [None]:

min_mapped = 0
max_mapped = 100
def map_range(x, in_max, in_min, out_min, out_max):
    return ((x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min)

In [None]:
linear_uncertainties = [prediction_linear[row][1] for row in range(len(prediction_linear))]
min_linear_uncertainty = float(tf.exp(min(linear_uncertainties)))
max_linear_uncertainty = float(tf.exp(max(linear_uncertainties)))

angular_uncertainties = [prediction_angular[row][1] for row in range(len(prediction_angular))]
min_angular_uncertainty = float(tf.exp(min(angular_uncertainties)))
max_angular_uncertainty = float(tf.exp(max(angular_uncertainties)))

print(min_linear_uncertainty)
print(max_linear_uncertainty)
print(min_angular_uncertainty)
print(max_angular_uncertainty)


In [None]:
okPrediction=0
for i in range(0, len(testData)): 
  print(f"\n======================== Predicting: {i} out of {len(testData)} ========================")
  # Load image
  image = testData.iloc[i]
  img = cv2.imread(image.imgpath)

  # Prepare image
  hsv_min = np.fromstring(image.hsv_min.split('[')[1].split(']')[0], dtype=float, sep=',')
  hsv_max = np.fromstring(image.hsv_max.split('[')[1].split(']')[0], dtype=float, sep=',')
  target = np.fromstring(image.Target.split('[')[1].split(']')[0], dtype=float, sep=',')
  hsv_lower = (hsv_min[0],hsv_min[1],hsv_min[2])
  hsv_upper = (hsv_max[0],hsv_max[1],hsv_max[2])

  # Preprocess image
  img_array = preprocess(img, (120, 160, 3), hsv_lower, hsv_upper)

  # Real Velocities
  real_linear = target[0] 
  real_angular = target[1]

  # Print predicted values
  keras_input = np.empty((1, 120, 160, 1))
  keras_input[0,:] = img_array
  
  prediction = model.predict(keras_input, verbose=0)
  
  predicted_linear = prediction[0][0][0] # Predicted Linear
  linear_certainty = float(tf.exp(prediction[0][0][1])) # Linear uncertanty
  predicted_angular = prediction[1][0][0] # Predicted Angular
  angular_certainty = float(tf.exp(prediction[1][0][1])) # Angular uncertanty

  predicted_linear_certainty = map_range(linear_certainty, max_linear_uncertainty, min_linear_uncertainty, min_mapped, max_mapped)
  predicted_angular_certainty = map_range(angular_certainty, max_angular_uncertainty, min_angular_uncertainty, min_mapped, max_mapped)
  if (predicted_linear_certainty+predicted_angular_certainty)/2 >80:
    okPrediction=okPrediction+1

  print("\nReal: Linear: {:.2f} Angular: {:.2f}".format(real_linear, real_angular))
  print("Predicted: Linear: {:.2f} (Certainty: {:.2f}%) \nPredicted Angular: {:.2f} (Certanty: {:.2f}%)".format(predicted_linear,
                                                                                                      predicted_linear_certainty,
                                                                                                      predicted_angular,
                                                                                                      predicted_angular_certainty))
  print("The predicted Linear Error is: {:.2f} and Angular Error is: {:.2f}".format(linear_certainty,angular_certainty))

print(f"\n======================== Results ========================")
result=okPrediction/len(testData)
print("\nPredictions done Okey is: {:.2f}%".format(result))

Keras prediction

In [None]:
test_dimension = (120, 160)
y_prediction_linear=[]
y_prediction_angular=[]
y_target_linear=[]
y_target_angular=[]

for i in range(0,len(testData)):
  print("Predicting: "+str(i)+" out of "+str(len(testData)))
  image=testData.iloc[i]
  img = cv2.imread(image.imgpath)
  hsv_min=np.fromstring(image.hsv_min.split('[')[1].split(']')[0], dtype=float, sep=',')
  hsv_max=np.fromstring(image.hsv_max.split('[')[1].split(']')[0], dtype=float, sep=',')
  target=np.fromstring(image.Target.split('[')[1].split(']')[0], dtype=float, sep=',')
  hsv_lower=(hsv_min[0],hsv_min[1],hsv_min[2])
  hsv_upper=(hsv_max[0],hsv_max[1],hsv_max[2])
  preprocessed_test = preprocess(img, test_dimension,hsv_lower,hsv_upper)  
  keras_input = np.empty((1, 120, 160, 1))
  keras_input[0,:] = preprocessed_test
  prediction=model.predict(keras_input)
  predicted_linear=prediction[0][0][0]
  predicted_angular=prediction[1][0][0]
  y_prediction_linear.append(predicted_linear)
  y_prediction_angular.append(predicted_angular)
  y_target_angular.append(target[1])
  y_target_linear.append(target[0])

In [None]:
from tqdm import tqdm
from google.colab.patches import cv2_imshow

test_dimension = (120, 160)
prediction_linear=[]
prediction_angular=[]
target_linear=[]
target_angular=[]

print("Predicting...")
for i in tqdm(range(0, len(testData)), ncols=100):
  image = testData.iloc[i]
  img = cv2.imread(image.imgpath)
  
  hsv_min = np.fromstring(image.hsv_min.split('[')[1].split(']')[0], dtype=float, sep=',')
  hsv_max = np.fromstring(image.hsv_max.split('[')[1].split(']')[0], dtype=float, sep=',')
  target = np.fromstring(image.Target.split('[')[1].split(']')[0], dtype=float, sep=',')
  hsv_lower = (hsv_min[0], hsv_min[1], hsv_min[2])
  hsv_upper = (hsv_max[0], hsv_max[1], hsv_max[2])
  
  preprocessed_test = preprocess(img, test_dimension, hsv_lower, hsv_upper)  
  
  keras_input = np.empty((1, 120, 160, 1))
  keras_input[0,:] = preprocessed_test
  
  prediction = model.predict(keras_input, verbose=0)
  print(prediction)
  predicted_linear = prediction[0][0][0] # Predicted Linear
  linear_uncertainty = prediction[0][0][1] # Linear uncertanty
  predicted_angular = prediction[1][0][0] # Predicted Angular
  angular_uncertainty = prediction[1][0][1] # Angular uncertanty

  prediction_linear.append([predicted_linear, linear_uncertainty])
  prediction_angular.append([predicted_angular, angular_uncertainty])
  target_angular.append(target[1])
  target_linear.append(target[0])

In [None]:
linear_uncertainties = [prediction_linear[row][1] for row in range(len(prediction_linear))]
min_linear_uncertainty = min(linear_uncertainties)
max_linear_uncertainty = max(linear_uncertainties)

angular_uncertainties = [prediction_angular[row][1] for row in range(len(prediction_angular))]
min_angular_uncertainty = min(angular_uncertainties)
max_angular_uncertainty = max(angular_uncertainties)
min_mapped = 0
max_mapped = 100

In [None]:
def map_range(x, in_min, in_max, out_min, out_max):
    return (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min

In [None]:
for i in range(0, len(testData), 50): 
  print(f"\n======================== Predicting: {i} out of {len(testData)} ========================")
  # Load image
  image = testData.iloc[i]
  img = cv2.imread(image.imgpath)

  # Prepare image
  hsv_min = np.fromstring(image.hsv_min.split('[')[1].split(']')[0], dtype=float, sep=',')
  hsv_max = np.fromstring(image.hsv_max.split('[')[1].split(']')[0], dtype=float, sep=',')
  target = np.fromstring(image.Target.split('[')[1].split(']')[0], dtype=float, sep=',')
  hsv_lower = (hsv_min[0],hsv_min[1],hsv_min[2])
  hsv_upper = (hsv_max[0],hsv_max[1],hsv_max[2])

  # Preprocess image
  img_array = preprocess(img, (120, 160, 3), hsv_lower, hsv_upper)

  # Real Velocities
  real_linear = target[0] 
  real_angular = target[1]

  # Print predicted values
  keras_input = np.empty((1, 120, 160, 1))
  keras_input[0,:] = img_array
  
  prediction = model.predict(keras_input, verbose=0)
  
  predicted_linear = prediction[0][0][0] # Predicted Linear
  linear_certainty = prediction[0][0][1] # Linear uncertanty
  predicted_angular = prediction[1][0][0] # Predicted Angular
  angular_certainty = prediction[1][0][1] # Angular uncertanty

  predicted_linear_certainty = map_range(linear_certainty, max_linear_uncertainty, min_linear_uncertainty, min_mapped, max_mapped)
  predicted_angular_certainty = map_range(angular_certainty, max_angular_uncertainty, min_angular_uncertainty, min_mapped, max_mapped)

  print("\nReal: Linear: {:.2f} Angular: {:.2f}".format(real_linear, real_angular))
  print("Predicted: Linear: {:.2f} (Certainty: {:.2f}%) \nPredicted Angular: {:.2f} (Certanty: {:.2f}%)".format(predicted_linear,
                                                                                                      predicted_linear_certainty,
                                                                                                      predicted_angular,
                                                                                                      predicted_angular_certainty))


In [None]:
mse = tf.keras.losses.MeanSquaredError()
mse_linear = mse(y_target_linear,y_prediction_linear).numpy()
mse_angular = mse( y_target_angular,y_prediction_angular,).numpy()
print("Linear mse loss is: "+str(round(mse_linear,4)))
print("Angular mse loss is: "+str(round(mse_angular,4)))
print("Total loss is: "+str(round(mse_linear*0.3+mse_angular*0.7,4)))