Skip to content

Commit

Permalink
Patch the run with the worker which claimed it.
Browse files Browse the repository at this point in the history
  • Loading branch information
brocaar committed Mar 19, 2013
1 parent 7c055c1 commit f4f2848
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
13 changes: 11 additions & 2 deletions job_runner_worker/enqueuer.py
Expand Up @@ -122,13 +122,22 @@ def _handle_enqueue_action(message, run_queue, event_queue):
message['run_id']
))

worker_list = Worker.get_list(
config.get('job_runner_worker', 'worker_resource_uri')
)

if run.enqueue_dts:
logger.warning(
'Was expecting that run: {0} was not in queue yet'.format(
run.id))
elif len(worker_list) != 1:
logger.warning('API returned multiple workers, expected one')
else:
run.patch({
'enqueue_dts': datetime.now(utc).isoformat(' ')
'enqueue_dts': datetime.now(utc).isoformat(' '),
# set the worker so we know which worker of the pool claimed the
# run
'worker': worker_list[0].resource_uri,
})
run_queue.put(run)
event_queue.put(json.dumps(
Expand Down Expand Up @@ -173,4 +182,4 @@ def _handle_ping_action(message):
'ping_response_dts': datetime.now(utc).isoformat(' '),
})
else:
logger.warning('Workers by api_key query resulted in multiple results')
logger.warning('API returned multiple workers, expected one')
9 changes: 7 additions & 2 deletions job_runner_worker/tests/unit/test_enqueuer.py
Expand Up @@ -105,10 +105,14 @@ def test_enqueue_actions_kill(self, config, kill_action):
@patch('job_runner_worker.enqueuer.config')
@patch('job_runner_worker.enqueuer.datetime')
@patch('job_runner_worker.enqueuer.Run')
def test__handle_enqueue_action(self, Run, datetime, config):
@patch('job_runner_worker.enqueuer.Worker')
def test__handle_enqueue_action(self, Worker, Run, datetime, config):
"""
Test :func:`._handle_enqueue_action`.
"""
worker = Mock()
Worker.get_list.return_value = [worker]

run_queue = Mock()
event_queue = Mock()

Expand All @@ -125,7 +129,8 @@ def test__handle_enqueue_action(self, Run, datetime, config):
_handle_enqueue_action(message, run_queue, event_queue)

run.patch.assert_called_once_with({
'enqueue_dts': datetime.now.return_value.isoformat.return_value
'enqueue_dts': datetime.now.return_value.isoformat.return_value,
'worker': worker.resource_uri
})
run_queue.put.assert_called_once_with(run)
event_queue.put.assert_called_once_with(
Expand Down

0 comments on commit f4f2848

Please sign in to comment.