## Playing Side Scrolling Video Games with Deep Reinforcement Learning

In [None]:
# !pip install opencv-python pyautogui pygetwindow 
# !pip install opencv-contrib-python --user
# !pip install pydirectinput 
# !pip install mss 
# !pip install gymnasium[all]
# !pip install swig 
# !pip install stable-baselines3[extra]
# !pip install pytesseract
# !pip install sb3-contrib

In [4]:
import matplotlib.pyplot as plt
import numpy as np
import cv2
import pytesseract
import mss
import mss.tools
import pygetwindow as gw
import time
import pyautogui
import gymnasium as gym
import numpy as np
from gymnasium import spaces
import sys
import os
from PIL import Image
from stable_baselines3.common.env_checker import check_env
import pydirectinput

In [5]:
# Creating the action space
jump = 'space'
melee_attack = 'ctrl'
projectile_attack = 'alt'
up = 'up'
down = 'down'
right = 'right'
left = 'left'
pick_up = 'z'
actions = [up, down, right, left, jump, 'jump_and_climb', 'right_jump', 'left_jump', melee_attack]

In [6]:
def get_window():
    window_title = "Claw"
    window = gw.getWindowsWithTitle(window_title)[0]
    return window

def activate_window():
    window = get_window()
    try:
        window.activate()
    except Exception as e:
        print(e)

def reset():
    activate_window()
    pyautogui.press('esc')
    pyautogui.press('down')
    pyautogui.press('down')
    pyautogui.press('down')
    pyautogui.press('down')
    pyautogui.press('return')
    pyautogui.press('return')
    time.sleep(2)
    pyautogui.press('return')
    pyautogui.press('return')
    pyautogui.press('return')
    time.sleep(2)

def close():
    window = get_window()
    window.close()

def start():
    claw_path = "<input_claw_executable_path_here>"
    os.startfile(claw_path)
    time.sleep(2)
    pyautogui.hotkey('alt', 'return')
    activate_window()
    time.sleep(15)
    activate_window()
    time.sleep(2)
    activate_window()
    pyautogui.press('return')
    pyautogui.press('return')
    pyautogui.press('return')
    time.sleep(3)
    activate_window()


# Testing these functions
# close()
# start()
# reset()
# reset()
# They have been perfected!

# Function to capture a screenshot of the specified window
def capture_screenshot():
    # FPS setting
    fps = 10
    # Interval between captures in seconds
    capture_interval = 1.0 / float(fps)  
    sct = mss.mss()
    # Offset configuration
    offsets = {
        'left': 8+2,  # Adjust as needed
        'top': 32,   # Adjust as needed
        'right': 8+2, # Adjust as needed
        'bottom': 8+2 # Adjust as needed
    }
    # Get the window
    window = get_window()
    bbox = (
        window.left + offsets['left'], 
        window.top + offsets['top'], 
        window.left + window.width - offsets['right'], 
        window.top + window.height - offsets['bottom']
    )
    screenshot = sct.grab(bbox)
    return screenshot

