We will build a Q learning agent to learn how to play the move to beacon mini game.

In [1]:
import math
import numpy as np
from pysc2.agents import base_agent
from pysc2.lib import actions
from pysc2.lib import features
from pysc2.env import sc2_env, run_loop, available_actions_printer
from pysc2 import maps
from absl import flags

_AI_RELATIVE = features.SCREEN_FEATURES.player_relative.index
_AI_SELECTED = features.SCREEN_FEATURES.selected.index
_NO_OP = actions.FUNCTIONS.no_op.id
_MOVE_SCREEN = actions.FUNCTIONS.Attack_screen.id
_SELECT_ARMY = actions.FUNCTIONS.select_army.id
_SELECT_POINT = actions.FUNCTIONS.select_point.id
_MOVE_RAND = 1000
_MOVE_MIDDLE = 2000
_BACKGROUND = 0
_AI_SELF = 1
_AI_ALLIES = 2
_AI_NEUTRAL = 3
_AI_HOSTILE = 4
_SELECT_ALL = [0]
_NOT_QUEUED = [0]
EPS_START = 0.9
EPS_END = 0.025
EPS_DECAY = 2500

In [2]:
# define our actions
# it can choose to move to
# the beacon or to do nothing
# it can select the marine or deselect
# the marine, it can move to a random point
possible_actions = [
    _NO_OP,
    _SELECT_ARMY,
    _SELECT_POINT,
    _MOVE_SCREEN,
    _MOVE_RAND,
    _MOVE_MIDDLE
]
possible_actions

[0, 7, 2, 12, 1000, 2000]

We're giving our agent the ability to do 6 things. 

`_NO_OP` - do nothing

`_SELECT_ARMY` - select the marine

`__SELECT_POINT` - deselect the marine

`_MOVE_SCREEN` - move to the beacon

`_MOVERAND` - move to a random point that is not the beacon

`_MOVE_MIDDLE` - move to a point that is in the middle of the map

For our Q learning table we're not going to teach our agent to recognize the beacon itself as this is a bit more complex. For now we just want it to realize that there are 6 things it can do in this world and that there is a sequence of some f those actions which produces a positive feedback / reward.

Let's examine what our agent can see about the world.

In [3]:
def get_eps_threshold(steps_done):
    return EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)

# define the state
def get_state(obs):
    # get the positions of the marine and the beacon
    ai_view = obs.observation['screen'][_AI_RELATIVE]
    beaconxs, beaconys = (ai_view == _AI_NEUTRAL).nonzero()
    marinexs, marineys = (ai_view == _AI_SELF).nonzero()
    marinex, mariney = marinexs.mean(), marineys.mean()
        
    marine_on_beacon = np.min(beaconxs) <= marinex <=  np.max(beaconxs) and np.min(beaconys) <= mariney <=  np.max(beaconys)
        
    # get a 1 or 0 for whether or not our marine is selected
    ai_selected = obs.observation['screen'][_AI_SELECTED]
    marine_selected = int((ai_selected == 1).any())
    
    return (marine_selected, int(marine_on_beacon)), [beaconxs, beaconys]

Our Agent can see the world like this:

`(1 | 0, 1 | 0)`

In other words our agent knows if we selected the marine and if the marine is current on the beacon. There are only 4 possible states:

`(0, 0)` - marine not selected, marine not on the beacon.

`(1, 0)` - marine selected but not on the beacon.

`(1, 1)` - marine selected and is also on the becaon.

`(0, 1)` - marine not selected but is on the beacon.

