In [1]:
# !python -m pip install "gymnasium[atari]"
# !python -m pip install "gymnasium[accept-rom-license, atari]"
# !pip install shimmy

In [2]:
import matplotlib.pyplot as plt
import gymnasium as gym
import seaborn as sns
import numpy as np

import ale_py
import shimmy
import joblib
import os

from gym import wrappers
from skimage.measure import block_reduce

| **Value** | **Meaning** |
|:---------:|:-----------:|
| 0 | NOOP |
| 1 | FIRE |
| 2 | RIGHT |
| 3 | LEFT |
| 4 | RIGHTFIRE |
| 5 | LEFTFIRE |

# General Functions

In [3]:
def show_obs(obs):
    """ 
    Simple display of image observation 
    
    Args:
    `obs` : np.ndarray
    - Observation from the environment
    """
    plt.figure(figsize=(16,10))
    plt.imshow(obs)
    plt.show()
    return

# Policy Functions

In [4]:
def discount_rewards(rewards):
    """ 
    Take 1D array of rewards and compute discounted version
    Most recent action has the greatest weight 
    
    Args:
    `rewards` : np.ndarray
    - Observed rewards over time
    - ndim : 1
    """
    discounted_rewards = np.zeros_like(rewards)
    running_add = 0
    for t in reversed(range(0, rewards.size)):
        running_add = running_add * gamma + rewards[t]
        discounted_rewards[t] = running_add
    return discounted_rewards

# Run Constants

In [5]:
batch_size = 64      # used to perform a RMS prop param update every batch_size steps
learning_rate = 1e-2 # learning rate used in RMS prop
gamma = 0.99         # discount factor for reward
decay_rate = 0.99    # decay factor for RMSProp leaky sum of grad^2

resume = False # resume training from previous checkpoint (from save.p  file)?
render = False # render video output?
print_ = False # print each observation
show = False
zero_grad = True
corner_correct = True

timer_i = 1000 # number of iterations without reward before noise is intentionally greater than signal

record_probs = True
record_rewards = True
record_eps_iters = True
save_path = 'model.joblib'

# Model Instantiation

In [6]:
OBS_SHAPE = (210, 160)
XMIN = 26
XMAX = 196
YMIN = 14
YMAX = 144
SHAPE = (XMAX - XMIN, YMAX - YMIN)
downsample = 'horizontal'
if downsample == 'max_pool':
    dim = np.prod(SHAPE) // 4
elif downsample == 'horizontal':
    dim = np.prod(SHAPE) // 2
else:
    dim = np.prod(SHAPE)
    
action_dict = {
    0 : 'NOOP',
    1 : 'FIRE',
    2 : 'RIGHT',
    3 : 'LEFT',
    4 : 'RIGHTFIRE',
    5 : 'LEFTFIRE'
}
ACTIONS = [0,1,2,3,4,5]
N_CLASSES = len(ACTIONS)

print('Input Shape:', SHAPE)
print('Input Dimensionality:', dim)

def preprocess(obs, downsample='horizontal', xmin=26, xmax=196, ymin=10, ymax=144):
    assert obs.shape == (210, 160)
    if downsample not in ['horizontal','max_pool']:
        raise ValueError(f'Invalid downsample operation {downsample}')
    I = obs[xmin:xmax,ymin:ymax]
    if downsample == 'horizontal':
        I = I[::2,:]
    I[I == 144] = 0 # erase background 1
    I[I == 109] = 0 # erase background 2
    I[I != 0] = 1 # everything else is kept
    if downsample == 'max_pool':
        # ideally downsampling would be done before changing values in place, but this way the background is ignored easily
        I = block_reduce(I, (2, 2), np.amax)
    return I.astype(np.float32).ravel()

Input Shape: (170, 130)
Input Dimensionality: 11050


# Layers

In [7]:
def standardize(arr):
    """ Standardizes input array `arr` to unit variance and mean of 0 """
    arr -= np.mean(arr)
    arr /= np.std(arr)
    return arr

def lrelu(x, alpha=.03):
    """
    Leaky ReLU activation function
    
    Args:
    `x` : np.ndarray
    - ndim doesn't matter
    - Operated on in place
    
    `alpha` : float
    - Attenuation coefficient for negative values
    - Value of 0 makes this equivalent to regular ReLU
    """
    x[x < 0] *= alpha
    return
    
