In [None]:
# %pip install pytesseract
# %pip install swig
# %pip install python-opencv
# %pip install gymnasium[all] 
# %pip install mss pydirectinput
# Install stable-baselines3 for gymnasium
# %pip install git+https://github.com/DLR-RM/stable-baselines3

# All scripts are available in the venv

In [None]:
#Main imports
import os
import time
import numpy as np
from matplotlib import pyplot as plt

# Image capture and display
from mss import mss
import cv2

#Stable baselines imports
import stable_baselines3
from stable_baselines3 import DQN
from stable_baselines3.common.callbacks import BaseCallback

# Environment and I/O tools
import pydirectinput
import pytesseract
pytesseract.pytesseract.tesseract_cmd = r'C:/Program Files/Tesseract-OCR/tesseract.exe' #Change installation location
import gymnasium as gym
from gymnasium import Env
from gymnasium.spaces import Box, Discrete

#If you are going for keras-rl2 use below
# from keras import Sequential
# from keras.layers import Dense, Flatten
# from keras.optimizers import Adam




# Defining Custom Environment

In [None]:
class GameEnv(Env):
    def __init__(self):
        super().__init__()
        self.observation_space = Box(
            low=0, high=255, shape=(1, 83, 120), dtype=np.uint8)
        self.action_space = Discrete(3)
        self.cap = mss()
        self.agent_location = {'top': 300,
                               'left': 0, 'width': 600, 'height': 500}
        self.done_location = {'top': 405,
                              'left': 630, 'width': 660, 'height': 70}

    def step(self, action):
        action_map = {
            0: 'space',  # Jump
            1: 'down',  # Duck
            2: 'no_op'  # Run
        }
        if action != 2:
            pydirectinput.press(action_map[action])  # type: ignore
        over, over_cap = self.game_over()
        next_obs = self.get_observation()
        reward = 10
        truncation = 0  # Limit is 99999, then score changes to zero
        info = {}
        return next_obs, reward, over, truncation, info

    def render(self):
        pass

    def reset(self):
        time.sleep(1)
        pydirectinput.click(x=150, y=150)
        pydirectinput.press('space')
        return self.get_observation(), {}

    def close(self):
        pass

    def get_observation(self):
        # Screen capture
        raw = np.array(self.cap.grab(self.agent_location))[:, :, :3]
        # convert to greyscale(reduces size)
        gray = cv2.cvtColor(raw, cv2.COLOR_BGR2GRAY)
        # resize to match observation space
        resize = cv2.resize(gray, (120, 83))
        # reshape for pytorch
        channel = np.reshape(resize, (1, 83, 120))
        return channel

    def game_over(self):
        over_cap = np.array(self.cap.grab(self.done_location))[:, :, :3]
        over_ind = ['GAME', 'GAHE']
        over = False
        res = pytesseract.image_to_string(over_cap)[:4]
        if res in over_ind:
            over = True
        return over, over_cap


In [None]:
#Instantiating environment
env = GameEnv()


In [None]:
#Check action space
env.action_space.sample()

In [None]:

#Generate observation for the bot
obs = env.get_observation()
plt.imshow(cv2.cvtColor(obs[0], cv2.COLOR_BGR2RGB))


# Training time


In [None]:
# tot_states = env.observation_space.shape
# # tot_actions = 3 #Figure out later


In [None]:
# Create custom logger for action callback

class TrainLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(
                self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)
        return True


In [None]:
CHECKPOINT_DIR = 'C:/Users/nikit/Desktop/Personal Projects/RLmodels/codes/Dino/train'
LOG_DIR = 'C:/Users/nikit/Desktop/Personal Projects/RLmodels/codes/Dino/logs'


In [None]:
callback = TrainLoggingCallback(check_freq=1000, save_path=CHECKPOINT_DIR)


In [None]:
total_timesteps = 1000000  # Total number of steps for training
batch_size = 50000  # Number of steps per training iteration (change per RAM in machine)

# Create the DQN model
model = DQN('CnnPolicy', env, tensorboard_log=LOG_DIR, verbose=1,
            buffer_size=batch_size, learning_starts=1000)


In [None]:
# Use this to load pre-trained models
# model.load(
#     'C:/Users/nikit/Desktop/Personal Projects/RLmodels/codes/Dino/train/best_model_84000.zip')


In [None]:
# Works better with high memory machine
# model.learn(total_timesteps=total_timesteps, callback=callback)


In [None]:
# For low memory users 
# Perform incremental learning in multiple iterations
current_timestep = 0
while current_timestep < total_timesteps:
    # Calculate the number of steps for this iteration
    remaining_timesteps = total_timesteps - current_timestep
    num_steps = min(batch_size, remaining_timesteps)

    # Train the model for the current iteration
    model.learn(total_timesteps=num_steps, callback=callback)

    # Update the current timestep
    current_timestep += num_steps


# Testing the model

In [None]:
for episode in range(5):
    obs = env.reset()[0]
    over = False
    tot_reward = 0

    while not over:
        action = model.predict(obs)
        obs, reward, over, info, _ = env.step(env.action_space.sample())
        # time.sleep(0.01)
        tot_reward += reward
    print(f'Episode : {episode}, Reward : {tot_reward}')
    time.sleep(2)
