Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Commit

Permalink
Merge pull request #309 from lundjordan/archiver
Browse files Browse the repository at this point in the history
Bug 1182532 - archiver is broken - mozharness can't be deployed on slaves
  • Loading branch information
lundjordan committed Jul 28, 2015
2 parents 15e0c5f + c175c0e commit 66bb96c
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 35 deletions.
43 changes: 30 additions & 13 deletions relengapi/blueprints/archiver/__init__.py
Expand Up @@ -27,6 +27,7 @@
log = logging.getLogger(__name__)

GET_EXPIRES_IN = 300
PENDING_EXPIRES_IN = 60
FINISHED_STATES = ['SUCCESS', 'FAILURE', 'REVOKED']


Expand Down Expand Up @@ -60,6 +61,14 @@ def cleanup_old_tasks(job_status):
break


def renew_tracker_pending_expiry(tracker):
pending_expires_at = now() + datetime.timedelta(seconds=PENDING_EXPIRES_IN)
session = current_app.db.session('relengapi')
log.info("renewing tracker {} with pending expiry: {}".format(tracker.id, pending_expires_at))
tracker.pending_expires_at = pending_expires_at
session.commit()


@bp.route('/status/<task_id>')
@api.apimethod(MozharnessArchiveTask, unicode)
def task_status(task_id):
Expand All @@ -78,17 +87,6 @@ def task_status(task_id):
task = create_and_upload_archive.AsyncResult(task_id)
task_tracker = tables.ArchiverTask.query.filter(tables.ArchiverTask.task_id == task_id).first()
log.info("checking status of task id {}: current state {}".format(task_id, task.state))

# archiver does not create any custom states, so we can assume to have only the defaults:
# http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-states
# therefore, delete our state_id tracker from the db if the celery state is in a final state:
# e.g. not RETRY, STARTED, or PENDING
if task_tracker:
if task.state in FINISHED_STATES:
delete_tracker(task_tracker)
elif task_tracker.state != task.state:
update_tracker_state(task_tracker, task.state)