def dropout(x, frac=.2):
    """
    Dropout to limit overfitting
    Selects weights from the 0th dimension of the passed array to be reset to 0
    
    Args:
    `x` : np.ndarray
    - Only 0th dimension is operated on
    
    `frac` : float
    - Must be less than 1, greater than 0
    - Determines number of indices to reset
    - Higher number means more dropout, more training, less overfitting
    """
    if frac:
        drop_indices = np.random.choice(x.shape[0], size=int(x.shape[0]*frac), replace=False)
        x[drop_indices] = 0
    return

def softmax(x):
    """
    Performs softmax on hidden logits
    All output values scaled between 0 and 1, sum to 1
    Vector adjustment of sigmoid
    
    Args:
    `x` : np.ndarray
    - ndim : 1
    - Output will be same shape, but normalized
    
    Returns:
    `x_softmax` : np.ndarray
    - Same shape as `x` but scaled as probabilities
    """
    x_exp = np.exp(x - np.max(x))
    x_sum = np.sum(x_exp)
    x_softmax = np.divide(x_exp, x_sum) #normalize
    assert round(np.sum(x_softmax), 5) == 1, x_softmax
    return x_softmax

## Three Layer Network
- Single hidden layer + input/output layers
- Leaky Relu

In [8]:
def triple_layer(n_neurons=64, n_classes=6, dim=6000):
    middle_n = n_neurons // 2
    model = {
        'Layer1' : np.random.randn(n_neurons, middle_n) / np.sqrt(middle_n),
        'Layer2' : np.random.randn(middle_n, dim) / np.sqrt(dim),
        'Layer3' : np.random.standard_normal((n_classes, n_neurons)) / np.sqrt(n_neurons),
    }
    return model
   
def policy_triple_forward(x, alpha=.03, frac=.2):
    x = np.dot(model['Layer1'], x)
    lrelu(x, alpha=alpha)
    dropout(x, frac=frac)

    alpha /= 2
    frac /= 2
    
    x = np.dot(model['Layer2'], x)
    lrelu(x, alpha=alpha)
    dropout(x, frac=frac)
    
    logp = np.dot(model['Layer3'], x)
    p = softmax(logp)
    return p, x

def policy_triple_backward(eph, epx, epdlogp):
    dl3 = np.dot(eph.transpose(), epdlogp).ravel()
    dx = np.outer(epdlogp, model['Layer3'])
    dx[eph <= 0] = 0
    
    dl2 = np.outer(dx, model['Layer2'])
    dh[eph <= 0] = 0
    
    dW1 = np.dot(dh.transpose(), epx)
    model = {
        'Layer1' : dl1, 
        'Layer2' : dl2, 
        'Layer3' : dl3,
    }
    return model

## Two Layer Network
- Input and output layers
- relu

In [9]:
def network(hidden_size=128, n_classes=6):
    model = {
        'W1' : np.random.randn(hidden_size, dim) / np.sqrt(dim),
        'W2' : np.random.standard_normal((n_classes, hidden_size)) / np.sqrt(hidden_size),
    }
    return model

def policy_forward(x, alpha=.03):
    h = np.dot(model['W1'], x)
    lrelu(h, alpha=alpha)
    logp = np.dot(model['W2'], h)
    probs = softmax(logp)
    return probs, h

def policy_backward(eph, epx, epdlogp):
    dW2 = np.dot(eph.transpose(), epdlogp).transpose()
    dh = np.dot(epdlogp, model['W2'])
    
    dh[eph < 0] = 0
    dW1 = np.dot(dh.transpose(), epx)
    model = {
        'W1' : dW1,
        'W2' : dW2,
    }
    return model

In [10]:
if resume and os.path.exists(save_path):
    model = joblib.load(save_path)
else:
    model = network()

grad_buffer = { k : np.zeros_like(v) for k,v in model.items() }
rmsprop_cache = { k : np.zeros_like(v) for k,v in model.items() }

In [11]:
prev_x = None
xs, hs, dlogps, drs = list(), list(), list(), list()
running_reward = None

