In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
from matplotlib import image
import numpy as np
from collections import deque
import math
import random

In [2]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor, Lambda

In [3]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

Using cuda device


# Implementing the NFQ algorithm
I've chosen the mountain car environment to try out the NFQ-algorithm
Some learning highlights:
* Hint-to-goal heuristic seems crucial for the success
* Make sure to have cost & min Q / reward max Q in all places.
* The number of episodes + increment fed to the training process in each batch seems to hav a large impact
* Continuing/stopping the training at the right time point is crucial. I've achieved good results with this implementation, but then ruined a good trained NFQ agent

In [5]:
import gym
# Set up environment and do a test run
env = gym.make('CartPole-v0')

In [28]:
env.action_space


Discrete(2)

In [7]:
if False:
    env.reset()
    n_transitions = 200

    for i in range(n_transitions):
        state, reward, done, _ = env.step(env.action_space.sample())
        env.render()
        if done:
            env.reset()

In [12]:
class NFQLearningAgent():
    def __init__(self,env,discount_factor, nfq_net):
        self.env = env
        self.gamma = discount_factor
        self.nfq_net = nfq_net
        self.x_success_range = 2.4
        self.theta_success_range = 12 * 2 * math.pi / 360
   
    def run_one_episode(self):
        episode = []
        done = False
        total_cost = 0
        state = self.env.reset()
        
        for _ in range(200):
            # get best action from the NFQ net
            q_s = self.nfq_net.get_qs(state)
            action = np.argmin(q_s)
            new_state, reward, done, info = self.env.step(action) 
            # Convert rewards to cost in the interval (0.0, 1,0)
            cost = 0.01
            if done and not ('TimeLimit.truncated' in info and info['TimeLimit.truncated']):
                cost = 0.0
            
            episode.append((state, action, cost, new_state, done, info))
            # Update state
            state = new_state

            total_cost += cost
            if done:
                break
        return episode, total_cost
    
    def generate_experiences(self, n_episodes):
        experiences=[]
        for _ in range(n_episodes):
            episode, _ = self.run_one_episode()
            experiences.extend(episode)
        return experiences
    
    def get_goal_experience(self, size):
        # Hint-to-goal heuristic with values extracted from env
        goal_experiences = []
        
        for i in range(size):
            goal_experiences.append(np.array([np.random.uniform(-0.05, 0.05), 
                                              np.random.normal(), 
                                              np.random.uniform(-self.theta_success_range, self.theta_success_range), 
                                              np.random.normal(), 
                                              np.random.randint(2)]))
        goal_targets = np.zeros(size, dtype = np.float32)
        return np.array(goal_experiences), goal_targets
                            

In [29]:
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        # Copying the neural network from Riedmiller 2005
        self.MLP = nn.Sequential(
            nn.Linear(5, 5),
            nn.Sigmoid(),
            nn.Linear(5, 5),
            nn.Sigmoid(),
            nn.Linear(5, 5),
            nn.Sigmoid(),
            nn.Linear(5, 1),
        )
          # Initialize weights to [-0.5, 0.5]
        def init_weights(m):
            if type(m) == nn.Linear:
                torch.nn.init.uniform_(m.weight, -0.5, 0.5)
              
        self.MLP.apply(init_weights)

    def forward(self, x):
        
        logits = self.MLP(x)
        return logits
    
    def get_qs(self, state):
        qs = np.array([self.MLP(torch.cat([torch.FloatTensor(state), torch.FloatTensor([i])], dim=0)).detach().numpy() for i in range(2)]).flatten()
        return qs

