In [45]:
import ptan
import numpy as np

import torch
import torch.nn as nn

import gym

# Action selectors

## ArgMax

In [6]:
q_vals = np.array([[1,2,3], [1,-1,0]])
q_vals

array([[ 1,  2,  3],
       [ 1, -1,  0]])

In [9]:
selector = ptan.actions.ArgmaxActionSelector()
selector(q_vals)

array([2, 0])

In [13]:
np.take_along_axis(q_vals, selector(q_vals)[:,None], 1).flatten()

array([3, 1])

## EpsilonGreedy

In [14]:
selector = ptan.actions.EpsilonGreedyActionSelector(epsilon=0.0)
selector(q_vals)

array([2, 0])

In [15]:
selector = ptan.actions.EpsilonGreedyActionSelector(epsilon=0.5)
selector(q_vals)

array([2, 1])

## Probability selection

In [18]:
selector = ptan.actions.ProbabilityActionSelector()
acts = np.array([[.125,.75,.125],
                [0,0,1.],
                [.5,.5,0]])

for _ in range(10):
    print(selector(acts))

[2 2 1]
[1 2 0]
[1 2 1]
[1 2 0]
[1 2 1]
[1 2 0]
[1 2 1]
[1 2 1]
[1 2 0]
[1 2 1]


# Agent

## DQN Agent

### Simple Model

In [25]:
class DQNNET(nn.Module):
    def __init__(self, n_actions):
        super().__init__()
        self.n_actions = n_actions
        
    def forward(self, x):
        return torch.eye(x.size()[0], self.n_actions)

In [26]:
net = DQNNET(n_actions=3)
net(torch.zeros(2,10))

tensor([[1., 0., 0.],
        [0., 1., 0.]])

### Argmax selector

In [27]:
selector = ptan.actions.ArgmaxActionSelector()
agent = ptan.agent.DQNAgent(dqn_model=net, action_selector=selector)

In [29]:
# (actions for each batch), [internal state for each batch]

agent(torch.zeros(2,5))

(array([0, 1]), [None, None])

### Epsilon-Greedy

In [33]:
selector = ptan.actions.EpsilonGreedyActionSelector(.5)
agent = ptan.agent.DQNAgent(dqn_model=net, action_selector=selector)

agent(torch.zeros(2,5))

(array([2, 2]), [None, None])

### Policy Agent

In [39]:
class PolicyNet(nn.Module):
    def __init__(self, n_actions):
        super().__init__()
        self.n_actions = n_actions
        
    def forward(self, x):
        output_shape = (x.shape[0], self.n_actions)
        result = torch.zeros(output_shape, dtype=torch.float32)
        result[:,:2] = 1.
        
        return result

In [40]:
net = PolicyNet(5)
net(torch.zeros(6,10))

tensor([[1., 1., 0., 0., 0.],
        [1., 1., 0., 0., 0.],
        [1., 1., 0., 0., 0.],
        [1., 1., 0., 0., 0.],
        [1., 1., 0., 0., 0.],
        [1., 1., 0., 0., 0.]])

In [43]:
selector = ptan.actions.ProbabilityActionSelector()
agent = ptan.agent.PolicyAgent(model=net, action_selector=selector, apply_softmax=True)

agent(torch.zeros(6,5))[0]

array([0, 4, 0, 4, 2, 2])

# Experience Source

In [52]:
class ToyEnv(gym.Env):
    def __init__(self):
        super().__init__()
        self.observation_space = gym.spaces.Discrete(n=5)
        self.action_space = gym.spaces.Discrete(n=3)
        self.step_index = 0
        
    def reset(self):
        self.step_index = 0
        return self.step_index
    
    def step(self, action):        
        if is_done := (self.step_index==10):
            return self.step_index%self.observation_space.n, 0.0, is_done, {}
        
        self.step_index += 1
        return self.step_index%self.observation_space.n, float(action), self.step_index==10, {}

In [53]:
env = ToyEnv()
env.step(2)

(1, 2.0, False, {})

In [54]:
class DullAgent(ptan.agent.BaseAgent):
    def __init__(self, action):
        super().__init__()
        self.action = action
        
    def __call__(self, observations, state):
        return [self.action for _ in observations], state

## ExperienceSource

In [57]:
env = ToyEnv()
agent = DullAgent(action=1)
exp_source = ptan.experience.ExperienceSource(env=env, agent=agent, steps_count=2)

for idx, exp in enumerate(exp_source):
    if idx>12: break
    print(exp)

