In [1]:
from torch import int64
import cv2 
import os 
import glob 
from google.colab.patches import cv2_imshow
import numpy as np
import torch
img_dir = "Images"
data_path = os.path.join(img_dir,'*g') 
files = glob.glob(data_path) 
Images = [] 
for f1 in files: 
    img = cv2.imread(f1, 0) 
    Images.append(img)
Images = np.array(Images)
Images = (np.divide(Images, 255))
Images = Images.astype(float)

In [2]:
controlData = np.loadtxt('Robot Log/robot_log.csv', delimiter = ';', skiprows = 1, usecols = (1,2), dtype = float)
print(np.max(controlData[:,0]))
controlData[:,0] /= 15

15.0


In [3]:
from torch.utils.data import TensorDataset
ImagesTrain = torch.tensor(Images[0:799])
steerDataTrain = torch.tensor(controlData[0:799, 0])
trainSteerDataset = TensorDataset(ImagesTrain, steerDataTrain)
ImagesTest = torch.tensor(Images[800:999])
steerDataTest = torch.tensor(controlData[800:999, 0])
testSteerDataset = TensorDataset(ImagesTest, steerDataTest)
throttleDataTrain = torch.tensor(controlData[0:799, 1])
trainThrottleDataset = TensorDataset(ImagesTrain, throttleDataTrain)
throttleDataTest = torch.tensor(controlData[800:999, 1])
testThrottleDataset = TensorDataset(ImagesTest, throttleDataTest)

In [4]:
steerTrainDataLoader = torch.utils.data.DataLoader(trainSteerDataset, batch_size=128, shuffle=True)
steerTestDataLoader = torch.utils.data.DataLoader(testSteerDataset, batch_size=128, shuffle=False)
throttleTrainDataLoader = torch.utils.data.DataLoader(trainThrottleDataset, batch_size=128, shuffle=True)
throttleTestDataLoader = torch.utils.data.DataLoader(testThrottleDataset, batch_size=128, shuffle=False)

In [5]:
class LR(torch.nn.Module):
    def __init__(self):
        super(LR, self).__init__()
        self.linear1 = torch.nn.Linear(160*320, 128) # W:784x128 , b:128x1 , parameters = [W,b]
        self.linear2 = torch.nn.Linear(128, 10)
        self.linear3 = torch.nn.Linear(10, 5)
        self.linear4 = torch.nn.Linear(5, 1)
    
    def forward(self, x):
        x = x.view(-1,160*320)
        transformed_x1 = self.linear1(x)
        transformed_x2 = self.linear2(transformed_x1)
        transformed_x3 = self.linear3(transformed_x2)
        transformed_x4 = self.linear4(transformed_x3)
        return transformed_x4

class LRthrottle(torch.nn.Module):
    def __init__(self):
        super(LRthrottle, self).__init__()
        self.linear1 = torch.nn.Linear(160*320, 128) # W:784x128 , b:128x1 , parameters = [W,b]
        self.linear2 = torch.nn.Linear(128, 10)
        self.linear3 = torch.nn.Linear(10, 5)
        self.linear4 = torch.nn.Linear(5, 1)
        self.activation = torch.nn.ReLU()
    
    def forward(self, x):
        x = x.view(-1,160*320)
        transformed_x1 = self.linear1(x)
        transformed_x2 = self.linear2(transformed_x1)
        transformed_x3 = self.linear3(transformed_x2)
        transformed_x4 = self.linear4(transformed_x3)
        #transformed_x4 = self.activation(transformed_x4)
        return transformed_x4

steerNet = LR()
steerOptimizer = torch.optim.Adam(steerNet.parameters(), lr=0.01)
criterion = torch.nn.MSELoss()
criterionThrottle = torch.nn.L1Loss()
throttleNet = LRthrottle()
throttleOptimizer = torch.optim.Adam(throttleNet.parameters(), lr=0.01)

In [7]:
#steering trainer
train_loss_history = []
test_loss_history = []
for epoch in range(20):
    train_loss = 0.0
    test_loss = 0.0
    for i, data in enumerate(steerTrainDataLoader):
        images, labels = data
        images = images.to(torch.float32)
        labels = labels.to(torch.float32)
        steerOptimizer.zero_grad()
        predicted_output = steerNet(images.float())
        predicted_output = predicted_output.to(torch.float32)
        loss = criterion(predicted_output, labels)
        loss.backward()
        steerOptimizer.step()
        train_loss += loss.item()
    for i, data in enumerate(steerTestDataLoader):
        with torch.no_grad():
            images, labels = data
            predicted_output = steerNet(images.float())
            loss = criterion(predicted_output, labels.long())
            test_loss += loss.item()
    train_loss = train_loss/len(steerTrainDataLoader)
    test_loss = test_loss/len(steerTestDataLoader)
    train_loss_history.append(train_loss)
    test_loss_history.append(test_loss)
    print('Epoch %s finished with train loss %s and test loss %s'%(epoch, train_loss, test_loss))

  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)


