forked from openai/gym-soccer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
soccer_env.py
159 lines (145 loc) · 6.61 KB
/
soccer_env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import os, subprocess, time, signal
import gym
from gym import error, spaces
from gym import utils
from gym.utils import seeding
try:
import hfo_py
except ImportError as e:
raise error.DependencyNotInstalled("{}. (HINT: you can install HFO dependencies with 'pip install gym[soccer].)'".format(e))
import logging
logger = logging.getLogger(__name__)
class SoccerEnv(gym.Env, utils.EzPickle):
metadata = {'render.modes': ['human']}
def __init__(self):
self.viewer = None
self.server_process = None
self.server_port = None
self.hfo_path = hfo_py.get_hfo_path()
self._configure_environment()
self.env = hfo_py.HFOEnvironment()
self.env.connectToServer(config_dir=hfo_py.get_config_path())
self.observation_space = spaces.Box(low=-1, high=1,
shape=(self.env.getStateSize()))
# Action space omits the Tackle/Catch actions, which are useful on defense
self.action_space = spaces.Tuple((spaces.Discrete(3),
spaces.Box(low=0, high=100, shape=1),
spaces.Box(low=-180, high=180, shape=1),
spaces.Box(low=-180, high=180, shape=1),
spaces.Box(low=0, high=100, shape=1),
spaces.Box(low=-180, high=180, shape=1)))
self.status = hfo_py.IN_GAME
def __del__(self):
self.env.act(hfo_py.QUIT)
self.env.step()
os.kill(self.server_process.pid, signal.SIGINT)
if self.viewer is not None:
os.kill(self.viewer.pid, signal.SIGKILL)
def _configure_environment(self):
"""
Provides a chance for subclasses to override this method and supply
a different server configuration. By default, we initialize one
offense agent against no defenders.
"""
self._start_hfo_server()
def _start_hfo_server(self, frames_per_trial=500,
untouched_time=100, offense_agents=1,
defense_agents=0, offense_npcs=0,
defense_npcs=0, sync_mode=True, port=6000,
offense_on_ball=0, fullstate=True, seed=-1,
ball_x_min=0.0, ball_x_max=0.2,
verbose=False, log_game=False,
log_dir="log"):
"""
Starts the Half-Field-Offense server.
frames_per_trial: Episodes end after this many steps.
untouched_time: Episodes end if the ball is untouched for this many steps.
offense_agents: Number of user-controlled offensive players.
defense_agents: Number of user-controlled defenders.
offense_npcs: Number of offensive bots.
defense_npcs: Number of defense bots.
sync_mode: Disabling sync mode runs server in real time (SLOW!).
port: Port to start the server on.
offense_on_ball: Player to give the ball to at beginning of episode.
fullstate: Enable noise-free perception.
seed: Seed the starting positions of the players and ball.
ball_x_[min/max]: Initialize the ball this far downfield: [0,1]
verbose: Verbose server messages.
log_game: Enable game logging. Logs can be used for replay + visualization.
log_dir: Directory to place game logs (*.rcg).
"""
self.server_port = port
cmd = self.hfo_path + \
" --headless --frames-per-trial %i --untouched-time %i --offense-agents %i"\
" --defense-agents %i --offense-npcs %i --defense-npcs %i"\
" --port %i --offense-on-ball %i --seed %i --ball-x-min %f"\
" --ball-x-max %f --log-dir %s"\
% (frames_per_trial, untouched_time, offense_agents,
defense_agents, offense_npcs, defense_npcs, port,
offense_on_ball, seed, ball_x_min, ball_x_max,
log_dir)
if not sync_mode: cmd += " --no-sync"
if fullstate: cmd += " --fullstate"
if verbose: cmd += " --verbose"
if not log_game: cmd += " --no-logging"
print('Starting server with command: %s' % cmd)
self.server_process = subprocess.Popen(cmd.split(' '), shell=False)
time.sleep(10) # Wait for server to startup before connecting a player
def _start_viewer(self):
"""
Starts the SoccerWindow visualizer. Note the viewer may also be
used with a *.rcg logfile to replay a game. See details at
https://github.com/LARG/HFO/blob/master/doc/manual.pdf.
"""
cmd = hfo_py.get_viewer_path() +\
" --connect --port %d" % (self.server_port)
self.viewer = subprocess.Popen(cmd.split(' '), shell=False)
def _step(self, action):
self._take_action(action)
self.status = self.env.step()
reward = self._get_reward()
ob = self.env.getState()
episode_over = self.status != hfo_py.IN_GAME
return ob, reward, episode_over, {}
def _take_action(self, action):
""" Converts the action space into an HFO action. """
action_type = ACTION_LOOKUP[action[0]]
if action_type == hfo_py.DASH:
self.env.act(action_type, action[1], action[2])
elif action_type == hfo_py.TURN:
self.env.act(action_type, action[3])
elif action_type == hfo_py.KICK:
self.env.act(action_type, action[4], action[5])
else:
print('Unrecognized action %d' % action_type)
self.env.act(hfo_py.NOOP)
def _get_reward(self):
""" Reward is given for scoring a goal. """
if self.status == hfo_py.GOAL:
return 1
else:
return 0
def _reset(self):
""" Repeats NO-OP action until a new episode begins. """
while self.status == hfo_py.IN_GAME:
self.env.act(hfo_py.NOOP)
self.status = self.env.step()
while self.status != hfo_py.IN_GAME:
self.env.act(hfo_py.NOOP)
self.status = self.env.step()
return self.env.getState()
def _render(self, mode='human', close=False):
""" Viewer only supports human mode currently. """
if close:
if self.viewer is not None:
os.kill(self.viewer.pid, signal.SIGKILL)
else:
if self.viewer is None:
self._start_viewer()
ACTION_LOOKUP = {
0 : hfo_py.DASH,
1 : hfo_py.TURN,
2 : hfo_py.KICK,
3 : hfo_py.TACKLE, # Used on defense to slide tackle the ball
4 : hfo_py.CATCH, # Used only by goalie to catch the ball
}