In the previous python project we managed to get a working generals AI for humvee micro, however it's very unstable and some quirks could not be fixed with longer training OR a larger network. I did some research and found that DQNs and most efficient with a prioritized replay buffer AND target network for stability, we will try to implement this new functionality now.

In [1]:
import tensorflow as tf

import game

tf.compat.v1.disable_eager_execution()


pygame 2.1.0 (SDL 2.0.16, Python 3.10.11)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
from keras.models import load_model
import keyboard
import numpy as np  # Missing import for numpy
import pygame  # Assuming pygame is being used based on the 'pygame.quit()' line

# Assuming Game class is defined somewhere in your code
game = game.Game(render=True, render_interval=1, limit_fps=True)
model = load_model('humvee_largest_20.h5')

def exit():
    game._running = False

# Set up a hotkey for 'q' to exit the game
keyboard.add_hotkey('q', exit)

while game._running:
    state = np.array([game.get_state()])
    print(state)
    actions = model.predict(state)[0]
    action = np.argmax(actions)

    game.step(action)
    
    # Check if the game is done
    if game._done:
        game.reset()

pygame.quit()


OSError: No file or directory found at humvee_largest_20.h5

In [None]:
import numpy as np

class ReplayBuffer():
    def __init__(self, max_size, input_dims, alpha=0.6):
        self.mem_size = max_size
        self.mem_cntr = 0
        self.alpha = alpha  # Prioritization exponent

        self.state_memory = np.zeros((self.mem_size, *input_dims), dtype=np.float32)
        self.new_state_memory = np.zeros((self.mem_size, *input_dims), dtype=np.float32)
        self.action_memory = np.zeros(self.mem_size, dtype=np.int32)
        self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)
        self.terminal_memory = np.zeros(self.mem_size, dtype=np.int32)
        
        self.priority_memory = np.zeros(self.mem_size, dtype=np.float32)  # Priorities of each memory
        self.max_priority = 1.0  # Initial max priority to 1 to ensure all samples are selected once

    def store_transition(self, state, action, reward, state_, done):
        index = self.mem_cntr % self.mem_size
        
        self.state_memory[index] = state
        self.new_state_memory[index] = state_
        self.action_memory[index] = action
        self.reward_memory[index] = reward
        self.terminal_memory[index] = 1 - int(done)

        # Set the priority of the new experience to be the maximum priority
        self.priority_memory[index] = self.max_priority
        
        self.mem_cntr += 1
    
    def sample_buffer(self, batch_size, beta=0.4):
        max_mem = min(self.mem_cntr, self.mem_size)
        
        # Compute the sampling probabilities
        scaled_priorities = self.priority_memory[:max_mem] ** self.alpha
        sampling_probs = scaled_priorities / np.sum(scaled_priorities)

        # Select batch_size number of indices based on sampling probabilities
        batch_indexes = np.random.choice(max_mem, batch_size, replace=False, p=sampling_probs)

        states = self.state_memory[batch_indexes]
        states_ = self.new_state_memory[batch_indexes]
        rewards = self.reward_memory[batch_indexes]
        actions = self.action_memory[batch_indexes]
        terminal = self.terminal_memory[batch_indexes]
        
        # Compute importance-sampling weights
        importance = (max_mem * sampling_probs[batch_indexes]) ** (-beta)
        importance = importance / importance.max()  # Normalize importance weights

        return states, actions, rewards, states_, terminal, batch_indexes, importance

    def update_priorities(self, batch_indexes, td_errors):
        # Update the priorities based on the TD errors
        for index, td_error in zip(batch_indexes, td_errors):
            self.priority_memory[index] = max(abs(td_error), 1e-6)  # Avoid 0 priority

        self.max_priority = max(self.max_priority, max(abs(td_errors)))  # Update max priority


In [15]:
import matplotlib.pyplot as plt

