In [3]:
import pygame
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import math
import random

pygame 2.5.2 (SDL 2.28.3, Python 3.11.9)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [4]:

class IntelliGoal(gym.Env):
    metadata = {'render.modes': ['human']}

    def __init__(s):
        super(IntelliGoal, s).__init__()
        
        s.screen_width = 1240
        s.screen_height = 800
        s.border_width = 10
        s.screen = None
        
        s.goal_radius = 20
        s.player_radius = 10
        
        s.player_speed = 1
        
        
        # State information
        s.player_x = 0
        s.player_y = 0
        s.goal_x = 0
        s.goal_y = 0
        s.state = np.array([s.player_x, s.player_y, s.goal_x, s.goal_y])
        s.prev_distance = math.sqrt((s.player_x - s.goal_x) ** 2 + (s.player_y - s.goal_y) ** 2)
        s.reward = 0
        
        s.done = False
        s.trucated = False
        s.truncation_step_limit = 1000
        s.truncation_step_counter = 0
        s.goal_spawn_range = 300
        
        
        # State and Action spaces
        s.action_space = spaces.Discrete(5) # 0: No movement, 1: Up, 2: Down, 3: Left, 4: Right
        # observation space = [player_x, player_y, goal_x, goal_y]
        s.observation_space = spaces.Box(
            low=np.array([0, 0, 0, 0]), 
            high=np.array([s.screen_width, s.screen_height, s.screen_width, s.screen_height]), 
            dtype=np.int32
        )
    
    def seed(s, seed=None):
        if seed is not None and (seed < 0 or seed >= 2**32):
            raise ValueError("Seed must be between 0 and 2**32 - 1")
        s.np_random, seed = gym.utils.seeding.np_random(seed)
        return [seed]
        
    def reset(s, seed=0):
        s.seed(seed)
        
        s.player_x = s.screen_width // 2
        s.player_y = s.screen_height // 2
        
        s.goal_x, s.goal_y = s.respawn_goal()
        
        s.reward = 0
        s.state = np.array([s.player_x, s.player_y, s.goal_x, s.goal_y])
        
        
        s.done = False
        s.trucated = False
        s.truncation_step_counter = 0
        
        return s.state, {}
    
    def respawn_goal(s):
        s.goal_x = s.player_x + random.randint(-s.goal_spawn_range, s.goal_spawn_range)
        s.goal_y = s.player_y + random.randint(-s.goal_spawn_range, s.goal_spawn_range)
        
        # clip the goal to the screen
        offset = s.border_width + s.goal_radius
        s.goal_x = min( max(s.goal_x, offset), s.screen_width - offset)
        s.goal_y = min( max(s.goal_y, offset), s.screen_height - offset)
        
        pygame.time.wait(300)
        
        s.prev_distance = math.sqrt((s.player_x - s.goal_x) ** 2 + (s.player_y - s.goal_y) ** 2)
        
        return s.goal_x, s.goal_y
        
    def step(s, action):
        #################
        #do more here?
        #################
        
        s.do_action(action)
        s.calculate_reward()
        
        s.truncation_step_counter += 1
        if s.truncation_step_counter >= s.truncation_step_limit:
            s.reward -= 0.4
            s.trucated = True
            s.done = True

        if s.reward < -1:
            s.reward = -1
            s.trucated = True
        
        if s.reward > 1:
            s.reward = 1
        
        
        return s.state, s.reward, s.done, s.trucated, {}
        
   
    def do_action(s, action):
        if not s.action_space.contains(action):
            print("Invalid Action")
        
        s.prev_distance = math.sqrt((s.player_x - s.goal_x) ** 2 + (s.player_y - s.goal_y) ** 2)
        
        s.player_x += (action == 4)*s.player_speed - (action == 3)*s.player_speed 
        s.player_y += (action == 2)*s.player_speed - (action == 1)*s.player_speed
        
    def calculate_reward(s):
        # Distance between player and goal
        distance = math.sqrt((s.player_x - s.goal_x) ** 2 + (s.player_y - s.goal_y) ** 2)

        # Check if the red dot touches the border
        if s.player_x not in range(s.border_width, s.screen_width - s.border_width) or s.player_y not in range(s.border_width, s.screen_height - s.border_width):
            s.player_x, s.player_y = s.screen_width // 2, s.screen_height // 2

        # Check if the goal is reached within a certain range
        if distance < s.goal_radius + s.player_radius:
            s.goal_x, s.goal_y = s.respawn_goal()
            s.reward += 1
            s.done = True  
    
        s.state = np.array([s.player_x, s.player_y, s.goal_x, s.goal_y])
        
        # Reward the agent for getting closer to the goal
        if s.prev_distance > distance:
            s.reward += 0.0005
        if s.prev_distance < distance:
            s.reward -= 0.0015
        
        # Penalize the agent for taking too long
        s.reward -= 0.00008
     
    
    def keyboard_input(s):
        keys = pygame.key.get_pressed()
        # 1: Up, 2: Down, 3: Left, 4: Right
        if keys[pygame.K_UP]:
            return 1
        if keys[pygame.K_DOWN]:
            return 2
        if keys[pygame.K_LEFT]:
            return 3
        if keys[pygame.K_RIGHT]:
            return 4
        return 0
        
    
    def render(s, mode = 'human'):
        if s.screen == None:
            pygame.init()
            s.screen = pygame.display.set_mode((s.screen_width, s.screen_height))
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
        s.player_x, s.player_y, s.goal_x, s.goal_y = s.state
        
        s.screen.fill((0, 0, 0))
        RED = (255, 0, 0)
        BLUE = (0, 0, 255)
        BORDER_COLOR = (255, 255, 0)
        
        # Draw the border
        pygame.draw.rect(s.screen, BORDER_COLOR, (0, 0, s.screen_width, s.border_width))
        pygame.draw.rect(s.screen, BORDER_COLOR, (0, 0, s.border_width, s.screen_height))
        pygame.draw.rect(s.screen, BORDER_COLOR, (0, s.screen_height - s.border_width, s.screen_width, s.border_width))
        pygame.draw.rect(s.screen, BORDER_COLOR, (s.screen_width - s.border_width, 0, s.border_width, s.screen_height))

        # Draw the red dot
        pygame.draw.circle(s.screen, RED, (s.player_x, s.player_y), 10)

        # Draw the blue dot
        pygame.draw.circle(s.screen, BLUE, (s.goal_x, s.goal_y), s.goal_radius)
        
        #Display Reward on pygame screen
        font = pygame.font.Font(None, 36)
        text = font.render("Reward: " + str(round(s.reward, 5)), True, (255, 255, 255))
        s.screen.blit(text, (10, 10))
        
        pygame.display.update()
    
    def close(s):
        pygame.quit()
        
        

