#Visual Odometry by using Quaternion Neural Network for AI Driving


---


*made by Alessandro Lambertini and Denise Landini*

*mat. 1938390 and 1938388*

In [2]:
#@title {vertical-output: true, form-width: "50%", display-mode: "code"}
#@markdown # Global Variables:

#@markdown ---
#@markdown ### Flags:
FLAG_DOWNLOAD_DATASET = False #@param {type:"boolean"}
FLAG_DEBUG_PRINT = True #@param {type:"boolean"}
FLAG_INFO_PRINT = True #@param {type:"boolean"}

#@markdown ---
#@markdown ### Files path:
# global variables to save the tables/models 
dir_main = '/content/drive/My Drive/Colab Notebooks/Thesis/' #@param {type:"string"}

dir_Dataset = 'Dataset/'#@param {type:"string"}
dir_Dataset = dir_main + dir_Dataset
dir_Model = 'Model/'#@param {type:"string"}
dir_Model = dir_main + dir_Model
dir_History = 'History/'#@param {type:"string"}
dir_History = dir_main + dir_History

#@markdown ---
#@markdown ### Images settings:
BACH_SIZE = 10 #@param {type:"number"}
CHANNELS = 6
WIDTH = 320 #@param [320, 640, 1280] {type:"raw", allow-input: false}
HEIGHT = 96 #@param[96, 192, 384] {type:"raw", allow-input: false}
NUM_POSES = 6

trainingSeries = ["00", "02", "08", "09"]
testingSeries = ["03", "04", "05", "06", "07", "10"]


from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [1]:
import glob
import numpy as np
import os
import time
import math

import cv2
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Function
from torch.autograd import Variable
from torchvision import models

### Functions

In [3]:
from IPython.display import HTML, display
def progress(value, max=100):
  return HTML("""
    <progress
        value='{value}'
        max='{max}',
        style='width: 50%'
    >
        {value}
    </progress>
  """.format(value=value, max=max))

In [4]:
def isRotationMatrix(R):
  RT = np.transpose(R)
  n = np.linalg.norm(np.identity(3, dtype = R.dtype) - np.dot(RT, R))
  return n < 1e-6

def rotationMatrixToEulerAngles(R):
  assert(isRotationMatrix(R))

  sy = math.sqrt(R[0,0]*R[0,0] + R[1,0]*R[1,0])
  if  sy < 1e-6:
    x = math.atan2(-R[1,2], R[1,1])
    y = math.atan2(-R[2,0], sy)
    z = 0
  else:
    x = math.atan2(R[2,1] , R[2,2])
    y = math.atan2(-R[2,0], sy)
    z = math.atan2(R[1,0], R[0,0])

  return np.array([x, y, z])

def matrix2pose(mat):
  p = np.array([mat[3], mat[7], mat[11]])
  R = np.array([[mat[0], mat[1], mat[2]],
                [mat[4], mat[5], mat[6]],
                [mat[8], mat[9], mat[10]]])
  
  angles = rotationMatrixToEulerAngles(R)
  pose = np.concatenate((p, angles))
  return pose

In [5]:
def getImage(path):
  img = cv2.imread(path)
  img = cv2.resize(img, (WIDTH, HEIGHT), interpolation=cv2.INTER_LINEAR)
  return img

def loadImages(path):
  #numImgs = 0#len(os.listdir(path))
  #print("Path: ".format(path))
  #print("Num of imges {}".format(numImgs))

  suffix = "_{}_{}_loaded.npy".format(WIDTH, HEIGHT)
  initT = time.time()
  if os.path.isfile(path + suffix):
    imagesSet = np.load(path + suffix, allow_pickle=False)
    print(imagesSet.shape)
  else:
    notFirstIter = False
    img1 = []
    img2 = []
    imagesSet = []
    for img in glob.glob(path+'/*'):
      img2 = getImage(img)

      if notFirstIter:
        img = np.concatenate([img1, img2], axis=-1)
        imagesSet.append(img)
      else:
        notFirstIter = True
      
      img1 = img2
  
  elapsedT = time.time() - initT
  #print("Time needed: %.2fs"%(elapsedT))
  imagesSet = np.reshape(imagesSet, (-1, CHANNELS, WIDTH, HEIGHT))
  return imagesSet

