In [None]:
import gym
import math
import torch
import numpy as np
import collections
import torch.nn as nn
from random import random
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Categorical

from matplotlib import pyplot as plt
from IPython.display import clear_output
%matplotlib inline

In [None]:
from functools import reduce
from operator import add

class PolicyShallowNN(nn.Module):
    def __init__(self, input_length, output_length):
        super(PolicyShallowNN, self).__init__()
        self.fc = nn.Linear(input_length, output_length)
        
    def forward(self, x):
        x = self.fc(x)
        return F.softmax(x, dim=1)

class PolicyDeepNN(nn.Module):
    def __init__(self, input_length, output_length):
        super(PolicyDeepNN, self).__init__()
        self.fc1 = nn.Linear(input_length, 64)
        self.fc2 = nn.Linear(64, 128)
        self.fc3 = nn.Linear(128, output_length)
        
    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        x = self.fc3(x)
        return F.softmax(x, dim=1)

def mountaincar_reward_func(_, states):
    return max([state[0] for state in states])
    
fields = [
    'name',
    'model_class',
    # the rest are optional, in case it's needed for the env
    'reward_func',
    'output_quantization_levels',
    'output_range',
    'reward_threshold',
    'learning_rate',
]
EnvDescription = collections.namedtuple('EnvDescription', fields, defaults=[None] * len(fields))    

env_descriptions = [
    EnvDescription(name='CartPole-v1',
                   model_class=PolicyShallowNN,
                  ),
    EnvDescription(name='Acrobot-v1',
                   model_class=PolicyShallowNN,
                  ),
    EnvDescription(name='Pendulum-v0',
                   model_class=PolicyDeepNN,
                   output_quantization_levels=41,
                   output_range=[-2, 2],
                   reward_threshold=-150,
                  ),
    EnvDescription(name='MountainCar-v0',
                   model_class=PolicyDeepNN,
                   reward_func=mountaincar_reward_func,
                   reward_threshold=0.51,
                   learning_rate=0.001,
                  ),
]    

def reward_threshold(env, env_desc):
    if env_desc.reward_threshold is not None:
        return env_desc.reward_threshold
    else:
        return env.spec.reward_threshold

def action_vector_from_policy(model, state, eps_non_greedy):
    state = torch.from_numpy(state).float().unsqueeze(0)
    x = model(state)
    m = Categorical(x)
    if random() < eps_non_greedy:
        # off policy
        action = int(len(x[0]) * random())
        return action, x[0]
    else:
        # on policy
        action = m.sample()
        return action.item(), x[0]

def select_action_from_policy(model, state, eps_non_greedy):
    state = torch.from_numpy(state).float().unsqueeze(0)
    probs = model(state)
    m = Categorical(probs)
    if random() < eps_non_greedy:
        # off policy
        action = int(len(probs[0]) * random())
        return action, m.logits[0][action]
    else:
        # on policy
        action = m.sample()
        return action.item(), m.log_prob(action)[0]

def select_action_from_policy_best(model, state, eps_non_greedy):
    state = torch.from_numpy(state).float().unsqueeze(0)
    probs = model(state)
    m = Categorical(probs)
    if random() < eps_non_greedy:
        # off policy
        action = int(len(probs[0]) * random())
        return action, m.logits[0][action]
    else:
        action = torch.argmax(probs[0]).item()
        return action, m.logits[0][action]
    
def play_one_game(env_desc, env, model, eps_non_greedy, policy_func=select_action_from_policy):
    state = env.reset()
    rewards, probs, actions, states = [], [], [], []
    for t in range(env._max_episode_steps):
        action, prob = policy_func(model, state, eps_non_greedy)
        actions.append(action)
        probs.append(prob)
        if env_desc.output_quantization_levels is not None:
            action = env_desc.output_range[0] + action * float(env_desc.output_range[1] - env_desc.output_range[0]) / (env_desc.output_quantization_levels-1)
            action = np.array([action])
        state, reward, done, _ = env.step(action)
        states.append(state)
        rewards.append(reward)
        if done:
            break
    if env_desc.reward_func is not None:
        episode_reward = env_desc.reward_func(rewards, states)
    else:
        episode_reward = sum(rewards)
    return episode_reward, probs, actions
    
def play_games(env_desc, env, model, num_episodes, eps_non_greedy):
    games = []
    for episode in range(num_episodes):
        episode_reward, probs, actions = play_one_game(env_desc, env, model, eps_non_greedy)
        games.append((episode_reward, probs, actions))
    return games

def in_out_length(env):
    input_length = env.observation_space.shape[0]
    if isinstance(env.action_space, gym.spaces.box.Box):
        output_length = env.action_space.shape[0]
    elif isinstance(env.action_space, gym.spaces.discrete.Discrete):
        output_length = env.action_space.n
    return input_length, output_length

def compute_loss(probs, rewards):
    loss = 0
    for p, r in zip(probs, rewards):
        loss += p * r
    loss *= -1
    return loss

def mean(li):
    return sum(li)/len(li)

def train_solve_self_play(env_desc, model=None, num_epochs=5000, num_episodes=100, eps_non_greedy=0.0):
    print('Doing %s' % env_desc.name)
    env = gym.make(env_desc.name)
    input_length, output_length = in_out_length(env)
    if env_desc.output_quantization_levels is not None:
        output_length *= env_desc.output_quantization_levels
    if model is None:
        model = env_desc.model_class(input_length, output_length)
    if env_desc.learning_rate != None:
        lr = env_desc.learning_rate
    else:
        lr = 0.01
    optimizer = optim.Adam(model.parameters(), lr=lr)
    for epoch in range(num_epochs):
        games = play_games(env_desc, env, model, num_episodes, eps_non_greedy)
        games.sort(key=lambda x: x[0]) # sorts by first key, the reward
        losses = games[:int(num_episodes/2)]
        wins = games[int(num_episodes/2):]
        sum_loss = 0
        for game in losses:
            probs = game[1]
            sum_loss += compute_loss(probs, [-1] * len(probs))
        for game in wins:
            probs = game[1]
            sum_loss += compute_loss(probs, [+1] * len(probs))
        optimizer.zero_grad()
        sum_loss.backward()
        optimizer.step()
        evaluation_rewards = []
        for _ in range(10):
            reward, _, _ = play_one_game(env_desc, env, model, eps_non_greedy=0.0, policy_func=select_action_from_policy_best)
            evaluation_rewards.append(reward)
        mean_reward = mean(evaluation_rewards)
        print('%d: min=%.2f median=%.2f max=%.2f eval=%.2f' % (epoch, games[0][0], games[int(num_episodes/2)][0], games[-1][0], mean_reward))
        if mean_reward >= reward_threshold(env, env_desc):
            print('Solved!')
            return model
    print('Failed!')
    return model

models = []
for env_desc in env_descriptions:
    model = train_solve_self_play(env_desc)
    models.append(model)