Skip to content
This repository has been archived by the owner on Aug 7, 2024. It is now read-only.

Remove simulation logic from worker manager #760

Merged
merged 4 commits into from
Aug 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions aimmo-game/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from simulation.avatar.avatar_manager import AvatarManager
from simulation.worker_managers import WORKER_MANAGERS
from simulation.communicator import Communicator
from simulation.game_runner import GameRunner

eventlet.sleep()
eventlet.monkey_patch()
Expand Down Expand Up @@ -136,7 +137,7 @@ def run_game(port):
game_state = generator.get_game_state(player_manager)

WorkerManagerClass = WORKER_MANAGERS[os.environ.get('WORKER_MANAGER', 'local')]
worker_manager = WorkerManagerClass(game_state=game_state, communicator=communicator, port=port)
worker_manager = WorkerManagerClass(port=port)

logs = Logs()
have_avatars_code_updated = {}
Expand All @@ -149,7 +150,11 @@ def run_game(port):
logs=logs,
have_avatars_code_updated=have_avatars_code_updated)

worker_manager.start()
game_runner = GameRunner(worker_manager=worker_manager,
game_state=game_state,
communicator=communicator)

game_runner.start()
turn_manager.start()


Expand Down
47 changes: 47 additions & 0 deletions aimmo-game/simulation/game_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import time
import threading

WORKER_UPDATE_SLEEP_TIME = 10


class GameRunner(threading.Thread):
def __init__(self, worker_manager, game_state, communicator):
super(GameRunner, self).__init__()
self.worker_manager = worker_manager
self.game_state = game_state
self.communicator = communicator

def get_users_to_add(self, game_metadata):
def player_is_new(_player):
return _player['id'] not in self.worker_manager.avatar_id_to_worker.keys()

return [player['id'] for player in game_metadata['users'] if player_is_new(player)]

def get_users_to_delete(self, game_metadata):
def player_in_worker_manager_but_not_metadata(pid):
return pid not in [player['id'] for player in game_metadata['users']]

return [player_id for player_id in self.worker_manager.avatar_id_to_worker.keys()
if player_in_worker_manager_but_not_metadata(player_id)]

def update_main_user(self, game_metadata):
self.game_state.main_avatar_id = game_metadata['main_avatar']

def update(self):
game_metadata = self.communicator.get_game_metadata()['main']

users_to_add = self.get_users_to_add(game_metadata)
users_to_delete = self.get_users_to_delete(game_metadata)

worker_urls = self.worker_manager.add_workers(users_to_add)
self.worker_manager.delete_workers(users_to_delete)
self.game_state.add_avatars(users_to_add, worker_urls)
self.game_state.delete_avatars(users_to_delete)
self.worker_manager.update_worker_codes(game_metadata['users'])

self.update_main_user(game_metadata)

def run(self):
while True:
self.update()
time.sleep(WORKER_UPDATE_SLEEP_TIME)
18 changes: 13 additions & 5 deletions aimmo-game/simulation/game_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,28 @@ def get_state_for(self, avatar_wrapper):
}
}

def add_avatar(self, user_id, worker_url, location=None):
def add_avatar(self, player_id, worker_url, location=None):
with self._lock:
location = self.world_map.get_random_spawn_location() if location is None else location
avatar = self.avatar_manager.add_avatar(user_id, worker_url, location)
avatar = self.avatar_manager.add_avatar(player_id, worker_url, location)
self.world_map.get_cell(location).avatar = avatar

def remove_avatar(self, user_id):
def add_avatars(self, player_ids, worker_url_bases):
for player_id in player_ids:
self.add_avatar(player_id, '{}/turn/'.format(worker_url_bases[player_id]))

def delete_avatars(self, player_ids):
for player_id in player_ids:
self.remove_avatar(player_id)

