# 1. Custom Dino Environment

## Import Dependencies

In [2]:
import base64
import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

# Environment Components
from gymnasium import Env
from gymnasium.spaces import Box, Discrete

# Selenium for automatically loading and play the game
from selenium import webdriver
from selenium.common.exceptions import WebDriverException
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from webdriver_manager.chrome import ChromeDriverManager

## DinoEnvironment Class

In [4]:
# Create Dino Game Environment
class DinoEnvironment(Env):

    def __init__(self):

        # Subclass model
        super().__init__()

        self.driver = self._create_driver()

        # Setup spaces
        low_values = np.array(
            [0, 0, 0, 6, -1, -1, -1, -1, -1, -1], dtype=np.float32)  # Initial speed is 6, while max speed is 13
        high_values = np.array(
            [150, 1, 1, 13, 600, 3, 600, 150, 50, 50], dtype=np.float32)  # Canvas dimensions are 600x150
        self.observation_space = Box(
            low=low_values, high=high_values, shape=(10,), dtype=np.float32)

        # Start jumping, Start ducking, Stop ducking, Do nothing - Ducking has been divided into two actions because the agent should also learn the correct ducking duration
        self.action_space = Discrete(4)

        self.actions_map = [
            (Keys.ARROW_UP, "key_down"),  # Start jumping
            (Keys.ARROW_DOWN, "key_down"),  # Start ducking
            (Keys.ARROW_DOWN, "key_up"),  # Stop ducking
            (Keys.ARROW_RIGHT, "key_down")  # Do nothing
        ]

        # Keep track of number of obstacles the agent has passed
        self.passed_obstacles = 0

    # Create and return an instance of the Chrome Driver
    def _create_driver(self):

        # Set options for the WebDriver
        options = Options()

        # Turn off logging to keep terminal clean
        options.add_experimental_option('excludeSwitches', ['enable-logging'])

        # Keep the browser running after the code finishes executing
        options.add_experimental_option("detach", True)

        # Create a Service instance for running the ChromeDriver executable
        service = Service(executable_path=ChromeDriverManager().install())

        # Create an instance of the Chrome WebDriver with the specified service and options - The driver object can be used to automate interactions with the Chrome browser
        driver = webdriver.Chrome(service=service, options=options)

        # Maximize the Chrome window
        driver.maximize_window()

        return driver

    # Encode the obstacle type as an integer
    def _encode_obstacle_type(self, obstacle_type):
        if obstacle_type == 'CACTUS_SMALL':
            return 0
        elif obstacle_type == 'CACTUS_LARGE':
            return 1
        elif obstacle_type == 'PTERODACTYL':
            return 2
        else:
            raise ValueError(f"Unknown obstacle type: {obstacle_type}")

    # Get obstacles that are currently on the screen
    def _get_obstacles(self):
        obstacles = self.driver.execute_script(
            "return Runner.instance_.horizon.obstacles")
        obstacle_info = []
        for obstacle in obstacles:
            obstacle_type = obstacle['typeConfig']['type']
            # Encode the obstacle type as an integer
            encoded_obstacle_type = self._encode_obstacle_type(obstacle_type)
            obstacle_x = obstacle['xPos']
            obstacle_y = obstacle['yPos']
            obstacle_width = obstacle['typeConfig']['width']
            obstacle_height = obstacle['typeConfig']['height']
            obstacle_info.append(
                (encoded_obstacle_type, obstacle_x, obstacle_y, obstacle_width, obstacle_height))
        return obstacle_info

    # Get Trex's state (Jumping, Ducking or Running/Do nothing)
    def _get_trex_info(self):
        trex = self.driver.execute_script("return Runner.instance_.tRex")
        # xpos remains the same throughout the game - don't need it
        trex_y = trex['yPos']
        trex_is_jumping = trex['jumping']
        trex_is_ducking = trex['ducking']
        return trex_y, trex_is_jumping, trex_is_ducking

    # Get current game speed
    def _get_game_speed(self):
        game_speed = self.driver.execute_script(
            "return Runner.instance_.currentSpeed")
        return game_speed

    # Get the distance between the Trex and the next obstacle
    def _get_distance_to_next_obstacle(self):
        trex_x = self.driver.execute_script(
            "return Runner.instance_.tRex.xPos")  # xpos of trex
        obstacles = self._get_obstacles()
        if obstacles:
            next_obstacle = obstacles[0]
            obstacle_x = next_obstacle[1]  # xpos of next obstacle
            distance_to_next_obstacle = obstacle_x - trex_x
        else:
            distance_to_next_obstacle = None
        return distance_to_next_obstacle

    # Check if the agent has passed an obstacle
    def _passed_obstacle(self):
        obstacles = self._get_obstacles()
        if obstacles:
            # next_obstacle: [encoded_obstacle_type, obstacle_x, obstacle_y, obstacle_width, obstacle_height]
            next_obstacle = obstacles[0]
            trex_x = self.driver.execute_script(
                "return Runner.instance_.tRex.xPos")
            obstacle_x = next_obstacle[1]  # Next obstacles xpos
            obstacle_width = next_obstacle[3]  # Next obstacles width
            return obstacle_x + obstacle_width < trex_x
        else:
            return False

    # Get and return the score for the last game played
    def _get_current_score(self):
        try:
            score = int(''.join(self.driver.execute_script(
                "return Runner.instance_.distanceMeter.digits")))
        except:
            score = 0
        return score

    # Get and return the high score for all games played in current browser session
    def _get_high_score(self):
        try:
            score = int(''.join(self.driver.execute_script(
                "return Runner.instance_.distanceMeter.highScore.slice(-5)")))  # MaxScore=99999, MaxScoreUnits=5
        except:
            score = 0
        return score

    # Capture screenshot of current game state and return the image captured for rendering
    def _get_image(self):
        # Capture a screenshot of the game canvas as a data URL - string that represents the image in base64-encoded format
        data_url = self.driver.execute_script(
            "return document.querySelector('canvas.runner-canvas').toDataURL()")

        # Remove the leading text from the data URL using string slicing and decode the remaining base64-encoded data
        LEADING_TEXT = "data:image/png;base64,"
        image_data = base64.b64decode(data_url[len(LEADING_TEXT):])

        # Convert the binary data in 'image_data' to a 1D NumPy array
        image_array = np.frombuffer(image_data, dtype=np.uint8)

        # Decode the image data and create an OpenCV image object - OpenCV Image Shape format (H, W, C) ( rows, columns, and channels )
        image = cv2.imdecode(image_array, cv2.IMREAD_COLOR)

        return image

    # Load and Reset the game environment
    def reset(self):
        try:
            # Navigate to the Chrome Dino website
            self.driver.get("chrome://dino/")

        except WebDriverException as e:
            # Ignore "ERR_INTERNET_DISCONNECTED" error thrown because this game is available offline
            if "ERR_INTERNET_DISCONNECTED" in str(e):
                pass  # Ignore the exception.
            else:
                raise e  # Handle other WebDriverExceptions

        # Avoid errors that can arise due to the 'runner-canvas' element not being present - Using WebDriverWait and EC together ensures that the code does not proceed until the required element is present
        timeout = 10
        WebDriverWait(self.driver, timeout).until(
            EC.presence_of_element_located((By.CLASS_NAME, "runner-canvas")))

        # Start game
        self.driver.find_element(By.TAG_NAME, "body").send_keys(Keys.SPACE)

        return self.get_observation()

    # Get the current state of the game and return it as the observation
    def get_observation(self):
        obstacles = self._get_obstacles()
        trex_y, trex_is_jumping, trex_is_ducking = self._get_trex_info()
        game_speed = self._get_game_speed()
        distance_to_next_obstacle = self._get_distance_to_next_obstacle()

        state = (
            trex_y,
            trex_is_jumping,
            trex_is_ducking,
            game_speed,
            distance_to_next_obstacle,
            # Unpack the tuple of the first obstacle
            *(obstacles[0] if obstacles else (None, None, None, None, None))
        )

        # Set dtype for state to float32 for consistency and compatibility with the RL algorithm
        state = np.array(state, dtype=np.float32)

        # Replace NaN values with -1
        state[np.isnan(state)] = -1

        return state

    # Check if the game is over and return True or False
    def is_game_over(self):
        # Done if either Trex crashed into an obstacle or reached max score which is 99999
        # Check if Trex crashed
        crashed = self.driver.execute_script("return Runner.instance_.crashed")

        # Get the maximum score from the game
        max_score = self.driver.execute_script(
            "return Runner.instance_.distanceMeter.maxScore")
        current_score = self._get_current_score()

        return crashed or (current_score >= max_score)

    # Calculate and return the reward for the current state of the game
    def get_reward(self, done):
        # Must maintain the relative importance of different rewards so that the agent can differentiate between the various outcomes and is encouraged to learn a good policy
        reward = 0
        if done:
            # Penalize for crashing into an obstacle
            reward -= 10
        else:
            if self._passed_obstacle():
                # Reward for passing an obstacle
                reward += 0.5
                self.passed_obstacles += 1
            else:
                # Small reward for staying alive
                reward += 0.1

        current_score = self._get_current_score()
        high_score = self._get_high_score()

        if current_score > high_score:
            # Bonus reward for surpassing the high score
            reward += 1

        return reward

    # Take a step in the game environment based on the given action
    def step(self, action):

        # Take action
        # Get key and action mapping
        key, action_type = self.actions_map[action]

        # Create a new ActionChains object
        action_chains = ActionChains(self.driver)

        # Perform the key press action
        if action_type == "key_down":
            action_chains.key_down(key).perform()
        # Perform the key release action
        elif action_type == "key_up":
            action_chains.key_up(key).perform()

        # Get next observation
        obs = self.get_observation()

        # Check whether game is over
        done = self.is_game_over()

        # Get reward
        reward = self.get_reward(done)

        info = {
            'current_score': self._get_current_score(),
            'high_score': self._get_high_score()
        }

        return obs, reward, done, info

    # Visualise the game
    def render(self, mode: str = 'human'):
        img = cv2.cvtColor(self._get_image(), cv2.COLOR_BGR2RGB)
        if mode == 'rgb-array':
            return img
        elif mode == 'human':
            cv2.imshow('Dino Game', img)
            cv2.waitKey(1)

    # Close the game environment and the driver
    def close(self):
        self.driver.quit()

