In [2]:
'''Implements Q-learning for a continuous state space (cartpole)'''


import gym
import gym_minigrid
import logging
import numpy as np
from collections import defaultdict
from lib import plotting
import matplotlib
import matplotlib.pyplot as plt
from tqdm import tqdm
import torch
import torch.nn as nn


In [3]:
ENV_NAME = 'CartPole-v0'
env = gym.make(ENV_NAME)
#env = gym.wrappers.Monitor(env, 'tmp/training-dir/')
logger = logging.getLogger()
logger.setLevel(logging.INFO)
%matplotlib inline


In [None]:
class FeedForwardNetwork(nn.Module):
    ''' Simple feed-forward NN for approximating the Q-function
    1 hidden layer'''
    def __init__(self, input_dim, hidden_dim, output_dim):
        nn.Module.__init__(self)
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.reLU()
        self.fc2 = nn.Linear(hidden_dim, output_dim)


        def forward(self, x):
            return self.fc2(self.relu(self.fc1(x)))


In [5]:
def select_action(model, observation, *, num_actions, epsilon):
    '''Select an action using the e-greedy policy based on the model

    :param model: model to approximate Q
    :param observation: observation from the environment to select the action for
    :param num_actions: Size of action space
    :param epsilon: Probability of taking a random action
    :returns: an action from [0, num_actions)
    :rtype: Int
    '''
    if np.random.uniform() < epsilon:
        return np.random.choice(num_actions)
    else:
        return model(torch.Tensor(observation)).argmax()


def q_learning(env, *, num_episodes, alpha, gamma, epsilon, max_entropy):
    """Find the optimal policy using off-policy Q-learning

    :param env: OpenAI environment
    :param num_episodes: Number of episodes to run
    :param alpha: Learning rate
    :param gamma: Discount factor
    :param epsilon: Probability of taking a random action
    :returns: Optimal Q-function and statistics
    :rtype: dictionary of state -> action -> action-value, plotting.EpisodeStats

    """
    statistics = plotting.EpisodeStats(
        episode_lengths=np.zeros(num_episodes),
        episode_rewards=np.zeros(num_episodes))
    nA = env.action_space.n
    q = defaultdict(lambda: np.zeros(nA))
    for episode_idx in range(num_episodes):
        if (episode_idx + 1) % 10 == 0:
            print("\nEpisode {}/{}"
                  .format(episode_idx + 1, num_episodes))
        observation = env.reset()
        terminal = False
        t = 1
        while not terminal:
            policy = make_policy(q, env.action_space.n, epsilon)
            action_distribution = policy(observation)
            action = np.random.choice(np.arange(len(action_distribution)),
                                      p=action_distribution)
            next_observation, reward, done, _ = env.step(action)
            #print(observation, reward, action, next_observation, done)
            next_observation = torch.tensor(next_observation)
            statistics.episode_rewards[episode_idx] += reward
            statistics.episode_lengths[episode_idx] = t
            next_action_values = [q[next_observation][next_action]
                                  for next_action
                                  in np.arange(nA)]
            best_next_q = max(q[next_observation])
            entropy_bonus = entropy(action_distribution)
            if max_entropy:
                q[observation][action] += alpha * (reward + gamma * best_next_q - q[observation][action] + entropy_bonus)
            else:
                q[observation][action] += alpha * (reward + gamma * best_next_q - q[observation][action])
            if done:
                terminal = True
            else:
                observation = next_observation
                t += 1
    return q, statistics


In [None]:
num_runs = 20
num_episodes = 100
for run_idx in tqdm(range(num_runs)):
    q, stats = q_learning(env, num_episodes=num_episodes, alpha=.1, gamma=.99, epsilon=.1, max_entropy=False)