# Making RL PySC2 Agent

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
# unfortunately, PySC2 uses Abseil, which treats python code as if its run like an app
# This does not play well with jupyter notebook
# So we will need to monkeypatch sys.argv
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import sys
#sys.argv = ["python", "--map", "AbyssalReef"]
sys.argv = ["python", "--map", "Simple64"]

# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Run an agent."""



import importlib
import threading

from absl import app
from absl import flags
from future.builtins import range  # pylint: disable=redefined-builtin

from pysc2 import maps
from pysc2.env import available_actions_printer
from pysc2.env import run_loop
from pysc2.env import sc2_env
from pysc2.lib import point_flag
from pysc2.lib import stopwatch

FLAGS = flags.FLAGS

# because of Abseil's horrible design for running code underneath Colabs
# We have to pull out this ugly hack from the hat
if "flags_defined" not in globals():
    flags.DEFINE_bool("render", True, "Whether to render with pygame.")
    point_flag.DEFINE_point("feature_screen_size", "84",
                            "Resolution for screen feature layers.")
    point_flag.DEFINE_point("feature_minimap_size", "64",
                            "Resolution for minimap feature layers.")
    point_flag.DEFINE_point("rgb_screen_size", None,
                            "Resolution for rendered screen.")
    point_flag.DEFINE_point("rgb_minimap_size", None,
                            "Resolution for rendered minimap.")
    flags.DEFINE_enum("action_space", None, sc2_env.ActionSpace._member_names_,  # pylint: disable=protected-access
                      "Which action space to use. Needed if you take both feature "
                      "and rgb observations.")
    flags.DEFINE_bool("use_feature_units", True,
                      "Whether to include feature units.")
    flags.DEFINE_bool("disable_fog", False, "Whether to disable Fog of War.")

    flags.DEFINE_integer("max_agent_steps", 0, "Total agent steps.")
    flags.DEFINE_integer("game_steps_per_episode", None, "Game steps per episode.")
    flags.DEFINE_integer("max_episodes", 0, "Total episodes.")
    flags.DEFINE_integer("step_mul", 8, "Game steps per agent step.")
    flags.DEFINE_float("fps", 22.4, "Frames per second to run the game.")

    #flags.DEFINE_string("agent", "sc2.agent.BasicAgent.ZergBasicAgent",
    #                    "Which agent to run, as a python path to an Agent class.")
    #flags.DEFINE_enum("agent_race", "zerg", sc2_env.Race._member_names_,  # pylint: disable=protected-access
    #                  "Agent 1's race.")
    flags.DEFINE_string("agent", "sc2.agent.BasicAgent.ZergBasicAgent",
                        "Which agent to run, as a python path to an Agent class.")
    flags.DEFINE_enum("agent_race", "zerg", sc2_env.Race._member_names_,  # pylint: disable=protected-access
                      "Agent 1's race.")

    flags.DEFINE_string("agent2", "Bot", "Second agent, either Bot or agent class.")
    flags.DEFINE_enum("agent2_race", "random", sc2_env.Race._member_names_,  # pylint: disable=protected-access
                      "Agent 2's race.")
    flags.DEFINE_enum("difficulty", "very_easy", sc2_env.Difficulty._member_names_,  # pylint: disable=protected-access
                      "If agent2 is a built-in Bot, it's strength.")

    flags.DEFINE_bool("profile", False, "Whether to turn on code profiling.")
    flags.DEFINE_bool("trace", False, "Whether to trace the code execution.")
    flags.DEFINE_integer("parallel", 1, "How many instances to run in parallel.")

    flags.DEFINE_bool("save_replay", True, "Whether to save a replay at the end.")

    flags.DEFINE_string("map", None, "Name of a map to use.")
    flags.mark_flag_as_required("map")

flags_defined = True

def run_thread(agent_classes, players, map_name, visualize):
  """Run one thread worth of the environment with agents."""
  with sc2_env.SC2Env(
      map_name=map_name,
      players=players,
      agent_interface_format=sc2_env.parse_agent_interface_format(
          feature_screen=FLAGS.feature_screen_size,
          feature_minimap=FLAGS.feature_minimap_size,
          rgb_screen=FLAGS.rgb_screen_size,
          rgb_minimap=FLAGS.rgb_minimap_size,
          action_space=FLAGS.action_space,
          use_feature_units=FLAGS.use_feature_units),
      step_mul=FLAGS.step_mul,
      game_steps_per_episode=FLAGS.game_steps_per_episode,
      disable_fog=FLAGS.disable_fog,
      visualize=visualize) as env:
    env = available_actions_printer.AvailableActionsPrinter(env)
    agents = [agent_cls() for agent_cls in agent_classes]
    run_loop.run_loop(agents, env, FLAGS.max_agent_steps, FLAGS.max_episodes)
    if FLAGS.save_replay:
      env.save_replay(agent_classes[0].__name__)

