<h1>
    <center>
        Implementation of IA for autonomous vehicle

# Step 1 – Importing the libraries

1. os: The operating system library, used to load the saved AI models.

2. random: Used to sample some random transitions from the memory for experience replay.

3. torch: The main library from PyTorch, which will be used to build our neural network with tensors, as opposed to simple matrices like numpy arrays. While a matrix is a 2-D array, a tensor can be a n-dimensional array, with more than just a single number in its cells. Here's a diagram so you can clearly understand the difference between a matrix and a tensor:

4. torch.nn: The nn module from the torch library, used to build the fully connected layers in the artificial neural network of our AI.
5. torch.nn.functional: The functional sub-module from the nn module, used to call the activation functions (rectifier and Softmax), as well as the loss function for backpropagation.

6. torch.optim: The optim module from the torch library, used to call the Adam optimizer, which computes the gradients of the loss with respect to the weights and updates those weights in directions that reduce the loss.

7. torch.autograd: The autograd module from the torch library, used to call the Variable class, which associates each tensor and its gradient into the same variable.

In [1]:
# AI for Autonomous Vehicles - Build a Self-Driving Car

# Importing the libraries

import os
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable

  from .autonotebook import tqdm as notebook_tqdm


# Step 2 – Creating the architecture of the neural network

First, you build this brain inside a class, which we are going to call Network.

In [2]:
class Network(nn.Module):
    
    def __init__(self, input_size, nb_action):
        super(Network, self).__init__()
        self.input_size = input_size
        self.nb_action = nb_action
        self.fc1 = nn.Linear(input_size, 30)
        self.fc2 = nn.Linear(30, nb_action)
    
    def forward(self, state):
        x = F.relu(self.fc1(state))
        q_values = self.fc2(x)
        return q_values

# Step 3 – Implementing experience replay

Time for the next step! You'll now build another class, which builds the memory object for experience replay (as seen in Chapter 5, Your First AI Model – Beware the Bandits!). Call this class ReplayMemory. Let's have a look at the code first and then I'll explain everything line by line from the deep_q_learning.py file.

In [3]:
# Implementing Experience Replay

class ReplayMemory(object):
    
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
    
    def push(self, event):
        self.memory.append(event)
        if len(self.memory) > self.capacity:
            del self.memory[0]
    
    def sample(self, batch_size):
        samples = zip(*random.sample(self.memory, batch_size))
        return map(lambda x: Variable(torch.cat(x, 0)), samples)

# Step 4 – Implementing deep Q-learning

You've made it! You're finally about to start coding the whole deep Q-learning process. Again, you'll wrap all of it into a class, this time called Dqn, as in deep Q-network. This is your final run before the finish line. Let's smash this.

This time, the class is quite long so I'll show and explain the lines of code method by method from the deep_q_learning.py file. Here's the first one, the __init__() method:

In [4]:
# Implementing Deep Q-Learning

class Dqn(object):
    
    def __init__(self, input_size, nb_action, gamma):
        self.gamma = gamma
        self.model = Network(input_size, nb_action)
        self.memory = ReplayMemory(capacity = 100000)
        self.optimizer = optim.Adam(params = self.model.parameters())
        self.last_state = torch.Tensor(input_size).unsqueeze(0)
        self.last_action = 0
        self.last_reward = 0

Now, you're all good for the __init__ method. Let's move on to the next code section with the next method: the select_action method, which selects the action to play at each iteration using Softmax.

In [6]:
    def select_action(self, state):
        probs = F.softmax(self.model(Variable(state))*100)
        action = probs.multinomial(len(probs))
        return action.data[0,0]

Let's move on to the next code section, the learn method. This one is pretty interesting because it's where the heart of deep Q-learning beats. It's in this method that we compute the temporal difference, and accordingly the loss, and update the weights with our optimizer in order to reduce that loss. That's why this method is called learn, because it is here that the AI learns to perform better and better actions that increase the accumulated reward. Let's continue:

In [7]:
    def learn(self, batch_states, batch_actions, batch_rewards, batch_next_states):
        batch_outputs = self.model(batch_states).gather(1, batch_actions.unsqueeze(1)).squeeze(1)
        batch_next_outputs = self.model(batch_next_states).detach().max(1)[0]
        batch_targets = batch_rewards + self.gamma * batch_next_outputs
        td_loss = F.smooth_l1_loss(batch_outputs, batch_targets)
        self.optimizer.zero_grad()
        td_loss.backward()
        self.optimizer.step()

Congratulations! You've built yourself a tool in the Dqn class that will train your car to drive better. You've done the toughest part. Now all you have left to do is to wrap things up into a last method, called update, which will simply update the weights after reaching a new state.

