# 0. PIP Installs

In [None]:
#!pip install pygame
#!pip install numpy
#!pip install gymnasium
#!pip install pytorch (CHOOSE YOUR BEST OPTION ON WEB [CUDA, PLATFORM, ETC..]
#!pip install stable-baselines3[extra]
#!pip install optuna

Last tested was running on Machine with this version state:

- PyGame: 2.5.2
- OS: Windows-10
- Python: 3.10.7
- multi_sb3 [Stable-Baselines3: 2.1.0]
- PyTorch: 2.1.0+cu121
- GPU Enabled: True
- Numpy: 1.25.0
- Cloudpickle: 3.0.0
- Gymnasium: 0.29.1
- OpenAI Gym: 0.26.2
- Optuna: 3.4.0


# 1. Imports

In [None]:
import multigamepy
import paratroopergame
import math
# Import environment base class for a wrapper 
from gymnasium import Env 

# Import the space shapes for the environment
from gymnasium.spaces import Discrete, Box, MultiBinary
# Import numpy to calculate frame delta 
import numpy as np

from stable_baselines3 import MultiSB3, PPO, DQN

from stable_baselines3.common.monitor import Monitor

from stable_baselines3.common.evaluation import evaluate_policy

from stable_baselines3.common.callbacks import BaseCallback

import os

from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack, VecTransposeImage

import optuna

# 2. Directories

In [None]:
LOG_DIR = './logs/'
OPT_DIR = './models/'

# 3. MultiGame Manager (for VecEnv)

In [None]:
MULTIGAME = multigamepy.MultiGameManager()

# 4. (Optional) Normal Gaming (Play yourself, not necessary)

In [None]:
#Env wrapper not necessary here.  SHOOT WITH SPACEBAR!
import matplotlib.pyplot as plt

game_instance = paratroopergame.GameSystem('Paratrooper game', paratroopergame.GAME_MODE_NORMAL, 'human')
MULTIGAME.regGame(game_instance)

game_instance.reset()

count = 0

while(game_instance.isRunning()):
    retVal = game_instance.step()
    game_instance.render()

    count += 1

    if(count % 60 == 0):
        pass
        #plt.figure()
        #plt.imshow(retVal[0], cmap='gray', vmin=0, vmax=255)
        #plt.show()
    
game_instance.close()
MULTIGAME.unregGame(game_instance)



# 4. Game Model

In [None]:
class Paratrooper(Env): 
    def __init__(self, render_mode = 'human'):
        super().__init__()
        
        # Startup and instance of the game 
        self.game = paratroopergame.GameSystem('Paratrooper game', paratroopergame.GAME_MODE_EXT_ACTION, render_mode)

        # Specify action space and observation space
        self.render_mode = render_mode
        self.observation_space = Box(low=0, high=255, shape=(45, 80, 1), dtype=np.uint8)
        self.action_space = MultiBinary(3)
    
    def reset(self, seed = 0):        
        # Return the first frame 
        obs = [self.game.reset(seed)]

        info = {}

        info['none'] = 0

        self.LastElapsedTime = 0
        self.LastDestroyedParatroopers = 0
        self.LastMissedBullets = 0
        self.LastEscapedParatroopers = 0
        
        return obs, info
    
    def step(self, action): 
        # Take a step 
        obs, done, trimmed, info = self.game.step(action)

        # There is only one observation element
        obs = [obs]

        reward = [0,0]

        reward[0] = info['ElapsedTime'] - self.LastElapsedTime
        reward[0] += (info['DestroyedParatroopers'] - self.LastDestroyedParatroopers) * 10

        if(not info['LowestParatrooperExists']):
            targetVector = [0,-1]
        else:
            _lp = info['LowestParatrooperPosition']
            _cp = info['CannonPosition']
            _ccs = info['CannonAngleCosSin']
            targetVector = [_lp[0] - _cp[0], _lp[1] - _cp[0]]

            #If paratrooper is lower than cannon, its impossible to reach
            if(targetVector[1] > 0):
                targetVector[1] = 0

        _dotproduct = _ccs[0] * targetVector[0] + _ccs[1] * targetVector[1]
            
        reward[1] = _dotproduct


        self.LastElapsedTime = info['ElapsedTime']
        self.LastDestroyedParatroopers = info['DestroyedParatroopers']
        self.LastMissedBullets = info['MissedBullets']
        self.LastEscapedParatroopers = info['EscapedParatroopers']
        
        return obs, reward, done, trimmed, info
    
    def render(self, *args, **kwargs):
        self.game.render()
        
    def close(self):
        self.game.close()

# 5. Optuna

In [None]:
# Function to return test hyperparameters - define the object function
def optimize_ppo(trial): 
    return {
        'n_steps':trial.suggest_int('n_steps', 32, 8192),
        'gamma':trial.suggest_float('gamma', 0.8, 0.9999, log=True),
        'learning_rate':trial.suggest_float('learning_rate', 1e-5, 1e-3, log=True),
        'clip_range':trial.suggest_float('clip_range', 0.1, 0.4),
        'gae_lambda':trial.suggest_float('gae_lambda', 0.8, 0.99)
    }