def plotLearning(x, scores, epsilons, filename, lines=None):
    fig=plt.figure()
    ax=fig.add_subplot(111, label='1')
    ax2=fig.add_subplot(111, label="2", frame_on=False)

    ax.plot(x, epsilons, color='C0')
    ax.set_xlabel('Game', color='C0')
    ax.set_ylabel('Epsilon', color='C0')
    ax.tick_params(axis='x', colors='C0')
    ax.tick_params(axis='y', colors='C0')

    N = len(scores)
    running_avg = np.empty(N)
    for t in range(N):
        running_avg[t] = np.mean(scores[max(0, t-20):(t+1)])

    ax2.scatter(x, running_avg, color='C1')
    ax2.axes.get_xaxis().set_visible(False)
    ax2.yaxis.tick_right()
    ax2.set_ylabel('Score', color='C1')
    ax2.yaxis.set_label_position('right')
    ax2.tick_params(axis='y', colors='C1')

    if lines is not None:
        for line in lines:
            plt.axvline(x=line)

    plt.savefig(filename)

In [16]:
from tensorflow import keras

def create_dqn(lr, n_actions):
    model = keras.Sequential([
        keras.layers.Dense(500, input_shape=(15,), activation='relu'),
        keras.layers.Dense(500, activation='relu'),
        keras.layers.Dense(500, activation='relu'),
        keras.layers.Dense(500, activation='relu'),
        keras.layers.Dense(n_actions)
    ])
    model.compile(optimizer=keras.optimizers.Adam(learning_rate=lr),
                  loss='mse')
    
    return model

In [17]:
from keras.models import load_model

class HumveeAgent():
    def __init__(self, lr, gamma, n_actions, epsilon, batch_size,
                 input_dims, epsilon_dec=0.0001, epsilon_end=0.01,
                 mem_size=1000000, fname='sqn_model.h5', alpha=0.6, beta=0.4):
        self.action_space = [i for i in range(n_actions)]
        self.gamma = gamma
        self.epsilon = epsilon
        self.eps_min = epsilon_end
        self.eps_dec = epsilon_dec
        self.batch_size = batch_size
        self.model_file = fname
        self.alpha = alpha  # Prioritization exponent
        self.beta = beta    # Importance-sampling exponent
        self.memory = ReplayBuffer(mem_size, input_dims=input_dims, alpha=alpha)  # Use PER buffer
        self.q_eval = create_dqn(lr, n_actions)


    def store_transition(self, state, action, reward, new_state, done):
        self.memory.store_transition(state, action, reward, new_state, done)

    def choose_action(self, observation):
        if np.random.random() < self.epsilon:
            action = np.random.choice(self.action_space)
        else:
            state = np.array([observation])
            actions = self.q_eval.predict(state)

            action = np.argmax(actions)

        return action
        
    def learn(self):
        if self.memory.mem_cntr < self.batch_size:
            return

        # Sample a batch of experiences from the replay buffer
        states, actions, rewards, states_, dones, batch_indexes, importance = \
            self.memory.sample_buffer(self.batch_size, beta=self.beta)
        
        q_eval = self.q_eval.predict(states)
        q_next = self.q_eval.predict(states_)
        
        q_target = np.copy(q_eval)
        batch_index = np.arange(self.batch_size, dtype=np.int32)

        # Compute the target Q-value
        q_target[batch_index, actions] = rewards + \
            self.gamma * np.max(q_next, axis=1) * (1 - dones)
        
        # Calculate TD errors
        td_errors = q_target[batch_index, actions] - q_eval[batch_index, actions]

        # Apply importance sampling weights
        importance = importance ** (1 - self.beta)
        td_errors *= importance
        
        # Train the network with the weighted loss
        self.q_eval.train_on_batch(states, q_target, sample_weight=importance)

        # Update priorities in the replay buffer
        self.memory.update_priorities(batch_indexes, td_errors)

        # Epsilon decay
        self.epsilon = max(self.epsilon - self.eps_dec, self.eps_min)

        
    def save_model(self, append=''):
        self.q_eval.save(self.model_file + append + '.h5')

    def load_model(self, path):
        self.q_eval = load_model(path)

        

In [18]:
import tensorflow as tf