Now, in case you are thinking, "but isn't what I've already done with the learn method?," well, you're right; but you need to make an extra function that will update the weights at the right time. The right time to update the weights is right after our AI reaches a new state. Simply put, this final update method you're about to implement will connect the dots between the learn method and the dynamic environment.

That's the finish line! Are you ready? Here's the code:

In [8]:
    def update(self, new_state, new_reward):
        new_state = torch.Tensor(new_state).float().unsqueeze(0)
        self.memory.push((self.last_state, torch.LongTensor([int(self.last_action)]), torch.Tensor([self.last_reward]), new_state))
        new_action = self.select_action(new_state)
        if len(self.memory.memory) > 100:
            batch_states, batch_actions, batch_rewards, batch_next_states = self.memory.sample(100)
            self.learn(batch_states, batch_actions, batch_rewards, batch_next_states)
        self.last_state = new_state
        self.last_action = new_action
        self.last_reward = new_reward
        return new_action

These last two methods aren't AI-related, so we won't spend time explaining each line of their code. Still, it's good for you to be able to recognize these two tools in case you want to use them for another AI model that you build with PyTorch.

Here's how they're implemented:

In [9]:
    def load(self):
        if os.path.isfile('last_brain.pth'):
            print("=> loading checkpoint... ")
            checkpoint = torch.load('last_brain.pth')
            self.model.load_state_dict(checkpoint['state_dict'])
            self.optimizer.load_state_dict(checkpoint['optimizer'])
            print("done !")
        else:
            print("no checkpoint found...")

# Full code

In [10]:
# AI for Autonomous Vehicles - Build a Self-Driving Car

# Importing the libraries

import os
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable

# Creating the architecture of the Neural Network

class Network(nn.Module):
    
    def __init__(self, input_size, nb_action):
        super(Network, self).__init__()
        self.input_size = input_size
        self.nb_action = nb_action
        self.fc1 = nn.Linear(input_size, 30)
        self.fc2 = nn.Linear(30, nb_action)
    
    def forward(self, state):
        x = F.relu(self.fc1(state))
        q_values = self.fc2(x)
        return q_values

# Implementing Experience Replay

class ReplayMemory(object):
    
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
    
    def push(self, event):
        self.memory.append(event)
        if len(self.memory) > self.capacity:
            del self.memory[0]
    
    def sample(self, batch_size):
        samples = zip(*random.sample(self.memory, batch_size))
        return map(lambda x: Variable(torch.cat(x, 0)), samples)

# Implementing Deep Q-Learning

class Dqn(object):
    
    def __init__(self, input_size, nb_action, gamma):
        self.gamma = gamma
        self.model = Network(input_size, nb_action)
        self.memory = ReplayMemory(capacity = 100000)
        self.optimizer = optim.Adam(params = self.model.parameters())
        self.last_state = torch.Tensor(input_size).unsqueeze(0)
        self.last_action = 0
        self.last_reward = 0
    
    def select_action(self, state):
        probs = F.softmax(self.model(Variable(state))*100)
        action = probs.multinomial(len(probs))
        return action.data[0,0]
    
    def learn(self, batch_states, batch_actions, batch_rewards, batch_next_states):
        batch_outputs = self.model(batch_states).gather(1, batch_actions.unsqueeze(1)).squeeze(1)
        batch_next_outputs = self.model(batch_next_states).detach().max(1)[0]
        batch_targets = batch_rewards + self.gamma * batch_next_outputs
        td_loss = F.smooth_l1_loss(batch_outputs, batch_targets)
        self.optimizer.zero_grad()
        td_loss.backward()
        self.optimizer.step()
    
    def update(self, new_state, new_reward):
        new_state = torch.Tensor(new_state).float().unsqueeze(0)
        self.memory.push((self.last_state, torch.LongTensor([int(self.last_action)]), torch.Tensor([self.last_reward]), new_state))
        new_action = self.select_action(new_state)
        if len(self.memory.memory) > 100:
            batch_states, batch_actions, batch_rewards, batch_next_states = self.memory.sample(100)
            self.learn(batch_states, batch_actions, batch_rewards, batch_next_states)
        self.last_state = new_state
        self.last_action = new_action
        self.last_reward = new_reward
        return new_action
    
    def save(self):
        torch.save({'state_dict': self.model.state_dict(),
                    'optimizer' : self.optimizer.state_dict(),
                   }, 'last_brain.pth')
    
    def load(self):
        if os.path.isfile('last_brain.pth'):
            print("=> loading checkpoint... ")
            checkpoint = torch.load('last_brain.pth')
            self.model.load_state_dict(checkpoint['state_dict'])
            self.optimizer.load_state_dict(checkpoint['optimizer'])
            print("done !")
        else:
            print("no checkpoint found...")