(Experience(state=0, action=1, reward=1.0, done=False), Experience(state=1, action=1, reward=1.0, done=False))
(Experience(state=1, action=1, reward=1.0, done=False), Experience(state=2, action=1, reward=1.0, done=False))
(Experience(state=2, action=1, reward=1.0, done=False), Experience(state=3, action=1, reward=1.0, done=False))
(Experience(state=3, action=1, reward=1.0, done=False), Experience(state=4, action=1, reward=1.0, done=False))
(Experience(state=4, action=1, reward=1.0, done=False), Experience(state=0, action=1, reward=1.0, done=False))
(Experience(state=0, action=1, reward=1.0, done=False), Experience(state=1, action=1, reward=1.0, done=False))
(Experience(state=1, action=1, reward=1.0, done=False), Experience(state=2, action=1, reward=1.0, done=False))
(Experience(state=2, action=1, reward=1.0, done=False), Experience(state=3, action=1, reward=1.0, done=False))
(Experience(state=3, action=1, reward=1.0, done=False), Experience(state=4, action=1, reward=1.0, done=True))
(E

## ExperienceSourceFirstLast

Take n steps, record the first and last together with accumulated rewards.

In [63]:
exp_source = ptan.experience.ExperienceSourceFirstLast(env, agent, gamma=1., steps_count=2)

In [64]:
for e,exp in enumerate(exp_source):
    print(exp)
    if e>10: break

ExperienceFirstLast(state=0, action=1, reward=2.0, last_state=2)
ExperienceFirstLast(state=1, action=1, reward=2.0, last_state=3)
ExperienceFirstLast(state=2, action=1, reward=2.0, last_state=4)
ExperienceFirstLast(state=3, action=1, reward=2.0, last_state=0)
ExperienceFirstLast(state=4, action=1, reward=2.0, last_state=1)
ExperienceFirstLast(state=0, action=1, reward=2.0, last_state=2)
ExperienceFirstLast(state=1, action=1, reward=2.0, last_state=3)
ExperienceFirstLast(state=2, action=1, reward=2.0, last_state=4)
ExperienceFirstLast(state=3, action=1, reward=2.0, last_state=None)
ExperienceFirstLast(state=4, action=1, reward=1.0, last_state=None)
ExperienceFirstLast(state=0, action=1, reward=2.0, last_state=2)
ExperienceFirstLast(state=1, action=1, reward=2.0, last_state=3)


## Experience replay buffers

In [75]:
env = ToyEnv()
agent = DullAgent(action=1)
exp_source = ptan.experience.ExperienceSourceFirstLast(env, agent, gamma=1., steps_count=1)

In [76]:
buffer = ptan.experience.ExperienceReplayBuffer(exp_source, buffer_size=100)
len(buffer)

0

Normal training loop now becomes:

1. buffer.populate(1)
2. batch = buffer.sample(BATCH_SIZE)
3. loss + backprop

In [77]:
for step in range(6):
    buffer.populate(1)
    if len(buffer)<5: continue
        
    batch = buffer.sample(4)
    print('\n'.join(str(b) for b in batch))
    print('--------------------------')

ExperienceFirstLast(state=2, action=1, reward=1.0, last_state=3)
ExperienceFirstLast(state=1, action=1, reward=1.0, last_state=2)
ExperienceFirstLast(state=1, action=1, reward=1.0, last_state=2)
ExperienceFirstLast(state=1, action=1, reward=1.0, last_state=2)
--------------------------
ExperienceFirstLast(state=1, action=1, reward=1.0, last_state=2)
ExperienceFirstLast(state=1, action=1, reward=1.0, last_state=2)
ExperienceFirstLast(state=1, action=1, reward=1.0, last_state=2)
ExperienceFirstLast(state=0, action=1, reward=1.0, last_state=1)
--------------------------


# TargetNet

Syncing of two neural nets, either by copying or by linear interpolation of the weights.

In [78]:
class DQNNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.ff = nn.Linear(5,3)
        
    def forward(self, x):
        return self.ff(x)

In [79]:
net = DQNNet()
net

DQNNet(
  (ff): Linear(in_features=5, out_features=3, bias=True)
)

In [81]:
tgt_net = ptan.agent.TargetNet(net)

In [89]:
net.ff.weight[0][0], tgt_net.target_model.ff.weight[0][0]

(tensor(-0.0197, grad_fn=<SelectBackward>),
 tensor(-0.0197, grad_fn=<SelectBackward>))

In [91]:
net.ff.weight[0][0] += 1
net.ff.weight[0][0], tgt_net.target_model.ff.weight[0][0]

(tensor(0.9803, grad_fn=<SelectBackward>),
 tensor(-0.0197, grad_fn=<SelectBackward>))

In [92]:
tgt_net.sync()
net.ff.weight[0][0], tgt_net.target_model.ff.weight[0][0]

(tensor(0.9803, grad_fn=<SelectBackward>),
 tensor(0.9803, grad_fn=<SelectBackward>))