# BASELINE
Logan Wong

law3082

In [1]:
import gymnasium as gym
import ale_py
import numpy as np
from stable_baselines3 import DQN	
import matplotlib.pyplot as plt
from collections import deque
import torch

# For debugging
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.callbacks import CheckpointCallback
from stable_baselines3.common.callbacks import BaseCallback
import time

# Action masking
from gymnasium import ActionWrapper

## Test to make sure no errors

In [2]:
# env = gym.make("ALE/Superman-v5", render_mode=None)

# action_space = env.action_space
# obs_space = env.observation_space
# print("Action space:", action_space)
# print("Number of actions:", action_space.n)
# action_meanings = env.unwrapped.get_action_meanings()
# print("Action meanings:", action_meanings)
# print("\nObservation space:", obs_space)

# obs, _ = env.reset()
# print("Observation shape:", obs.shape)
# env.close()

In [3]:
# # action_meanings
# action_dict = {}
# for i in range(len(action_meanings)):
#     action_dict[i] = action_meanings[i]

# print(action_dict)

In [4]:
# for a in list(action_dict.keys()):
#     print(f"{a}: {action_dict[a]}")

In [5]:
# Action meanings: ['NOOP', 'FIRE', 'UP', 'RIGHT', 'LEFT', 'DOWN', 'UPRIGHT', 'UPLEFT', 'DOWNRIGHT', 'DOWNLEFT', 'UPFIRE', 'RIGHTFIRE', 'LEFTFIRE', 'DOWNFIRE', 'UPRIGHTFIRE', 'UPLEFTFIRE', 'DOWNRIGHTFIRE', 'DOWNLEFTFIRE']

# {0: 'NOOP', 1: 'FIRE', 2: 'UP', 3: 'RIGHT', 4: 'LEFT', 5: 'DOWN', 6: 'UPRIGHT', 7: 'UPLEFT', 8: 'DOWNRIGHT', 9: 'DOWNLEFT', 10: 'UPFIRE', 11: 'RIGHTFIRE', 12: 'LEFTFIRE', 13: 'DOWNFIRE', 14: 'UPRIGHTFIRE', 15: 'UPLEFTFIRE', 16: 'DOWNRIGHTFIRE', 17: 'DOWNLEFTFIRE'}

# # ACTIONS
# 0: NOOP
# 1: FIRE

# 2: UP
# 3: RIGHT
# 4: LEFT
# 5: DOWN
# 6: UPRIGHT
# 7: UPLEFT
# 8: DOWNRIGHT
# 9: DOWNLEFT

# 10: UPFIRE
# 11: RIGHTFIRE
# 12: LEFTFIRE
# 13: DOWNFIRE

# 14: UPRIGHTFIRE
# 15: UPLEFTFIRE
# 16: DOWNRIGHTFIRE
# 17: DOWNLEFTFIRE

## Create environment

In [6]:
class SupermanActionReducer(ActionWrapper):
    def __init__(self, env, allowed_actions=None):
        super().__init__(env)
        
        if allowed_actions is None:
            # Basic movement only: Cardinal directions & diagonal directions
            allowed_actions = [2,3,4,5, 6,7,8,9]
            
            # Cardinal directions, diagonal directions, AND x-ray vision
            # allowed_actions = [2,3,4,5, 6,7,8,9, 10,11,12,13]
        
        self.allowed_actions = allowed_actions
        self.action_space = gym.spaces.Discrete(len(allowed_actions))
        
    def action(self, action):
        # Map the reduced action index back to the original action
        return self.allowed_actions[action]

In [7]:
def convert(seconds):
    seconds = seconds % (24 * 3600)
    hour = seconds // 3600
    seconds %= 3600
    minutes = seconds // 60
    seconds %= 60
    
    return "%d:%02d:%02d" % (hour, minutes, seconds)

In [8]:
# Create the Superman environment
# And enable visual rendering so humans can SEE it
# RENDER_MODE = "human"
RENDER_MODE = None

env = gym.make("ALE/Superman-v5", render_mode=None)
env = SupermanActionReducer(env)

# Wrap the environment with Monitor to print out progress
# env = Monitor(env)

In [9]:
# Check environment type
print("Environment type:", type(env))
print("Is vectorized?", hasattr(env, 'num_envs'))

if hasattr(env, 'num_envs'):
    print("Number of parallel environments:", env.num_envs)

Environment type: <class '__main__.SupermanActionReducer'>
Is vectorized? False


## Init DQN Model