task_info = task.info or {}
response = {
'state': task.state,
Expand All @@ -103,6 +101,23 @@ def task_status(task_id):
response['src_url'] = ''
response['s3_urls'] = {}

# archiver does not create any custom states, so we can assume to have only the defaults:
# http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-states
# therefore, delete our state_id tracker from the db if the celery state is in a final state:
# e.g. not RETRY, STARTED, or PENDING
if task_tracker:
if task.state in FINISHED_STATES:
delete_tracker(task_tracker)
elif task.state == "PENDING" and task_tracker.pending_expires_at < now():
log.info("Task {} has expired from pending too long. Re-creating task".format(task.id))
renew_tracker_pending_expiry(task_tracker) # let exceptions bubble up before moving on
create_and_upload_archive.apply_async(args=[task_tracker.src_url, task_tracker.s3_key],
task_id=task.id)
response['state'] = 'RETRY'
response['status'] = 'Task has expired from pending for too long. Re-creating task.'
elif task_tracker.state != task.state:
update_tracker_state(task_tracker, task.state)

return MozharnessArchiveTask(**response)


Expand Down Expand Up @@ -174,12 +189,14 @@ def get_archive(src_url, key, preferred_region):
log.info("Creating new celery task and task tracker for: {}".format(task_id))
task = create_and_upload_archive.apply_async(args=[src_url, key], task_id=task_id)
if task and task.id:
session.add(tables.ArchiverTask(task_id=task.id, created_at=now(),
pending_expires_at = now() + datetime.timedelta(seconds=PENDING_EXPIRES_IN)
session.add(tables.ArchiverTask(task_id=task.id, s3_key=key, created_at=now(),
pending_expires_at=pending_expires_at,
src_url=src_url, state="PENDING"))
session.commit()
return {}, 202, {'Location': url_for('archiver.task_status', task_id=task.id)}
else:
return {}, 500
return {}, 202, {'Location': url_for('archiver.task_status', task_id=task.id)}

log.info("generating GET URL to {}, expires in {}s".format(key, GET_EXPIRES_IN))
# return 302 pointing to s3 url with archive
Expand Down
5 changes: 3 additions & 2 deletions relengapi/blueprints/archiver/tables.py
Expand Up @@ -8,6 +8,7 @@ class ArchiverTask(db.declarative_base('relengapi')):
id = sa.Column(sa.Integer, primary_key=True)
task_id = sa.Column(sa.String(100), nullable=False, unique=True)
created_at = sa.Column(db.UTCDateTime(timezone=True), nullable=False)
pending_expires_at = sa.Column(db.UTCDateTime(timezone=True), nullable=False)
state = sa.Column(sa.String(50))
# for debugging
src_url = sa.Column(sa.String(200))
src_url = sa.Column(sa.String(200), nullable=False)
s3_key = sa.Column(sa.String(200), nullable=False)
142 changes: 123 additions & 19 deletions relengapi/blueprints/archiver/test_archiver.py
Expand Up @@ -11,17 +11,19 @@
from relengapi.blueprints.archiver import TASK_TIME_OUT
from relengapi.blueprints.archiver import cleanup_old_tasks
from relengapi.blueprints.archiver import delete_tracker
from relengapi.blueprints.archiver import renew_tracker_pending_expiry
from relengapi.blueprints.archiver import tables
from relengapi.blueprints.archiver import update_tracker_state
from relengapi.blueprints.archiver.test_util import EXPECTED_TASK_STATUS_FAILED_RESPONSE
from relengapi.blueprints.archiver.test_util import EXPECTED_TASK_STATUS_PENDING_RESPONSE
from relengapi.blueprints.archiver.test_util import EXPECTED_TASK_STATUS_SUCCESSFUL_RESPONSE
from relengapi.blueprints.archiver.test_util import create_s3_items
from relengapi.blueprints.archiver.test_util import fake_200_response
from relengapi.blueprints.archiver.test_util import fake_expired_task_status
from relengapi.blueprints.archiver.test_util import fake_failed_task_status
from relengapi.blueprints.archiver.test_util import fake_incomplete_task_status
from relengapi.blueprints.archiver.test_util import fake_successful_task_status
from relengapi.blueprints.archiver.test_util import setup_buckets
from relengapi.lib import time

from relengapi.lib.testing.context import TestContext

Expand Down Expand Up @@ -49,12 +51,18 @@
test_context = TestContext(config=cfg, databases=['relengapi'])


def create_fake_tracker_row(app, id, created_at=None, src_url='https://foo.com', state="PENDING"):
def create_fake_tracker_row(app, id, s3_key='key', created_at=None, pending_expires_at=None,
src_url='https://foo.com', state="PENDING"):
now = datetime.datetime(2015, 7, 14, 23, 19, 42, tzinfo=pytz.UTC) # freeze time
pending_expiry = now + datetime.timedelta(seconds=60)
if not created_at:
created_at = time.now()
created_at = now
if not pending_expires_at:
pending_expires_at = pending_expiry
session = app.db.session('relengapi')
session.add(
tables.ArchiverTask(task_id=id, created_at=created_at, src_url=src_url, state=state)
tables.ArchiverTask(task_id=id, s3_key=s3_key, created_at=created_at,
pending_expires_at=pending_expires_at, src_url=src_url, state=state)
)
session.commit()

Expand Down Expand Up @@ -114,6 +122,63 @@ def test_task_status_when_success(app, client):
"A successful task status check does not equal expected status.")


@moto.mock_s3
@test_context
def test_task_status_when_pending_expired(app, client):
now = datetime.datetime(2015, 7, 14, 23, 19, 42, tzinfo=pytz.UTC) # freeze time
pending_expiry = now + datetime.timedelta(seconds=60)
new_expected_pending_expiry = now + datetime.timedelta(seconds=121)
expired_future = now + datetime.timedelta(seconds=61)
task_id = "mozilla-central-9213957d166d.tar.gz_testing_mozharness"
create_fake_tracker_row(app, task_id, created_at=now,
pending_expires_at=pending_expiry)
expected_response = EXPECTED_TASK_STATUS_PENDING_RESPONSE

with app.app_context():
with mock.patch('relengapi.blueprints.archiver.now') as time_traveller, \
mock.patch("relengapi.blueprints.archiver.create_and_upload_archive") as caua:
caua.AsyncResult.return_value = fake_expired_task_status()
time_traveller.return_value = expired_future
response = client.get('/archiver/status/{task_id}'.format(task_id=task_id))
# status will change state to RETRY
expected_response['state'] = "RETRY"
expected_response['status'] = "Task has expired from pending for too long. " \
"Re-creating task."
eq_(cmp(json.loads(response.data)['result'], expected_response), 0,
"An expired task status check does not equal expected status.")
tracker = tables.ArchiverTask.query.filter(
tables.ArchiverTask.task_id == task_id
).first()
eq_(tracker.pending_expires_at, new_expected_pending_expiry,
"New pending expiry does not match expected")


@moto.mock_s3
@test_context
def test_task_status_when_pending_but_not_expired(app, client):
now = datetime.datetime(2015, 7, 14, 23, 19, 42, tzinfo=pytz.UTC) # freeze time
pending_expiry = now + datetime.timedelta(seconds=60)
future = now + datetime.timedelta(seconds=59)
task_id = "mozilla-central-9213957d166d.tar.gz_testing_mozharness"
create_fake_tracker_row(app, task_id, created_at=now,
pending_expires_at=pending_expiry)
expected_response = EXPECTED_TASK_STATUS_PENDING_RESPONSE

with app.app_context():
with mock.patch('relengapi.blueprints.archiver.now') as time_traveller, \
mock.patch("relengapi.blueprints.archiver.create_and_upload_archive") as caua:
caua.AsyncResult.return_value = fake_expired_task_status()
time_traveller.return_value = future
response = client.get('/archiver/status/{task_id}'.format(task_id=task_id))
eq_(cmp(json.loads(response.data)['result'], expected_response), 0,
"A pending task that has not expired does not equal expected status.")
tracker = tables.ArchiverTask.query.filter(
tables.ArchiverTask.task_id == task_id
).first()
eq_(tracker.pending_expires_at, pending_expiry,
"Tracker does not match original pending expiry.")


@moto.mock_s3
@test_context
def test_tracker_delete(app, client):
Expand All @@ -126,6 +191,26 @@ def test_tracker_delete(app, client):
eq_(tracker, None, "tracker was not deleted")


@moto.mock_s3
@test_context
def test_renew_tracker_pending_expiry(app, client):
now = datetime.datetime(2015, 7, 14, 23, 19, 42, tzinfo=pytz.UTC) # freeze time
expected_pending_expiry = now + datetime.timedelta(seconds=60)
with mock.patch('relengapi.blueprints.archiver.now') as time_traveller:
time_traveller.return_value = now
with app.app_context():
create_fake_tracker_row(app, 'foo', created_at=now,
pending_expires_at=expected_pending_expiry)
tracker = tables.ArchiverTask.query.filter(tables.ArchiverTask.task_id == 'foo').first()
eq_(tracker.pending_expires_at, expected_pending_expiry,
"original pending expiry does not match expected")
renew_tracker_pending_expiry(tracker)
tracker = tables.ArchiverTask.query.filter(tables.ArchiverTask.task_id == 'foo').first()
expected_new_pending_expiry = now + datetime.timedelta(seconds=60)
eq_(tracker.pending_expires_at, expected_new_pending_expiry,
"new pending expiry does not match expected")


@moto.mock_s3
@test_context
def test_tracker_update(app, client):
Expand All @@ -141,20 +226,33 @@ def test_tracker_update(app, client):
@test_context
def test_tracker_added_when_celery_task_is_created(app, client):
setup_buckets(app, cfg)
with mock.patch("relengapi.blueprints.archiver.tasks.requests.get") as get, \
now = datetime.datetime(2015, 7, 14, 23, 19, 42, tzinfo=pytz.UTC) # freeze time
expected_pending_expiry = now + datetime.timedelta(seconds=60)
expected_tracker_id = "mozilla-central-9213957d166d.tar.gz_testing_mozharness"
expected_tracker_s3_key = "mozilla-central-9213957d166d.tar.gz/testing/mozharness"
expected_src_url = cfg['ARCHIVER_HGMO_URL_TEMPLATE'].format(
repo='mozilla-central', rev='9213957d166d', suffix='tar.gz', subdir='testing/mozharness'
)
with mock.patch('relengapi.blueprints.archiver.now') as time_traveller, \
mock.patch("relengapi.blueprints.archiver.tasks.requests.get") as get, \
mock.patch("relengapi.blueprints.archiver.tasks.requests.head") as head:
# don't actually hit hg.m.o, we just care about starting a subprocess and
# returning a 202 accepted
get.return_value = fake_200_response()
head.return_value = fake_200_response()
client.get('/archiver/hgmo/mozilla-central/9213957d166d?'
'subdir=testing/mozharness&preferred_region=us-west-2')
with app.app_context():
expected_tracker_id = "mozilla-central-9213957d166d.tar.gz_testing_mozharness"
tracker = tables.ArchiverTask.query.filter(
tables.ArchiverTask.task_id == expected_tracker_id
).first()
eq_(tracker.task_id, expected_tracker_id, "tracker was not created for celery task")
# don't actually hit hg.m.o, we just care about starting a subprocess and
# returning a 202 accepted
get.return_value = fake_200_response()
head.return_value = fake_200_response()
time_traveller.return_value = now
client.get('/archiver/hgmo/mozilla-central/9213957d166d?'
'subdir=testing/mozharness&preferred_region=us-west-2')
with app.app_context():
tracker = tables.ArchiverTask.query.filter(
tables.ArchiverTask.task_id == expected_tracker_id
).first()
eq_(tracker.task_id, expected_tracker_id, "tracker id doesn't match task")
eq_(tracker.s3_key, expected_tracker_s3_key, "tracker s3_key doesn't match task")
eq_(tracker.created_at, now, "tracker created_at doesn't match task")
eq_(tracker.pending_expires_at, expected_pending_expiry,
"tracker pending_expires_at doesn't match task")
eq_(tracker.src_url, expected_src_url, "tracker src_url doesn't match task")


@moto.mock_s3
Expand All @@ -163,7 +261,10 @@ def test_tracker_is_updated_when_task_state_changes_but_is_not_complete(app, cli
with app.app_context():
task_id = 'foo'
session = app.db.session('relengapi')
session.add(tables.ArchiverTask(task_id=task_id, created_at=time.now(),
now = datetime.datetime(2015, 7, 14, 23, 19, 42, tzinfo=pytz.UTC) # freeze time
pending_expiry = now + datetime.timedelta(seconds=60)
session.add(tables.ArchiverTask(task_id=task_id, s3_key='key', created_at=now,
pending_expires_at=pending_expiry,
src_url='https://foo.com', state="PENDING"))
session.commit()
with mock.patch("relengapi.blueprints.archiver.create_and_upload_archive") as caua:
Expand All @@ -179,7 +280,10 @@ def test_tracker_is_deleted_when_task_status_shows_task_complete(app, client):
with app.app_context():
task_id = 'foo'
session = app.db.session('relengapi')
session.add(tables.ArchiverTask(task_id=task_id, created_at=time.now(),
now = datetime.datetime(2015, 7, 14, 23, 19, 42, tzinfo=pytz.UTC) # freeze time
pending_expiry = now + datetime.timedelta(seconds=60)
session.add(tables.ArchiverTask(task_id=task_id, s3_key='key', created_at=now,
pending_expires_at=pending_expiry,
src_url='https://foo.com', state="PENDING"))
session.commit()
with mock.patch("relengapi.blueprints.archiver.create_and_upload_archive") as caua:
Expand Down
26 changes: 25 additions & 1 deletion relengapi/blueprints/archiver/test_util.py
Expand Up @@ -29,6 +29,17 @@
}


EXPECTED_TASK_STATUS_PENDING_RESPONSE = {
"s3_urls": {
"us-east-1": "https://archiver-us-east-1.s3.amazonaws.com/mozilla-central-9213957d1.tar.gz",
"us-west-2": "https://archiver-us-west-2.s3.amazonaws.com/mozilla-central-9213957d1.tar.gz",
},
"src_url": "https://hg.mozilla.org/mozilla-central/archive/9213957d1.tar.gz/testing/mozharness",
"state": "PENDING",
"status": "no task status at this point",
}


def setup_buckets(app, cfg):
for region, bucket in cfg['ARCHIVER_S3_BUCKETS'].iteritems():
s3 = app.aws.connect_to('s3', region)
Expand Down Expand Up @@ -81,8 +92,21 @@ def fake_successful_task_status():
return task


def fake_expired_task_status():
task = mock.Mock()
task.state = EXPECTED_TASK_STATUS_PENDING_RESPONSE['state']
task.info = {
'src_url': EXPECTED_TASK_STATUS_PENDING_RESPONSE['src_url'],
's3_urls': EXPECTED_TASK_STATUS_PENDING_RESPONSE['s3_urls'],
'status': EXPECTED_TASK_STATUS_PENDING_RESPONSE['status'],
}
return task


def fake_incomplete_task_status():
task = mock.Mock()
task.state = EXPECTED_TASK_STATUS_INCOMPLETE_RESPONSE['state']
task.info = EXPECTED_TASK_STATUS_INCOMPLETE_RESPONSE['status']
task.status = {
'status': EXPECTED_TASK_STATUS_INCOMPLETE_RESPONSE['status'],
}
return task

0 comments on commit 66bb96c

Please sign in to comment.