In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import time
import torch
from feedforward import DQNAgent
import pylab as plt
# %matplotlib inline
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
import hockey.hockey_env as h_env
print("hi")

# https://github.com/Howuhh/prioritized_experience_replay/tree/main
# https://github.com/quantumiracle/Popular-RL-Algorithms/blob/master/dqn_multistep.py#L212

## Helper Functions

In [2]:
def running_mean(x, N):
    cumsum = np.cumsum(np.insert(x, 0, 0)) 
    return (cumsum[N:] - cumsum[:-N]) / float(N)

In [3]:
class DiscreteActionWrapper(gym.ActionWrapper):
    def __init__(self, env: gym.Env, bins = 5):
        """A wrapper for converting a 1D continuous actions into discrete ones.
        Args:
            env: The environment to apply the wrapper
            bins: number of discrete actions
        """
        assert isinstance(env.action_space, spaces.Box)
        super().__init__(env)
        self.bins = bins
        self.orig_action_space = env.action_space
        self.action_space = spaces.Discrete(self.bins)

    def action(self, action):
        """ discrete actions from low to high in 'bins'
        Args:
            action: The discrete action
        Returns:
            continuous action
        """
        return self.orig_action_space.low + action/(self.bins-1.0)*(self.orig_action_space.high-self.orig_action_space.low)  

## Test in Env

In [None]:
# env_name = 'Pendulum-v1'
env_name = 'CartPole-v0'
env = gym.make(env_name)

# env = h_env.HockeyEnv()

if isinstance(env.action_space, spaces.Box):
    print("Discretizing actions")
    env = DiscreteActionWrapper(env,5)

ac_space = env.action_space
o_space = env.observation_space
print(ac_space)
print(o_space)

In [5]:
# env_ = gym.make(env_name, render_mode="human")
# observation = env_.reset()

# # Run for a few steps
# for _ in range(50):  # Adjust steps as needed
#     env_.render()  # Render the environment
#     action = env_.action_space.sample()  # Choose a random action (0 or 1)
#     observation, reward, done, _, _ = env_.step(action)  # Take a step
    
#     if done:
#         observation = env_.reset()  # Reset if the episode ends

# env_.close()  # Close the environment

Train the agent!

In [None]:
q_agent = DQNAgent(o_space, ac_space, eps=0.2, update_Qt_after=20, PrioritizedMemory=True, n_multi_step=3, double=True)

episode_rewards = []
cum_mean_episode_rewards = []
losses = []
max_episodes = 10000
max_steps = 500 
printevery = 500
num_stored = 0
for i in range(max_episodes):
    # print(f"Starting episode {i+1}")
    ob, _info = env.reset()
    total_reward = 0
    
    for t in range(max_steps):   
        a = q_agent.act(ob)
        (ob_new, reward, done, trunc, _info) = env.step(a)

        total_reward += reward
        q_agent.store_transition((ob, a, reward, ob_new, done, i, t))            
        num_stored += 1
        ob=ob_new        

        if done: 
            break

    # print(f"Episode {i+1} ended after {t+1} steps. Episode reward = {total_reward}")

    episode_rewards.append(total_reward)
    cum_mean_episode_rewards.append(np.mean(episode_rewards[-printevery:]))
    losses.append(np.mean(q_agent.train(1)))
    
    if((i+1)%printevery==0):
        print(f"{i+1} episodes completed: Mean cumulative reward: {np.mean(episode_rewards[-printevery:])}")

# Evaluate

In [None]:
env_eval = gym.make(env_name, render_mode="human")
# if isinstance(env.action_space, spaces.Box):
#     env_eval = DiscreteActionWrapper(env_eval,5)


test_stats = []
episodes=50
env_ = env    # without rendering
# env_ = env_eval # with rendering

for i in range(episodes):
    total_reward = 0
    ob, _info = env_.reset()
    for t in range(max_steps):
        done = False        
        a = q_agent.act(ob, eps=0.0)
        (ob_new, reward, done, trunc, _info) = env_.step(a)
        total_reward+= reward
        ob=ob_new        
        if done: break    
    test_stats.append([i,total_reward,t+1])        

In [None]:
test_stats_np = np.array(test_stats)
print(np.mean(test_stats_np[:,1]), "+-", np.std(test_stats_np[:,1]))

In [None]:
# env_name = 'Acrobot-v1'
# env_name = 'MountainCar-v0'
# env_name = 'LunarLander-v2'