In [None]:
import cannongame
# Import environment base class for a wrapper 
from gymnasium import Env 

# Import the space shapes for the environment
from gymnasium.spaces import Discrete, Box
# Import numpy to calculate frame delta 
import numpy as np

from stable_baselines3 import DQN

from stable_baselines3.common.monitor import Monitor

from stable_baselines3.common.evaluation import evaluate_policy

from stable_baselines3.common.callbacks import BaseCallback

import os

from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack, VecTransposeImage

In [None]:
LOG_DIR = './logs/'
OPT_DIR = './models/'

# Normal Gaming

In [None]:
import matplotlib.pyplot as plt
game_instance = cannongame.GameInstance('Canyon game', cannongame.GAME_MODE_NORMAL, 'ai')

game_instance.reset()

count = 0

while(game_instance.isRunning()):
    retVal = game_instance.step()
    #game_instance.render()

    count +=1

    if(count % 30 == 0):
        #pass
        plt.figure()
        plt.imshow(retVal[0], cmap='gray') 
        plt.show()  # display it
game_instance.close()




# Game Model

In [None]:
class CanyonGym(Env): 
    def __init__(self, render_mode = 'human'):
        super().__init__()
        # Specify action space and observation space 
        self.render_mode = render_mode
        self.observation_space = Box(low=0.0, high=1.0, shape=(10, 64,), dtype=np.float16)
        self.action_space = Discrete(2)


        # Startup and instance of the game 
        self.game = cannongame.GameInstance('Canyon game', cannongame.GAME_MODE_EXT_ACTION, render_mode)
    
    def reset(self, seed = 0):
        super().reset(seed=seed)
        
        # Return the first frame 
        obs = self.game.reset(seed)
        self.LastDA = 0
        self.LastMA = 0
        self.LastMB = 0

        info = {}

        info['DestroyedAircrafts'] = 0
        info['MissedAircrafts'] = 0
        info['MissedBullets'] = 0
        
        return obs, info
    
    def step(self, action): 
        # Take a step 
        obs, done, trimmed, info = self.game.step(action)
        
        
        # Reshape the reward function
        reward = (info['DestroyedAircrafts'] - self.LastDA)*(10)
        reward += (info['MissedAircrafts'] - self.LastMA)*(0)
        reward += (info['MissedBullets'] - self.LastMB)*(-9)

        self.LastDA = info['DestroyedAircrafts']
        self.LastMA = info['MissedAircrafts']
        self.LastMB = info['MissedBullets']
        
        return obs, reward, done, trimmed, info
    
    def render(self, *args, **kwargs):
        self.game.render()
        
    def close(self):
        self.game.close()

# Test Model

In [None]:
env = CanyonGym('human')
env = Monitor(env, LOG_DIR)
#env = DummyVecEnv([lambda: env])
#env = VecFrameStack(env, 4, channels_order='last')
model = DQN.load(os.path.join(OPT_DIR, 'best_model_1000000.zip'))
mean_reward, _ = evaluate_policy(model, env, render=True, n_eval_episodes=5)
env.close()

# Callback Training

In [None]:
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True


callback = TrainAndLoggingCallback(check_freq=20000, save_path=OPT_DIR)

In [None]:
# Create environment 
env = CanyonGym('ai')
env = Monitor(env, LOG_DIR)
#env = DummyVecEnv([lambda: env])
#env = VecFrameStack(env, 4, channels_order='last')

# Create algo 
model = DQN('MlpPolicy', env, tensorboard_log=LOG_DIR, learning_rate=0.0001, train_freq=128, batch_size=128)
model.learn(total_timesteps=1000000, callback=callback)
env.close()