Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 45 additions & 34 deletions api/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from . import base
from . import config
from . import util

log = config.log

Expand All @@ -37,11 +38,27 @@
def valid_transition(from_state, to_state):
return (from_state + ' --> ' + to_state) in JOB_TRANSITIONS or from_state == to_state

ALGORITHMS = [
'dcm2nii',
'qa'
Category = util.Enum('Category', {
'classifier': 'classifier', # discover metadata
'converter': 'converter', # translate between formats
'qa': 'qa', # quality assurance
'analytical': 'analytical', # general purpose
})

Gear = namedtuple('gear', ['name', 'category', 'input'])

Gears = [
Gear('dicom_mr_classifier', Category.classifier, {'type': 'file', 'location': '/', 'uri': '/opt/flywheel-temp/dicom_mr_classifier-0.0.1.tar'}),
Gear('dcm_convert', Category.converter, {'type': 'file', 'location': '/', 'uri': '/opt/flywheel-temp/dcm_convert-0.3.0.b.tar' }),
Gear('qa-report-fmri', Category.qa, {'type': 'file', 'location': '/', 'uri': '/opt/flywheel-temp/qa-report-fmri-0.1.0.tar' })
]

def get_gear_by_name(name):
for gear in Gears:
if gear.name == name:
return gear
raise Exception("Unknown gear " + name)

# A FileInput tuple holds all the details of a scitran file that needed to use that as an input a formula.
FileInput = namedtuple('input', ['container_type', 'container_id', 'filename', 'filehash'])

Expand Down Expand Up @@ -85,7 +102,7 @@ def create_fileinput_from_reference(container, container_type, file_):
return FileInput(container_type=container_type, container_id=container_id, filename=filename, filehash=filehash)


def queue_job(db, algorithm_id, input, attempt_n=1, previous_job_id=None):
def queue_job(db, algorithm_id, input, tags=[], attempt_n=1, previous_job_id=None):
"""
Enqueues a job for execution.

Expand All @@ -97,12 +114,13 @@ def queue_job(db, algorithm_id, input, attempt_n=1, previous_job_id=None):
Human-friendly unique name of the algorithm
input: FileInput
The input to be used by this job
tags: string array (optional)
Tags that this job should be marked with.
attempt_n: integer (optional)
If an equivalent job has tried & failed before, pass which attempt number we're at. Defaults to 1 (no previous attempts).
"""

if algorithm_id not in ALGORITHMS:
raise Exception('Usupported algorithm ' + algorithm_id)
gear = get_gear_by_name(algorithm_id)

if input.container_type.endswith('s'):
raise Exception('Container type cannot be plural :|')
Expand All @@ -111,16 +129,23 @@ def queue_job(db, algorithm_id, input, attempt_n=1, previous_job_id=None):

now = datetime.datetime.utcnow()

# Job tags allow consumers to filter on job attributes.
# The algorithm name and category are always tagged; callee can provide others.
hardcoded_tags = [ gear.name, str(gear.category) ]
# Union of two arrays
tags = list(set(hardcoded_tags) | set(tags))

job = {
'state': 'pending',

'created': now,
'modified': now,

'algorithm_id': algorithm_id,
'algorithm_id': gear.name,
'input': input._asdict(),

'attempt': attempt_n,
'tags': tags
}

if previous_job_id is not None:
Expand All @@ -129,7 +154,7 @@ def queue_job(db, algorithm_id, input, attempt_n=1, previous_job_id=None):
result = db.jobs.insert_one(job)
_id = result.inserted_id

log.info('Running %s as job %s to process %s %s' % (algorithm_id, str(_id), input.container_type, input.container_id))
log.info('Running %s as job %s to process %s %s' % (gear.name, str(_id), input.container_type, input.container_id))
return _id

def retry_job(db, j, force=False):
Expand All @@ -138,8 +163,8 @@ def retry_job(db, j, force=False):
Can override the attempt limit by passing force=True.
"""

if j['attempt'] < 3 or Force:
job_id = queue_job(db, j['algorithm_id'], convert_to_fileinput(j['input']), j['attempt']+1, j['_id'])
if j['attempt'] < 3 or force:
job_id = queue_job(db, j['algorithm_id'], convert_to_fileinput(j['input']), attempt_n=j['attempt']+1, previous_job_id=j['_id'])
log.info('respawned job %s as %s (attempt %d)' % (j['_id'], job_id, j['attempt']+1))
else:
log.info('permanently failed job %s (after %d attempts)' % (j['_id'], j['attempt']))
Expand All @@ -157,47 +182,33 @@ def generate_formula(algorithm_id, i):
The input to be used by this job
"""

if algorithm_id not in ALGORITHMS:
raise Exception('Usupported algorithm ' + algorithm_id)
gear = get_gear_by_name(algorithm_id)

f = {
'inputs': [
{
'type': 'file',
'uri': '/nope.txt',
'location': '/',
},
gear.input,
{
'type': 'scitran',
'uri': '/' + i['container_type'] + 's/' + i['container_id'] + '/files/' + i['filename'],
'location': '/input',
'location': '/flywheel/v0/input',
}
],
'target': {
'command': [ 'echo', 'No command specified for ' + algorithm_id],
'env': { },
'dir': "/",
'command': ['bash', '-c', 'rm -rf output; mkdir -p output; sed -i \'s/_dicom//\' run; ./run; echo "Exit was $?"'],
'env': {
'PATH': '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin'
},
'dir': "/flywheel/v0",
},

'outputs': [
{
'type': 'scitran',
'uri': '/' + i['container_type'] + 's/' + i['container_id'] + '/files/',
'location': '/output',
'uri': '/engine?level=' + i['container_type'] + '&id=' + i['container_id'],
'location': '/flywheel/v0/output',
},
],
}

if algorithm_id == 'dcm2nii':
f['inputs'][0]['uri'] = '/opt/flywheel-temp/dcm_convert-0.1.1.tar'
f['target']['command'] = ['bash', '-c', 'mkdir /output; /scripts/run /input/' + i['filename'] + ' /output/' + i['filename'].split('_')[0]]

elif algorithm_id == 'qa':
f['inputs'][0]['uri'] = '/opt/flywheel-temp/qa-report-fmri-0.0.2.tar'
f['target']['command'] = ['bash', '-c', 'mkdir /output; /scripts/run; exit 0']
else:
raise Exception('Command for algorithm ' + algorithm_id + ' not specified')

return f


Expand Down
13 changes: 10 additions & 3 deletions api/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,19 @@
# TODO: replace with default rules, which get persisted, maintained, upgraded, and reasoned intelligently
HARDCODED_RULES = [
{
'alg': 'dcm2nii',
'alg': 'dicom_mr_classifier',
'all': [
['file.type', 'dicom']
]
}, {
'alg': 'qa',
},
{
'alg': 'dcm_convert',
'all': [
['file.type', 'dicom']
]
},
{
'alg': 'qa-report-fmri',
'all': [
['file.name', '*.nii.gz']
]
Expand Down
7 changes: 7 additions & 0 deletions api/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import mimetypes
import bson.objectid
import tempdir as tempfile
import enum as baseEnum

MIMETYPES = [
('.bvec', 'text', 'bvec'),
Expand Down Expand Up @@ -110,3 +111,9 @@ def custom_json_serializer(obj):
elif isinstance(obj, datetime.datetime):
return pytz.timezone('UTC').localize(obj).isoformat()
raise TypeError(repr(obj) + " is not JSON serializable")

class Enum(baseEnum.Enum):
# Enum strings are prefixed by their class: "Category.classifier".
# This overrides that behaviour and removes the prefix.
def __str__(self):
return str(self.name)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Production packages
enum==0.4.6
jsonschema==2.5.1
Markdown==2.6.5
pymongo==3.2
Expand Down