In [59]:
import gymnasium as gym
import numpy as np
import time
from q_learning import QL
from tqdm import tqdm

import torch
import torch.nn.functional as F
from pclib.nn.models import FCClassifierInv

In [53]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [48]:
env = gym.make('FrozenLake-v1', is_slippery=False)
ql = QL(env, 0.001, 0.9, epsilon=0.1)
ql.train(10000)

                                                                           

In [139]:
terminated, truncated = False, False
state = env.reset()[0]
total_reward = 0
while not terminated and not truncated:
    action = env.action_space.sample()
    state, reward, terminated, truncated, _ = env.step(action)
    total_reward += reward - 0.01
    if terminated:
        total_reward -= 0.2
    state = state
print(total_reward)

-0.42000000000000004


In [50]:
# Evaluate
total_rewards = []
num = 10000
loop = tqdm(range(num), leave=False, total=num)
for i in loop:
    loop.set_description(f"Evaluating {i}/{num}")
    total_reward = 0.0
    state = env.reset()[0]
    terminated, truncated = False, False
    while not terminated and not truncated:
        action = ql.act(state)
        state, reward, terminated, truncated, _ =env.step(action)
        total_reward += reward - 0.01
        if terminated:
            total_reward -= 0.2
    total_rewards.append(reward)

print(f"mean reward: {np.array(total_rewards).mean()}")
env.close()

                                                                             

mean reward: 1.0




In [146]:
seed = 42
torch.manual_seed(seed)

model = FCClassifierInv(
    input_size=16,
    num_classes=4,
    hidden_sizes=[],
    bias=True,
    symmetric=True,
    precision_weighted=False,
    actv_fn=F.tanh,
    steps=60,
    gamma=0.1,
).to(device)

def format_obs(obs):
    return F.one_hot(torch.tensor(obs), num_classes=16).float().to(device).unsqueeze(0)

optimiser = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.1)

In [147]:
num_epochs = 1000
final_rewards = []
loop = tqdm(range(num_epochs), leave=False, total=num_epochs)
for epoch in loop:
    if epoch > 0:
        loop.set_description(f"Epoch {epoch}/{num_epochs}, Reward: {np.array(final_rewards[-10:]).mean()}")
    state = env.reset()[0]
    total_reward = 0.0
    terminated, truncated = False, False
    prev_state_value = ql.Q[state].max()
    while not terminated and not truncated:

        obs = format_obs(state)
        actions, s = model(obs)

        state, reward, terminated, truncated, _ = env.step(actions[0].argmax().item())
        state_value = ql.Q[state].max()
        total_reward += reward - 0.01
        if terminated:
            total_reward -= 0.2

        loss = model.vfe(s) * (state_value - prev_state_value)
        optimiser.zero_grad()
        loss.backward()
        optimiser.step()

        prev_state_value = state_value
    final_rewards.append(total_reward)



                                                                                             

KeyboardInterrupt: 

In [28]:
# Watch the agent play
env = gym.make('FrozenLake-v1', is_slippery=False, render_mode='human')
QL.env = env
total_rewards = []
for i in range(3):
    total_reward = 0.0
    state = env.reset()[0]
    terminated, truncated = False, False
    while not terminated and not truncated:
        action = ql.act(state)
        state, reward, terminated, truncated, _ =env.step(action)
        total_reward += reward
        env.render()
        time.sleep(0.05)
    total_rewards.append(reward)

print(f"mean reward: {np.array(total_rewards).mean()}")
env.close()

KeyboardInterrupt: 