Skip to content

Commit

Permalink
Fix the issue when a run already has a log.
Browse files Browse the repository at this point in the history
  • Loading branch information
brocaar committed Mar 19, 2013
1 parent fee3764 commit b8f24c5
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 9 deletions.
13 changes: 13 additions & 0 deletions job_runner_worker/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ def _get_json_data(self):

return response.json

def reload(self):
"""
Reload the model.
"""
self._data = self._get_json_data()

@retry_on_requests_error
def patch(self, attributes={}):
"""
Expand Down Expand Up @@ -245,6 +251,13 @@ class Run(BaseRestModel):
def job(self):
return Job(self.__getattr__('job'))

@property
def run_log(self):
uri = self.__getattr__('run_log')
if uri:
return RunLog(uri)
return None


class RunLog(BaseRestModel):
"""
Expand Down
51 changes: 51 additions & 0 deletions job_runner_worker/tests/unit/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def test_execute_run(self, config, datetime, RunLog):
config.get.return_value = '/tmp'

run = Mock()
run.run_log = None
run.id = 1234
run.job.script_content = (
u'#!/usr/bin/env bash\n\necho "H\xe9llo World!";\n')
Expand Down Expand Up @@ -64,6 +65,55 @@ def exit_queue_side_effect(*args, **kwargs):
], event_queue.put.call_args_list)
datetime.now.assert_called_with(utc)

@patch('job_runner_worker.worker.subprocess', subprocess)
@patch('job_runner_worker.worker.datetime')
@patch('job_runner_worker.worker.config')
def test_execute_run_with_log(self, config, datetime):
"""
Test :func:`.execute_run` with existing log.
"""
config.get.return_value = '/tmp'

run = Mock()
run.id = 1234
run.job.script_content = (
u'#!/usr/bin/env bash\n\necho "H\xe9llo World!";\n')

event_queue = Mock()
exit_queue = Mock()
run_queue = Queue()
run_queue.put(run)

exit_queue_return = [Empty, None]

def exit_queue_side_effect(*args, **kwargs):
value = exit_queue_return.pop(0)
if callable(value):
raise value()

exit_queue.get.side_effect = exit_queue_side_effect

execute_run(run_queue, event_queue, exit_queue)

dts = datetime.now.return_value.isoformat.return_value
self.assertTrue('pid' in run.patch.call_args_list[1][0][0])
self.assertEqual(dts, run.patch.call_args_list[0][0][0]['start_dts'])
self.assertEqual(
u'H\xe9llo World!\n'.encode('utf-8'),
run.run_log.patch.call_args_list[0][0][0]['content']
)
self.assertEqual([
call({
'return_dts': dts,
'return_success': True,
})
], run.patch.call_args_list[2:])
self.assertEqual([
call('{"kind": "run", "event": "started", "run_id": 1234}'),
call('{"kind": "run", "event": "returned", "run_id": 1234}'),
], event_queue.put.call_args_list)
datetime.now.assert_called_with(utc)

@patch('job_runner_worker.worker.subprocess', subprocess)
@patch('job_runner_worker.worker.RunLog')
@patch('job_runner_worker.worker.datetime')
Expand All @@ -75,6 +125,7 @@ def test_execute_bad_shebang(self, config, datetime, RunLog):
config.get.return_value = '/tmp'

run = Mock()
run.run_log = None
run.id = 1234
run.job.script_content = (
u'#!I love cheese\n\necho "H\xe9llo World!";\n')
Expand Down
28 changes: 19 additions & 9 deletions job_runner_worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,25 @@ def execute_run(run_queue, event_queue, exit_queue):
log_output = _truncate_log(out)

logger.info('Run {0} ended'.format(run.resource_uri))
run_log = RunLog(
config.get('job_runner_worker', 'run_log_resource_uri'))
run_log.post({
'run': '{0}{1}/'.format(
config.get('job_runner_worker', 'run_resource_uri'),
run.id
),
'content': log_output
})
run.reload()
run_log = run.run_log

if run_log:
# handles the rare case when a job alread has a log, but was
# restarted (because the return_dts was never set)
run_log.patch({
'content': log_output,
})
else:
run_log = RunLog(
config.get('job_runner_worker', 'run_log_resource_uri'))
run_log.post({
'run': '{0}{1}/'.format(
config.get('job_runner_worker', 'run_resource_uri'),
run.id
),
'content': log_output
})
run.patch({
'return_dts': datetime.now(utc).isoformat(' '),
'return_success':
Expand Down

0 comments on commit b8f24c5

Please sign in to comment.