def optimize_dqn(trial): 
    return {
        'buffer_size':trial.suggest_int('buffer_size',64,8192, log=True),
        'learning_starts':trial.suggest_int('learning_starts', 64, 10000),
        'learning_rate':trial.suggest_float('learning_rate', 1e-5, 1e-3, log=True),
        'tau':trial.suggest_float('tau',0.85,1.0, log=True),
        'gamma':trial.suggest_float('gamma', 0.8, 0.9999, log=True),
        'train_freq':trial.suggest_int('train_freq', 4,128, log=True),
        'gradient_steps':trial.suggest_int('gradient_steps', 1,8, log=True),
        'target_update_interval':trial.suggest_int('target_update_interval', 2000,15000, log=True),

        'exploration_fraction':trial.suggest_float('exploration_fraction', 0.1, 0.2),
        'exploration_final_eps':trial.suggest_float('exploration_final_eps', 0.1, 0.3),
        'max_grad_norm':trial.suggest_float('max_grad_norm', 5, 15)
    }

SAVE_PATH = os.path.join(OPT_DIR, 'trial_{}_best_model'.format(1))

In [None]:
# Run a training loop and return mean reward 
def optimize_agent(trial):
    try:
        global env
        model_params = optimize_dqn(trial) 

        # Create algo 
        model = DQN('MlpPolicy', env, tensorboard_log=LOG_DIR, verbose=0, **model_params)
        model.learn(total_timesteps=50000)
        #model.learn(total_timesteps=100000)

        # Evaluate model 
        mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=10)
        env.reset()

        SAVE_PATH = os.path.join(OPT_DIR, 'trial_{}_best_model'.format(trial.number))
        model.save(SAVE_PATH)

        return mean_reward

    except Exception as e:
        return -1000

In [None]:
# Create environment 
env = ParatrooperGym('ai')
env = Monitor(env, LOG_DIR)
# Creating the experiment 
study = optuna.create_study(direction='maximize')
study.optimize(optimize_agent, n_trials=40, n_jobs=1)
env.close()
#study.optimize(optimize_agent, n_trials=100, n_jobs=1)

In [None]:
study.best_params

In [None]:
study.best_trial

# 5. Callback Training

Don't worry if game window seems frozen, training is going on and can be checked on log folder during whole proccess. Don't close game window

In [None]:
# Define class for periodic stoarge of trained models
class TrainAndLoggingCallback(BaseCallback):
    def __init__(self, check_freq, save_path, algo_name, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.algo_name = algo_name
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}_{}'.format(self.algo_name,self.n_calls))
            self.model.save(model_path)

        # It is necessary to return True. Otherwise learn process would end
        return True

In [None]:
# Create environment. AI tells game not to render or process window events, which makes FPS higher
real_env = Paratrooper('ai')
real_env = Monitor(real_env, LOG_DIR)

# Algorithm list with deferred actions would be next:
# 0: DQN (shot)
# 1: PPO (aim)
observation_list = []
action_space_list = []

action_space_list.append(Discrete(2)) # DQN (DON'T SHOT, Action SHOT), DQN can only cope with Discrete Spaces
action_space_list.append(MultiBinary(2)) # PPO (Action Right, Left)
observation_list.append(real_env.observation_space) # DQN will observe element 0 of obs return
observation_list.append(real_env.observation_space) # PPO will observe element 0 of obs return (Same as DQN)

# Every algorithm can take its own periodicity to save model
callback_0 = TrainAndLoggingCallback(check_freq=20000, save_path=OPT_DIR, algo_name='DQN')
callback_1 = TrainAndLoggingCallback(check_freq=20000, save_path=OPT_DIR, algo_name='PPO')

# Create virtual environments
virtual_env_list = MultiSB3.createVirtualEnvironments(numAlgorithms=2, observationSpaceList=observation_list, actionSpaceList=action_space_list)

# Create algorithms, by association of its indexed virtual environment with them
alg_0 = DQN('CnnPolicy', virtual_env_list[0], tensorboard_log=LOG_DIR)
alg_1 = PPO('CnnPolicy', virtual_env_list[1], tensorboard_log=LOG_DIR)
#alg_0 = DQN.load(os.path.join(OPT_DIR, 'best_model_DQN_x.zip'), env=env)
#alg_1 = PPO.load(os.path.join(OPT_DIR, 'best_model_PPO_x.zip'), env=env)

# Dictionary for DQN will specify DQN model itself, and obs_index 0 to pick first element from real environment return
alg_collection_0 = {}
alg_collection_0['alg'] = alg_0
alg_collection_0['obs_index'] = 0
alg_collection_0['reward_index'] = 0
alg_collection_0['action_indexes'] = [0]
alg_collection_0['action_space'] = action_space_list[0]

# Dictionary for PPO will specify PPO model itself, and obs_index 0 to pick first element from real environment return
alg_collection_1 = {}
alg_collection_1['alg'] = alg_1
alg_collection_1['obs_index'] = 0
alg_collection_1['reward_index'] = 1
alg_collection_1['action_indexes'] = [1,2]
alg_collection_1['action_space'] = action_space_list[1]

alg_collection = [alg_collection_0, alg_collection_1]

# Create MultiAlgorithm, this one is in contact with real environment and deffers actions and observation to virtual environments
model = MultiSB3(real_env, alg_collection, virtual_env_list)

#model.learn(total_timesteps=1000000, callback=callback)
#env.close()

# 6. Test Model

In [None]:
# Human render tells game to render and process window event. Also render variable in evaluate_policy tells to call Env render() function periodically
env = ParatrooperGym('human')
env = Monitor(env, LOG_DIR)
model = DQN.load(os.path.join(OPT_DIR, 'best_model_200000.zip'))
mean_reward, _ = evaluate_policy(model, env, render=True, n_eval_episodes=15)
env.close()