In [6]:
def loadPoses(path):
  #print("Path: ".format(path))

  suffix = "_pose_loaded.npy".format(WIDTH, HEIGHT)
  initT = time.time()
  if os.path.isfile(path + suffix):
    posesSet = np.load(path + suffix, allow_pickle=False)
  else:
    notFirstIter = False
    pose1 = []
    pose2 = []
    posesSet = []
    with open(path + ".txt", 'r') as f:
      lines = f.readlines()
      for line in lines:
        matrix = np.fromstring(line, dtype=float, sep=' ')
        pose2 = matrix2pose(matrix)

        if notFirstIter:
          pose = pose2-pose1
          posesSet.append(pose)
        else:
          notFirstIter = True
      
        pose1 = pose2
      posesSet = np.array(posesSet)

  elapsedT = time.time() - initT
  #print("Time needed: %.2fs"%(elapsedT))
  return posesSet
        

###Load data

In [7]:
img_path = os.path.join(dir_Dataset,'sequences')
numImgsTrain = sum([math.floor(os.path.getsize(img_path+"/"+dir1+"/"+dir2) / (CHANNELS*WIDTH*HEIGHT)) for dir1 in trainingSeries for dir2 in os.listdir(img_path+"/"+dir1) if (len(dir2.split(".")) > 1 and dir2.split(".")[1] == "npy")])
numImgsTest = sum([math.floor(os.path.getsize(img_path+"/"+dir1+"/"+dir2) / (CHANNELS*WIDTH*HEIGHT)) for dir1 in testingSeries for dir2 in os.listdir(img_path+"/"+dir1) if (len(dir2.split(".")) > 1 and dir2.split(".")[1] == "npy")])

print(numImgsTrain*100/(numImgsTrain+numImgsTest))
print(numImgsTest*100/(numImgsTrain+numImgsTest))

67.27025803531009
32.72974196468991


In [8]:
img_path = os.path.join(dir_Dataset, 'sequences')
for dir1 in os.listdir(img_path):
  print(dir1)
  for dir2 in os.listdir(img_path+"/"+dir1):
    if len(dir2.split(".")) > 1 and dir2.split(".")[1] == "npy":
      print("-->"+dir2)
      sizeFile = os.path.getsize(img_path+"/"+dir1+"/"+dir2)
      print("-->{}".format(sizeFile))
      print("-->{}".format(sizeFile / (CHANNELS*WIDTH*HEIGHT)))

08
-->image_2_320_96_loaded.npy
-->750182528
-->4070.0006944444444
01
-->image_2_320_96_loaded.npy
-->202752128
-->1100.0006944444444
00
-->image_2_320_96_loaded.npy
-->836812928
-->4540.000694444445
02
-->image_2_320_96_loaded.npy
-->858931328
-->4660.000694444445
03
-->image_2_320_96_loaded.npy
-->147456128
-->800.0006944444444
04
-->image_2_320_96_loaded.npy
-->49766528
-->270.00069444444443
05
-->image_2_320_96_loaded.npy
-->508723328
-->2760.0006944444444
10
-->image_2_320_96_loaded.npy
-->221184128
-->1200.0006944444444
09
-->image_2_320_96_loaded.npy
-->293068928
-->1590.0006944444444
06
-->image_2_320_96_loaded.npy
-->202752128
-->1100.0006944444444
07
-->image_2_320_96_loaded.npy
-->202752128
-->1100.0006944444444


In [9]:
def VODataLoader(datapath, sequence='00'):
  imgPath = os.path.join(datapath, 'sequences', sequence, 'image_2')
  posesPath = os.path.join(datapath, 'poses', sequence)
  
  imagesSet = [torch.FloatTensor(loadImages(imgPath))]
  posesSet = [torch.FloatTensor(loadPoses(posesPath))]

  print("Details of X :")
  print(imagesSet[0].size())
  print("Details of y :")
  print(posesSet[0].size())

  X = torch.stack(imagesSet).view(-1, BACH_SIZE, CHANNELS, WIDTH, HEIGHT)
  y = torch.stack(posesSet).view(-1, BACH_SIZE, NUM_POSES)
  print("Details of X :")
  print(X.size())
  #bb, _, cc, ww, hh = X.size()
  print("Details of y :")
  print(y.size())
    
  return X, y

In [10]:
X, y = VODataLoader(dir_Dataset, sequence = '07')

(1100, 96, 320, 6)
Details of X :
torch.Size([1100, 6, 320, 96])
Details of y :
torch.Size([1100, 6])
Details of X :
torch.Size([110, 10, 6, 320, 96])
Details of y :
torch.Size([110, 10, 6])


