Skip to content

Commit

Permalink
Remove simulation logic from worker manager (#760)
Browse files Browse the repository at this point in the history
* Moved logic into GameRunner

* Moved logic into GameRunner

* Docs

* Various changes
  • Loading branch information
NiallEgan committed Aug 23, 2018
1 parent cc94cf0 commit 8e1c72d
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 178 deletions.
9 changes: 7 additions & 2 deletions aimmo-game/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from simulation.avatar.avatar_manager import AvatarManager
from simulation.worker_managers import WORKER_MANAGERS
from simulation.communicator import Communicator
from simulation.game_runner import GameRunner

eventlet.sleep()
eventlet.monkey_patch()
Expand Down Expand Up @@ -136,7 +137,7 @@ def run_game(port):
game_state = generator.get_game_state(player_manager)

WorkerManagerClass = WORKER_MANAGERS[os.environ.get('WORKER_MANAGER', 'local')]
worker_manager = WorkerManagerClass(game_state=game_state, communicator=communicator, port=port)
worker_manager = WorkerManagerClass(port=port)

logs = Logs()
have_avatars_code_updated = {}
Expand All @@ -149,7 +150,11 @@ def run_game(port):
logs=logs,
have_avatars_code_updated=have_avatars_code_updated)

worker_manager.start()
game_runner = GameRunner(worker_manager=worker_manager,
game_state=game_state,
communicator=communicator)

game_runner.start()
turn_manager.start()


Expand Down
47 changes: 47 additions & 0 deletions aimmo-game/simulation/game_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import time
import threading

WORKER_UPDATE_SLEEP_TIME = 10


class GameRunner(threading.Thread):
def __init__(self, worker_manager, game_state, communicator):
super(GameRunner, self).__init__()
self.worker_manager = worker_manager
self.game_state = game_state
self.communicator = communicator

def get_users_to_add(self, game_metadata):
def player_is_new(_player):
return _player['id'] not in self.worker_manager.avatar_id_to_worker.keys()

return [player['id'] for player in game_metadata['users'] if player_is_new(player)]

def get_users_to_delete(self, game_metadata):
def player_in_worker_manager_but_not_metadata(pid):
return pid not in [player['id'] for player in game_metadata['users']]

return [player_id for player_id in self.worker_manager.avatar_id_to_worker.keys()
if player_in_worker_manager_but_not_metadata(player_id)]

def update_main_user(self, game_metadata):
self.game_state.main_avatar_id = game_metadata['main_avatar']

def update(self):
game_metadata = self.communicator.get_game_metadata()['main']

users_to_add = self.get_users_to_add(game_metadata)
users_to_delete = self.get_users_to_delete(game_metadata)

worker_urls = self.worker_manager.add_workers(users_to_add)
self.worker_manager.delete_workers(users_to_delete)
self.game_state.add_avatars(users_to_add, worker_urls)
self.game_state.delete_avatars(users_to_delete)
self.worker_manager.update_worker_codes(game_metadata['users'])

self.update_main_user(game_metadata)

def run(self):
while True:
self.update()
time.sleep(WORKER_UPDATE_SLEEP_TIME)
18 changes: 13 additions & 5 deletions aimmo-game/simulation/game_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,28 @@ def get_state_for(self, avatar_wrapper):
}
}

def add_avatar(self, user_id, worker_url, location=None):
def add_avatar(self, player_id, worker_url, location=None):
with self._lock:
location = self.world_map.get_random_spawn_location() if location is None else location
avatar = self.avatar_manager.add_avatar(user_id, worker_url, location)
avatar = self.avatar_manager.add_avatar(player_id, worker_url, location)
self.world_map.get_cell(location).avatar = avatar

def remove_avatar(self, user_id):
def add_avatars(self, player_ids, worker_url_bases):
for player_id in player_ids:
self.add_avatar(player_id, '{}/turn/'.format(worker_url_bases[player_id]))

def delete_avatars(self, player_ids):
for player_id in player_ids:
self.remove_avatar(player_id)

