In [1]:
import copy
import random
from torch import optim
import torch.nn as nn
import torch.nn.functional as F
import torch
import matplotlib.pyplot as plt
from tqdm import tqdm_notebook as tqdm
import numpy as np

import gym

### Problem

Let's get some information about environment

In [2]:
env = gym.make("MountainCar-v0")

In [3]:
print(env.action_space)
print(env.observation_space)
print(env.observation_space.high)
print(env.observation_space.low)

Discrete(3)
Box(2,)
[0.6  0.07]
[-1.2  -0.07]


In [4]:
total_rewards = []

for episod in range(100):
    state = env.reset()
    episod_reward = 0
    
    done = False
    while not done:
        action = env.action_space.sample()
        observation, reward, done, info = env.step(action)
        episod_reward += reward
        
        if done:
            total_rewards.append(episod_reward)
            break
env.close()

In [5]:
np.unique(total_rewards)

array([-200.])

In [6]:
total_rewards = np.array(total_rewards)
print(f"Task was solved {len(total_rewards[total_rewards > -200])} times with random action algorithm")

Task was solved 0 times with random action algorithm


It's clear that provided task can't be solved with simple policy of taking random actions. Let's try to learn optimal policy with deep Q learning algorithm. More precisely, we will learn Q values and than take based on them choose next action.

### Model

Let's first define DL model

In [26]:
observation_space_dim = env.observation_space.shape[0]
action_space_dim = env.action_space.n
hidden_dim = 50

In [27]:
def create_model(hidden_dim=hidden_dim):
    return nn.Sequential(
        nn.Linear(observation_space_dim, hidden_dim),
        nn.ReLU(),
        nn.Linear(hidden_dim, hidden_dim),
        nn.ReLU(),
        nn.Linear(hidden_dim, action_space_dim)
    )

In [28]:
model = create_model()
print(model)

Sequential(
  (0): Linear(in_features=2, out_features=50, bias=True)
  (1): ReLU()
  (2): Linear(in_features=50, out_features=50, bias=True)
  (3): ReLU()
  (4): Linear(in_features=50, out_features=3, bias=True)
)


In deep learning, the target labels are not changing over training. That makes training stable. Since in Q learning estimation of Q-values is based on another Q-values we are dealing with continiously changing target. To make RL model more stable we will use two models. One will be fixed till another one is training. Once in a while we will update first model too.

In [25]:
fixed_model = copy.deepcopy(model)

### Experience Replay

Also we will use Experience replay technique. Good motivation and explonation of it is provided in [Dealing with Sparse Rewards in Reinforcement Learning](https://arxiv.org/abs/1910.09281):

If using a gradient based iterative optimisation process as done via back-propagation in
neural networks, it is important that the data is independent and identically distributed (i.i.d.).
This is done as to avoid sampling bias from correlated inputs, which can cause the gradient to get
stuck in a non-optimal local maxima (as we are performing gradient ascent to maximise the expected
future reward). Experience replay is a method to help decorrelate the sequential experiences gained
from dynamic programming and model free reinforcement learning methods. This is done by storing
experiences, a tuple of (st, at, rt, st+1) into a list of experiences known as the replay memory. Batch
samples can be drawn randomly from the replay memory which provide ∼ i.i.d. for a large replay
length.

In [10]:
class Experience:
    """Class for implementing Experience Replay technique"""
    def __init__(self, capacity, weighted=False):
        self.capacity = capacity
        self.experience = []
        self.weighted = weighted

    def add(self, el):
        self.experince.append(el)
        self.experince = self.experince[-size:]

    def sample(self, batch_size):
        if self.weighted:
            raise NotImplementedError
        else:
            return np.random.choice(self.experience, size=batch_size)

There is one possible improvement in base Experience Replay method regarding the way of sampling. We can use weighted sampling to put more attention to states with greater reward. 

### Fitting network

To fit network we have to calculate predicted by model Q-values and target Q-values created by fixed model.

In [13]:
def fit(model, fixed_model, data, optimizer, criterion):
    pass

### Reward function

Major difficulty of Mountain car task is extremely sparce rewards: reward is -1 after each step till goal is not reached. To make reward function more dense we will add some heuristic to it.

Let's first define base reward function:

In [16]:
def calc_base_reward(state, reward, next_state):
    return reward

### Training

For training we will use eps-greedy policy with decreasing eps over time.

In [33]:
def greedy_action(model, state, eps=0):
    with torch.no_grad():
        if np.random.rand() <= eps:
            return np.random.choice(action_space_dim)
        else:
            output = model(torch.tensor(state).float())
            return torch.argmax(output).item()

In [None]:
def validate_model(model, draw=False):
    state = env.reset()
    total_reward = 0
    while not done:
        if draw:
            env.render()
        action = greedy_action(model, state)
        state, reward, done, _ = env.step(action)
        total_reward += reward

    state = env.reset()  
    return total_reward

In [15]:
def train(optimizer, criterion, reward_function, 
          time_max=100000, batchsize=64, fixed_model_update=100,
          eps_min=0.1, eps_max=0.9):
    total_rewards = []
    
    model = create_model()
    fixed_model = copy.deepcopy(model)
    
    experience = Experience()
    
    state = env.reset()
    
    for time in range(max_time):
        eps = eps_max - (eps_max - eps_min) * time / time_max
        action = greedy_action(model, state, eps)
        next_state, reward, done, _ = env.step(action)
        
        experience.add([
            state,
            action,
            reward_function(state, reward, next_state),
            next_state,
            done
        ])
        
        if done: # Restart simulation
            state = env.reset()
        else:
            state = next_state
            
        if time > batchsize:
            fit(model, fixed_model, experience.sample(batchsize, optimizer, criterion) # Train model
                
        if time % fixed_model_update == 0:
            fixed_model = copy.deepcopy(model)
            total_rewards.append(validate_model(fixed_model))
    return model, total_rewards

In [20]:
optimizer = optim.Adam(model.parameters(), lr=0.00003)
criterion = F.mse_loss
time_max = 500000

In [None]:
model, total_rewards = train(optimizer,
                             criterion,
                             calc_base_reward,
                             time_max=time_max)

In [None]:
plt.plot(range(len(total_rewards)), total_rewards)
plt.grid()
plt.show()

### Test

In [None]:
total_reward = validate_model(model, draw=True)
print(f"Reward: {reward}")