## Test the Custom Game Environment

This section is for testing the Game Environment to ensure it is defined correctly before using it with the Agent for RL. 

In [5]:
# Helper class to format and print observations properly
def print_formatted_obs(observations):
    obs_titles = ["trex_y", "trex_jumping", "trex_ducking", "game_speed", "obst_dist",
                  "obst_type", "obst_x", "obst_y", "obst_width", "obst_height"]
    # Create a pandas DataFrame
    df = pd.DataFrame(observations, columns=obs_titles)

    # Set the pandas display options for better readability (optional)
    pd.set_option("display.width", 120)
    # pd.set_option("display.precision", 2)

    # Print the DataFrame
    print(df)

In [6]:
env = DinoEnvironment()

In [7]:
env.observation_space

Box([ 0.  0.  0.  6. -1. -1. -1. -1. -1. -1.], [150.   1.   1.  13. 600.   3. 600. 150.  50.  50.], (10,), float32)

In [8]:
env.observation_space.shape[0]

10

In [9]:
env.action_space

Discrete(4)

In [10]:
env.action_space.n

4

**Note:** Render function works better if using `.py` python files instead of the `.ipynb` notebook to run the code.

In [11]:
# Test loop - Play 1 game
env = DinoEnvironment()
for episode in range(1):
    obs = env.reset()
    done = False
    total_reward = 0
    all_observations = []
    # images = []

    while not done:
        action = env.action_space.sample()  # Take random actions
        obs, reward, done, info = env.step(action)
        # print(obs)
        all_observations.append(obs)  # Print obs formatted nicely in a table
        total_reward += reward

        # env.render(mode='human')
        # img = env.render(mode='rgb-array')
        # images.append(img) # Can use some image library to create a gif using collected images

    print_formatted_obs(all_observations)
    print(f"Episode: {episode}, Total Reward: {total_reward}, , Current Score: {info['current_score']}, High Score: {info['high_score']}")

     trex_y  trex_jumping  trex_ducking  game_speed  obst_dist  obst_type  obst_x  obst_y  obst_width  obst_height
