# Install dependencies

In [None]:
!pip3 install torch torchvision torchaudio
!pip3 install pydirectinput
!pip3 install stable-baselines3[extra] protobuf==3.20.*
!pip3 install mss
!pip3 install tensorboard

In [2]:
from mss import mss
import pydirectinput
import cv2
import numpy as np
from matplotlib import pyplot as plt
import time
import keyboard
from gymnasium import Env
from gymnasium.spaces import Box, Discrete

import os
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common import env_checker
from stable_baselines3 import DQN

from tensorboard.backend.event_processing.event_accumulator import EventAccumulator

# Build environment

In [3]:
class Game(Env):
    # set up environment and observation shapes
    def __init__(self):
        # subclass the model
        super().__init__()
        # set up spaces (gymnasium containers to represent an observation and actions)
        self.observation_space = Box(low=0, high=255, shape=(1,83,150), dtype=np.uint8)
        self.action_space = Discrete(3) # define 3 unique actions agent can take
        # set up parameters for game extracting an observation of the game using mss
        self.cap = mss()
        # defining observation of environment window size (preprocessing)
        self.game_location = {'top':150, 'left':80, 'width':600, 'height':200}
        self.done_location = {'top':185, 'left':330, 'width':300, 'height':70}

        # bookkeeping for reward shaping
        self.step_count = 0
        self.next_bonus_step = 60
        self.bonus_interval = 20
        self.jump_penalty = 0.02
        self.alive_reward = 0.01

    # simulate agent observation, action, and environment response
    def step(self, action):
        # agent has 3 actions - 0 = jump (space), 1 = crouch (down key), 2 = nothing
        action_map = {
            0: 'space',
            2: 'no_op'
        }
        # use pydirectinput to press key related to desired action
        if action == 0:
                pydirectinput.keyDown('space')
                pydirectinput.keyUp('space')
        # action == 2 does nothing (no-op)

        #increment step counter
        self.step_count += 1
        # check if game is done/environment is terminated
        done, done_cap = self.get_done()
        # penalize agent if done is true
        if done:
            reward = -1
        else:
            reward = self.alive_reward

        # reward agent if it clears the first obstacle (around time step 60)
        if not done and self.step_count == self.next_bonus_step:
            reward += 2
            self.next_bonus_step += self.bonus_interval

        # penalize agent for jumping (discourage spamming)
        if action == 0 :
            reward -= self.jump_penalty
        # check if game is done IF TIME CONSTRAINT IS ADDED TO EPISODE
        truncated = False
        # get the next observation of the environment to return to the agent
        next_observation = self.get_observation()
        info = {} # can leave info empty, not entirely necessary for this project
        return next_observation, reward, done, truncated, info

    
    # visualize the game (not really used since I implement game capture via mss)
    def render(self):
        cv2.imshow('Game,', np.array(self.cap.grab(self.game_location))[:,:,:3])
        if cv2.waitKey(1) & 0xFF == ord('q'):
            self.close()

    # ends an observation
    def close(self):
        cv2.destroyAllWindows()

    # reset the game environment (can follow environment termination)
    def reset(self, *, seed=None, option=None):
        # set seed for reproducibility of the reset
        if seed is not None:
            np.random.seed(seed)
        
        #set buffer between transitions
        # time.sleep(1.5)
        #move mouse to top left of screen and click to restart game
        pydirectinput.click(x=300, y=300)
        pydirectinput.press('space')
        
        # Gymnasium requires an observation and info to be returned
        info = {}
        return self.get_observation(), info

    # extra methods not required to build Gymnasium environment

    # grab observation of the game environment
    def get_observation(self):
        # grab raw screen capture of the game_location using mss and turning it into an array
        raw = np.array(self.cap.grab(self.game_location))[:,:,:3]
        # preprocessing
        gray = cv2.cvtColor(raw, cv2.COLOR_BGR2GRAY)
        resized = cv2.resize(gray, (150,83))
        channel = np.reshape(resized, (1,83,150))
        return channel
    # extract game over text with OCR via pytesseract
    def get_done(self):
        done = False
        # grab raw screen capture of the done_location using mss
        done_cap = np.array(self.cap.grab(self.done_location))[:,:,:3]
        # coordinates for checking if a certain pixel of game over exists
        check_x, check_y = 38, 200
        pixel_color = done_cap[check_x, check_y]
        # specific rgb value that is expected to appear when game is over
        game_over_rgb = np.array([172,172,172])
        tolerance = 10
        if np.all(np.abs(pixel_color - game_over_rgb) <= tolerance):
            done = True
        
        return done, done_cap
    


# Set up callbacks

In [None]:
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True

In [None]:
CHECKPOINT_DIR = './runs/'
LOG_DIR = './logs/'

In [None]:
# set up callback to occur every 1000 steps and save it to the designated directory
callback = TrainAndLoggingCallback(check_freq=10000, save_path=CHECKPOINT_DIR)

# Build/Train DQN

In [None]:
env = Game()

In [None]:
# Create an instance of the model
model = DQN('CnnPolicy', env, learning_rate=0.0001, tensorboard_log=LOG_DIR, verbose=1, buffer_size=300000, learning_starts=0)

In [None]:
# Start training
model.learn(total_timesteps=50000, callback=callback)

# Test model

In [None]:
# load model
model = DQN.load(os.path.join('runs', 'learning-rate-0001'))

In [None]:
for episode in range(5):
    obs, _ = env.reset()
    done = False
    total_reward = 0

    while not done:
        action, _ = model.predict(obs)
        obs, reward, terminated, truncated, _ = env.step(int(action))
        done = terminated or truncated
        total_reward += reward
    print(f'Total Reward for episode {episode} is {total_reward}')

# Analysis

In [None]:
def extract_rewards_from_event_file(path):
    ea = EventAccumulator(path)
    ea.Reload()

    if 'rollout/ep_rew_mean' not in ea.Tags()['scalars']:
        return None

    scalar_events = ea.Scalars('rollout/ep_rew_mean')
    steps = [e.step for e in scalar_events]
    values = [e.value for e in scalar_events]

    return steps, values

def plot_tensorboard_rewards(logs_dir):
    plt.figure(figsize=(10, 6))

    for folder in sorted(os.listdir(logs_dir)):
        folder_path = os.path.join(logs_dir, folder)
        if not os.path.isdir(folder_path):
            continue

        event_files = [f for f in os.listdir(folder_path) if f.startswith("events.out")]
        if not event_files:
            continue

        steps, rewards = None, None
        for f in event_files:
            event_path = os.path.join(folder_path, f)
            result = extract_rewards_from_event_file(event_path)
            if result:
                steps, rewards = result
                break

        if steps and rewards:
            plt.plot(steps, rewards, label=folder)

    plt.xlabel("Timesteps")
    plt.ylabel("Mean Episode Reward")
    plt.title("Training Reward Progress (from TensorBoard logs)")
    plt.legend(loc='upper left', bbox_to_anchor=(1.05, 1.0), fontsize='small')
    plt.grid()
    plt.tight_layout()
    plt.show()

# Example usage
plot_tensorboard_rewards('./logs')  # replace with your actual log directory