In [4]:
class QTable(object):
    def __init__(self, actions, lr=0.01, reward_decay=0.9, load_qt=None, load_st=None):
        self.lr = lr
        self.actions = actions
        self.reward_decay = reward_decay
        self.states_list = set()
        self.load_qt = load_qt
        if load_st:
            temp = self.load_states(load_st)
            self.states_list = set([tuple(temp[i]) for i in range(len(temp))])
        
        if load_qt:
            self.q_table = self.load_qtable(load_qt)
        else:
            self.q_table = np.zeros((0, len(possible_actions))) # create a Q table
        
    def get_action(self, state):
        if not self.load_qt and np.random.rand() < get_eps_threshold(steps):
            return np.random.randint(0, len(self.actions))
        else:
            if state not in self.states_list:
                self.add_state(state)
            idx = list(self.states_list).index(state)
            q_values = self.q_table[idx]
            return int(np.argmax(q_values))
    
    def add_state(self, state):
        self.q_table = np.vstack([self.q_table, np.zeros((1, len(possible_actions)))])
        self.states_list.add(state)
    
    def update_qtable(self, state, next_state, action, reward):
        if state not in self.states_list:
            self.add_state(state)
        if next_state not in self.states_list:
            self.add_state(next_state)
        # how much reward 
        state_idx = list(self.states_list).index(state)
        next_state_idx = list(self.states_list).index(next_state)
        # calculate q labels
        q_state = self.q_table[state_idx, action]
        q_next_state = self.q_table[next_state_idx].max()
        q_targets = reward + (self.reward_decay * q_next_state)
        # calculate our loss 
        loss = q_targets - q_state
        # update the q value for this state/action pair
        self.q_table[state_idx, action] += self.lr * loss
        return loss
    
    def get_size(self):
        print(self.q_table.shape)
        
    def save_qtable(self, filepath):
        np.save(filepath, self.q_table)
        
    def load_qtable(self, filepath):
        return np.load(filepath)
        
    def save_states(self, filepath):
        temp = np.array(list(self.states_list))
        np.save(filepath, temp)
        
    def load_states(self, filepath):
        return np.load(filepath)
    
class Agent3(base_agent.BaseAgent):
    def __init__(self, load_qt=None, load_st=None):
        super(Agent3, self).__init__()
        self.qtable = QTable(possible_actions, load_qt=load_qt, load_st=load_st)
        
    def step(self, obs):
        '''Step function gets called automatically by pysc2 environment'''
        super(Agent3, self).step(obs)
        state, beacon_pos = get_state(obs)
        action = self.qtable.get_action(state)
        func = actions.FunctionCall(_NO_OP, [])
        
        if possible_actions[action] == _NO_OP:
            func = actions.FunctionCall(_NO_OP, [])
        elif state[0] and possible_actions[action] == _MOVE_SCREEN:
            beacon_x, beacon_y = beacon_pos[0].mean(), beacon_pos[1].mean()
            func = actions.FunctionCall(_MOVE_SCREEN, [_NOT_QUEUED, [beacon_y, beacon_x]])
        elif possible_actions[action] == _SELECT_ARMY:
            func = actions.FunctionCall(_SELECT_ARMY, [_SELECT_ALL])
        elif state[0] and possible_actions[action] == _SELECT_POINT:
            ai_view = obs.observation['screen'][_AI_RELATIVE]
            backgroundxs, backgroundys = (ai_view == _BACKGROUND).nonzero()
            point = np.random.randint(0, len(backgroundxs))
            backgroundx, backgroundy = backgroundxs[point], backgroundys[point]
            func = actions.FunctionCall(_SELECT_POINT, [_NOT_QUEUED, [backgroundy, backgroundx]])
        elif state[0] and possible_actions[action] == _MOVE_RAND:
            # move somewhere that is not the beacon
            beacon_x, beacon_y = beacon_pos[0].max(), beacon_pos[1].max()
            movex, movey = np.random.randint(beacon_x, 64), np.random.randint(beacon_y, 64)
            func = actions.FunctionCall(_MOVE_SCREEN, [_NOT_QUEUED, [movey, movex]])
        elif state[0] and possible_actions[action] == _MOVE_MIDDLE:
            func = actions.FunctionCall(_MOVE_SCREEN, [_NOT_QUEUED, [32, 32]])
        return state, action, func

In [5]:
FLAGS = flags.FLAGS
FLAGS(['run_sc2'])

viz = False
save_replay = False
steps_per_episode = 0 # 0 actually means unlimited
MAX_EPISODES =35
MAX_STEPS = 400
steps = 0

# create a map
beacon_map = maps.get('MoveToBeacon')