0      70.0           1.0           0.0       6.017       -1.0       -1.0    -1.0    -1.0        -1.0         -1.0
1      43.0           1.0           0.0       6.035       -1.0       -1.0    -1.0    -1.0        -1.0         -1.0
2      24.0           1.0           0.0       6.047       -1.0       -1.0    -1.0    -1.0        -1.0         -1.0
3      13.0           1.0           0.0       6.063       -1.0       -1.0    -1.0    -1.0        -1.0         -1.0
4       7.0           1.0           0.0       6.075       -1.0       -1.0    -1.0    -1.0        -1.0         -1.0
..      ...           ...           ...         ...        ...        ...     ...     ...         ...          ...
126    11.0           1.0           0.0       7.563       91.0        1.0    96.0    90.0        25.0         50.0
127    17.0           1.0           0.0       7.575       74.0        1.0    79.

# 2. DQN Dino Agent

## Import Dependencies

In [12]:
import os
import random
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import wandb

## DinoDQNAgent Class

In [13]:
class DinoDQNAgent():
    def __init__(self, env,
                 gamma=0.95,
                 epsilon=1.0,
                 epsilon_min=0.01,
                 epsilon_decay=0.995,
                 learning_rate=0.001,
                 batch_size=32,
                 memory_size=100000):
        self.env = env
        self.state_size = env.observation_space.shape[0]  # 10
        self.action_size = env.action_space.n  # 4
        self.hidden_sizes = [64, 128]  # number of hidden neurons for the model
        self.memory = deque(maxlen=memory_size)
        self.gamma = gamma  # discounting factor
        self.epsilon = epsilon  # exploration rate
        self.epsilon_min = epsilon_min  # min exploration rate
        self.epsilon_decay = epsilon_decay  # exploration decay per step
        self.batch_size = batch_size
        self.model = self._build_model()
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        self.loss_fn = nn.MSELoss()

    # Define the DQN model architecture - This model will be used to approximate the Q-values of the agent's actions given a state.
    def _build_model(self):
        model = nn.Sequential(
            nn.Linear(self.state_size, self.hidden_sizes[0]),
            nn.ReLU(),
            nn.Linear(self.hidden_sizes[0], self.hidden_sizes[1]),
            nn.ReLU(),
            nn.Linear(self.hidden_sizes[1], self.action_size)
        )

        return model

    # Store agents experiences as a tuple
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    # Determine which action to take given a state
    def act(self, state):
        # Explore randomly or exploit given the current epsilon value
        if random.uniform(0, 1) <= self.epsilon:
            return random.randrange(self.action_size)
        else:
            state = torch.tensor(state, dtype=torch.float32)
            q_values = self.model(state)
            action = torch.argmax(q_values).item()
            return action

    # Update the DQN model using a batch of experiences sampled from the memory
    def replay(self):
        # Check if the number of experiences (state, action, reward, next_state, done) in the memory is less than the batch size
        if len(self.memory) < self.batch_size:
            # Don't do anything since there's not enough data to create a minibatch for training
            return

        # Create minibatch from a random sample of experiences from the memory
        minibatch = random.sample(self.memory, self.batch_size)

        for state, action, reward, next_state, done in minibatch:
            # Calculate the expected Q-value for the current state-action pair (q_target)
            # If done, - Game has ended, don't need to make predictions about future rewards
            q_target = reward
            if not done:
                # Calculate the Q-values for the next state using the DQN model, i.e., estimate future reward
                next_state = torch.tensor(next_state, dtype=torch.float32)
                q_values_next = self.model(next_state)
                # Update the target value by adding the discounted maximum Q-value of the next state to the current reward
                q_target = reward + self.gamma * \
                    torch.max(q_values_next).item()

            # Calculate the Q-values for the current state using the DQN model
            state = torch.tensor(state, dtype=torch.float32)
            q_values = self.model(state)

            # Update/Map the expected Q-value of the chosen action with the calculated target value
            q_values_expected = q_values.clone().detach()

            q_values_expected[action] = q_target

            # Note: q_values_expected is the ground truth for the action that the agent took in the current state vs q_values is the models prediction of what should happen

            # Reset the gradients of the optimizer before performing backpropagation
            self.optimizer.zero_grad()

            # Calculate the loss using the Mean Squared Error (MSE) between the current Q-values and the expected Q-values
            loss = self.loss_fn(q_values, q_values_expected)

            # Perform backpropagation to calculate the gradients of the model's parameters with respect to the loss
            loss.backward()

            # Update the model's parameters using the calculated gradients and the optimizer's learning rate
            self.optimizer.step()

        # Decrease episolon over time to reduce exploration and increase exploitation of the models learnt knowledge
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

        # Return the loss value
        return loss.item()

    # Save the current state of the DQN model and optimizer to a file.
    def save_model(self, model_name, model_output_dir, log_to_wandb):
        # Create a dictionary to store the state of the model, optimizer and any other additional information
        state = {
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict()
        }

        save_path = os.path.join(
            model_output_dir, model_name)

        # Save the state dictionary to a file
        torch.save(state, save_path)

        if log_to_wandb:
            # Save model as a wandb artifact
            artifact = wandb.Artifact(model_name, type='model')
            artifact.add_file(save_path)
            wandb.log_artifact(artifact)

    # Load the DQN model and optimizer state from a file.
    def load_model(self, file_path, for_training=False):

        # Load the state dictionary from the file using the torch.load() function
        state = torch.load(file_path)

        # Restore the state of the model and optimizer
        self.model.load_state_dict(state['model_state_dict'])

        # Set for_training to true if using the model to continue training from a previously saved state
        if for_training:
            self.optimizer.load_state_dict(state['optimizer_state_dict'])