reward_sum = 0
adj_reward_sum = 0

In [12]:
def add_noise(probs, i, i_since_r, timer_i, buffer=None, print_=False):
    if buffer is None:
        buffer = timer_i // 2
    n = len(probs)
    sigma = 2 / n
    noise = np.random.normal(0, sigma, size=n)
    noise = noise - np.mean(noise)
    
    scale = i_since_r / (timer_i - buffer)
    noise = noise * scale
    assert not round(np.mean(noise), 3), noise
    if print_ and not i % 100:
        print(probs)
        print(noise)
    new_probs = probs + noise
    pmin = np.amin(new_probs)
    if pmin < 0:
        new_probs -= pmin
        new_probs /= np.sum(new_probs)
    return new_probs

def balance_lr(probs, i_since_r, timer_i, buffer=None):
#    ACTIONS = [NOOP,1,2,3,4,5]
    if i_since_r < timer_i // 4:
        pass
    elif i_since_r < timer_i // 2:
        equal_n = (probs[2] + probs[3]) / 2
        equal_y = (probs[4] + probs[5]) / 2
        probs[2] = equal_n
        probs[3] = equal_n
        probs[4] = equal_y
        probs[5] = equal_y
    elif i_since_r < 3 * timer_i // 4:
        probs[2], probs[3] = probs[3], probs[2]
        probs[4], probs[5] = probs[5], probs[4]
    return probs
    
def modify_reward(action, reward, info, prev_lives):
    if info['lives'] < prev_lives:
        reward -= 15
    if reward <= 0 and action in [1,4,5]:
        reward -= 1
    return reward

In [None]:
n_episodes = 2000
last_i = 0
env = gym.make(
    'ALE/DemonAttack-v5', # alternate games can be chosen here 
    obs_type='grayscale', # saves RGB preprocessing reduction
    render_mode='human' if render else None, # rendering shows popup but limits training speed
)
        
if record_rewards:
    reward_list = list()
if record_probs:
    prob_list = list()
if record_eps_iters:
    eps_iters_list = list()

obs, info = env.reset()

episode_number = 0
prev_lives = 0
i = 0

while episode_number <= n_episodes:
    curr_x = preprocess(obs, downsample=downsample, xmin=XMIN, xmax=XMAX, ymin=YMIN, ymax=YMAX)
    x = curr_x - prev_x if prev_x is not None else np.zeros(dim) # only monitor change between frames
    prev_x = curr_x

    # forward the policy network and sample an action from the returned probability
    probs, h = policy_forward(x)
    
    i_since_r = i - last_i
    if i_since_r > timer_i:
        terminated = True
        truncated = False
        print('Timer causing reset               ')
        last_i = i

    xs.append(x) # observation
    hs.append(h) # hidden state

    if corner_correct: # heavily biases agent from getting 'stuck' in corner
        probs = add_noise(probs, i, i_since_r, timer_i)
        probs = balance_lr(probs, i_since_r, timer_i)

    try:
        action = np.random.choice(ACTIONS, p=probs) # RANDOMLY choose one with probability weight based on forward pass expectations
    except Exception as e:
        print(probs)
        raise(e)
        
    if record_probs:
        prob_list.append(probs)

    y = np.zeros_like(probs) # create zeroed probability array
    y[ACTIONS.index(action)] = 1 # assign all probability to single element (chosen action)
    dlogps.append(y - probs) 
    
    #####################################################
    prev_lives = info['lives'] # lives not available through general step return
    obs, reward, terminated, truncated, info = env.step(action) # step returns all other relevant information 
    if reward > 0: # reset the iterations since last reward if reward is accrued
        last_i = i

    reward_sum += reward # total round reward incremented
    adj_reward = modify_reward(action, reward, info, prev_lives) # adjusted reward may better lead agent toward short term optimums
    adj_reward_sum += adj_reward
    drs.append(adj_reward) # record reward
    ######################################################

    if terminated: # an episode finished
        episode_number += 1
