diff --git a/relengapi/blueprints/archiver/__init__.py b/relengapi/blueprints/archiver/__init__.py index c50a6456..79138e59 100644 --- a/relengapi/blueprints/archiver/__init__.py +++ b/relengapi/blueprints/archiver/__init__.py @@ -27,6 +27,7 @@ log = logging.getLogger(__name__) GET_EXPIRES_IN = 300 +PENDING_EXPIRES_IN = 60 FINISHED_STATES = ['SUCCESS', 'FAILURE', 'REVOKED'] @@ -60,6 +61,14 @@ def cleanup_old_tasks(job_status): break +def renew_tracker_pending_expiry(tracker): + pending_expires_at = now() + datetime.timedelta(seconds=PENDING_EXPIRES_IN) + session = current_app.db.session('relengapi') + log.info("renewing tracker {} with pending expiry: {}".format(tracker.id, pending_expires_at)) + tracker.pending_expires_at = pending_expires_at + session.commit() + + @bp.route('/status/') @api.apimethod(MozharnessArchiveTask, unicode) def task_status(task_id): @@ -78,17 +87,6 @@ def task_status(task_id): task = create_and_upload_archive.AsyncResult(task_id) task_tracker = tables.ArchiverTask.query.filter(tables.ArchiverTask.task_id == task_id).first() log.info("checking status of task id {}: current state {}".format(task_id, task.state)) - - # archiver does not create any custom states, so we can assume to have only the defaults: - # http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-states - # therefore, delete our state_id tracker from the db if the celery state is in a final state: - # e.g. not RETRY, STARTED, or PENDING - if task_tracker: - if task.state in FINISHED_STATES: - delete_tracker(task_tracker) - elif task_tracker.state != task.state: - update_tracker_state(task_tracker, task.state) - task_info = task.info or {} response = { 'state': task.state, @@ -103,6 +101,23 @@ def task_status(task_id): response['src_url'] = '' response['s3_urls'] = {} + # archiver does not create any custom states, so we can assume to have only the defaults: + # http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-states + # therefore, delete our state_id tracker from the db if the celery state is in a final state: + # e.g. not RETRY, STARTED, or PENDING + if task_tracker: + if task.state in FINISHED_STATES: + delete_tracker(task_tracker) + elif task.state == "PENDING" and task_tracker.pending_expires_at < now(): + log.info("Task {} has expired from pending too long. Re-creating task".format(task.id)) + renew_tracker_pending_expiry(task_tracker) # let exceptions bubble up before moving on + create_and_upload_archive.apply_async(args=[task_tracker.src_url, task_tracker.s3_key], + task_id=task.id) + response['state'] = 'RETRY' + response['status'] = 'Task has expired from pending for too long. Re-creating task.' + elif task_tracker.state != task.state: + update_tracker_state(task_tracker, task.state) + return MozharnessArchiveTask(**response) @@ -174,12 +189,14 @@ def get_archive(src_url, key, preferred_region): log.info("Creating new celery task and task tracker for: {}".format(task_id)) task = create_and_upload_archive.apply_async(args=[src_url, key], task_id=task_id) if task and task.id: - session.add(tables.ArchiverTask(task_id=task.id, created_at=now(), + pending_expires_at = now() + datetime.timedelta(seconds=PENDING_EXPIRES_IN) + session.add(tables.ArchiverTask(task_id=task.id, s3_key=key, created_at=now(), + pending_expires_at=pending_expires_at, src_url=src_url, state="PENDING")) session.commit() - return {}, 202, {'Location': url_for('archiver.task_status', task_id=task.id)} else: return {}, 500 + return {}, 202, {'Location': url_for('archiver.task_status', task_id=task.id)} log.info("generating GET URL to {}, expires in {}s".format(key, GET_EXPIRES_IN)) # return 302 pointing to s3 url with archive diff --git a/relengapi/blueprints/archiver/tables.py b/relengapi/blueprints/archiver/tables.py index 8c6228ea..c9fc3134 100644 --- a/relengapi/blueprints/archiver/tables.py +++ b/relengapi/blueprints/archiver/tables.py @@ -8,6 +8,7 @@ class ArchiverTask(db.declarative_base('relengapi')): id = sa.Column(sa.Integer, primary_key=True) task_id = sa.Column(sa.String(100), nullable=False, unique=True) created_at = sa.Column(db.UTCDateTime(timezone=True), nullable=False) + pending_expires_at = sa.Column(db.UTCDateTime(timezone=True), nullable=False) state = sa.Column(sa.String(50)) - # for debugging - src_url = sa.Column(sa.String(200)) + src_url = sa.Column(sa.String(200), nullable=False) + s3_key = sa.Column(sa.String(200), nullable=False) diff --git a/relengapi/blueprints/archiver/test_archiver.py b/relengapi/blueprints/archiver/test_archiver.py index e25bc1ea..013ed944 100644 --- a/relengapi/blueprints/archiver/test_archiver.py +++ b/relengapi/blueprints/archiver/test_archiver.py @@ -11,17 +11,19 @@ from relengapi.blueprints.archiver import TASK_TIME_OUT from relengapi.blueprints.archiver import cleanup_old_tasks from relengapi.blueprints.archiver import delete_tracker +from relengapi.blueprints.archiver import renew_tracker_pending_expiry from relengapi.blueprints.archiver import tables from relengapi.blueprints.archiver import update_tracker_state from relengapi.blueprints.archiver.test_util import EXPECTED_TASK_STATUS_FAILED_RESPONSE +from relengapi.blueprints.archiver.test_util import EXPECTED_TASK_STATUS_PENDING_RESPONSE from relengapi.blueprints.archiver.test_util import EXPECTED_TASK_STATUS_SUCCESSFUL_RESPONSE from relengapi.blueprints.archiver.test_util import create_s3_items from relengapi.blueprints.archiver.test_util import fake_200_response +from relengapi.blueprints.archiver.test_util import fake_expired_task_status from relengapi.blueprints.archiver.test_util import fake_failed_task_status from relengapi.blueprints.archiver.test_util import fake_incomplete_task_status from relengapi.blueprints.archiver.test_util import fake_successful_task_status from relengapi.blueprints.archiver.test_util import setup_buckets -from relengapi.lib import time from relengapi.lib.testing.context import TestContext @@ -49,12 +51,18 @@ test_context = TestContext(config=cfg, databases=['relengapi']) -def create_fake_tracker_row(app, id, created_at=None, src_url='https://foo.com', state="PENDING"): +def create_fake_tracker_row(app, id, s3_key='key', created_at=None, pending_expires_at=None, + src_url='https://foo.com', state="PENDING"): + now = datetime.datetime(2015, 7, 14, 23, 19, 42, tzinfo=pytz.UTC) # freeze time + pending_expiry = now + datetime.timedelta(seconds=60) if not created_at: - created_at = time.now() + created_at = now + if not pending_expires_at: + pending_expires_at = pending_expiry session = app.db.session('relengapi') session.add( - tables.ArchiverTask(task_id=id, created_at=created_at, src_url=src_url, state=state) + tables.ArchiverTask(task_id=id, s3_key=s3_key, created_at=created_at, + pending_expires_at=pending_expires_at, src_url=src_url, state=state) ) session.commit() @@ -114,6 +122,63 @@ def test_task_status_when_success(app, client): "A successful task status check does not equal expected status.") +@moto.mock_s3 +@test_context +def test_task_status_when_pending_expired(app, client): + now = datetime.datetime(2015, 7, 14, 23, 19, 42, tzinfo=pytz.UTC) # freeze time + pending_expiry = now + datetime.timedelta(seconds=60) + new_expected_pending_expiry = now + datetime.timedelta(seconds=121) + expired_future = now + datetime.timedelta(seconds=61) + task_id = "mozilla-central-9213957d166d.tar.gz_testing_mozharness" + create_fake_tracker_row(app, task_id, created_at=now, + pending_expires_at=pending_expiry) + expected_response = EXPECTED_TASK_STATUS_PENDING_RESPONSE + + with app.app_context(): + with mock.patch('relengapi.blueprints.archiver.now') as time_traveller, \ + mock.patch("relengapi.blueprints.archiver.create_and_upload_archive") as caua: + caua.AsyncResult.return_value = fake_expired_task_status() + time_traveller.return_value = expired_future + response = client.get('/archiver/status/{task_id}'.format(task_id=task_id)) + # status will change state to RETRY + expected_response['state'] = "RETRY" + expected_response['status'] = "Task has expired from pending for too long. " \ + "Re-creating task." + eq_(cmp(json.loads(response.data)['result'], expected_response), 0, + "An expired task status check does not equal expected status.") + tracker = tables.ArchiverTask.query.filter( + tables.ArchiverTask.task_id == task_id + ).first() + eq_(tracker.pending_expires_at, new_expected_pending_expiry, + "New pending expiry does not match expected") + + +@moto.mock_s3 +@test_context +def test_task_status_when_pending_but_not_expired(app, client): + now = datetime.datetime(2015, 7, 14, 23, 19, 42, tzinfo=pytz.UTC) # freeze time + pending_expiry = now + datetime.timedelta(seconds=60) + future = now + datetime.timedelta(seconds=59) + task_id = "mozilla-central-9213957d166d.tar.gz_testing_mozharness" + create_fake_tracker_row(app, task_id, created_at=now, + pending_expires_at=pending_expiry) + expected_response = EXPECTED_TASK_STATUS_PENDING_RESPONSE + + with app.app_context(): + with mock.patch('relengapi.blueprints.archiver.now') as time_traveller, \ + mock.patch("relengapi.blueprints.archiver.create_and_upload_archive") as caua: + caua.AsyncResult.return_value = fake_expired_task_status() + time_traveller.return_value = future + response = client.get('/archiver/status/{task_id}'.format(task_id=task_id)) + eq_(cmp(json.loads(response.data)['result'], expected_response), 0, + "A pending task that has not expired does not equal expected status.") + tracker = tables.ArchiverTask.query.filter( + tables.ArchiverTask.task_id == task_id + ).first() + eq_(tracker.pending_expires_at, pending_expiry, + "Tracker does not match original pending expiry.") + + @moto.mock_s3 @test_context def test_tracker_delete(app, client): @@ -126,6 +191,26 @@ def test_tracker_delete(app, client): eq_(tracker, None, "tracker was not deleted") +@moto.mock_s3 +@test_context +def test_renew_tracker_pending_expiry(app, client): + now = datetime.datetime(2015, 7, 14, 23, 19, 42, tzinfo=pytz.UTC) # freeze time + expected_pending_expiry = now + datetime.timedelta(seconds=60) + with mock.patch('relengapi.blueprints.archiver.now') as time_traveller: + time_traveller.return_value = now + with app.app_context(): + create_fake_tracker_row(app, 'foo', created_at=now, + pending_expires_at=expected_pending_expiry) + tracker = tables.ArchiverTask.query.filter(tables.ArchiverTask.task_id == 'foo').first() + eq_(tracker.pending_expires_at, expected_pending_expiry, + "original pending expiry does not match expected") + renew_tracker_pending_expiry(tracker) + tracker = tables.ArchiverTask.query.filter(tables.ArchiverTask.task_id == 'foo').first() + expected_new_pending_expiry = now + datetime.timedelta(seconds=60) + eq_(tracker.pending_expires_at, expected_new_pending_expiry, + "new pending expiry does not match expected") + + @moto.mock_s3 @test_context def test_tracker_update(app, client): @@ -141,20 +226,33 @@ def test_tracker_update(app, client): @test_context def test_tracker_added_when_celery_task_is_created(app, client): setup_buckets(app, cfg) - with mock.patch("relengapi.blueprints.archiver.tasks.requests.get") as get, \ + now = datetime.datetime(2015, 7, 14, 23, 19, 42, tzinfo=pytz.UTC) # freeze time + expected_pending_expiry = now + datetime.timedelta(seconds=60) + expected_tracker_id = "mozilla-central-9213957d166d.tar.gz_testing_mozharness" + expected_tracker_s3_key = "mozilla-central-9213957d166d.tar.gz/testing/mozharness" + expected_src_url = cfg['ARCHIVER_HGMO_URL_TEMPLATE'].format( + repo='mozilla-central', rev='9213957d166d', suffix='tar.gz', subdir='testing/mozharness' + ) + with mock.patch('relengapi.blueprints.archiver.now') as time_traveller, \ + mock.patch("relengapi.blueprints.archiver.tasks.requests.get") as get, \ mock.patch("relengapi.blueprints.archiver.tasks.requests.head") as head: - # don't actually hit hg.m.o, we just care about starting a subprocess and - # returning a 202 accepted - get.return_value = fake_200_response() - head.return_value = fake_200_response() - client.get('/archiver/hgmo/mozilla-central/9213957d166d?' - 'subdir=testing/mozharness&preferred_region=us-west-2') - with app.app_context(): - expected_tracker_id = "mozilla-central-9213957d166d.tar.gz_testing_mozharness" - tracker = tables.ArchiverTask.query.filter( - tables.ArchiverTask.task_id == expected_tracker_id - ).first() - eq_(tracker.task_id, expected_tracker_id, "tracker was not created for celery task") + # don't actually hit hg.m.o, we just care about starting a subprocess and + # returning a 202 accepted + get.return_value = fake_200_response() + head.return_value = fake_200_response() + time_traveller.return_value = now + client.get('/archiver/hgmo/mozilla-central/9213957d166d?' + 'subdir=testing/mozharness&preferred_region=us-west-2') + with app.app_context(): + tracker = tables.ArchiverTask.query.filter( + tables.ArchiverTask.task_id == expected_tracker_id + ).first() + eq_(tracker.task_id, expected_tracker_id, "tracker id doesn't match task") + eq_(tracker.s3_key, expected_tracker_s3_key, "tracker s3_key doesn't match task") + eq_(tracker.created_at, now, "tracker created_at doesn't match task") + eq_(tracker.pending_expires_at, expected_pending_expiry, + "tracker pending_expires_at doesn't match task") + eq_(tracker.src_url, expected_src_url, "tracker src_url doesn't match task") @moto.mock_s3 @@ -163,7 +261,10 @@ def test_tracker_is_updated_when_task_state_changes_but_is_not_complete(app, cli with app.app_context(): task_id = 'foo' session = app.db.session('relengapi') - session.add(tables.ArchiverTask(task_id=task_id, created_at=time.now(), + now = datetime.datetime(2015, 7, 14, 23, 19, 42, tzinfo=pytz.UTC) # freeze time + pending_expiry = now + datetime.timedelta(seconds=60) + session.add(tables.ArchiverTask(task_id=task_id, s3_key='key', created_at=now, + pending_expires_at=pending_expiry, src_url='https://foo.com', state="PENDING")) session.commit() with mock.patch("relengapi.blueprints.archiver.create_and_upload_archive") as caua: @@ -179,7 +280,10 @@ def test_tracker_is_deleted_when_task_status_shows_task_complete(app, client): with app.app_context(): task_id = 'foo' session = app.db.session('relengapi') - session.add(tables.ArchiverTask(task_id=task_id, created_at=time.now(), + now = datetime.datetime(2015, 7, 14, 23, 19, 42, tzinfo=pytz.UTC) # freeze time + pending_expiry = now + datetime.timedelta(seconds=60) + session.add(tables.ArchiverTask(task_id=task_id, s3_key='key', created_at=now, + pending_expires_at=pending_expiry, src_url='https://foo.com', state="PENDING")) session.commit() with mock.patch("relengapi.blueprints.archiver.create_and_upload_archive") as caua: diff --git a/relengapi/blueprints/archiver/test_util.py b/relengapi/blueprints/archiver/test_util.py index 854035b0..74a94480 100644 --- a/relengapi/blueprints/archiver/test_util.py +++ b/relengapi/blueprints/archiver/test_util.py @@ -29,6 +29,17 @@ } +EXPECTED_TASK_STATUS_PENDING_RESPONSE = { + "s3_urls": { + "us-east-1": "https://archiver-us-east-1.s3.amazonaws.com/mozilla-central-9213957d1.tar.gz", + "us-west-2": "https://archiver-us-west-2.s3.amazonaws.com/mozilla-central-9213957d1.tar.gz", + }, + "src_url": "https://hg.mozilla.org/mozilla-central/archive/9213957d1.tar.gz/testing/mozharness", + "state": "PENDING", + "status": "no task status at this point", +} + + def setup_buckets(app, cfg): for region, bucket in cfg['ARCHIVER_S3_BUCKETS'].iteritems(): s3 = app.aws.connect_to('s3', region) @@ -81,8 +92,21 @@ def fake_successful_task_status(): return task +def fake_expired_task_status(): + task = mock.Mock() + task.state = EXPECTED_TASK_STATUS_PENDING_RESPONSE['state'] + task.info = { + 'src_url': EXPECTED_TASK_STATUS_PENDING_RESPONSE['src_url'], + 's3_urls': EXPECTED_TASK_STATUS_PENDING_RESPONSE['s3_urls'], + 'status': EXPECTED_TASK_STATUS_PENDING_RESPONSE['status'], + } + return task + + def fake_incomplete_task_status(): task = mock.Mock() task.state = EXPECTED_TASK_STATUS_INCOMPLETE_RESPONSE['state'] - task.info = EXPECTED_TASK_STATUS_INCOMPLETE_RESPONSE['status'] + task.status = { + 'status': EXPECTED_TASK_STATUS_INCOMPLETE_RESPONSE['status'], + } return task