Skip to content

Commit

Permalink
Add tags filer to /jobs/next
Browse files Browse the repository at this point in the history
  • Loading branch information
kofalt committed May 16, 2016
1 parent 7a2d935 commit cfff8a8
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
6 changes: 5 additions & 1 deletion api/jobs/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,11 @@ def next(self):
if not self.superuser_request:
self.abort(403, 'Request requires superuser')

job = Queue.start_job()
tags = self.request.GET.getall('tags')
if len(tags) <= 0:
tags = None

job = Queue.start_job(tags=tags)

if job is None:
self.abort(400, 'No jobs to process')
Expand Down
14 changes: 10 additions & 4 deletions api/jobs/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,23 @@ def retry(job, force=False):
log.info('respawned job %s as %s (attempt %d)' % (job._id, new_id, new_job.attempt))

@staticmethod
def start_job():
def start_job(tags=None):
"""
Atomically change a 'pending' job to 'running' and returns it. Updates timestamp.
Will return None if there are no jobs to offer.
Potential jobs must match at least one tag, if provided.
"""

query = { 'state': 'pending' }

if tags is not None:
query['tags'] = {'$in': tags }

# First, atomically mark document as running.
result = config.db.jobs.find_one_and_update(
{
'state': 'pending'
},
query,

{ '$set': {
'state': 'running',
'modified': datetime.datetime.utcnow()}
Expand Down

0 comments on commit cfff8a8

Please sign in to comment.