def remove_avatar(self, player_id):
with self._lock:
try:
avatar = self.avatar_manager.get_avatar(user_id)
avatar = self.avatar_manager.get_avatar(player_id)
except KeyError:
return
self.world_map.get_cell(avatar.location).avatar = None
self.avatar_manager.remove_avatar(user_id)
self.avatar_manager.remove_avatar(player_id)

def _update_effects(self):
with self._lock:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def make_pod(self, player_id):
'app': 'aimmo-game-worker',
'game': self.game_id,
'player': str(player_id)},
generate_name="aimmo-%s-worker-%s-" % (self.game_id, player_id),
generate_name='aimmo-%s-worker-%s-' % (self.game_id, player_id),
owner_references=self._make_owner_references()
)

Expand Down
157 changes: 32 additions & 125 deletions aimmo-game/simulation/worker_managers/worker_manager.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import logging
import threading
import time

import requests
from eventlet.greenpool import GreenPool
from eventlet.semaphore import Semaphore

Expand All @@ -14,147 +11,57 @@ class _WorkerManagerData(object):
This class is thread safe
"""

def __init__(self, game_state, user_codes):
self._game_state = game_state
def __init__(self, user_codes):
self._user_codes = user_codes
self._lock = Semaphore()

def _remove_avatar(self, user_id):
assert self._lock.locked
self._game_state.remove_avatar(user_id)
del self._user_codes[user_id]
def set_code(self, player):
self._user_codes[player['id']] = player['code']

def is_new_avatar(self, user):
with self._lock:
existing_code = self._user_codes.get(user['id'], None)
return existing_code is None

def is_new_code_different_than_existing(self, user):
with self._lock:
existing_code = self._user_codes.get(user['id'], None)
return existing_code != user['code']

def add_avatar(self, user, worker_url):
with self._lock:
# Add avatar back into game
self._game_state.add_avatar(
user_id=user['id'], worker_url="%s/turn/" % worker_url)
def get_code(self, player_id):
return self._user_codes[player_id]

def set_code(self, user):
with self._lock:
self._user_codes[user['id']] = user['code']

def get_code(self, player_id):
with self._lock:
return self._user_codes[player_id]

def remove_unknown_avatars(self, known_user_ids):
with self._lock:
unknown_user_ids = set(self._user_codes) - frozenset(known_user_ids)
for u in unknown_user_ids:
self._remove_avatar(u)
return unknown_user_ids

def set_main_avatar(self, avatar_id):
with self._lock:
self._game_state.main_avatar_id = avatar_id

def get_avatar_from_user_id(self, user_id):
"""
Accesses the avatar manager from the game state to receive the avatar
object.
:param user_id: The ID of the worker in this game instance.
:return: Avatar object
"""
with self._lock:
return self._game_state.avatar_manager.get_avatar(user_id)


class WorkerManager(threading.Thread):
class WorkerManager(object):
"""
Methods of this class must be thread safe unless explicitly stated.
"""
daemon = True

def __init__(self, game_state, communicator, port=5000):
self._data = _WorkerManagerData(game_state, {})
self.communicator = communicator
def __init__(self, port=5000):
self._data = _WorkerManagerData({})
self._pool = GreenPool(size=3)
self.avatar_id_to_worker = {}
self.port = port
super(WorkerManager, self).__init__()

def get_code(self, player_id):
return self._data.get_code(player_id)

def create_worker(self, player_id):
"""Create a worker."""

raise NotImplementedError

def remove_worker(self, player_id):
"""Remove a worker for the given player."""

raise NotImplementedError

def update_code(self, user):
"""
Updates the code for a user.
"""
self._data.set_code(user)

def add_new_user(self, user):
"""
Adds a new avatar to the game state so we keep track of it in each turn.
:param user: Dict containing the user code, id etc.
"""
user_id = user['id']
self._data.set_code(user)
worker_url = self.create_worker(user_id)

# Add avatar into game
self._data.add_avatar(user, worker_url)
LOGGER.info('Added user %s', user_id)
def update_code(self, player):
self._data.set_code(player)

def add_new_worker(self, player_id):
worker_url = self.create_worker(player_id)
self.avatar_id_to_worker[player_id] = 'WorkerDummyObject'
print('Worker url: {}'.format(worker_url))
return player_id, worker_url

def _parallel_map(self, func, iterable_args):
list(self._pool.imap(func, iterable_args))

def update(self):
try:
LOGGER.info("Waking up")
game_data = self.communicator.get_game_metadata()
except (requests.RequestException, ValueError) as err:
LOGGER.error("Failed to obtain game data : %s", err)
else:
game = game_data['main']

# Update users with different code
users_to_update = []
new_users_to_add = []

for user in game['users']:
if self._data.is_new_avatar(user):
new_users_to_add.append(user)
if self._data.is_new_code_different_than_existing(user):
users_to_update.append(user)
LOGGER.debug("Need to add users: %s" % [x['id'] for x in new_users_to_add])

# Add new worker pods
self._parallel_map(self.add_new_user, new_users_to_add)

# Update code
self._parallel_map(self.update_code, users_to_update)

# Delete extra users
known_avatars = set(user['id'] for user in game['users'])
removed_user_ids = self._data.remove_unknown_avatars(known_avatars)
LOGGER.debug("Removing users: %s" % removed_user_ids)
self._parallel_map(self.remove_worker, removed_user_ids)

# Update main avatar
self._data.set_main_avatar(game_data['main']['main_avatar'])

def run(self):
while True:
self.update()
LOGGER.info("Sleeping")
time.sleep(10)
return list(self._pool.imap(func, iterable_args))

def add_workers(self, users_to_add):
return dict(self._parallel_map(self.add_new_worker, users_to_add))

def delete_workers(self, players_to_delete):
print('Users to delete: {}'.format(players_to_delete))
self._parallel_map(self.delete_worker, players_to_delete)

def delete_worker(self, player):
del self.avatar_id_to_worker[player]
self.remove_worker(player)

def update_worker_codes(self, players):
self._parallel_map(self.update_code, players)
15 changes: 10 additions & 5 deletions aimmo-game/tests/test_simulation/fake_game_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from simulation.map_generator import Main
from simulation.logs import Logs
from simulation.avatar.avatar_manager import AvatarManager
from simulation.game_runner import GameRunner
from .concrete_worker_manager import ConcreteWorkerManager
from .mock_communicator import MockCommunicator

Expand All @@ -25,24 +26,28 @@ def __init__(self, settings=None, player_manager=None):
else:
self.player_manager = player_manager
self.mock_communicator = MockCommunicator()
self.game_state = self.map_generator.get_game_state(self.player_manager)
self.worker_manager = ConcreteWorkerManager(game_state=self.game_state, communicator=self.mock_communicator)
self.turn_manager = ConcurrentTurnManager(game_state=self.game_state,

game_state = self.map_generator.get_game_state(self.player_manager)
worker_manager = ConcreteWorkerManager()
self.game_runner = GameRunner(worker_manager=worker_manager,
game_state=game_state,
communicator=self.mock_communicator)
self.turn_manager = ConcurrentTurnManager(game_state=self.game_runner.game_state,
end_turn_callback=lambda: None,
communicator=self.mock_communicator,
logs=self.logs,
have_avatars_code_updated={})
random.seed(0)

def run_single_turn(self):
self.worker_manager.update()
self.game_runner.update()
self.turn_manager._run_single_turn()

def get_logs(self, avatar_id):
return self.logs.get_user_logs(avatar_id)

def get_avatar(self, avatar_id):
return self.game_state.avatar_manager.get_avatar(avatar_id)
return self.game_runner.game_state.avatar_manager.get_avatar(avatar_id)

def change_avatar_code(self, avatar_id, code):
avatar = (user for user in self.mock_communicator.data["main"]["users"] if user["id"] == avatar_id).next()
Expand Down
Loading

0 comments on commit 8e1c72d

Please sign in to comment.