# Receives numpy image and does lots of pre-processing to obtain the treasure, health and lives
def extract_variables(image):
    # Set the tesseract cmd
    pytesseract.pytesseract.tesseract_cmd = "<input_tesseract_executable_path_here>"
    # Convert the PIL image to an OpenCV image
    # PIL uses RGB and OpenCV uses BGR, so we need to convert the channels
    image_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
    # Assuming the HUD occupies the top 10% of the height of the image
    hud_height_ratio = 0.07
    hud_height = int(image.height * hud_height_ratio)
    # Crop the HUD area from the image
    hud_area = image_cv[0:hud_height, :]

    treasure = hud_area[:, int(image.width*0.05):int(image.width*.2)]
    gray_treasure = cv2.cvtColor(treasure, cv2.COLOR_BGR2GRAY)
    blur_treasure = cv2.GaussianBlur(gray_treasure, (3,3), 0)
    thresh_treasure = cv2.threshold(blur_treasure, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)[1]
    treasure = pytesseract.image_to_string(thresh_treasure, lang='eng', config='--psm 13 --oem 3 -c tessedit_char_whitelist=0123456789')
    try:
        treasure = int(treasure)
    except Exception as err:
        treasure = -1
    
    health = hud_area[:,  int(image.width*0.915):int(image.width*.96)]
    gray_health = cv2.cvtColor(health, cv2.COLOR_BGR2GRAY)
    blur_health = cv2.GaussianBlur(gray_health, (3,3), 0)
    thresh_health = cv2.threshold(blur_health, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)[1]
    health = pytesseract.image_to_string(thresh_health, lang='eng', config='--psm 13 --oem 3 -c tessedit_char_whitelist=0123456789')
    try:
        health = int(health)
    except Exception as err:
        health = -1
    
    lives = image_cv[int(0.13*image.height):int(0.165*image.height), int(image.width*0.935):int(image.width*.955)]
    gray_lives = cv2.cvtColor(lives, cv2.COLOR_BGR2GRAY)
    blur_lives = cv2.GaussianBlur(gray_lives, (3,3), 0)
    thresh_lives = cv2.threshold(blur_lives, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)[1]
    lives = pytesseract.image_to_string(thresh_lives, lang='eng', config='--psm 13 --oem 3 -c tessedit_char_whitelist=0123456789')
    try:
        lives = int(lives)
    except Exception as err:
        lives = -1

    return treasure, health, lives

def get_frame_and_reward():
    screenshot = capture_screenshot()
    pil_image = Image.frombytes('RGB', (screenshot.width, screenshot.height), screenshot.rgb)
    treasure, health, lives = extract_variables(pil_image)
    gray_image = pil_image.convert('L')
    resized_pil_img = gray_image.resize((256, 256))
    numpy_image = np.array(resized_pil_img).reshape((1, 256, 256))
    return numpy_image, treasure #+ health + lives

# All the above functions are working, now it is time to define the claw environment

def take_action(action):
    # print(action)
    activate_window()
    if action in ['jump_and_climb', 'right_jump', 'left_jump']:
        if action == 'jump_and_climb':
            pyautogui.hotkey('space', 'up', interval=1./5.)
        elif action == 'right_jump':
            with pyautogui.hold('space'):
                pyautogui.keyDown('right')
                time.sleep(1./2.)
                pyautogui.keyUp('right')
        elif action == 'left_jump':
            with pyautogui.hold('space'):
                pyautogui.keyDown('left')
                time.sleep(1./2.)
                pyautogui.keyUp('left')
    else:
        pydirectinput.keyDown(action)
        time.sleep(1./10.)
        pydirectinput.keyUp(action)

In [7]:
class ClawEnv(gym.Env):
    def __init__(self):
        super(ClawEnv, self).__init__()
        self.action_space = spaces.Discrete(len(actions))
        self.observation_space = spaces.Box(low=0, high=255, shape=(1, 256, 256), dtype=np.uint8)
        try:
            close()
        except Exception as e:
            print(e)
        start()
        
    def reset(self, seed=0):
        reset()
        frame, reward = get_frame_and_reward()
        info = dict()
         # Stack the initial observation four times
        return frame, info

    def step(self, action):
        take_action(actions[action])
        frame, reward = get_frame_and_reward()
        # done = 0
        terminated = False
        truncated = False
        info = dict()
        if reward % 6 == 0 or reward % 50000000 == 0 or reward == 1 or '6' in str(reward) or '8' in str(reward):
            reward = 0
        return frame, reward, terminated, truncated, info

    def render(self):
        pass
        
    def close(self):
        close()

# Testing the environment
# Instantiate the env
env = ClawEnv()
# Check if the environment is valid
# check_env(env)
# Close the env
# env.close()

In [8]:
from stable_baselines3 import A2C
a2c = A2C('CnnPolicy', env, verbose=1).learn(total_timesteps=25)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [46]:
from sb3_contrib import RecurrentPPO
# ppo = RecurrentPPO('CnnLstmPolicy', env).learn(total_timesteps=25000)

