In [1]:
import random
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
device = "cpu"

In [70]:
class SimComplete(Exception):
    pass

class ShuffleBoardEnv:
    def __init__(self, device, goal_start_pos=10, goal_end_pos=20, coeff_friction=0.2, max_energy=100,
                 block_mass_kg=1, gravity=9.8, action_reward=-1, goal_reward=0, max_steps=100):
        """
        The agent in this toy example applies a force on an object every time step, 
        
        Agent starts at position 0. In each state, the agent's action is a scalar energy applied in range [-1, 1],
        which is a linear proportion of max_energy (e.g. an action of 0.5 applies max_energy/2).
        (here we used a signed energy to allow movement in either direction).
        Displacement is calculated as dx=applied_energy / (mass*gravity*coeff_friction)
        
        For each action, the agent receives a reward action_reward, unless the action moves the agent into the
        goal region, which results in the agent receiving reward goal_reward and play restarts from x=0. If the
        agent goes beyond end_pos, play also restarts.
        
        After taking a step, the agent receives updated state information as its current position.
        
        Parameters:
        device: cpu or cuda
        goal_start_pos: coordinate where goal region begins
        goal_end_pos: coordinate where goal region ends
        coeff_friction: friction coefficient for surface
        max_energy: max energy the agent can apply in each step, in joules
        block_mass_kg: mass of the block being push, in kilograms
        gravity: m/s^2 gravity coefficient in the sim
        action_reward: reward given to agent for each action that does not result in agent inside goal region
        goal_reward: reward given to agent upon reaching goal region.
        max_steps: restart episode after this many sim steps
        """
        
        self.goal_start_pos = goal_start_pos
        self.goal_end_pos = goal_end_pos
        self.coeff_friction = coeff_friction
        self.max_energy = torch.Tensor([max_energy]).to(device)
        self.block_mass_kg = block_mass_kg
        self.gravity = gravity
        self.action_reward = action_reward
        self.goal_reward = goal_reward
        self.max_steps = max_steps
        
        # absolute movement limit, aka state_scaling in Q_Learning class
        self.boundary = 1000
        
        self.device = device
        self.initialized = False
        
        
    def reset(self, batch_size):
        """
        batch_size: how many simulations to run simultaneously with these sim parameters
        """
        self.positions = torch.zeros(batch_size).to(device)
        self.batch_size = batch_size
        self.steps = 0
        self.done_flags = (torch.zeros(self.batch_size).to(device)) == 1
        self.initialized = True
        
    def step(self, input_energy):
        """
        Returns:
        - rewards: batch_size vector of rewards after this step
        - done_flags: batch_size bool vector, true if that sim is completed
        """
        if not self.initialized or torch.all(self.done_flags):
            raise SimComplete()
        applied_energy = input_energy * self.max_energy
        displacement = applied_energy/(self.block_mass_kg * self.gravity * self.coeff_friction)
        self.positions += displacement
        
        reached_goal = (self.positions > self.goal_start_pos) * (self.positions < self.goal_end_pos)
        rewards = torch.zeros(self.batch_size)
        
        rewards[~reached_goal] += self.action_reward
        rewards[reached_goal] += self.goal_reward
        rewards[self.done_flags] = 0
            
        self.steps += 1
        if self.steps > self.max_steps or torch.any(torch.abs(self.positions) > self.boundary):
            all_done_flag = torch.ones(self.batch_size).to(device) == 1
            return rewards, all_done_flag
        self.done_flags += reached_goal
        return rewards, self.done_flags
        
    def observe_state(self):
        return self.positions

In [71]:
class SAValueNN(nn.Module):
    def __init__(self, num_hidden, num_actions):
        super().__init__()
        """
        simple shallow ReLU network:
        
        """
        self.model = nn.Sequential(nn.Linear(1, num_hidden),
                                    nn.ReLU(),
                                    nn.Linear(num_hidden, num_actions))

    def forward(self, state):
        return self.model(state)