### network

In [11]:
class C_Block(nn.Module):
  def __init__(self, in_ch, out_ch, kernel_size, stride, padding, dropout_rate):
    super(C_Block, self).__init__()
    
    #self.conv1 = nn.Conv2d(6, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3)) #6 64
    #self.relu1 = nn.ReLU(inplace=True)
    #self.batch1 = nn.BatchNorm2d(64)
    #self.drop1 = nn.Dropout(0.2)

    self.conv = nn.Conv2d(in_ch, out_ch, kernel_size=kernel_size, stride=stride, padding=padding)
    self.relu = nn.ReLU(inplace=True)
    self.batch = nn.BatchNorm2d(out_ch)
    self.drop = nn.Dropout(dropout_rate)
      
  def forward(self, x):
    x = self.conv(x)
    x = self.relu(x)
    x = self.batch(x)
    x = self.drop(x)
    return x

class DeepVONet(nn.Module):
    def __init__(self, sizeHidden=1):
        super(DeepVONet, self).__init__()

        self.block1 = C_Block(6, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), dropout_rate=0.2)
        self.block2 = C_Block (64, 128, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2), dropout_rate=0.2)
        self.block3 = C_Block (128, 256, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2), dropout_rate=0.2)
        self.block3_1 = C_Block (256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), dropout_rate=0.2)
        self.block4 = C_Block (256, 512, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), dropout_rate=0.2)
        self.block4_1 = C_Block (512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), dropout_rate=0.2)
        self.block5 = C_Block (512, 512, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), dropout_rate=0.2)
        self.block5_1 = C_Block (512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), dropout_rate=0.2)
        self.block6 = C_Block (512, 1024, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), dropout_rate=0.2)

        self.lstm1 = nn.LSTMCell(1024*5*2, 1000)
        self.lstm1_dropout = nn.Dropout(0.5)
        self.lstm2 = nn.LSTMCell(1000, 1000)
        self.lstm2_dropout = nn.Dropout(0.5)

        self.fc = nn.Linear(in_features=1000, out_features=6)

        self.reset_hidden_states(sizeHidden=sizeHidden, zero=True)

    def reset_hidden_states(self, sizeHidden=1, zero=True):
        if zero == True:
            self.hx1 = Variable(torch.zeros(sizeHidden, 1000))
            self.cx1 = Variable(torch.zeros(sizeHidden, 1000))
            self.hx2 = Variable(torch.zeros(sizeHidden, 1000))
            self.cx2 = Variable(torch.zeros(sizeHidden, 1000))
        else:
            self.hx1 = Variable(self.hx1.data)
            self.cx1 = Variable(self.cx1.data)
            self.hx2 = Variable(self.hx2.data)
            self.cx2 = Variable(self.cx2.data)

        if next(self.parameters()).is_cuda == True:
            self.hx1 = self.hx1.cuda()
            self.cx1 = self.cx1.cuda()
            self.hx2 = self.hx2.cuda()
            self.cx2 = self.cx2.cuda()

    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block3_1(x)
        x = self.block4(x)
        x = self.block4_1(x)
        x = self.block5(x)
        x = self.block5_1(x)
        x = self.block6(x)

        #print(x.size())
        x = x.view(x.size(0), 1024*5*2)
        #print(x.size())
        self.hx1, self.cx1 = self.lstm1(x, (self.hx1, self.cx1))
        x = self.hx1
        x = self.lstm1_dropout(x)

        self.hx2, self.cx2 = self.lstm2(x, (self.hx2, self.cx2))
        x = self.hx2
        #print(x.size())
        x = self.lstm2_dropout(x)
        
        x = self.fc(x)
        return x

In [12]:
#Creating model and defining loss and optimizer to be used 
model = DeepVONet(sizeHidden=BACH_SIZE)
print(model)

import torch.optim as optim

#criterion = nn.CrossEntropyLoss()
criterion = torch.nn.MSELoss()
#optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.5, weight_decay=0.5)

