In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!unzip drive/MyDrive/Data_set.zip -d images/

# Self driving car - street lane tracing 
the goal of this work is to design RC robotic car , this car is supposed to trace the lanes on a small track. 

In [None]:
import pandas as pd
import os 
import torch
import torchvision
from torch.utils.data import Dataset,DataLoader
from skimage import io
import torchvision.transforms as transforms
import numpy as np
import cv2 
import matplotlib.pyplot as plt
from math import floor , ceil
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
import torch.nn.functional as f

In [None]:
class customDataset(Dataset):
  def __init__(self,csv_file,root_url,transform = None):
    self.annotation = pd.read_csv(csv_file)
    self.root_dir = root_url
    self.transform = transform

  def __len__(self):
    return len(self.annotation)

  def __getitem__(self, index):
    image_path = os.path.join(self.root_dir,self.annotation.iloc[index,0].split(';')[0])
    image = io.imread(image_path)
    y_label = torch.tensor(int(self.annotation.iloc[index,0].split(';')[1][1]))
    if self.transform :
      image = self.transform(image)
    return (image,y_label)

In [None]:
data_set = customDataset(csv_file = 'drive/MyDrive/dataset/ref/ref.csv',root_url='images/images'
,transform = transforms.ToTensor())

train_to_test_ratio = 0.8

data_length = data_set.__len__()
train_data_length = floor(data_length*train_to_test_ratio)
test_data_length = ceil(data_length*(1 - train_to_test_ratio))

train_set , test_set = torch.utils.data.random_split(data_set , [train_data_length , test_data_length])


batch_size = 32


train_loader = DataLoader(dataset = train_set , batch_size = batch_size , shuffle = True)
test_loader = DataLoader(dataset = test_set , batch_size = batch_size , shuffle = True)
print('training data size is :',train_data_length)
print('testing data size is :',test_data_length)

training data size is : 8909
testing data size is : 2228


Define the model architecture

In [None]:
import torch 
import torch.nn as nn
import torch.nn.functional as f
from torchsummary import summary

class self_driving_model(nn.Module):
    def __init__(self):
        super(self_driving_model,self).__init__()


        self.conv_1 = nn.Sequential(
            nn.Conv2d(3 , 3 , kernel_size = 3 , stride = 1 , padding = 1),
            nn.InstanceNorm2d(3, affine=True),
            nn.SiLU(inplace=True),
            nn.Conv2d(3 , 3 , kernel_size = 3 , stride = 1 , padding = 1),
            nn.InstanceNorm2d(3, affine=True),
            nn.SiLU(inplace=True),
            
            )
        self.conv_2 = nn.Sequential(
            nn.Conv2d(6 , 3 , kernel_size = 3 , stride = 1 , padding = 1),
            nn.InstanceNorm2d(3, affine=True),
            nn.SiLU(inplace = True),
            nn.Conv2d(3 , 1 , kernel_size = 3 , stride = 1 , padding = 1),
            nn.SiLU(inplace=True),
            nn.MaxPool2d(16),
            )

            
        ###########################################
        self.fc = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(512,128),
            nn.SiLU(inplace=True),
            nn.Dropout(p=0.2),
            nn.Linear(128,4),
            )

    def forward(self,x):

        input = x

        x = self.conv_1(x)

        x = torch.cat((x,input),1)

        x = self.conv_2(x)
        
        x = torch.flatten(x,1)
        
        x = self.fc(x)

        return x



In [None]:
LR = 0.001
num_epochs = 35
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = 'cpu'
print(device)

In [None]:
model = self_driving_model()
pytorch_total_params = sum(p.numel() for p in model.parameters())
print('number of parameters in the model : ',pytorch_total_params)
model = model.to(device)

In [None]:
class_weights= torch.tensor([1.18,13.8,0.1,12.6]).to(device = device)

In [None]:
criterion = nn.CrossEntropyLoss(weight=class_weights,reduction='mean')
optimizer = optim.Adam(model.parameters(),lr = LR)

after running the next ceil the training will start

In [None]:
loss_trace = []
for epoch in range(num_epochs):
  losses = []
  for batch_idx, (data,targets) in tqdm(enumerate(train_loader)):
    data = data.to(device = device)
    targets = targets.to(device = device)

    scores = model(data)
    loss = criterion(scores, targets)
    losses.append(loss.item())

    optimizer.zero_grad()
    loss.backward()

    optimizer.step()
  print(f'cost at epoch {epoch} , is {sum(losses)/len(losses)}')
  loss_trace.append(sum(losses)/len(losses))

shows the loss vs epochs plot

In [None]:
plt.plot(loss_trace)
plt.xlabel('epochs')
plt.ylabel('loss')

In [None]:
from torch.nn.modules.container import ParameterDict
def check_accuracy(loader , model):
  num_correct = 0
  num_samples = 0
  num_right = 0
  total_right = 0
  total_left = 0
  num_left = 0
  model.eval()
  with torch.no_grad():
    for x,y in tqdm(loader) :
      x = x.to(device = device)
      y = y.to(device = device)

      scores = model(x)
      predicions = torch.argmax(scores,dim = 1)
      num_correct += (predicions == y).sum()
      total_left +=(y == 3).sum()
      num_left +=(predicions == 3).sum()
      total_right +=(y == 1).sum()
      num_right +=(predicions == 1).sum()
      num_samples += predicions.size(0)
    print(f'got {num_correct}/{num_samples} with accuracy {(float(num_correct)/float(num_samples))*100.0}')
    print('predeccted right turns :',num_right , 'total right turns :',total_right)
    print('predeccted left turns :',num_left ,'total left turns :', total_left)
  model.train()


In [None]:
print('checking accuracy on training set')
check_accuracy(train_loader , model)

print('checking accuracy on testing set')
check_accuracy(test_loader , model)

finally save the model

In [None]:
path = ''
torch.save(model.state_dict(),os.path.join(path,'mod.stat'))