def main(unused_argv):
  """Run an agent."""
  #stopwatch.sw.enabled = FLAGS.profile or FLAGS.trace
  #stopwatch.sw.trace = FLAGS.trace

  map_inst = maps.get(FLAGS.map)

  agent_classes = []
  players = []

  #agent_module, agent_name = FLAGS.agent.rsplit(".", 1)
  #agent_cls = getattr(importlib.import_module(agent_module), agent_name)
  #agent_classes.append(agent_cls)
  #agent_classes.append(TerranRLAgent)
  agent_classes.append(ZergAgent)
  players.append(sc2_env.Agent(sc2_env.Race[FLAGS.agent_race]))

  if map_inst.players >= 2:
    if FLAGS.agent2 == "Bot":
      players.append(sc2_env.Bot(sc2_env.Race[FLAGS.agent2_race],
                                 sc2_env.Difficulty[FLAGS.difficulty]))
    else:
      agent_module, agent_name = FLAGS.agent2.rsplit(".", 1)
      agent_cls = getattr(importlib.import_module(agent_module), agent_name)
      agent_classes.append(agent_cls)
      players.append(sc2_env.Agent(sc2_env.Race[FLAGS.agent2_race]))

  threads = []
  for _ in range(FLAGS.parallel - 1):
    t = threading.Thread(target=run_thread,
                         args=(agent_classes, players, FLAGS.map, False))
    threads.append(t)
    t.start()

  run_thread(agent_classes, players, FLAGS.map, FLAGS.render)

  for t in threads:
    t.join()

  if FLAGS.profile:
    pass
    #print(stopwatch.sw)

pygame 1.9.6
Hello from the pygame community. https://www.pygame.org/contribute.html


# Import Module

In [3]:
import random
import time
import math

import numpy as np
import pandas as pd


from pysc2.agents import base_agent
from pysc2.env import sc2_env
from pysc2.lib import actions, features, units
from absl import app

In [4]:
ACTION_DO_NOTHING = 'donothing'
ACTION_SELECT_COCOON = 'selectcocoon'
ACTION_BUILD_SUPPLY_EXTRACTOR = 'buildextractor'
ACTION_BUILD_CREEPTUMOR = 'buildcreeptumor'
ACTION_SELECT_CREEPTUMOR = 'selectcreeptumor'
ACTION_BUILD_ZERGLING = 'buildzergling'
ACTION_SELECT_ARMY = 'selectarmy'
ACTION_ATTACK = 'attack'

smart_actions = [
    ACTION_DO_NOTHING,
    ACTION_SELECT_COCOON,
    ACTION_BUILD_SUPPLY_EXTRACTOR,
    ACTION_BUILD_CREEPTUMOR,
    ACTION_SELECT_CREEPTUMOR,
    ACTION_BUILD_ZERGLING,
    ACTION_SELECT_ARMY,
    ACTION_ATTACK,
]

KILL_UNIT_REWARD = 0.2
KILL_BUILDING_REWARD = 0.5

# reference from https://github.com/MorvanZhou/Reinforcement-learning-with-tensorflow
class QLearningTable:
    def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9):
        self.actions = actions  # a list
        self.lr = learning_rate
        self.gamma = reward_decay
        self.epsilon = e_greedy
        self.q_table = pd.DataFrame(columns=self.actions, dtype=np.float64)

    def choose_action(self, observation):
        self.check_state_exist(observation)

        if np.random.uniform() < self.epsilon:
            # choose best action
            # state_action = self.q_table.ix[observation, :]
            state_action = self.q_table.loc[observation, :]

            # some actions have the same value
            state_action = state_action.reindex(np.random.permutation(state_action.index))

            action = state_action.idxmax()
        else:
            # choose random action
            action = np.random.choice(self.actions)

        return action

    def learn(self, s, a, r, s_):
        self.check_state_exist(s_)
        self.check_state_exist(s)

        # q_predict = self.q_table.ix[s, a]
        q_predict = self.q_table.loc[s, a]
        # q_target = r + self.gamma * self.q_table.ix[s_, :].max()
        q_target = r + self.gamma * self.q_table.loc[s_, :].max()

        # update
        # self.q_table.ix[s, a] += self.lr * (q_target - q_predict)
        self.q_table.loc[s, a] += self.lr * (q_target - q_predict)

    def check_state_exist(self, state):
        if state not in self.q_table.index:
            # append new state to q table
            self.q_table = self.q_table.append(
                pd.Series([0] * len(self.actions), index=self.q_table.columns, name=state))