# create an envirnoment
with sc2_env.SC2Env(agent_race=None,
                    bot_race=None,
                    difficulty=None,
                    map_name=beacon_map,
                    visualize=viz) as env:
    agent = Agent3()
    for i in range(MAX_EPISODES):
        print('Starting episode {}'.format(i))
        ep_reward = 0
        obs = env.reset()
        for j in range(MAX_STEPS):
            steps += 1
            state, action, func = agent.step(obs[0])
            obs = env.step(actions=[func])
            next_state, _ = get_state(obs[0])
            reward = obs[0].reward
            ep_reward += reward
            loss = agent.qtable.update_qtable(state, next_state, action, reward)
        print('Episode Reward: {}, Explore threshold: {}, Q loss: {}'.format(ep_reward, get_eps_threshold(steps), loss))
    if save_replay:
        env.save_replay(Agent3.__name__)

Starting episode 0


  # Remove the CWD from sys.path while we load stuff.
  ret = ret.dtype.type(ret / rcount)


Episode Reward: 1, Explore threshold: 0.770625815345435, Q loss: 6.112720181083372e-05
Starting episode 1
Episode Reward: 0, Explore threshold: 0.6603804074394796, Q loss: -1.5116670143063867e-05
Starting episode 2
Episode Reward: 1, Explore threshold: 0.5664354678303732, Q loss: -2.1292038214325244e-05
Starting episode 3
Episode Reward: 1, Explore threshold: 0.4863808710376675, Q loss: 0.007672810393025457
Starting episode 4
Episode Reward: 1, Explore threshold: 0.4181628436025689, Q loss: -0.0007654517143376926
Starting episode 5
Episode Reward: 2, Explore threshold: 0.3600312752282231, Q loss: -0.0006237272210723654
Starting episode 6
Episode Reward: 0, Explore threshold: 0.31049482029515957, Q loss: -0.0004806166707503664
Starting episode 7
Episode Reward: 8, Explore threshold: 0.2682826378965449, Q loss: -0.004313301649580499
Starting episode 8
Episode Reward: 40, Explore threshold: 0.23231178884685655, Q loss: -0.014309225465330505
Starting episode 9
Episode Reward: 26, Explore t

In [6]:
agent.reward/MAX_EPISODES

21.257142857142856

In [7]:
agent.qtable.q_table.shape

(3, 6)

The agent learns (slowly at first) and if youwatch it play it kind of meanders around and does a lot of `_NO_OP` (nothing) operations. Somewhere around episode 10 (sometimes its faster or slower to converge anywhere from episode 2 to episode 30) though our Agent learns that going to the beacon is good and starts to move towards it consistently.

You may have noticed our Q learning agent actually outperforms an agent that is told to simply move to the beacon. How is that possible? Let's examine the Q Table.

In [12]:
print('(marine_sel, marine_beac)', '[do nothing, select marine, deselect marine, move beacon, move random, move middle] ')
for state in agent.qtable.states_list:
    print(state, agent.qtable.q_table[list(agent.qtable.states_list).index(state)])

(marine_sel, marine_beac) [do nothing, select marine, deselect marine, move beacon, move random, move middle] 
(1, 0) [ 0.25143407  0.28523599  0.28281235  0.63223914  0.21524341  0.26161219]
(0, 0) [ 0.093577    0.50350932  0.09130667  0.18796636  0.11642416  0.10940728]
(1, 1) [ 1.39015192  0.13389986  0.11158669  0.06626444  0.04365226  0.01829639]


So when it has the marine selected but its not at the beacon, state=`(1, 0)`, our agent learns that moving to the beacon has the highest value (action at index 3).

When it doesnt have the marine selected and its not at the beacon, state=`(0,0)`, our agent learns to select the marine has the highest value (action at index 1).

When it is one the beacon and it has the marine selected, state=`(1,1)`, reselecting the marine is valuable. I've also had it learn that deselection is good here. Sometimes that's going to happen. Our reward function doesn't seem to be affected much by intermittent deselection.

In [9]:
# save qtable
agent.qtable.save_qtable('agent3_qtable.npy')
agent.qtable.save_states('agent3_states.npy')


It would be nice to also teach our agent to move to the beacon AND recognize it, let's examine this next. 

Check this tutorial for a more advanced tabular Q learning agent: https://chatbotslife.com/building-a-smart-pysc2-agent-cdc269cb095d

In [10]:
np.load('agent3_states.npy')

array([[1, 0],
       [0, 0],
       [1, 1]])