DeepVONet(
  (block1): C_Block(
    (conv): Conv2d(6, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
    (relu): ReLU(inplace=True)
    (batch): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (drop): Dropout(p=0.2, inplace=False)
  )
  (block2): C_Block(
    (conv): Conv2d(64, 128, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
    (relu): ReLU(inplace=True)
    (batch): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (drop): Dropout(p=0.2, inplace=False)
  )
  (block3): C_Block(
    (conv): Conv2d(128, 256, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
    (relu): ReLU(inplace=True)
    (batch): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (drop): Dropout(p=0.2, inplace=False)
  )
  (block3_1): C_Block(
    (conv): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (relu): ReLU(inplace=True)
    (batch): BatchNorm2d(256, eps=1e-05, mom

In [13]:
#Uncomment lines below to see model paramters 
# for parameter in model.parameters():
#     print(len(parameter))

### Training

In [14]:
def get_accuracy(outputs, labels, batch_size):
    diff =0
    for i in range(batch_size):
        for j in range(10):
            out = outputs[i, j].detach().numpy()
            lab = labels[i, j].detach().numpy()
            diff+=get_mse_diff(out,lab)
    print("Loss : ",diff/(batch_size*10),"%")
    print("Accuracy : ",(1 -diff/(batch_size*10))*100,"%")
    
def get_mse_diff(x,y):
    diff= 0
    for i in range(6):
        diff += (x[i]-y[i])*(x[i]-y[i])
    return diff/6


In [None]:
NUM_EPOCHS = 1
img_out = 0

loss_train = []
loss_test = []

for epoch in range(1, NUM_EPOCHS+1):
  print("\x1b[1;31;10mEPOCH {}/{}\x1b[0m\n".format(epoch, NUM_EPOCHS))
  print("\x1b[1;33;10mTRAINING\x1b[0m")
  app_loss_train = []
  
  train_initT = time.time()
  for sequence in trainingSeries:
    print("Sequence: {}".format(sequence))
    X, y = VODataLoader(dir_Dataset, sequence=sequence)
    train_numOfBatch = len(X)
    out_disp = display(progress(0, train_numOfBatch-1), display_id=True)
    
    for i in range(train_numOfBatch):
      out_disp.update(progress(i, train_numOfBatch-1))
      inputs = X[i]
      labels = y[i]

      model.zero_grad()
      model.reset_hidden_states(sizeHidden=BACH_SIZE, zero=True)
        
      outputs = model(inputs)

      loss = criterion(outputs, labels)
      loss.backward()
      app_loss_train.append(loss.item())

      optimizer.step()
      optimizer.zero_grad()
  train_elapsedT = time.time() - train_initT
  
#  print("\x1b[1;33;10mTESTING\x1b[0m")
#  test_numOfBatch = len(X)
#  app_loss_test = []
#
#  out_disp = display(progress(0, test_numOfBatch-1), display_id=True)
#  test_initT = time.time()
#  outputs = []
#  for i in range(test_numOfBatch):
#    out_disp.update(progress(i, test_numOfBatch-1))
#    inputs = X[i]
#    labels = y[i]
#      
#    app_outputs = model(inputs)
#
#    loss = criterion(app_outputs, labels)
#    app_loss_test.append(loss.item())
#    outputs.append(outputs)
#  test_elapsedT = time.time() - test_initT

  
  loss_train.append(sum(app_loss_train)/len(app_loss_train))
#  loss_test.append(sum(app_loss_test)/len(app_loss_test))

  print("epoch %d"%(epoch+1))
  print("loss_train %.3f, time %.2fs"%(loss_train[-1], train_elapsedT))
#  print("loss_test %.3f, time %.2fs"%(loss_test[-1], test_elapsedT))
  
  #output = net(x_test[j_pat][j_img])
  #out = output.cpu().data.numpy()
  #out = np.transpose(out, (0,2,3,1))[:,:,:,1:]
  #out = np.reshape(out, (out.shape[1], out.shape[2], out.shape[3]))
  #imwrite(dir_M+"/save_image"+str(epoch)+".png", img_as_ubyte(out))
  #img_out = out

[1;31;10mEPOCH 1/1[0m

[1;33;10mTRAINING[0m
Sequence: 00
(4540, 96, 320, 6)
Details of X :
torch.Size([4540, 6, 320, 96])
Details of y :
torch.Size([4540, 6])
Details of X :
torch.Size([454, 10, 6, 320, 96])
Details of y :
torch.Size([454, 10, 6])


Sequence: 02
(4660, 96, 320, 6)
Details of X :
torch.Size([4660, 6, 320, 96])
Details of y :
torch.Size([4660, 6])
Details of X :
torch.Size([466, 10, 6, 320, 96])
Details of y :
torch.Size([466, 10, 6])


Sequence: 08
(4070, 96, 320, 6)
Details of X :
torch.Size([4070, 6, 320, 96])
Details of y :
torch.Size([4070, 6])
Details of X :
torch.Size([407, 10, 6, 320, 96])
Details of y :
torch.Size([407, 10, 6])


In [None]:
training_model(model, bb, X, y, 10)

In [None]:
#Save the model
torch.save(model.state_dict(), dir_Model+"DeepVO_3.pt")
#Load model
# model_loaded = torch.load('DeepVO.pt')

### Testing

In [None]:
_X_test, _y_test = VODataLoader("/content/drive/My Drive/Colab Notebooks/Thesis/Dataset/dataset", sequences=['01'])

/content/drive/My Drive/Colab Notebooks/Thesis/Dataset/dataset
Path: 
Num of imges 0
Time needed: 5.60s
Path: 
Time needed: 0.26s


In [None]:
print("Details of X_test :")
print(type(_X_test)) 
print(type(_X_test[0]))
print(len(_X_test)) 
print(len(_X_test[0])) 
print(_X_test[0].size())
print("Details of y_test :")
print(type(_y_test))
print(type(_y_test[0]))
print(len(_y_test))
print(len(_y_test[0]))
print(_y_test[0].size())

Details of X_test :
<class 'list'>
<class 'torch.Tensor'>
1
1100
torch.Size([1100, 6, 320, 96])
Details of y_test :
<class 'list'>
<class 'torch.Tensor'>
1
1100
torch.Size([1100, 6])


In [None]:
X_test=torch.stack(_X_test).view(-1, BACH_SIZE, CHANNELS, WIDTH, HEIGHT)
y_test=torch.stack(_y_test).view(-1, BACH_SIZE, NUM_POSES)
print(X_test.size())
print(y_test.size())

torch.Size([1100, 1, 6, 320, 96])
torch.Size([1100, 1, 6])


In [None]:
#Getting predictions from the model 
test_batch_size = 1 #Based on this count only batches will be process, not as per the total number of batches provided
y_output = testing_model(model, test_batch_size, X_test)
print(y_output.size())
#Saving outputs
torch.save(y_output, "y_output.pt")
#getting accuracy
#get_accuracy(y_output, y_test, test_batch_size)

In [None]:
print(y_test.shape)
print(y_output.shape)

torch.Size([1100, 1, 6])
torch.Size([1, 1, 6])


In [None]:
print(y_test.shape)
print(y_output.shape)

torch.Size([1100, 1, 6])
torch.Size([1, 1, 6])


In [None]:
base = y_test[0, 0].detach().numpy()
steps = y_output[0].detach().numpy()

test_out = [y_test[0, 0, :]]
out = [y_test[0, 0, :]]
for i in range(0, len(steps)):
  test_out.append(out[i]+y_test[0, i, :])
  out.append(out[i]+steps[i])

In [None]:
y_test[0, :, :]

tensor([[ 0.0515, -0.0242,  1.0007,  0.0026,  0.0436,  0.0018]])

In [None]:
steps[0]

array([ 0.02826825, -0.00039041, -0.00099795, -0.00054409,  0.00091183,
        0.00013087], dtype=float32)

In [None]:
out

[tensor([ 0.0515, -0.0242,  1.0007,  0.0026,  0.0436,  0.0018]),
 tensor([ 0.0798, -0.0246,  0.9997,  0.0020,  0.0445,  0.0019])]

In [None]:
plt.plot(out[:][0], out[:][2], color='red')
plt.plot(test_out[:][0], test_out[:][2], color='blue')
plt.plot(y_test[:, :, 0], y_test[:, :, 2], color='green')
plt.show()

IndexError: ignored

In [None]:
print("\x1b[1;33;10mTESTING\x1b[0m")
test_numOfBatch = len(X)
app_loss_test = []

out_disp = display(progress(0, test_numOfBatch-1), display_id=True)
test_initT = time.time()
outputs = []
for i in range(test_numOfBatch):
  out_disp.update(progress(i, test_numOfBatch-1))
  inputs = X[i]
  labels = y[i]
    
  app_outputs = model(inputs)

  loss = criterion(app_outputs, labels)
  app_loss_test.append(loss.item())
  outputs.append(outputs)
test_elapsedT = time.time() - test_initT

In [None]:
np.asarray(outputs).shape