def remove_avatar(self, player_id):
with self._lock:
try:
avatar = self.avatar_manager.get_avatar(user_id)
avatar = self.avatar_manager.get_avatar(player_id)
except KeyError:
return
self.world_map.get_cell(avatar.location).avatar = None
self.avatar_manager.remove_avatar(user_id)
self.avatar_manager.remove_avatar(player_id)

def _update_effects(self):
with self._lock:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def make_pod(self, player_id):
'app': 'aimmo-game-worker',
'game': self.game_id,
'player': str(player_id)},
generate_name="aimmo-%s-worker-%s-" % (self.game_id, player_id),
generate_name='aimmo-%s-worker-%s-' % (self.game_id, player_id),
owner_references=self._make_owner_references()
)

Expand Down
157 changes: 32 additions & 125 deletions aimmo-game/simulation/worker_managers/worker_manager.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import logging
import threading
import time

import requests
from eventlet.greenpool import GreenPool
from eventlet.semaphore import Semaphore

Expand All @@ -14,147 +11,57 @@ class _WorkerManagerData(object):
This class is thread safe
"""

def __init__(self, game_state, user_codes):
self._game_state = game_state
def __init__(self, user_codes):
self._user_codes = user_codes
self._lock = Semaphore()

def _remove_avatar(self, user_id):
assert self._lock.locked
self._game_state.remove_avatar(user_id)
del self._user_codes[user_id]
def set_code(self, player):
self._user_codes[player['id']] = player['code']

def is_new_avatar(self, user):
with self._lock:
existing_code = self._user_codes.get(user['id'], None)
return existing_code is None

def is_new_code_different_than_existing(self, user):
with self._lock:
existing_code = self._user_codes.get(user['id'], None)
return existing_code != user['code']

def add_avatar(self, user, worker_url):
with self._lock:
# Add avatar back into game
self._game_state.add_avatar(
user_id=user['id'], worker_url="%s/turn/" % worker_url)
def get_code(self, player_id):
return self._user_codes[player_id]

def set_code(self, user):
with self._lock:
self._user_codes[user['id']] = user['code']

def get_code(self, player_id):
with self._lock:
return self._user_codes[player_id]

def remove_unknown_avatars(self, known_user_ids):
with self._lock:
unknown_user_ids = set(self._user_codes) - frozenset(known_user_ids)
for u in unknown_user_ids:
self._remove_avatar(u)
return unknown_user_ids

def set_main_avatar(self, avatar_id):
with self._lock:
self._game_state.main_avatar_id = avatar_id

def get_avatar_from_user_id(self, user_id):
"""
Accesses the avatar manager from the game state to receive the avatar
object.
:param user_id: The ID of the worker in this game instance.
:return: Avatar object
"""
with self._lock:
return self._game_state.avatar_manager.get_avatar(user_id)


class WorkerManager(threading.Thread):
class WorkerManager(object):
"""
Methods of this class must be thread safe unless explicitly stated.
"""
daemon = True

def __init__(self, game_state, communicator, port=5000):
self._data = _WorkerManagerData(game_state, {})
self.communicator = communicator
def __init__(self, port=5000):
self._data = _WorkerManagerData({})
self._pool = GreenPool(size=3)
self.avatar_id_to_worker = {}
self.port = port
super(WorkerManager, self).__init__()

def get_code(self, player_id):
return self._data.get_code(player_id)

def create_worker(self, player_id):
"""Create a worker."""

raise NotImplementedError

def remove_worker(self, player_id):
"""Remove a worker for the given player."""

raise NotImplementedError

def update_code(self, user):
"""
Updates the code for a user.
"""
self._data.set_code(user)

def add_new_user(self, user):
"""
Adds a new avatar to the game state so we keep track of it in each turn.
:param user: Dict containing the user code, id etc.
"""
user_id = user['id']
self._data.set_code(user)
worker_url = self.create_worker(user_id)

# Add avatar into game
self._data.add_avatar(user, worker_url)
LOGGER.info('Added user %s', user_id)
def update_code(self, player):
self._data.set_code(player)

def add_new_worker(self, player_id):
worker_url = self.create_worker(player_id)
self.avatar_id_to_worker[player_id] = 'WorkerDummyObject'
print('Worker url: {}'.format(worker_url))
return player_id, worker_url

def _parallel_map(self, func, iterable_args):
list(self._pool.imap(func, iterable_args))

def update(self):
try:
LOGGER.info("Waking up")
game_data = self.communicator.get_game_metadata()
except (requests.RequestException, ValueError) as err:
LOGGER.error("Failed to obtain game data : %s", err)
else:
game = game_data['main']

# Update users with different code
users_to_update = []
new_users_to_add = []

for user in game['users']:
if self._data.is_new_avatar(user):
new_users_to_add.append(user)
if self._data.is_new_code_different_than_existing(user):
users_to_update.append(user)
LOGGER.debug("Need to add users: %s" % [x['id'] for x in new_users_to_add])

# Add new worker pods
self._parallel_map(self.add_new_user, new_users_to_add)

# Update code
self._parallel_map(self.update_code, users_to_update)

# Delete extra users
known_avatars = set(user['id'] for user in game['users'])
removed_user_ids = self._data.remove_unknown_avatars(known_avatars)
LOGGER.debug("Removing users: %s" % removed_user_ids)
self._parallel_map(self.remove_worker, removed_user_ids)

# Update main avatar
self._data.set_main_avatar(game_data['main']['main_avatar'])

def run(self):
while True:
self.update()
LOGGER.info("Sleeping")
time.sleep(10)
return list(self._pool.imap(func, iterable_args))

def add_workers(self, users_to_add):
return dict(self._parallel_map(self.add_new_worker, users_to_add))

def delete_workers(self, players_to_delete):
print('Users to delete: {}'.format(players_to_delete))
self._parallel_map(self.delete_worker, players_to_delete)

def delete_worker(self, player):
del self.avatar_id_to_worker[player]
self.remove_worker(player)

def update_worker_codes(self, players):
self._parallel_map(self.update_code, players)
15 changes: 10 additions & 5 deletions aimmo-game/tests/test_simulation/fake_game_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from simulation.map_generator import Main
from simulation.logs import Logs
from simulation.avatar.avatar_manager import AvatarManager
from simulation.game_runner import GameRunner
from .concrete_worker_manager import ConcreteWorkerManager
from .mock_communicator import MockCommunicator

Expand All @@ -25,24 +26,28 @@ def __init__(self, settings=None, player_manager=None):
else:
self.player_manager = player_manager
self.mock_communicator = MockCommunicator()
self.game_state = self.map_generator.get_game_state(self.player_manager)
self.worker_manager = ConcreteWorkerManager(game_state=self.game_state, communicator=self.mock_communicator)
self.turn_manager = ConcurrentTurnManager(game_state=self.game_state,

game_state = self.map_generator.get_game_state(self.player_manager)
worker_manager = ConcreteWorkerManager()
self.game_runner = GameRunner(worker_manager=worker_manager,
game_state=game_state,
communicator=self.mock_communicator)
self.turn_manager = ConcurrentTurnManager(game_state=self.game_runner.game_state,
end_turn_callback=lambda: None,
communicator=self.mock_communicator,
logs=self.logs,
have_avatars_code_updated={})
random.seed(0)

def run_single_turn(self):
self.worker_manager.update()
self.game_runner.update()
self.turn_manager._run_single_turn()

def get_logs(self, avatar_id):
return self.logs.get_user_logs(avatar_id)

def get_avatar(self, avatar_id):
return self.game_state.avatar_manager.get_avatar(avatar_id)
return self.game_runner.game_state.avatar_manager.get_avatar(avatar_id)

def change_avatar_code(self, avatar_id, code):
avatar = (user for user in self.mock_communicator.data["main"]["users"] if user["id"] == avatar_id).next()
Expand Down
Loading