class ZergAgent(base_agent.BaseAgent):
    def __init__(self):
        super(ZergAgent, self).__init__()

        self.base_top_left = None 
        self.qlearn = QLearningTable(actions=list(range(len(smart_actions))))
        
        self.previous_killed_unit_score = 0
        self.previous_killed_building_score = 0
        
        self.previous_action = None
        self.previous_state = None

    def transformLocation(self, x, x_distance, y, y_distance):
        if not self.base_top_left:
            return [x - x_distance, y - y_distance]
        
        return [x + x_distance, y + y_distance]

    def getMeanLocation(self, unitList):
        sum_x = 0
        sum_y = 0
        for unit in unitList:
            sum_x += unit.x
            sum_y += unit.y
        mean_x = sum_x / len(unitList)
        mean_y = sum_y / len(unitList)
        
        return [mean_x, mean_y]
    
    def unit_type_is_selected(self, obs, unit_type):
        if (len(obs.observation.single_select) > 0 and
            obs.observation.single_select[0].unit_type == unit_type):
              return True

        if (len(obs.observation.multi_select) > 0 and
            obs.observation.multi_select[0].unit_type == unit_type):
              return True

        return False

    def get_units_by_type(self, obs, unit_type):
        return [unit for unit in obs.observation.feature_units
                if unit.unit_type == unit_type]

    def can_do(self, obs, action):
        return action in obs.observation.available_actions
    
    def step(self, obs):
        super(ZergAgent, self).step(obs)

        #time.sleep(0.5)
        
        if obs.first():
            player_y, player_x = (obs.observation.feature_minimap.player_relative == features.PlayerRelative.SELF).nonzero()
            self.base_top_left = 1 if player_y.any() and player_y.mean() <= 31 else 0

        extractor_count = len(self.get_units_by_type(obs, units.Zerg.Extractor))

        creeptumor_count = len(self.get_units_by_type(obs, units.Zerg.CreepTumor))
            
        supply_limit = obs.observation.player.food_cap
        army_supply = obs.observation.player.food_used
        
        killed_unit_score = obs.observation.score_cumulative.killed_value_units
        killed_building_score = obs.observation.score_cumulative.killed_value_structures
        
        current_state = [
            extractor_count,
            creeptumor_count,
            supply_limit,
            army_supply,
        ]
        
        if self.previous_action is not None:
            reward = 0
                
            if killed_unit_score > self.previous_killed_unit_score:
                reward += KILL_UNIT_REWARD
                    
            if killed_building_score > self.previous_killed_building_score:
                reward += KILL_BUILDING_REWARD
                
            self.qlearn.learn(str(self.previous_state), self.previous_action, reward, str(current_state))
        
        rl_action = self.qlearn.choose_action(str(current_state))
        smart_action = smart_actions[rl_action]
        
        self.previous_killed_unit_score = killed_unit_score
        self.previous_killed_building_score = killed_building_score
        self.previous_state = current_state
        self.previous_action = rl_action
        
        if smart_action == ACTION_DO_NOTHING:
            return actions.FUNCTIONS.no_op()

        elif smart_action == ACTION_SELECT_COCOON:
            if self.can_do(obs, actions.FUNCTIONS.select_point.id):
                cocoons = self.get_units_by_type(obs, units.Zerg.Cocoon)
                if len(cocoons) > 0:
                    cocoon = random.choice(cocoons)
                    if cocoon.x >= 0 and cocoon.y >= 0:
                        return actions.FUNCTIONS.select_point("select", (cocoon.x,
                                                                              cocoon.y))
        
        elif smart_action == ACTION_BUILD_SUPPLY_EXTRACTOR:
            if self.can_do(obs, actions.FUNCTIONS.Build_Extractor_screen.id):
                ccs = self.get_units_by_type(obs, units.Zerg.Extractor)
                if len(ccs) > 0:
                    mean_x, mean_y = self.getMeanLocation(ccs)
                    target = self.transformLocation(int(mean_x), 0, int(mean_y), 20)

                    return actions.FUNCTIONS.Build_Extractor_screen("now", target)        
        
        elif smart_action == ACTION_BUILD_CREEPTUMOR:
            if self.can_do(obs, actions.FUNCTIONS.Build_CreepTumor_screen.id):
                ccs = self.get_units_by_type(obs, units.Zerg.CreepTumor)
                if len(ccs) > 0:
                    mean_x, mean_y = self.getMeanLocation(ccs)
                    target = self.transformLocation(int(mean_x), 20, int(mean_y), 0)

                    return actions.FUNCTIONS.Build_CreepTumor_screen("now", target)
    
        elif smart_action == ACTION_SELECT_CREEPTUMOR:
            if self.can_do(obs, actions.FUNCTIONS.select_point.id):
                CreepTumors = self.get_units_by_type(obs, units.Zerg.CreepTumor)
                if len(CreepTumors) > 0:
                    CreepTumor = random.choice(CreepTumors)
                    if CreepTumor.x >= 0 and CreepTumor.y >= 0:
                        return actions.FUNCTIONS.select_point("select", (CreepTumor.x,
                                                                              CreepTumor.y))
        
        elif smart_action == ACTION_BUILD_ZERGLING:
            if self.can_do(obs, actions.FUNCTIONS.Train_Zergling_quick.id):
                return actions.FUNCTIONS.Train_Zergling_quick("queued")
        
        elif smart_action == ACTION_SELECT_ARMY:
            if self.can_do(obs, actions.FUNCTIONS.select_army.id):
                return actions.FUNCTIONS.select_army("select")
        
        elif smart_action == ACTION_ATTACK:
            if self.can_do(obs, actions.FUNCTIONS.Attack_minimap.id):
                if self.base_top_left:
                    return actions.FUNCTIONS.Attack_minimap("now", [39, 45])
                else:
                    return actions.FUNCTIONS.Attack_minimap("now", [21, 24])
            
        return actions.FUNCTIONS.no_op()

