The state is represented by 4 numbers:

The cart position x from -2.4 to 2.4.

The cart velocity v

The pole angle θ with respect to the vertical from -12 to 12 degrees (from -0.21 to 0.21 in radians)

The pole angular velocity ω. This is the rate of change of θ.

In [1]:
import argparse
import gymnasium as gym
import numpy as np
np.random.seed(42)
from itertools import count
from random import random

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

from pendulum import PendulumEnv

In [2]:
#env = gym.make('CartPole-v1', render_mode="human")
env = PendulumEnv(render_mode='human')
#print(env._max_episode_steps)

In [3]:
#floats between -2.0 and 2.0
import random
def select_action_random(state):
    rand = random.uniform(-2.0, 2.0)
    return [rand]
    

def goodness_score(select_action, num_episodes=10):
    
    num_steps = 500
    ts = []
    #for episode in range(1):
    for episode in range(num_episodes):
#         state = env.reset(options={"x_init":np.pi, "y_init":1.0})[0]
        state = env.reset()[0]
        print("observation state is ", state)
        for t in range(1, num_steps+1):
            #print(state)
            action = select_action(state)
            #print("action is ", action)
            state, reward, done, truncated, info = env.step(action)
            #print("state is ", state)
            if done:
                #print("step is ", t)
                break
        ts.append(t)
    score = sum(ts) / (len(ts)*num_steps)
    return score

print(goodness_score(select_action_random))

observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observation state is  [1. 0. 0.]
observatio

In [4]:
class PolicyNN(nn.Module):
    def __init__(self):
        super(PolicyNN, self).__init__()
        self.fc = nn.Linear(3,1)

    def forward(self, x):
        x = self.fc(x)
        return x

def select_action_from_policy(model, state):
    state = torch.from_numpy(state).float().unsqueeze(0)
    y = model(state)
    #y = y.detach().numpy()
    return y

def select_action_from_policy_best(model, state):
    state = torch.from_numpy(state).float().unsqueeze(0)
    probs = model(state)
    if probs[0][0] > probs[0][1]:
        return 0
    else:
        return 1

In [8]:
model_untrained = PolicyNN()

# print(
#     goodness_score(lambda state: select_action_from_policy(model_untrained, state[0])),
#     goodness_score(lambda state: select_action_from_policy_best(model_untrained, state))
# )

In [12]:
model = PolicyNN()
optimizer = optim.Adam(model.parameters(), lr=0.01)

def train_wont_work(num_episodes=100):
    num_steps = 500
    for episode in range(num_episodes):
        state = env.reset()[0]
        for t in range(1, num_steps+1):
            action = select_action_from_policy(model, state)
            print(action)
            state, _, done, _, info = env.step(action[0])
            if done:
                break
        loss = 1.0 - t / num_steps
        # this doesn't actually work, because
        # the loss function is not an explicit
        # function of the model's output; it's
        # a function of book keeping variables
        optimizer.zero_grad()
        loss.backward() # AttributeError: 'float' object has no attribute 'backward'
        optimizer.step()

def train_simple(num_episodes=10):
    num_steps = 10000
    ts = []
    for episode in range(num_episodes):
        state = env.reset()[0]
        probs = []
        for t in range(1, num_steps+1):
            action = select_action_from_policy(model, state)
            probs.append(action[0])
            #state, _, done, _ = env.step(action)
            state, reward, done, truncated, info = env.step(action[0].detach().numpy())
            if done:
                break
        loss = 0
        for i, prob in enumerate(probs):
#             loss += -1 * (t - i) * prob
             loss += -1 * torch.abs(prob - reward)
        print(episode, t, loss.item())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        ts.append(t)
        if len(ts) > 10 and sum(ts[-10:])/10.0 >= num_steps * 0.95:
            print('Stopping training, looks good...')
            return

train_simple()
#train_wont_work()

0 21 -41.483856201171875
1 21 -33.21754837036133
2 22 -40.19142150878906
3 23 -46.6368522644043
4 24 -51.639686584472656
5 25 -54.00933074951172
6 26 -52.32860565185547
7 27 -45.175716400146484
8 30 -67.6634521484375
9 33 -63.42487335205078


In [None]:
print(
    goodness_score(lambda state: select_action_from_policy(model, state)[0]),
    goodness_score(lambda state: select_action_from_policy_best(model, state))
)