In [47]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CnnLstmDQN(nn.Module):
    def __init__(self, hidden_dim, output_dim):
        super(CnnLstmDQN, self).__init__()
        
        # CNN Feature Extractor
        self.conv1 = nn.Conv2d(1, 32, kernel_size=8, stride=4)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)

        # Computing the output size of the CNN
        self.feature_size = self.get_conv_output_size((1, 256, 256))

        # LSTM Backbone
        self.lstm = nn.LSTM(self.feature_size, hidden_dim)

        # Output layer
        self.head = nn.Linear(hidden_dim, output_dim)

    def convolution_operation(self, x):
        x = x.squeeze(0)
        x = F.relu(self.conv1(x))
        x = self.pool1(x)
        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        x = F.relu(self.conv3(x))
        x = self.pool3(x)
        return x.view(x.size(0), -1)

    def get_conv_output_size(self, shape):
        with torch.no_grad():
            input = torch.rand(1, *shape)
            output = self.convolution_operation(input)
            return int(torch.numel(output))
    
    def forward(self, x, hidden=None):
        # Forward pass through CNN
        x = self.convolution_operation(x)     
        # Reshape x for LSTM
        x = x.view(1, -1, self.feature_size)
        # Forward pass through LSTM
        if hidden is None:
            x, hidden = self.lstm(x)
        else:
            x, hidden = self.lstm(x, hidden)
        # Output layer
        x = self.head(x.squeeze(0))
        return x

# Example initialization
hidden_dim = 512  # Hidden dimensions for LSTM
output_dim = len(actions)  # Output dimensions

model = CnnLstmDQN(hidden_dim, output_dim)
print(model)

CnnLstmDQN(
  (conv1): Conv2d(1, 32, kernel_size=(8, 8), stride=(4, 4))
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(32, 64, kernel_size=(4, 4), stride=(2, 2))
  (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1))
  (pool3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (lstm): LSTM(256, 512)
  (head): Linear(in_features=512, out_features=9, bias=True)
)


In [None]:
import torch.optim as optim
gamma = 0.8
env.reset()

# Model, loss function, optimizer
model = CnnLstmDQN(hidden_dim=512, output_dim=len(actions))
criterion = nn.MSELoss()  # Mean Squared Error Loss for DQN
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training parameters
num_episodes = 1000  # Number of episodes for training
max_steps_per_episode = 25  # Maximum steps in each episode

# Loop over episodes
for episode in range(num_episodes):
    # Reset environment and state at the beginning of each episode
    state, _ = env.reset()
    total_loss = 0

    # Loop over steps within the episode
    for step in range(max_steps_per_episode):
        # Get action from the policy (model)
        # For simplicity, assuming a single-channel grayscale image state
        state_tensor = torch.from_numpy(state).unsqueeze(0).unsqueeze(0).float()
        action_values = model(state_tensor)
        action = action_values.max(1)[1].view(1, 1).item()

        # Take action and observe next state and reward
        next_state, reward, terminated, truncated, _ = env.step(action)

        done = terminated or truncated
        
        # Preprocess next state
        next_state_tensor = torch.from_numpy(next_state).unsqueeze(0).unsqueeze(0).float()

        # Compute target Q value
        next_action_values = model(next_state_tensor)
        max_next_q_value = next_action_values.max(1)[0]
        target_q_value = reward + (gamma * max_next_q_value * (1 - done))

        # Compute current Q value
        current_q_value = action_values.gather(1, torch.tensor([[action]]))

        # Compute loss and backpropagate
        loss = criterion(current_q_value, target_q_value.unsqueeze(1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        state = next_state

        # Break if the state is terminal
        if done:
            break

    print(f"Episode {episode}, Total Loss: {total_loss}")

Episode 0, Total Loss: 100000000376832.06
Episode 1, Total Loss: 100000000376833.45
Episode 2, Total Loss: 3.068405721336603


In [1]:
# At first the the training is unstable but it stabilizes after a few hundred episodes

In [2]:
# Gameplay

In [15]:
obs, _ = env.reset()
# Choose model
# Custom
# play_model = model
# Baseline
play_model = a2c

for i in range(25):
    action, _ = a2c.predict(obs)
    action = int(action)
    obs, reward, terminated, truncated, info = env.step(action)
    # print reward)
    if terminated or truncated:
        env.reset()
        break

In [16]:
# to test the custom model, make sure that the output is correctly interpreted after passing the frame throught the model
# action_values = model(state_tensor)
# action = action_values.max(1)[1].view(1, 1).item()