In [None]:
if __name__ == "__main__":
  app.run(main)

I0922 12:44:35.196630 140735901324160 sc_process.py:135] Launching SC2: /Applications/StarCraft II/Versions/Base81433/SC2.app/Contents/MacOS/SC2 -listen 127.0.0.1 -port 18291 -dataDir /Applications/StarCraft II/ -tempDir /var/folders/4s/11m4jncx233dbm_shjk8mc980000gn/T/sc-es9nxzde/ -displayMode 0 -windowwidth 640 -windowheight 480 -windowx 50 -windowy 50
I0922 12:44:35.208395 140735901324160 remote_controller.py:167] Connecting to: ws://127.0.0.1:18291/sc2api, attempt: 0, running: True
I0922 12:44:36.223806 140735901324160 remote_controller.py:167] Connecting to: ws://127.0.0.1:18291/sc2api, attempt: 1, running: True
I0922 12:44:37.230798 140735901324160 remote_controller.py:167] Connecting to: ws://127.0.0.1:18291/sc2api, attempt: 2, running: True
I0922 12:44:38.233633 140735901324160 remote_controller.py:167] Connecting to: ws://127.0.0.1:18291/sc2api, attempt: 3, running: True
I0922 12:44:39.235786 140735901324160 remote_controller.py:167] Connecting to: ws://127.0.0.1:18291/sc2api,

   0/no_op                                              ()
   1/move_camera                                        (1/minimap [64, 64])
   2/select_point                                       (6/select_point_act [4]; 0/screen [84, 84])
   3/select_rect                                        (7/select_add [2]; 0/screen [84, 84]; 2/screen2 [84, 84])
   4/select_control_group                               (4/control_group_act [5]; 5/control_group_id [10])
 451/Smart_screen                                       (3/queued [2]; 0/screen [84, 84])
 452/Smart_minimap                                      (3/queued [2]; 1/minimap [64, 64])
   9/select_larva                                       ()
 335/Rally_Units_screen                                 (3/queued [2]; 0/screen [84, 84])
 336/Rally_Units_minimap                                (3/queued [2]; 1/minimap [64, 64])
 343/Rally_Workers_screen                               (3/queued [2]; 0/screen [84, 84])
 344/Rally_Workers_minimap      

I0922 12:46:18.886322 140735901324160 sc2_env.py:725] Episode 1 finished after 9264 game steps. Outcome: [-1], reward: [-1], score: [5170]
I0922 12:46:25.496906 140735901324160 sc2_env.py:507] Starting episode 2: [zerg, random] on Simple64
