Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def prefix(path, routes):
prefix('/jobs', [
route('/next', JobsHandler, h='next', m=['GET']),
route('/stats', JobsHandler, h='stats', m=['GET']),
route('/pending', JobsHandler, h='pending', m=['GET']),
route('/reap', JobsHandler, h='reap_stale', m=['POST']),
route('/add', JobsHandler, h='add', m=['POST']),
route('/<:[^/]+>', JobHandler),
Expand Down
28 changes: 26 additions & 2 deletions api/jobs/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,29 @@ def stats(self):
if not self.superuser_request and not self.user_is_admin:
self.abort(403, 'Request requires admin')

return Queue.get_statistics()
all_flag = self.is_true('all')
unique = self.is_true('unique')
tags = self.request.GET.getall('tags')
last = self.request.GET.get('last')

# Allow for tags to be specified multiple times, or just comma-deliminated
if len(tags) == 1:
tags = tags[0].split(',')

if last is not None:
last = int(last)

return Queue.get_statistics(tags=tags, last=last, unique=unique, all_flag=all_flag)

def pending(self):
if not self.superuser_request and not self.user_is_admin:
self.abort(403, 'Request requires admin')

tags = self.request.GET.getall('tags')
if len(tags) == 1:
tags = tags[0].split(',')

return Queue.get_pending(tags=tags)

def next(self):

Expand Down Expand Up @@ -325,7 +347,7 @@ def get(self, _id):

def get_config(self, _id):
"""Get a job's config"""
if not self.superuser_request:
if not self.superuser_request and not self.user_is_admin:
self.abort(403, 'Request requires superuser')

j = Job.get(_id)
Expand Down Expand Up @@ -373,10 +395,12 @@ def get_config(self, _id):

encoded = pseudo_consistent_json_encode(c)
self.response.app_iter = StringIO.StringIO(encoded)
self.response.headers['Content-Length'] = str(len(encoded.encode('utf-8'))) # must be set after app_iter
else:
# Legacy behavior
encoded = pseudo_consistent_json_encode({"config": c})
self.response.app_iter = StringIO.StringIO(encoded)
self.response.headers['Content-Length'] = str(len(encoded.encode('utf-8'))) # must be set after app_iter

@require_login
def put(self, _id):
Expand Down
51 changes: 37 additions & 14 deletions api/jobs/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,30 +360,53 @@ def search(containers, states=None, tags=None):
])

@staticmethod
def get_statistics():
def get_statistics(tags=None, last=None, unique=False, all_flag=False):
"""
Return a variety of interesting information about the job queue.
"""

# Count jobs by state
result = config.db.jobs.aggregate([{"$group": {"_id": "$state", "count": {"$sum": 1}}}])
# Map mongo result to a useful object
if all_flag:
unique = True
if last is None:
last = 3

results = { }
match = { } # match all jobs

if tags is not None and len(tags) > 0:
match = { 'tags': {'$in': tags } } # match only jobs with given tags

# Count jobs by state, mapping the mongo result to a useful object
result = list(config.db.jobs.aggregate([{'$match': match }, {'$group': {'_id': '$state', 'count': {'$sum': 1}}}]))
by_state = {s: 0 for s in JOB_STATES}
by_state.update({r['_id']: r['count'] for r in result})
results['states'] = by_state

# List unique tags
if unique:
results['unique'] = sorted(config.db.jobs.distinct('tags'))

# List recently modified jobs for each state
if last is not None:
results['recent'] = {s: config.db.jobs.find({'$and': [match, {'state': s}]}, {'modified':1}).sort([('modified', pymongo.DESCENDING)]).limit(last) for s in JOB_STATES}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than doing N (job state) finds we could use a mongo aggregation pipeline that does a sort, followed by a group (by state), followed by a limit. I'm not convinced that would be faster on larger datasets but it's something to think about if this ends up being a problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually looked into that, and it was slower - via the entirely scientific process of running each query a few times and checking the execution time. It was close though, and dev.fw may not be very representative for this either... It would at least be fewer round trips!


# Count jobs by tag grouping
result = list(config.db.jobs.aggregate([{"$group": {"_id": "$tags", "count": {"$sum": 1}}}]))
by_tag = []
for r in result:
by_tag.append({'tags': r['_id'], 'count': r['count']})
return results

# Count jobs that will not be retried
permafailed = config.db.jobs.count({"attempt": {"$gte": max_attempts()}, "state":"failed"})
@staticmethod
def get_pending(tags=None):
"""
Returns the same format as get_statistics, but only the pending number.
Designed to be as efficient as possible for frequent polling :(
"""

match = { } # match all jobs
if tags is not None and len(tags) > 0:
match = { 'tags': {'$in': tags } } # match only jobs with given tags

return {
'by-state': by_state,
'by-tag': by_tag,
'permafailed': permafailed
'states': {
'pending': config.db.jobs.count({'$and': [match, {'state': 'pending'}]})
}
}

@staticmethod
Expand Down
14 changes: 14 additions & 0 deletions tests/integration_tests/python/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ def test_jobs_access(as_user):
r = as_user.get('/jobs/stats')
assert r.status_code == 403

r = as_user.get('/jobs/pending')
assert r.status_code == 403

r = as_user.post('/jobs/reap')
assert r.status_code == 403

Expand Down Expand Up @@ -328,6 +331,17 @@ def test_jobs(data_builder, default_payload, as_public, as_user, as_admin, as_ro
assert "The job did not report in for a long time and was canceled." in [log["msg"] for log in r.json()['logs']]
api_db.jobs.delete_one({"_id": bson.ObjectId("5a007cdb0f352600d94c845f")})

r = as_admin.get('/jobs/stats')
assert r.ok
r = as_admin.get('/jobs/pending')
assert r.ok
r = as_admin.get('/jobs/pending', params={'tags': 'auto,unused'})
assert r.ok
r = as_admin.get('/jobs/stats', params={'all': '1'})
assert r.ok
r = as_admin.get('/jobs/stats', params={'tags': 'auto,unused', 'last': '2'})
assert r.ok


def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_drone, api_db, file_form):
# create gear
Expand Down