In [43]:
import random

import gym
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as f
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, RandomSampler, SubsetRandomSampler

In [182]:
env = gym.make('MountainCar-v0')

ALPHA = 0.1
BATCH_SIZE = 16
TARGET_UPDATE = 10
EPSILON = 0.2
GAMMA = 0.9

In [221]:
class LinearFunctionApproximator(nn.Module):
    def __init__(self, state_size, action_size):
        super(LinearFunctionApproximator, self).__init__()
        init_tensor = nn.init.normal_(torch.empty(state_size, action_size))
        self.params = nn.parameter.Parameter(init_tensor, requires_grad=True)
        self.loss_func = nn.MSELoss()
        self.optimizer = optim.SGD(self.parameters(), lr=ALPHA)
        
    def forward(self, state):
        return torch.matmul(self.params.T, state)
    
    def compute_loss(self, inp, target):
        self.optimizer.zero_grad()
        loss = self.loss_func(inp, target)
        loss.backward()
        self.optimizer.step()
        return loss.detach()

In [222]:
class ReplayMemory:
    def __init__(self, memory_size):
        self.memory_size = memory_size
        self.memory = []
        
    def __len__(self):
        return len(self.memory)
    
    def __getitem__(self, item):
        return self.memory[item]
    
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)
    
    def push(self, *item):
        if len(self) > self.memory_size:
            self.memory.pop()
        self.memory.append(item)

In [223]:
def polynomial_coding(state):
    pos, vel = state
    return np.array([vel, pos * vel])

In [224]:
def get_greedy_action(policy, state):
    with torch.no_grad():
        return policy(state).argmax().item()
    
    
def get_epsilon_greedy_action(policy, state):
    sample = np.random.uniform()
    if sample < EPSILON:
        action = get_greedy_action(policy, state)
    else:
        action = env.action_space.sample()
    return action

In [236]:
def optimize_policy(memory, policy, target_policy, criterion, optimizer):
    sample = memory.sample(BATCH_SIZE)

    actions = torch.cat([a[0] for a in sample], dim=0)
    states = torch.cat([s[1] for s in sample])
    reward = torch.cat([r[2] for r in sample])
    next_states = torch.cat([ns[3] for ns in sample])
    
    state_actions = policy(states.T).T.gather(1, actions)
    next_state_actions = target_policy(next_states.T).T.max(1)[0].detach()
    
    expected = reward + GAMMA * next_state_actions
    
    optimizer.zero_grad()
    loss = criterion(state_actions.flatten(), expected)
    loss.backward()
    optimizer.step()

In [237]:
memory = ReplayMemory(1000)
policy = LinearFunctionApproximator(2, 3)
target_policy = LinearFunctionApproximator(2, 3)
target_policy.load_state_dict(policy.state_dict())
target_policy.eval()

criterion = nn.MSELoss()
optimizer = optim.SGD(policy.parameters(), lr=0.1)
n_episodes = 100
finishing_ep = []

for ep in range(n_episodes):
    done = False
    state = polynomial_coding(env.reset())
    state = torch.tensor(state, dtype=torch.float)
    while not done:
        action = get_epsilon_greedy_action(policy, state)
        next_state, reward, done, _ = env.step(action)
        next_state = polynomial_coding(next_state)
        memory.push(
            torch.tensor([action]).unsqueeze(0),
            state.unsqueeze(0), torch.tensor([reward]),
            torch.tensor(next_state, dtype=torch.float).unsqueeze(0)
        )
        if reward == 0:
            finishing_ep.append(ep)
        
        if len(memory) > BATCH_SIZE:
            optimize_policy(memory, policy, target_policy, criterion, optimizer)
        if ep % TARGET_UPDATE == 0:
            target_policy.load_state_dict(policy.state_dict())

In [238]:
finishing_ep

[]

In [240]:
done = False
state = polynomial_coding(env.reset())
state = torch.tensor(state, dtype=torch.float)

with torch.no_grad():
    while not done:
        action = policy(state).argmax()
        state, _, done, _ = env.step(action.item())
        state = polynomial_coding(state)
        state = torch.tensor(state, dtype=torch.float)
        env.render()
    env.close()

In [241]:
policy.params

Parameter containing:
tensor([[-0.1126, -1.4942,  1.8002],
        [ 0.4350, -0.1616,  0.2305]], requires_grad=True)