def train_model(model, prev_model=None):
    env = game.Game(render=True, render_interval=10)
    lr = 0.001
    n_games = 100
    CHECKPOINT_INTERVAL = None

    agent = HumveeAgent(gamma=0.99, epsilon=0.05, lr=lr,
                input_dims=env.get_state().shape,
                n_actions=env.n_actions, mem_size=1000000, batch_size=64,
                epsilon_end=0.05, fname=model)
    if (prev_model):
        agent.load_model(prev_model)
    
    scores = []
    eps_history = []

    for i in range(n_games):
        done = False
        score = 0
        observation, info = env.reset()
        while not done:
            action = agent.choose_action(observation)
            observation_, reward, done, truncated, info = env.step(action)
            score += reward
            agent.store_transition(observation, action, reward, observation_, done)
            observation = observation_
            agent.learn()
        eps_history.append(agent.epsilon)
        scores.append(score)

        avg_score = np.mean(scores[-100:])
        print('episode: ', i, 'score %.2f' % score,
              'avarage_score %.02f' % avg_score,
              'epsilon %.2f' % agent.epsilon)
        
        if CHECKPOINT_INTERVAL and i != 0 and i % CHECKPOINT_INTERVAL == 0:
            filename = model+'_checkpoint'+str(i/CHECKPOINT_INTERVAL)+'.png'
            x = [a+1 for a in range(i+1)]
            agent.save_model('_checkpoint_'+str(i/CHECKPOINT_INTERVAL))
            plotLearning(x, scores, eps_history, filename)
        
    filename = model+'.png'
    x = [i+1 for i in range(n_games)]
    plotLearning(x, scores, eps_history, filename)

    return agent

In [6]:
from tensorflow import keras

def create_humvee_dqn(lr, n_actions):
    model = keras.Sequential([
        keras.layers.Dense(500, activation='relu'),
        keras.layers.Dense(500, activation='relu'),
        keras.layers.Dense(500, activation='relu'),
        keras.layers.Dense(500, activation='relu'),
        keras.layers.Dense(n_actions)
    ])
    model.compile(optimizer=keras.optimizers.Adam(learning_rate=lr),
                  loss='mse')
    
    return model

In [7]:
# Define the replay buffer
class HumveeReplayBuffer():
    def __init__(self, max_size, input_dims):
        self.mem_size = max_size # How many states we keep track of
        self.mem_cntr = 0 # How many times added a new state

        # Initialize the state memory. The *input_dims expression pulls out the
        # dimention values ex.: (5,  *(1, 4, 3)) = (5, 1, 4, 3)
        self.state_memory = np.zeros((self.mem_size, *input_dims),
                                dtype=np.float32)

        self.new_state_memory = np.zeros((self.mem_size, *input_dims),
                                dtype=np.float32)
        
        self.action_memory = np.zeros(self.mem_size, dtype=np.int32) # The action taken on that state
        self.reward_memory = np.zeros(self.mem_size, dtype=np.float32) # The reward received in the state
        self.terminal_memory = np.zeros(self.mem_size, dtype=np.int32) # 0 if the game was still going, 1 if it ended

    def store_transition(self, state, action, reward, state_, done):
        index = self.mem_cntr % self.mem_size

        self.state_memory[index] = state
        self.action_memory[index] = action
        self.reward_memory[index] = reward
        self.new_state_memory[index] = state_
        self.terminal_memory[index] = 1 - int(done)
        self.mem_cntr += 1
    
    def sample_buffer(self, batch_size):
        max_mem = min(self.mem_cntr, self.mem_size)

        # Selects batch_size number of indexes from the memory
        batch_indexes = np.random.choice(max_mem, batch_size, replace=False)

        states = self.state_memory[batch_indexes]
        states_ = self.new_state_memory[batch_indexes]
        rewards = self.reward_memory[batch_indexes]
        actions = self.action_memory[batch_indexes]
        terminal = self.terminal_memory[batch_indexes]

        return states, actions, rewards, states_, terminal

In [8]:
from keras.models import load_model