In [5]:
# Test Env with keyboard input actions
test_env = IntelliGoal()
test_env.reset(0)
done = False
i = 1

while True:
    try:
        test_env.render()
        action = test_env.keyboard_input()
        state, reward, done, truncated, _i = test_env.step(action)
        if done or truncated:
            print("Reward at step - ", i, " : ", round(reward, 5))
            i += 1
            test_env.reset(0)
    except:
        print("Environment Closed")
        break

# while True:
#     test_env.render()
#     action = test_env.keyboard_input()
#     state, reward, done, truncated, _i = test_env.step(action)
#     if done or truncated:
#         print("Reward at step - ", i, " : ", round(reward, 5))
#         i += 1
#         test_env.reset(0)   


Reward at step -  1  :  -0.48
Reward at step -  2  :  -0.48
Reward at step -  3  :  -0.48
Reward at step -  4  :  -0.48
Reward at step -  5  :  -0.48
Reward at step -  6  :  -0.48
Reward at step -  7  :  -0.48
Reward at step -  8  :  -0.48
Reward at step -  9  :  -0.48
Reward at step -  10  :  -0.48
Reward at step -  11  :  -0.48
Reward at step -  12  :  -0.48
Reward at step -  13  :  -0.48
Reward at step -  14  :  -0.48
Reward at step -  15  :  -0.48
Reward at step -  16  :  -0.48
Reward at step -  17  :  -0.48
Reward at step -  18  :  -0.48
Reward at step -  19  :  -0.48
Reward at step -  20  :  -0.48
Reward at step -  21  :  -0.48
Environment Closed


# DQN Learning Algorithm

In [6]:
from stable_baselines3 import DQN
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.vec_env import SubprocVecEnv
from stable_baselines3.common.evaluation import evaluate_policy
import os

In [7]:
import torch
device = "cpu"
if torch.cuda.is_available():
    device = "cuda"
    print("Using GPU - ", torch.cuda.get_device_name(0))

Using GPU -  NVIDIA RTX A500 Laptop GPU


In [8]:
env = IntelliGoal()
check_env(env)

In [9]:
def make_env():
    env = IntelliGoal()
    return env

num_envs = 2
take_logs = False
device = "cuda"

if take_logs:
    log_dir = "./logs/ppo_intelligoal_a3_explore-more"
else:
    log_dir = None
      
if num_envs == 1:
    env = DummyVecEnv([make_env] * num_envs)
else:
    env = SubprocVecEnv([make_env] * num_envs)


