diff --git a/api/api.py b/api/api.py index 37ee11d6a..db7a9dc70 100644 --- a/api/api.py +++ b/api/api.py @@ -132,6 +132,7 @@ def prefix(path, routes): prefix('/jobs', [ route('/next', JobsHandler, h='next', m=['GET']), route('/stats', JobsHandler, h='stats', m=['GET']), + route('/pending', JobsHandler, h='pending', m=['GET']), route('/reap', JobsHandler, h='reap_stale', m=['POST']), route('/add', JobsHandler, h='add', m=['POST']), route('/<:[^/]+>', JobHandler), diff --git a/api/jobs/handlers.py b/api/jobs/handlers.py index 9e874e117..6be97dbba 100644 --- a/api/jobs/handlers.py +++ b/api/jobs/handlers.py @@ -287,7 +287,29 @@ def stats(self): if not self.superuser_request and not self.user_is_admin: self.abort(403, 'Request requires admin') - return Queue.get_statistics() + all_flag = self.is_true('all') + unique = self.is_true('unique') + tags = self.request.GET.getall('tags') + last = self.request.GET.get('last') + + # Allow for tags to be specified multiple times, or just comma-deliminated + if len(tags) == 1: + tags = tags[0].split(',') + + if last is not None: + last = int(last) + + return Queue.get_statistics(tags=tags, last=last, unique=unique, all_flag=all_flag) + + def pending(self): + if not self.superuser_request and not self.user_is_admin: + self.abort(403, 'Request requires admin') + + tags = self.request.GET.getall('tags') + if len(tags) == 1: + tags = tags[0].split(',') + + return Queue.get_pending(tags=tags) def next(self): @@ -325,7 +347,7 @@ def get(self, _id): def get_config(self, _id): """Get a job's config""" - if not self.superuser_request: + if not self.superuser_request and not self.user_is_admin: self.abort(403, 'Request requires superuser') j = Job.get(_id) @@ -373,10 +395,12 @@ def get_config(self, _id): encoded = pseudo_consistent_json_encode(c) self.response.app_iter = StringIO.StringIO(encoded) + self.response.headers['Content-Length'] = str(len(encoded.encode('utf-8'))) # must be set after app_iter else: # Legacy behavior encoded = pseudo_consistent_json_encode({"config": c}) self.response.app_iter = StringIO.StringIO(encoded) + self.response.headers['Content-Length'] = str(len(encoded.encode('utf-8'))) # must be set after app_iter @require_login def put(self, _id): diff --git a/api/jobs/queue.py b/api/jobs/queue.py index 198b515b1..4d2ab8ce9 100644 --- a/api/jobs/queue.py +++ b/api/jobs/queue.py @@ -360,30 +360,53 @@ def search(containers, states=None, tags=None): ]) @staticmethod - def get_statistics(): + def get_statistics(tags=None, last=None, unique=False, all_flag=False): """ Return a variety of interesting information about the job queue. """ - # Count jobs by state - result = config.db.jobs.aggregate([{"$group": {"_id": "$state", "count": {"$sum": 1}}}]) - # Map mongo result to a useful object + if all_flag: + unique = True + if last is None: + last = 3 + + results = { } + match = { } # match all jobs + + if tags is not None and len(tags) > 0: + match = { 'tags': {'$in': tags } } # match only jobs with given tags + + # Count jobs by state, mapping the mongo result to a useful object + result = list(config.db.jobs.aggregate([{'$match': match }, {'$group': {'_id': '$state', 'count': {'$sum': 1}}}])) by_state = {s: 0 for s in JOB_STATES} by_state.update({r['_id']: r['count'] for r in result}) + results['states'] = by_state + + # List unique tags + if unique: + results['unique'] = sorted(config.db.jobs.distinct('tags')) + + # List recently modified jobs for each state + if last is not None: + results['recent'] = {s: config.db.jobs.find({'$and': [match, {'state': s}]}, {'modified':1}).sort([('modified', pymongo.DESCENDING)]).limit(last) for s in JOB_STATES} - # Count jobs by tag grouping - result = list(config.db.jobs.aggregate([{"$group": {"_id": "$tags", "count": {"$sum": 1}}}])) - by_tag = [] - for r in result: - by_tag.append({'tags': r['_id'], 'count': r['count']}) + return results - # Count jobs that will not be retried - permafailed = config.db.jobs.count({"attempt": {"$gte": max_attempts()}, "state":"failed"}) + @staticmethod + def get_pending(tags=None): + """ + Returns the same format as get_statistics, but only the pending number. + Designed to be as efficient as possible for frequent polling :( + """ + + match = { } # match all jobs + if tags is not None and len(tags) > 0: + match = { 'tags': {'$in': tags } } # match only jobs with given tags return { - 'by-state': by_state, - 'by-tag': by_tag, - 'permafailed': permafailed + 'states': { + 'pending': config.db.jobs.count({'$and': [match, {'state': 'pending'}]}) + } } @staticmethod diff --git a/tests/integration_tests/python/test_jobs.py b/tests/integration_tests/python/test_jobs.py index 78306338e..57e09e919 100644 --- a/tests/integration_tests/python/test_jobs.py +++ b/tests/integration_tests/python/test_jobs.py @@ -10,6 +10,9 @@ def test_jobs_access(as_user): r = as_user.get('/jobs/stats') assert r.status_code == 403 + r = as_user.get('/jobs/pending') + assert r.status_code == 403 + r = as_user.post('/jobs/reap') assert r.status_code == 403 @@ -328,6 +331,17 @@ def test_jobs(data_builder, default_payload, as_public, as_user, as_admin, as_ro assert "The job did not report in for a long time and was canceled." in [log["msg"] for log in r.json()['logs']] api_db.jobs.delete_one({"_id": bson.ObjectId("5a007cdb0f352600d94c845f")}) + r = as_admin.get('/jobs/stats') + assert r.ok + r = as_admin.get('/jobs/pending') + assert r.ok + r = as_admin.get('/jobs/pending', params={'tags': 'auto,unused'}) + assert r.ok + r = as_admin.get('/jobs/stats', params={'all': '1'}) + assert r.ok + r = as_admin.get('/jobs/stats', params={'tags': 'auto,unused', 'last': '2'}) + assert r.ok + def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_drone, api_db, file_form): # create gear