# 3. Train and Test Agent

## Import Dependencies

In [14]:
import os
import wandb

## Train Agent

### Train Function

In [15]:
def train(agent, env, episodes, model_output_dir, save_interval=10, log_to_wandb=False, render=False):

    if log_to_wandb:
        wandb.init(project='dino_rl_agent', name='train_run')

    total_rewards = []
    total_scores = []

    for episode in range(episodes):
        state = env.reset()
        done = False
        episode_reward = 0
        episode_loss = []

        while not done:
            if render:
                env.render(mode='human')

            # Use agent to predict action
            action = agent.act(state)

            # Take a step in the environment
            next_state, reward, done, info = env.step(action)

            # Remember agents experience after every step
            agent.remember(state, action, reward, next_state, done)

            state = next_state
            episode_reward += reward

        # Train/Update the model every step
        loss = agent.replay()
        episode_loss.append(loss)

        total_rewards.append(episode_reward)
        total_scores.append(info["current_score"])

        # Calculate overall training metrics
        mean_episode_loss = sum(episode_loss) / len(episode_loss)
        mean_reward = sum(total_rewards) / len(total_rewards)
        mean_score = sum(total_scores) / len(total_scores)

        # Log metrics
        print(f"Episode {episode + 1}/{episodes}, Highest Score: {info['high_score']}, Episode Score: {info['current_score']}, Episode Reward: {episode_reward:.4f}, Episode Epsilon: {agent.epsilon:.4f}, Episode Loss: {loss:.4f}, Mean Score: {mean_score:.4f}, Mean Reward {mean_reward:.4f}")

        if log_to_wandb:
            wandb.log({
                "episode": (episode + 1)/episodes,
                "highest_score": info["high_score"],
                "episode_score": info["current_score"],
                "episode_reward": episode_reward,
                "episode_epsilon": agent.epsilon,
                "episode_loss": loss,
                "mean_loss": mean_episode_loss,
                "mean_reward": mean_reward,
                "mean_current_score": mean_score
            })

        # Save the model every save_interval episodes
        if (episode + 1) % save_interval == 0:
            model_name = f"dino_dqn_episode_{episode + 1}.pth"
            agent.save_model(model_name, model_output_dir, log_to_wandb)
            print(f"Model saved after episode {episode + 1}")