Epoch 0 finished with train loss 81.72508937971932 and test loss 46.36457061767578
Epoch 1 finished with train loss 56.48515183585031 and test loss 11.300064086914062
Epoch 2 finished with train loss 9.1688118321555 and test loss 21.957029342651367
Epoch 3 finished with train loss 10.801085991518837 and test loss 0.07292334549129009
Epoch 4 finished with train loss 3.8708798629896983 and test loss 4.304843425750732
Epoch 5 finished with train loss 1.8042827972343989 and test loss 0.8607746660709381
Epoch 6 finished with train loss 1.5721244641712733 and test loss 0.6726866960525513
Epoch 7 finished with train loss 0.5098892322608403 and test loss 0.4422624558210373
Epoch 8 finished with train loss 0.6203276131834302 and test loss 0.033756616525352
Epoch 9 finished with train loss 0.39949839455740793 and test loss 0.2062266543507576
Epoch 10 finished with train loss 0.3053705926452364 and test loss 0.045125687494874
Epoch 11 finished with train loss 0.34559762477874756 and test loss 0.0

In [10]:
#throttle trainer
train_loss_history = []
test_loss_history = []
for epoch in range(40):
    train_loss = 0.0
    test_loss = 0.0
    for i, data in enumerate(throttleTrainDataLoader):
        images, labels = data
        images = images.to(torch.float32)
        labels = labels.to(torch.float32)
        throttleOptimizer.zero_grad()
        predicted_output = throttleNet(images.float())
        predicted_output = predicted_output.to(torch.float32)
        loss = criterion(predicted_output, labels)
        loss.backward()
        throttleOptimizer.step()
        train_loss += loss.item()
    for i, data in enumerate(throttleTestDataLoader):
        with torch.no_grad():
            images, labels = data
            predicted_output = throttleNet(images.float())
            loss = criterion(predicted_output, labels.long())
            test_loss += loss.item()
    train_loss = train_loss/len(throttleTrainDataLoader)
    test_loss = test_loss/len(throttleTestDataLoader)
    train_loss_history.append(train_loss)
    test_loss_history.append(test_loss)
    print('Epoch %s finished with train loss %s and test loss %s'%(epoch, train_loss, test_loss))

  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)


Epoch 0 finished with train loss 0.2264768523829324 and test loss 0.1814398393034935
Epoch 1 finished with train loss 0.2347784468105861 and test loss 0.12995517253875732
Epoch 2 finished with train loss 0.2350929762635912 and test loss 0.07058463245630264
Epoch 3 finished with train loss 0.23590699902602605 and test loss 0.06044740229845047
Epoch 4 finished with train loss 0.22246872740132467 and test loss 0.09885851666331291
Epoch 5 finished with train loss 0.23030151426792145 and test loss 0.10410498082637787
Epoch 6 finished with train loss 0.22380402897085463 and test loss 0.1745888888835907
Epoch 7 finished with train loss 0.2262152177946908 and test loss 0.15760134905576706
Epoch 8 finished with train loss 0.23904795518943242 and test loss 0.12293196842074394
Epoch 9 finished with train loss 0.2372493531022753 and test loss 0.035394784063100815
Epoch 10 finished with train loss 0.22640949061938695 and test loss 0.03442592732608318
Epoch 11 finished with train loss 0.229941981179

In [11]:
from torch import int64
import cv2 
import os 
import glob 
from google.colab.patches import cv2_imshow
import numpy as np
import torch
img_dir = "Images"
data_path = os.path.join(img_dir,'*g') 
files = glob.glob(data_path) 
Images = [] 
for f1 in files: 
    img = cv2.imread(f1, 0) 
    Images.append(img)
Images = np.array(Images)
Images = (np.divide(Images, 255))
Images = Images.astype(float)
Images = torch.tensor(Images)

for i in np.linspace(1, 999, 100).astype(int):
  predicted_output_steer = steerNet(Images[i].float())
  predicted_output_throttle = throttleNet(Images.float()[i])
  correct_steer = controlData[i,0]
  correct_throttle = controlData[i,1]
  print ("correct steer: " + str(correct_steer) + " predicted steer: " + str(predicted_output_steer))
  print ("correct throttle: " + str(correct_throttle) + " predicted throttle: " + str(predicted_output_throttle))

correct steer: 0.0 predicted steer: tensor([[0.0929]], grad_fn=<AddmmBackward0>)
correct throttle: 0.0 predicted throttle: tensor([[0.2651]], grad_fn=<AddmmBackward0>)
correct steer: 0.0 predicted steer: tensor([[0.3336]], grad_fn=<AddmmBackward0>)
correct throttle: 0.0 predicted throttle: tensor([[0.2800]], grad_fn=<AddmmBackward0>)
correct steer: -1.0 predicted steer: tensor([[0.0947]], grad_fn=<AddmmBackward0>)
correct throttle: 0.0 predicted throttle: tensor([[0.3001]], grad_fn=<AddmmBackward0>)
correct steer: 0.4833035333333333 predicted steer: tensor([[-0.0521]], grad_fn=<AddmmBackward0>)
correct throttle: 0.0 predicted throttle: tensor([[0.3058]], grad_fn=<AddmmBackward0>)
correct steer: 0.7374106666666667 predicted steer: tensor([[0.2806]], grad_fn=<AddmmBackward0>)
correct throttle: 0.0 predicted throttle: tensor([[0.2817]], grad_fn=<AddmmBackward0>)
correct steer: 0.5239568666666666 predicted steer: tensor([[0.1483]], grad_fn=<AddmmBackward0>)
correct throttle: 1.0 predicted 