In [227]:
import gymnasium as gym
import torch as t
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from copy import deepcopy
import torch.nn.functional as F
import time

In [229]:
device="cpu"

def f32t(data, device=device):
    return t.tensor(data, device=device, dtype=t.float32)

def int64t(data, device=device):
    return t.tensor(data, device=device, dtype=t.int64)

def sampleQ(QValues):
    total = t.sum(t.cat(QValues))
    action_probs = f32t([abs(i.item()/total) for i in QValues])
    # print(action_probs)
    action_index = t.distributions.Categorical(action_probs).sample()
    # print(action_index)
    return QValues[action_index], action_index.item()

def maxQ(QValues, exploration_rate=0.2):
    if np.random.random() <= exploration_rate:
        action_index = np.random.randint(0,2)
        return QValues[action_index], action_index
    elif QValues[0].item() > QValues[1].item():
        return QValues[0], 0
    else:
        return QValues[1], 1


def Calculate_Q_Labels(actions_R, total_reward, discount=0.99, mature=False):
    Q_Labels = []
    for i in actions_R: 
        Q_Label = i + (total_reward - i) * discount
        if mature and Q_Label < i:
            Q_Labels.append(i)
        else:
            Q_Labels.append(Q_Label)
    return Q_Labels

class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.network = nn.Sequential(
        nn.Linear(6, 128),
        nn.ReLU(),
        nn.Linear(128,1)
        )
        self.device = device
    
    def forward(self, action, state):
        return self.network(t.cat([F.one_hot(int64t(action), num_classes=2), f32t(state)]))

In [230]:
q_net = Model().to(device)
optimizer = t.optim.Adam(q_net.parameters(), lr=0.1)

In [240]:
optimizer = t.optim.Adam(q_net.parameters(), lr=0.01)
q_net.to(device)

def trainQ(optimizer, episode_reward, actions_R, actions_Q):
    predictions = t.stack(actions_Q)
    labels = Calculate_Q_Labels(actions_R, episode_reward, mature=False, discount=0.99)
    labels = f32t(labels).unsqueeze(dim=1)
    loss = F.mse_loss(predictions, labels)
    
    loss.backward()
    optimizer.step()
    
    optimizer.zero_grad()
    return loss.item()


env = gym.make("CartPole-v1", render_mode="human")
state, _ = env.reset()
env._max_episode_steps = 8000
env.render()

episode_reward = 0
episodes = 1
rewards_per_episode = np.empty(episodes)
loss_per_episode = np.empty(episodes)
i = 0
actions_Q = []
actions_R = []

start_time = time.localtime(time.time())
with t.no_grad():
    while i < episodes:
        Q_Values = [q_net(action, state) for action in range(env.action_space.n)]
        chosen_Q, action = maxQ(Q_Values, exploration_rate=0.0)
        state, reward, terminated, truncated, _ = env.step(action)
        episode_reward = episode_reward + reward
        actions_Q.append(chosen_Q)
        actions_R.append(episode_reward)
        print(episode_reward, end="\r")

        if terminated or truncated:
            # loss_per_episode[i] = trainQ(optimizer, episode_reward, actions_R, actions_Q)
            state, _ = env.reset()
            env._max_episode_steps = 1000
            rewards_per_episode[i] = episode_reward
            print(episode_reward, i, end="\r")
            episode_reward = 0
            actions_Q = []
            actions_R = []
            i = i + 1
    
end_time = time.localtime(time.time())
print("Start Time: " + time.strftime("%H:%M:%S", start_time), end="---")
print("End Time: " + time.strftime("%H:%M:%S", end_time))
print(f"Duration = {time.mktime(end_time) - time.mktime(start_time)} seconds")

env.close()
print("Average ", np.average(rewards_per_episode))
# plt.plot(rewards_per_episode)
# plt.show()
# plt.plot(loss_per_episode)
# plt.show()
# plt.close()

Start Time: 19:46:38---End Time: 19:46:39
Duration = 1.0 seconds
Average  10.0


In [64]:
t.cat([F.one_hot(int64t(1), num_classes=2), f32t([45,12])])

tensor([ 0.,  1., 45., 12.])

In [6]:
F.one_hot(int64t(1), num_classes=2)

tensor([0, 1])

In [53]:
uint8t(1)

tensor(1, dtype=torch.uint8)

In [38]:
t.tensor(1)

tensor(1)

In [17]:
for i in env.action_space.n:
    print(i)

TypeError: 'int' object is not iterable

In [15]:
listw = []

for i in range(6):
    tensor, _ = sampleQ([f32t([1]),f32t([4])]) 
    listw.append(tensor)

In [16]:
listw

[tensor([4.]),
 tensor([1.]),
 tensor([1.]),
 tensor([4.]),
 tensor([4.]),
 tensor([4.])]

In [19]:
main = t.stack(listw)

In [20]:
main

tensor([[4.],
        [1.],
        [1.],
        [4.],
        [4.],
        [4.]])

In [21]:
label = t.clone(main)

In [23]:
F.mse_loss(main, label)

tensor(0.)

In [43]:
def trainQ(optimizer, episode_reward, actions_R, actions_Q):
    # predictions = t.stack(actions_Q)
    labels = [ i + (episode_reward-i)*0.99 for i in actions_R]
    print(f32t(labels).unsqueeze(dim=1))

In [45]:
trainQ(5, 10, [5,7,10], [1,2])

tensor([[ 9.9500],
        [ 9.9700],
        [10.0000]])


In [106]:
for i in range(10):
    print(maxQ([f32t(5), f32t(7)]))

(tensor(5.), 0)
(tensor(5.), 0)
(tensor(5.), 0)
(tensor(7.), 1)
(tensor(7.), 1)
(tensor(7.), 1)
(tensor(7.), 1)
(tensor(7.), 1)
(tensor(7.), 1)
(tensor(7.), 1)


In [79]:
np.random.random()

0.1962596718405013

In [90]:
np.random.randint(0,2)

0

In [117]:
env._max_episode_steps

500