In [10]:
# Check if GPU is available
print("CUDA available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("GPU device:", torch.cuda.get_device_name(0))


CUDA available: False


In [11]:
# Policy: I tell it to use a CNN
# env: Pass in the environment
# Learing rat: Alpha = 0.0001
model = DQN(
    policy="CnnPolicy",
    env=env,
    learning_rate= 0.0001,
    # buffer_size=100000,
    buffer_size=20000,
    batch_size=32,
    target_update_interval=1000,
    verbose=1,
    device="auto"
)

print("DQN model created successfully!")

# Output:
# Using cpu device
# Wrapping the env with a `Monitor` wrapper
# Wrapping the env in a DummyVecEnv.
# Wrapping the env in a VecTransposeImage.
# C:\Users\Logan\anaconda3\envs\superman_env\lib\site-packages\stable_baselines3\common\vec_env\base_vec_env.py:78: UserWarning: The `render_mode` attribute is not defined in your environment. It will be set to None.
#   warnings.warn("The `render_mode` attribute is not defined in your environment. It will be set to None.")
# C:\Users\Logan\anaconda3\envs\superman_env\lib\site-packages\stable_baselines3\common\buffers.py:242: UserWarning: This system does not have apparently enough memory to store the complete replay buffer 20.16GB > 4.78GB
#   warnings.warn(

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.




DQN model created successfully!


## Train the model

In [12]:
class ProgressCallback(BaseCallback):
    def __init__(self, print_freq=1000):
        super().__init__()
        self.print_freq = print_freq
        self.last_print = 0
        self.start_time = time.time()
    
    def _on_step(self) -> bool:
        if self.num_timesteps - self.last_print >= self.print_freq:
            # Print progress every print_freq steps
            elapsed = time.time() - self.start_time
            steps_per_sec = self.num_timesteps / elapsed
            
            # Get latest episode info if available
            episode_info = ""
            if hasattr(self.model, 'ep_info_buffer') and len(self.model.ep_info_buffer) > 0:
                latest = self.model.ep_info_buffer[-1]
                episode_info = f" | Latest episode: {latest['l']} steps, {latest['r']} reward"
            
            print(f"Step {self.num_timesteps} | Speed: {steps_per_sec:.1f} steps/sec{episode_info}")
            self.last_print = self.num_timesteps
            
        return True

In [13]:
# total_timesteps = 10000000    # 10M
# total_timesteps =  2000000    # 2M
# total_timesteps =  1000000    # 1M
# total_timesteps =   100000    # 100K
# total_timesteps =    10000    # 10K
total_timesteps =     5000    # 5K

# ALSO: Time how long it takes
print("Training phase started.")
start_time = time.time()
# model.learn(total_timesteps=total_timesteps)
model.learn(total_timesteps=total_timesteps, callback=ProgressCallback(print_freq=1000))
end_time = time.time()
training_duration = end_time - start_time
time_in_minutes_and_seconds = convert(training_duration)

print("Training phase completed!")
print(f"Time taken: {time_in_minutes_and_seconds}")
print(f"Speed: {total_timesteps/training_duration:.2f} steps/second")

Step 1000 | Speed: 36.8 steps/sec
Step 2000 | Speed: 33.7 steps/sec
Step 3000 | Speed: 31.1 steps/sec
Step 4000 | Speed: 28.9 steps/sec
Step 5000 | Speed: 27.8 steps/sec
Step 6000 | Speed: 27.0 steps/sec
Step 7000 | Speed: 26.5 steps/sec
Step 8000 | Speed: 25.9 steps/sec
Step 9000 | Speed: 25.6 steps/sec
Step 10000 | Speed: 24.4 steps/sec
Training phase completed!
Time taken: 0:06:49
Speed: 24.41 steps/second


In [14]:
env.close()

In [15]:
# Save the policy network
model.policy.save("dqn_superman_policy")
print("Policy network saved successfully")

Policy network saved successfully


## Plot episode rewards from training

In [16]:
print(type(model.ep_info_buffer))

<class 'collections.deque'>


In [17]:
print(f"Number of episodes recorded: {len(model.ep_info_buffer)}")

Number of episodes recorded: 0


In [18]:
print("First 5 episodes in buffer:")
for i, episode_info in enumerate(list(model.ep_info_buffer)[:5]):
    print(f"Episode {i}: {episode_info}")


First 5 episodes in buffer:


In [19]:
# Extract rewards and episode lengths from the buffer
if len(model.ep_info_buffer) > 0:
    rewards = []
    episode_lengths = []
    
    for episode_info in model.ep_info_buffer:
        # get episode's reward
        rewards.append(episode_info['r'])  
        # get episode's length aka number of steps
        episode_lengths.append(episode_info['l'])  
    
    # Create the learning curve plot
    plt.figure(figsize=(12, 5))
    
    # Plot 1: Reward over episodes
    plt.subplot(1, 2, 1)
    plt.plot(rewards)
    plt.xlabel('Training Episode')
    plt.ylabel('Episode Reward')
    plt.title('DQN Learning Curve\n(Higher is Better)')
    plt.grid(True)
    
    # Plot 2: Episode length over episodes  
    plt.subplot(1, 2, 2)
    plt.plot(episode_lengths)
    plt.xlabel('Training Episode')
    plt.ylabel('Episode Length (Steps)')
    plt.title('Episode Duration\n(Longer survival may indicate learning)')
    plt.grid(True)
    
    plt.tight_layout()
    plt.show()
    
    # Print some statistics
    print(f"\nTraining Summary:")
    print(f"Total training episodes: {len(rewards)}")
    print(f"Final average reward: {sum(rewards[-10:])/10:.2f} (last 10 episodes)")
    print(f"Best episode reward: {max(rewards):.2f}")
    print(f"Average episode length: {sum(episode_lengths)/len(episode_lengths):.1f} steps")
    
else:
    print("No episode data found in the buffer.")

No episode data found in the buffer.