class HumveeAgent():
    def __init__(self, lr, gamma, n_actions, epsilon, batch_size,
                 input_dims, epsilon_dec=0.0001, epsilon_end=0.01,
                 mem_size=1000000, fname='sqn_model.h5'):
        self.action_space = [i for i in range(n_actions)]
        self.gamma = gamma
        self.epsilon = epsilon
        self.eps_min = epsilon_end
        self.eps_dec = epsilon_dec
        self.batch_size = batch_size
        self.model_file = fname
        self.memory = HumveeReplayBuffer(mem_size, input_dims=input_dims)
        self.q_eval = create_humvee_dqn(lr, n_actions)

    def store_transition(self, state, action, reward, new_state, done):
        self.memory.store_transition(state, action, reward, new_state, done)

    def choose_action(self, observation):
        if np.random.random() < self.epsilon:
            action = np.random.choice(self.action_space)
        else:
            state = np.array([observation])
            actions = self.q_eval.predict(state)

            action = np.argmax(actions)

        return action
        
    def learn(self):
        if self.memory.mem_cntr < self.batch_size:
            return
        
        states, actions, rewards, states_, dones = \
                self.memory.sample_buffer(self.batch_size)
        
        q_eval = self.q_eval.predict(states)
        q_next = self.q_eval.predict(states_)

        q_target = np.copy(q_eval)
        batch_index = np.arange(self.batch_size, dtype=np.int32)

        q_target[batch_index, actions] = rewards + \
            self.gamma * np.max(q_next, axis=1) * (1 - dones)

        
        self.q_eval.train_on_batch(states, q_target)

        # Epsilon decay
        self.epsilon = self.epsilon - self.eps_dec if self.epsilon > \
                        self.eps_min else self.eps_min
        
    def save_model(self, append=''):
        self.q_eval.save(self.model_file + append + '.h5')

    def load_model(self, path):
        self.q_eval = load_model(path)

        

In [9]:
import tensorflow as tf

def train_model(model, prev_model=None):
    env = game.Game(render=True, render_interval=10)
    lr = 0.001
    n_games = 10000
    CHECKPOINT_INTERVAL = 1000

    agent = HumveeAgent(gamma=0.99, epsilon=0.05, lr=lr,
                input_dims=env.get_state().shape,
                n_actions=env.n_actions, mem_size=1000000, batch_size=64,
                epsilon_end=0.05, fname=model)
    if (prev_model):
        agent.load_model(prev_model)
    
    scores = []
    eps_history = []

    for i in range(n_games):
        done = False
        score = 0
        observation, info = env.reset()
        while not done:
            action = agent.choose_action(observation)
            observation_, reward, done, truncated, info = env.step(action)
            score += reward
            agent.store_transition(observation, action, reward, observation_, done)
            observation = observation_
            agent.learn()
        eps_history.append(agent.epsilon)
        scores.append(score)

        avg_score = np.mean(scores[-100:])
        print('episode: ', i, 'score %.2f' % score,
              'avarage_score %.02f' % avg_score,
              'epsilon %.2f' % agent.epsilon)
        
        if i != 0 and i % CHECKPOINT_INTERVAL == 0:
            filename = model+'_checkpoint'+str(i/CHECKPOINT_INTERVAL)+'.png'
            x = [a+1 for a in range(i+1)]
            agent.save_model('_checkpoint_'+str(i/CHECKPOINT_INTERVAL))
            plotLearning(x, scores, eps_history, filename)
        
    filename = model+'.png'
    x = [i+1 for i in range(n_games)]
    plotLearning(x, scores, eps_history, filename)

    return agent

In [10]:
agent = train_model('test.h5')
agent.save_model()

episode:  0 score -275402.01 avarage_score -275402.01 epsilon 0.05
episode:  1 score -283236.01 avarage_score -279319.01 epsilon 0.05
episode:  2 score -323258.16 avarage_score -293965.39 epsilon 0.05
episode:  3 score -291664.73 avarage_score -293390.23 epsilon 0.05
episode:  4 score -232144.74 avarage_score -281141.13 epsilon 0.05
episode:  5 score -253864.92 avarage_score -276595.09 epsilon 0.05
episode:  6 score -290012.27 avarage_score -278511.83 epsilon 0.05
episode:  7 score -226111.37 avarage_score -271961.78 epsilon 0.05
episode:  8 score -265033.80 avarage_score -271192.00 epsilon 0.05
episode:  9 score -308510.17 avarage_score -274923.82 epsilon 0.05
episode:  10 score -250462.54 avarage_score -272700.06 epsilon 0.05
episode:  11 score -314358.51 avarage_score -276171.60 epsilon 0.05
episode:  12 score -281897.62 avarage_score -276612.06 epsilon 0.05


: 