### Train

In [16]:
# Specify directory to save model
OUTPUT_DIR = "trained_models/"

# Create directories if they don't exist on the path
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

In [17]:
# Number of episodes to train the agent
TRAIN_EPISODES = 500

In [18]:
# Instantiate Environment and Agent
env = DinoEnvironment()
agent = DinoDQNAgent(env)

# Train Model
train(agent, env, TRAIN_EPISODES, OUTPUT_DIR, log_to_wandb=True)

[34m[1mwandb[0m: Currently logged in as: [33mbidmalvi[0m. Use [1m`wandb login --relogin`[0m to force relogin


Episode 1/500, Highest Score: 74, Episode Score: 74, Episode Reward: 160.3000, Episode Epsilon: 0.9950, Episode Loss: 25.6143, Mean Score: 74.0000, Mean Reward 160.3000
Episode 2/500, Highest Score: 74, Episode Score: 72, Episode Reward: 7.7000, Episode Epsilon: 0.9900, Episode Loss: 1.9440, Mean Score: 73.0000, Mean Reward 84.0000
Episode 3/500, Highest Score: 74, Episode Score: 64, Episode Reward: 4.2000, Episode Epsilon: 0.9851, Episode Loss: 0.8339, Mean Score: 70.0000, Mean Reward 57.4000
Episode 4/500, Highest Score: 74, Episode Score: 52, Episode Reward: 3.1000, Episode Epsilon: 0.9801, Episode Loss: 3.6402, Mean Score: 65.5000, Mean Reward 43.8250
Episode 5/500, Highest Score: 74, Episode Score: 63, Episode Reward: 17.2000, Episode Epsilon: 0.9752, Episode Loss: 2.1060, Mean Score: 65.0000, Mean Reward 38.5000
Episode 6/500, Highest Score: 74, Episode Score: 60, Episode Reward: 14.4000, Episode Epsilon: 0.9704, Episode Loss: 1.3333, Mean Score: 64.1667, Mean Reward 34.4833
Epis

## Test Agent

### Test Function

In [35]:
def test(agent, env, episodes, model_path, save_interval=50, log_to_wandb=False, older_model=False, render=False):

    if log_to_wandb:
        wandb.init(project='dino_rl_agent', name='test_run')

    total_rewards = []
    total_scores = []

    if older_model:
        agent.model.load_state_dict(torch.load(model_path))
    else:
        agent.load_model(model_path, for_training=False)
        
    
    # Set exploration rate (epsilon) to 0 to only choose actions based on the model's predictions (exploit its knowledge)
    agent.epsilon = 0

    for episode in range(episodes):
        state = env.reset()
        done = False
        episode_reward = 0

        while not done:
            if render:
                env.render(mode='human')

            # Use agent to predict action
            action = agent.act(state)

            # Take a step in the environment
            next_state, reward, done, info = env.step(action)

            state = next_state
            episode_reward += reward

        total_rewards.append(episode_reward)
        total_scores.append(info["current_score"])

        # Calculate overall training metrics
        mean_reward = sum(total_rewards) / len(total_rewards)
        mean_score = sum(total_scores) / len(total_scores)

        # Log metrics
        print(f"Episode {episode + 1}/{episodes}, Highest Score: {info['high_score']}, Episode Score: {info['current_score']}, Episode Reward: {episode_reward:.4f}, Episode Epsilon: {agent.epsilon:.4f}, Mean Score: {mean_score:.4f}, Mean Reward {mean_reward:.4f}")

        if log_to_wandb:
            wandb.log({
                "episode": (episode + 1)/episodes,
                "highest_score": info["high_score"],
                "episode_score": info["current_score"],
                "episode_reward": episode_reward,
                "episode_epsilon": agent.epsilon,
                "mean_reward": mean_reward,
                "mean_current_score": mean_score
            })

### Test

In [30]:
# Specify path to load a model
MODEL_LOAD_PATH = "trained_models\dino_dqn_episode_400.pth"

In [24]:
# Number of episodes to test the agent
TEST_EPISODES = 5

In [31]:
# Instantiate Environment and Agent
env = DinoEnvironment()
agent = DinoDQNAgent(env)

# Test model
test(agent, env, TEST_EPISODES, MODEL_LOAD_PATH, log_to_wandb=True)

0,1
episode,▁▃▅▆█
episode_epsilon,▁▁▁▁▁
episode_reward,█▁▁▁▁
episode_score,██▁▁█
highest_score,▁▁▁▁▁
mean_current_score,██▃▁▂
mean_reward,█▄▂▁▁

0,1
episode,1.0
episode_epsilon,0.0
episode_reward,4.4
episode_score,52.0
highest_score,52.0
mean_current_score,51.6
mean_reward,26.5


Episode 1/5, Highest Score: 323, Episode Score: 323, Episode Reward: 600.1000, Episode Epsilon: 0.0000, Mean Score: 323.0000, Mean Reward 600.1000
Episode 2/5, Highest Score: 323, Episode Score: 270, Episode Reward: 41.7000, Episode Epsilon: 0.0000, Mean Score: 296.5000, Mean Reward 320.9000
Episode 3/5, Highest Score: 323, Episode Score: 254, Episode Reward: 44.3000, Episode Epsilon: 0.0000, Mean Score: 282.3333, Mean Reward 228.7000
Episode 4/5, Highest Score: 323, Episode Score: 52, Episode Reward: 3.7000, Episode Epsilon: 0.0000, Mean Score: 224.7500, Mean Reward 172.4500
Episode 5/5, Highest Score: 323, Episode Score: 287, Episode Reward: 49.9000, Episode Epsilon: 0.0000, Mean Score: 237.2000, Mean Reward 147.9400


## Best Model Test

In [36]:
# Specify path to load a model
MODEL_LOAD_PATH = "best_trained_models\episode_100.pth"

In [37]:
# Number of episodes to test the agent
TEST_EPISODES = 5

In [38]:
# Instantiate Environment and Agent
env = DinoEnvironment()
agent = DinoDQNAgent(env)

# Test model
test(agent, env, TEST_EPISODES, MODEL_LOAD_PATH, log_to_wandb=False, older_model=True)

Episode 1/5, Highest Score: 526, Episode Score: 526, Episode Reward: 828.8000, Episode Epsilon: 0.0000, Mean Score: 526.0000, Mean Reward 828.8000
Episode 2/5, Highest Score: 1294, Episode Score: 1294, Episode Reward: 1063.1000, Episode Epsilon: 0.0000, Mean Score: 910.0000, Mean Reward 945.9500
Episode 3/5, Highest Score: 1618, Episode Score: 1618, Episode Reward: 602.8000, Episode Epsilon: 0.0000, Mean Score: 1146.0000, Mean Reward 831.5667
Episode 4/5, Highest Score: 1618, Episode Score: 576, Episode Reward: 82.6000, Episode Epsilon: 0.0000, Mean Score: 1003.5000, Mean Reward 644.3250
Episode 5/5, Highest Score: 1618, Episode Score: 147, Episode Reward: 24.9000, Episode Epsilon: 0.0000, Mean Score: 832.2000, Mean Reward 520.4400


# Old

In [None]:
# Train model

EPISODE_NUMS = 1000

for episode in range(EPISODE_NUMS):
    state = env.reset()
    done = False
    episode_reward = 0

    while not done:
        action = agent.act(state)
        next_state, reward, done, info = env.step(action)
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        episode_reward += reward

    agent.replay()
    print(f"Episode {episode + 1}/{EPISODE_NUMS}, Reward: {episode_reward}, Current Score: {info['current_score']}, High Score: {info['high_score']}")

    if (episode + 1) % 50 == 0:
        model_file = os.path.join(OUTPUT_DIR, f"episode_{episode + 1}.pth")
        agent.save_model(model_file)
        print(f"Model saved after episode {episode + 1}")


In [None]:
# Test model

# agent = DinoDQNAgent(env)
agent.load_model("model_output\dino\episode_100.pth")
# Set agent's exploration rate (epsilon) to zero, so that it only chooses actions based on the model's predictions
agent.epsilon = 0

EPISODE_NUMS = 100

# Test loop
for episode in range(EPISODE_NUMS):
    state = env.reset()
    done = False
    episode_reward = 0

    while not done:
        action = agent.act(state)
        next_state, reward, done, info = env.step(action)
        state = next_state
        episode_reward += reward

    print(
        f"Episode {episode + 1}/{EPISODE_NUMS}, Total episode reward: {episode_reward}, Final score: {info['current_score']}, Highest score achieved: {info['high_score']}")
