/
__init__.py
191 lines (153 loc) · 5.88 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
from functools import partial
import pretrained
from smac.env import MultiAgentEnv, StarCraft2Env
import sys
import os
import gym
from gym import ObservationWrapper, spaces
from gym.envs import registry as gym_registry
from gym.spaces import flatdim
import numpy as np
from gym.wrappers import TimeLimit as GymTimeLimit
def env_fn(env, **kwargs) -> MultiAgentEnv:
return env(**kwargs)
REGISTRY = {}
REGISTRY["sc2"] = partial(env_fn, env=StarCraft2Env)
if sys.platform == "linux":
os.environ.setdefault(
"SC2PATH", os.path.join(os.getcwd(), "3rdparty", "StarCraftII")
)
class TimeLimit(GymTimeLimit):
def __init__(self, env, max_episode_steps=None):
super().__init__(env)
if max_episode_steps is None and self.env.spec is not None:
max_episode_steps = env.spec.max_episode_steps
# if self.env.spec is not None:
# self.env.spec.max_episode_steps = max_episode_steps
self._max_episode_steps = max_episode_steps
self._elapsed_steps = None
def step(self, action):
assert (
self._elapsed_steps is not None
), "Cannot call env.step() before calling reset()"
observation, reward, done, info = self.env.step(action)
self._elapsed_steps += 1
if self._elapsed_steps >= self._max_episode_steps:
info["TimeLimit.truncated"] = not all(done) \
if type(done) is list \
else not done
done = len(observation) * [True]
return observation, reward, done, info
class FlattenObservation(ObservationWrapper):
r"""Observation wrapper that flattens the observation of individual agents."""
def __init__(self, env):
super(FlattenObservation, self).__init__(env)
ma_spaces = []
for sa_obs in env.observation_space:
flatdim = spaces.flatdim(sa_obs)
ma_spaces += [
spaces.Box(
low=-float("inf"),
high=float("inf"),
shape=(flatdim,),
dtype=np.float32,
)
]
self.observation_space = spaces.Tuple(tuple(ma_spaces))
def observation(self, observation):
return tuple(
[
spaces.flatten(obs_space, obs)
for obs_space, obs in zip(self.env.observation_space, observation)
]
)
class _GymmaWrapper(MultiAgentEnv):
def __init__(self, key, time_limit, pretrained_wrapper, seed, **kwargs):
self.original_env = gym.make(f"{key}", **kwargs)
self.episode_limit = time_limit
self._env = TimeLimit(self.original_env, max_episode_steps=time_limit)
self._env = FlattenObservation(self._env)
if pretrained_wrapper:
self._env = getattr(pretrained, pretrained_wrapper)(self._env)
self.n_agents = self._env.n_agents
self._obs = None
self._info = None
self.longest_action_space = max(self._env.action_space, key=lambda x: x.n)
self.longest_observation_space = max(
self._env.observation_space, key=lambda x: x.shape
)
self._seed = seed
self._env.seed(self._seed)
def step(self, actions):
""" Returns reward, terminated, info """
actions = [int(a) for a in actions]
self._obs, reward, done, self._info = self._env.step(actions)
self._obs = [
np.pad(
o,
(0, self.longest_observation_space.shape[0] - len(o)),
"constant",
constant_values=0,
)
for o in self._obs
]
if type(reward) is list:
reward = sum(reward)
if type(done) is list:
done = all(done)
return float(reward), done, {}
def get_obs(self):
""" Returns all agent observations in a list """
return self._obs
def get_obs_agent(self, agent_id):
""" Returns observation for agent_id """
raise self._obs[agent_id]
def get_obs_size(self):
""" Returns the shape of the observation """
return flatdim(self.longest_observation_space)
def get_state(self):
return np.concatenate(self._obs, axis=0).astype(np.float32)
def get_state_size(self):
""" Returns the shape of the state"""
if hasattr(self.original_env, 'state_size'):
return self.original_env.state_size
return self.n_agents * flatdim(self.longest_observation_space)
def get_avail_actions(self):
avail_actions = []
for agent_id in range(self.n_agents):
avail_agent = self.get_avail_agent_actions(agent_id)
avail_actions.append(avail_agent)
return avail_actions
def get_avail_agent_actions(self, agent_id):
""" Returns the available actions for agent_id """
valid = flatdim(self._env.action_space[agent_id]) * [1]
invalid = [0] * (self.longest_action_space.n - len(valid))
return valid + invalid
def get_total_actions(self):
""" Returns the total number of actions an agent could ever take """
# TODO: This is only suitable for a discrete 1 dimensional action space for each agent
return flatdim(self.longest_action_space)
def reset(self):
""" Returns initial observations and states"""
self._obs = self._env.reset()
self._obs = [
np.pad(
o,
(0, self.longest_observation_space.shape[0] - len(o)),
"constant",
constant_values=0,
)
for o in self._obs
]
return self.get_obs(), self.get_state()
def render(self):
self._env.render()
def close(self):
self._env.close()
def seed(self):
return self._env.seed
def save_replay(self):
pass
def get_stats(self):
return {}
REGISTRY["gymma"] = partial(env_fn, env=_GymmaWrapper)