# model = PPO("MlpPolicy", env, verbose=1, device=device)

# Define DQN model with specific parameters
model = DQN("MlpPolicy", env, verbose=1, 
            buffer_size=10000, 
            learning_rate=0.0005, 
            batch_size=32, 
            tau=1.0, 
            gamma=0.99, 
            train_freq=4, 
            gradient_steps=1, 
            exploration_fraction=0.1, 
            exploration_final_eps=0.01, 
            target_update_interval=1000, 
            max_grad_norm=10, 
            tensorboard_log="./logs/dqn_intelligoal_tensorboard/", 
            device=device)

Using cuda device


In [13]:
save_path = "./models/dqn_intelligoal_a2"

In [12]:
# Start New training session
save_path = "./models/intelli_goal_a2"

if os.path.exists(save_path +".zip"):
    #append number to the file name
    i = save_path[-1]
    i = int(i) + 1
    save_path = save_path[:-1] + str(i)
    print("Model already exists. Saving new one as - ", save_path)
    
try:
    model.learn(total_timesteps=9999999999)
    model.save(save_path)
except KeyboardInterrupt:
    print("Training Interrupted")
    model.save(save_path)

Model already exists. Saving new one as -  ./models/intelli_goal_a3
Logging to ./logs/dqn_intelligoal_tensorboard/DQN_6
----------------------------------
| rollout/            |          |
|    exploration_rate | 1        |
| time/               |          |
|    episodes         | 4        |
|    fps              | 1019     |
|    time_elapsed     | 3        |
|    total_timesteps  | 4000     |
| train/              |          |
|    learning_rate    | 0.0005   |
|    loss             | 0.0185   |
|    n_updates        | 68780    |
----------------------------------
----------------------------------
| rollout/            |          |
|    exploration_rate | 1        |
| time/               |          |
|    episodes         | 8        |
|    fps              | 1086     |
|    time_elapsed     | 7        |
|    total_timesteps  | 8000     |
| train/              |          |
|    learning_rate    | 0.0005   |
|    loss             | 0.0116   |
|    n_updates        | 69280    |
-----

In [None]:
def make_env():
    env = IntelliGoal()
    return env

num_envs = 2
take_logs = False
device = "cuda"

if take_logs:
    log_dir = "./logs/ppo_intelligoal_a3_explore-more"
else:
    log_dir = None
      
if num_envs == 1:
    env = DummyVecEnv([make_env] * num_envs)
else:
    env = SubprocVecEnv([make_env] * num_envs)

In [14]:
# Load and resume training from saved model
try:
    model.load(path = save_path, env = env)
    model.learn(total_timesteps=100000000, reset_num_timesteps=False, progress_bar=False)
    model.save(save_path)
except KeyboardInterrupt:
    print("Training Interrupted")
    print("Saving Model")
    model.save(save_path)

FileNotFoundError: [Errno 2] No such file or directory: 'models\\dqn_intelligoal_a2.zip'

In [None]:
# Evaluate the model
mean_reward, std_reward = evaluate_policy(model, model.get_env(), n_eval_episodes=1)
print(f"Mean Reward: {mean_reward}, Std Reward: {std_reward}")


Mean Reward: 0.48489999771118164, Std Reward: 0.0


In [None]:
from stable_baselines3.common.vec_env import DummyVecEnv
save_path = "./models/dqn_intelligoal_a1_works-100"

In [None]:

def make_env():
    def _init():
        env = IntelliGoal()
        env.render_mode = 'human'
        return env
    return _init

# Wrap your environment in a DummyVecEnv for compatibility
env = DummyVecEnv([make_env()])

model = DQN.load(save_path, env=env, device=device)

def run_model():
    obs = env.reset()  # This will now be in the correct batch format
    done = False
    while True:
        action, _states = model.predict(obs, deterministic=True)
        obs, rewards, done, info = env.step(action)
        env.render()
        if done:
            print("Episode Finished with Reward - ", rewards[0])
            obs = env.reset()

try:
    run_model()
except:
    print("Environment Closed")


Episode Finished with Reward -  -1.0
Episode Finished with Reward -  -1.0
Episode Finished with Reward -  -0.775
Episode Finished with Reward -  -1.0
Episode Finished with Reward -  -1.0
Episode Finished with Reward -  -1.0
Episode Finished with Reward -  -1.0
Episode Finished with Reward -  1.0
Episode Finished with Reward -  -1.0
Episode Finished with Reward -  -1.0
Episode Finished with Reward -  -1.0
Episode Finished with Reward -  -1.0
Environment Closed


In [None]:
from stable_baselines3.common.vec_env import DummyVecEnv
