In [39]:
import torch
from torch import nn
from torch import optim
import gym
import matplotlib.pyplot as plt

In [2]:
env = gym.make('CartPole-v1')
n_actions = env.action_space.n
n_observations = env.observation_space.shape[0]
hidden_size = 150

In [48]:
model = nn.Sequential(
    nn.Linear(n_observations, hidden_size),
    nn.LeakyReLU(),
    nn.Linear(hidden_size, n_actions),
    nn.Softmax(dim=-1)
)

In [4]:
LEARNING_RATE = 0.0009
optimizier = optim.Adam(model.parameters(), lr=LEARNING_RATE)

In [5]:
state, _ = env.reset()

In [6]:
state

array([-0.02845098,  0.02649889,  0.0030152 ,  0.02776563], dtype=float32)

In [7]:
pred = model(torch.from_numpy(state))

In [8]:
def select_random_action(preds):
    pass

In [9]:
action = select_random_action(pred)

In [10]:
new_observation, reward, done, truncated, info = env.step(action)

AssertionError: None (<class 'NoneType'>) invalid

### Discounted Reward

In [11]:
def discount_rewards(rewards, discount_factor):
    n_rewards = len(rewards)
    timesteps = torch.arange(0, n_rewards)
    
    # calculate the discount for each time step
    discount = torch.pow(discount_factor, timesteps)
    discounted_rewards = discount * rewards
    
    # normalize
    discounted_rewards /= discounted_rewards.max()

    return discounted_rewards

In [12]:
rewards = torch.tensor([1, 2, 3])

In [13]:
discount_rewards(rewards, discount_factor=0.99)

tensor([0.3401, 0.6734, 1.0000])

### Loss Function

$-\gamma_t * G_t * \log \pi_s(a \mid \theta)$

$G_t$: is called the sum of all rewards until time $t$
- $G_t=r_t+r_{t+1} \ldots+r_{T-1}+r_T$

**Example**:

In [51]:
def loss_func(selected_actions, discounted_rewards):
    return -1 * torch.sum(discounted_rewards * torch.log(selected_actions))

### Training Loop

In [47]:
# for episode in range(MAX_EPISODES):
#     observation, _ = env.reset()
#     observation = torch.from_numpy(observation)
#     done= False
#     transitions = []
    
#     for t in range(MAX_DUR):
#         predicted_actions = model(observation).float()
#         action = torch.multinomial(predicted_actions, num_samples=1)[0].item()
        
#         new_observation, reward, done, truncated, info = env.step(action)
#         transitions.append((observation.numpy(), action, reward, new_observation))
        
#         if done: break
        
#         scores.append(len(transitions))
        
#         reward_batch = torch.tensor([reward for (observation, action, reward) in transitions]).flip(dims=(0,))
#         discounted_reward = discount_rewards(reward_batch, discount_factor=0.99)
        
#         observation_batch = torch.tensor([observation for (observation, action, reward) in transitions])
#         action_batch = torch.tensor([action for (observation, action, reward) in transitions])
        
#         predicted_action_batch = model(observation_batch)
        
#         predicted_action_batch = predicted_action_batch.gather(
#             dim=1,
#             index=action_batch.long().view(-1,1)
#         ).squeeze()
        
#         loss = loss_func(predicted_action_batch, discounted_reward)
#         losses.append(loss.detach().numpy())
#         optimizier.zero_grad()
#         loss.backward()
#         optimizier.step()
        
#         observation = torch.from_numpy(new_observation)

In [43]:
MAX_DUR = 200
MAX_EPISODES = 1000

discount_factor = 0.99
scores = []
transitions = []
losses = []

In [None]:
def select_action(model, state):
    predicted_action = model(state)

In [86]:
def generate_transitions(model, env):
    transitions = []
    state, _ = env.reset()
    
    while True:
        predicted_action = model(torch.from_numpy(state))
        action = torch.argmax(predicted_action, dim=-1)
        next_state, reward, done, truncated, info = env.step(action.item())
        
        transitions.append((
            state, action, reward, next_state
        ))
        
        if done: break
        
        state = next_state
    return transitions

In [87]:
def extract_transitions(transitions):
    states, actions, rewards = [], [], []
    
    for transition in transitions:
        state, action, reward, _ = transition
        states.append(state)
        actions.append(action)
        rewards.append(reward)
    
    return states, actions, rewards

In [88]:
N_EPISODE = 1
DISCOUNT_FACTOR = 0.99

losses = []

for episode in range(N_EPISODE):
    transitions = generate_transitions(model=model, env=env)
    states, actions, reward = extract_transitions(transitions)
    discounted_rewards = discount_rewards(rewards, discount_factor=DISCOUNT_FACTOR)
    
    # convert to torch tensor
    states = torch.tensor(states)
    
    # make prediction over a batch of states
    predicted_actions = model(states)
    selected_actions = predicted_actions(range(len(predicted_actions)), actions)
    
    loss = loss_func(selected_actions, discounted_rewards)
    losses.append(loss.detach().numpy())
    
    optimizier.zero_grad()
    loss.backward()
    optimizier.step()

TypeError: 'Tensor' object is not callable