In [30]:
def generate_pattern_set(experiences, model, discount_factor):
    state_b, action_b, cost_b, next_state_b, done_b,_ = zip(*experiences)
    state_b = torch.FloatTensor(np.array(state_b))
    action_b = torch.FloatTensor(np.array(action_b))
    cost_b = torch.FloatTensor(np.array(cost_b))
    next_state_b = torch.FloatTensor(np.array(next_state_b))
    done_b = torch.FloatTensor(np.array(done_b))

    # Create the input_l = sl, ul
    state_action_b = torch.cat([state_b, action_b.unsqueeze(1)], 1)

    # Compute minb Qk(sl, b)
    q_next_state_0_b = model(torch.cat([next_state_b, torch.zeros(len(experiences), 1)], 1)).squeeze()
    q_next_state_1_b = model(torch.cat([next_state_b, torch.ones(len(experiences), 1)], 1)).squeeze()
    q_next_state_2_b = model(torch.cat([next_state_b, torch.full((len(experiences), 1), fill_value=2)], 1)).squeeze()
    q_next_state_b,_ = torch.min(torch.stack([q_next_state_0_b, q_next_state_1_b, q_next_state_2_b]),dim=0)
    
    # Create the target_l = c(sl, ul, sl) +γ minb Qk(sl, b)
    with torch.no_grad():
        target_q_values = cost_b + discount_factor * q_next_state_b * (1 - done_b)

    return state_action_b, target_q_values

def train_loop(agent, loss_fn, optimizer, discount_factor):
    losses=[]
    
    N_batches = 500
    state_actions = torch.empty(0,5)
    target_q_values = torch.empty(0)
    
    #Incrementally add one episode + 100 hint-to-goal heuristics for each batch
    for i_batch in range(N_batches):
        # Generating experiences
        experiences = agent.generate_experiences(1)
        # Extract and format state_actions and targets       
        state_actions_1, target_q_values_1 = generate_pattern_set(experiences, agent.nfq_net, discount_factor)
        state_actions = torch.cat([state_actions, state_actions_1], dim=0)
        target_q_values = torch.cat([target_q_values, target_q_values_1], dim=0)

        # Add hint-to-goal heuristics with a factor of 100
        goal_state_action_b, goal_target_q_values = agent.get_goal_experience(100)

        goal_state_action_b = torch.FloatTensor(goal_state_action_b)
        goal_target_q_values = torch.FloatTensor(goal_target_q_values)
        
        state_actions = torch.cat([state_actions, goal_state_action_b], dim=0)
        target_q_values = torch.cat([target_q_values, goal_target_q_values], dim=0)
            
        # Compute prediction and loss
        pred = model(state_actions).squeeze()
        
        loss = loss_fn(pred, target_q_values)
        losses.append(loss.item())

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    return losses


def test_loop(agent, n_trials):
    success = 0.
    for i in range (n_trials):
        episode, total_cost = agent.run_one_episode()
        done = episode[-1][-2]
        info = episode[-1][-1]
        if done and not ('TimeLimit.truncated' in info and info['TimeLimit.truncated']):
            success+=1.
        
    success /= n_trials
    print(f"Test run, success: {(100*success):>0.1f}%")
    return success*100

In [31]:
model = NeuralNetwork()
gamma = 0.99
loss_fn=nn.MSELoss()
optimizer = torch.optim.Rprop(model.parameters())

NFQagent = NFQLearningAgent(env,gamma, model)

In [32]:
losses=[]
epochs = 20
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    losses.extend(train_loop(NFQagent, loss_fn, optimizer, gamma))
    success_rate = test_loop(NFQagent, 100)
    if (success_rate >= 99.0):
        break
print("Done!")

Epoch 1
-------------------------------
Test run, success: 100.0%
Done!


In [33]:
# Do a test run using the trained NFQ agent
test_loop(NFQagent, 500)

Test run, success: 100.0%


100.0

In [34]:
# For visualization
state = env.reset()
n_transitions = 1000

for i in range(n_transitions):
    q_s = np.array([NFQagent.nfq_net(torch.cat([torch.FloatTensor(state), torch.FloatTensor([i])], dim=0)).detach().numpy() for i in range(3)]).flatten()
    action = np.argmin(q_s)
    #print(action, q_s)
    state, reward, done, info = env.step(action)
    
    env.render()
    if done:
        if 'TimeLimit.truncated' in info:
            print(info)
        else:
            print('Terminated: Success')
        env.reset()

NoSuchDisplayException: ignored