#         print(f'Episode: {episode_number}              ')

        if record_rewards:
            reward_list.append(reward_sum)
        if record_eps_iters:
            eps_iters_list.append(i)
        
        # stack together all inputs, hidden states, action gradients, and rewards for this episode
        if not zero_grad:
            epx = np.vstack(xs)
            eph = np.vstack(hs)
            epdlogp = np.vstack(dlogps)
            epr = np.vstack(drs)
            for lis in [xs, hs, dlogps, drs]:
                lis.clear()
            
            discounted_epr = discount_rewards(epr)
            # standardize the rewards to be unit normal (helps control the gradient estimator variance)
            
            discounted_epr = standardize(discounted_epr)
            epdlogp *= discounted_epr # modulate the gradient with advantage (Policy Grad magic happens right here.)

            grad = policy_backward(eph, epx, epdlogp)
            for layer in model.keys():
                grad_buffer[layer] += grad[layer] # accumulate grad over batch

            # perform rmsprop parameter update every batch_size episodes
            if episode_number % batch_size == 0:
                for layer, weights in model.items():
                    g = grad_buffer[layer] # gradient
                    rmsprop_cache[layer] = decay_rate * rmsprop_cache[layer] + (1 - decay_rate) * g**2
                    model[layer] += learning_rate * g / (np.sqrt(rmsprop_cache[layer]) + 1e-5)
                    grad_buffer[layer] = np.zeros_like(weights) # reset batch gradient buffer
                print('Backward Policy Applied                ')
            running_reward = adj_reward_sum if running_reward is None else running_reward * 0.99 + adj_reward_sum * 0.01
        
        reward_sum = 0 # reset all totals
        adj_reward_sum = 0
        
        obs, info = env.reset() # reset env
        prev_x = None
    elif truncated: # an episode terminated unexpectedly, shouldn't maintain results
        for lis in [xs, hs, dlogps, drs]: # empty tracking
            lis.clear()
        
        reward_sum = 0
        adj_reward_sum = 0
        
        obs, info = env.reset()
        prev_x = None
        
    if not i % 100:
#         print(f'{round((i/iters)*100, 3)}% complete                ', end='\r')
        print(f'Episode {episode_number} of {n_episodes} episodes                ', end='\r')
        joblib.dump(model, save_path)
    i += 1

env.close()

  logger.warn(


Episode 37 of 2000 episodes                

In [None]:
def plot_probs(prob_list, eps_iters_list, batch_size, step=1):
    probs_arr = np.vstack(prob_list)
#     plt.figure(figsize=(16,5))
    fig, axs = plt.subplots(nrows=3, ncols=2, figsize=(16,12), dpi=200, sharex=True, sharey=True)
    fig.suptitle('Single Episode Action Probabilities')
    colors = sns.color_palette('Spectral', 7)
    colors = colors[:3] + colors[4:]
    for i, (ax, color) in enumerate(zip(axs.flatten(), colors)):
        sns.lineplot(data=probs_arr[::step,i], color=color, label=action_dict[i], alpha=.7, dashes=False, ax=ax)
#     plt.xticks(np.asarray(plt.xticks(), dtype=np.int32) * step)
#     for i, eps_iters in enumerate(eps_iters_list):
#         plt.axvline(eps_iters, 0, 1, color='red' if not i % batch_size else 'pink')
    plt.xlabel('Iterations')
    plt.ylabel('Probability')
    plt.tight_layout()
    plt.show()
    return

if record_probs:
    plot_probs(prob_list, eps_iters_list, batch_size)

In [None]:
def moving_average(a, window_size) :
    ret = np.cumsum(a, dtype=float)
    ret[window_size:] = ret[window_size:] - ret[:-window_size]
    return ret[window_size - 1:] / window_size

def plot_rewards(reward_list, window_size=10):
    plt.figure(figsize=(16,5))
    plt.title('Rewards Over Time')
    plt.ylabel('Total Reward')
    plt.xlabel('Episode Number')
    x = np.arange(0, len(reward_list), 1)
    assert len(x) == len(reward_list)
    plt.plot(x, reward_list, color='black', linestyle='dashed', label='Reward Per Episode')
    plt.plot(x[window_size-1:], moving_average(reward_list, window_size), color='red', label='Reward Moving Average')
    plt.legend()
    plt.show()
    return

print(reward_list)
if record_rewards:
    plot_rewards(reward_list, window_size=200)