class Q_Learning:
    def __init__(self, epsilon, gamma, value_model, target_model, action_space, state_scaling=100):
        self.epsilon = epsilon
        self.gamma = gamma
        self.value_model = value_model
        self.target_model = target_model
        self.action_space = action_space
        self.num_actions = sum(action_space.shape)
        self.optimizer = torch.optim.Adam(self.value_model.parameters(), lr=0.01)
        
        self.state_scaling = state_scaling
        
    def update_target_model(self):
        self.target_model.load_state_dict(self.value_model.state_dict())
        
    def random_sample(self):
        # for now, assume action space is a vector of actions
        return random.randrange(self.num_actions)
        
    def get_action(self, state):
        """
        End goal:
        sample actions given a state and action space
            action space might be a list of actions
        how to define action space?
            maybe easiest is a list of values (since this is a continuous action space)
        """
        state = state/self.state_scaling
        q_values = self.predict_values(state)
        
        if np.random.random() < self.epsilon:
            rand_ind = self.random_sample()
            return rand_ind, q_values
        
        # otherwise, compute value for each of these actions
        best_value, best_action_ind = torch.max(q_values, dim=0)
        return best_action_ind, q_values
    
    def calc_loss(self, q_values, action_taken_ind, target_values, reward, done_flags):
        """
        q_value: predicted value for taken action
        target_values: value for each action for update target
        reward: for taken action
        done_flag: whether to mask this update (TODO for batch q learning)
        """
        max_target = torch.max(target_values)
        q_target = reward + self.gamma*max_target
        return torch.square(q_values[action_taken_ind] - q_target)
    
    def update(self, loss):
        loss.backward()
        self.optimizer.step()
    
    def target_values(self, state):
        state = state / self.state_scaling
        return self.target_model(state)
    
    def predict_values(self, state):
        state = state / self.state_scaling
        return self.value_model(state)

In [75]:
env = ShuffleBoardEnv(device, goal_start_pos=55, goal_end_pos=60, goal_reward=10)

action_space = torch.Tensor([0.2*x+0.1 for x in range(-5, 5)])
value_model_NN = SAValueNN(num_hidden=5, num_actions=action_space.numel())
target_model_NN = SAValueNN(num_hidden=5, num_actions=action_space.numel())

q_agent = Q_Learning(epsilon=0.1, gamma=0.99, value_model=value_model_NN,
                    target_model = target_model_NN, action_space=action_space)

In [76]:
# training loop:
nIters = 1000
reward_list = []

target_model_update_freq = 500

for i in range(nIters):
    env.reset(1)
    episode_running = True
    
    state = env.observe_state()
    episode_reward = 0
    
    if i % target_model_update_freq == 0:
        q_agent.update_target_model()
    
    while True:
        q_agent.optimizer.zero_grad()
        
        action_ind, values = q_agent.get_action(state)
        
        reward, flag = env.step(action_space[action_ind])
        episode_reward += reward.detach()
        
        new_state = env.observe_state()
        
        target_values = q_agent.target_values(new_state)
        loss = q_agent.calc_loss(values, action_ind, target_values, reward, flag)
        q_agent.update(loss)
        if flag:
            # episode over
            #print(episode_reward)
            reward_list.append(episode_reward)
            break
        state = new_state

In [59]:
def test_run(agent, env, action_space):
    env.reset(1)
    store_eps = agent.epsilon
    agent.epsilon = 0
    
    total_reward = 0
    done_flag = False
    while not done_flag:
        action, values = agent.get_action(env.positions)
        reward, done_flag = env.step(action_space[action])
        total_reward += reward
    agent.epsilon = store_eps
    return total_reward

In [60]:
test_run(q_agent, env, action_space)

tensor([9.])

In [39]:
q_agent.predict_values(env.positions)

tensor([-0.5812, -0.1380,  0.4887, -0.3369, -0.0779], grad_fn=<AddBackward0>)

In [40]:
env.positions

tensor([25.5102])

In [55]:
b

tensor(2)

In [81]:
a = torch.Tensor([1])
b = torch.Tensor([1, 2])
torch.cat((a, b))

tensor([1., 1., 2.])

In [88]:
torch.Tensor([state[0], action_space[0]])

tensor([0., 0.])

In [90]:
action_space[0].unsqueeze(0)

tensor([0.])

In [2]:
import gym

In [None]:
env = gym.make('CartPole-v0')
env.reset()
for _ in range(1000):
    env.render()
    env.step(env.action_space.sample())
env.close()

In [3]:
env = gym.make("BipedalWalker-v3")
env.reset()



array([ 2.74741533e-03, -1.30260282e-05,  1.01329319e-03, -1.59999108e-02,
        9.19707566e-02, -1.33720273e-03,  8.60263631e-01,  2.39109698e-03,
        1.00000000e+00,  3.23770605e-02, -1.33711053e-03,  8.53812397e-01,
        9.45682094e-04,  1.00000000e+00,  4.40814018e-01,  4.45820123e-01,
        4.61422771e-01,  4.89550203e-01,  5.34102798e-01,  6.02461040e-01,
        7.09148884e-01,  8.85931849e-01,  1.00000000e+00,  1.00000000e+00])

In [4]:
obs, reward, done, _ = env.step(env.action_space.sample())

In [5]:
obs.shape

(24,)

In [23]:
while not done:
    env.render()
    obs, reward, done, _ = env.step(env.action_space.sample())

In [20]:
obs.shape

(96, 96, 3)

In [1]:
done

NameError: name 'done' is not defined