Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions api/dao/containerutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,15 @@ def from_dictionary(cls, d):
name = d['name']
)

def get_file(self):
container = super(FileReference, self).get()

for file in container['files']:
if file['name'] == self.name:
return file

raise Exception('No such file {} on {} {} in database'.format(self.name, self.type, self.id))


def create_filereference_from_dictionary(d):
return FileReference.from_dictionary(d)
Expand Down
60 changes: 56 additions & 4 deletions api/jobs/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
API request handlers for the jobs module
"""
import bson
import json
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol

import os
import StringIO
from jsonschema import ValidationError
Expand All @@ -14,6 +13,7 @@
from ..dao.containerstorage import ProjectStorage, AcquisitionStorage
from ..dao.containerutil import create_filereference_from_dictionary, create_containerreference_from_dictionary, create_containerreference_from_filereference, ContainerReference
from ..web import base
from ..web.encoder import pseudo_consistent_json_encode
from ..validators import InputValidationException
from .. import config
from . import batch
Expand Down Expand Up @@ -300,6 +300,48 @@ def add(self):
self.abort(400, 'Gear marked as invalid, will not run!')
validate_gear_config(gear, config_)

# Config options are stored on the job object under the "config" key
config_ = {
'config': config_,
'inputs': { }
}

# Implementation notes: with regard to sending the gear file information, we have two options:
#
# 1) Send the file object as it existed when you enqueued the job
# 2) Send the file object as it existed when the job was started
#
# Option #2 is possibly more convenient - it's more up to date - but the only file modifications after a job is enqueued would be from
#
# A) a gear finishing, and updating the file object
# B) a user editing the file object
#
# You can count on neither occurring before a job starts, because the queue is not globally FIFO.
# So option #2 is potentially more convenient, but unintuitive and prone to user confusion.


for x in inputs:
if gear['gear']['inputs'][x]['base'] == 'file':

obj = inputs[x].get_file()
cr = create_containerreference_from_filereference(inputs[x])

# Whitelist file fields passed to gear to those that are scientific-relevant
whitelisted_keys = ['info', 'tags', 'measurements', 'mimetype', 'type', 'modality', 'size']
obj_projection = { key: obj[key] for key in whitelisted_keys }

config_['inputs'][x] = {
'base': 'file',
'hierarchy': cr.__dict__,
'location': {
'name': obj['name'],
'path': '/flywheel/v0/input/' + x + '/' + obj['name'],
},
'object': obj_projection,
}
else:
self.abort(500, 'Non-file input base type')

gear_name = gear['gear']['name']

if gear_name not in tags:
Expand Down Expand Up @@ -361,12 +403,22 @@ def get_config(self, _id):
if c is None:
c = {}

# Serve config as formatted json file
self.response.headers['Content-Type'] = 'application/octet-stream'
self.response.headers['Content-Disposition'] = 'attachment; filename="config.json"'

# Serve config as formatted json file
encoded = json.dumps({"config": c}, sort_keys=True, indent=4, separators=(',', ': ')) + '\n'
self.response.app_iter = StringIO.StringIO(encoded)
# Detect if config is old- or new-style.
# TODO: remove this logic with a DB upgrade, ref database.py's reserved upgrade section.

if c.get('config') is not None and c.get('inputs') is not None:
# New behavior
encoded = pseudo_consistent_json_encode(c)
self.response.app_iter = StringIO.StringIO(encoded)
else:
# Legacy behavior
encoded = pseudo_consistent_json_encode({"config": c})
self.response.app_iter = StringIO.StringIO(encoded)


@require_login
def put(self, _id):
Expand Down
44 changes: 41 additions & 3 deletions api/jobs/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import bson
import copy
import datetime
import string

from ..types import Origin
from ..dao.containerutil import create_filereference_from_dictionary, create_containerreference_from_dictionary, create_containerreference_from_filereference
Expand Down Expand Up @@ -113,11 +114,9 @@ def intention_equals(self, other_job):
self.inputs == other_job.inputs and
self.destination == other_job.destination
):
config.log.debug('the jobs {} and {} are equal.'.format(self.map, other_job.map))
return True

else:
config.log.debug('the jobs {} and {} are NOT equal.'.format(self.map, other_job.map))
return False


Expand Down Expand Up @@ -258,7 +257,7 @@ def generate_request(self, gear):
'env': {
'PATH': '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin'
},
'dir': "/flywheel/v0",
'dir': '/flywheel/v0',
},
'outputs': [
{
Expand All @@ -278,6 +277,45 @@ def generate_request(self, gear):
if self.id_ is None:
raise Exception('Running a job requires an ID')

# Detect if config is old- or new-style.
# TODO: remove this logic with a DB upgrade, ref database.py's reserved upgrade section.

# Add config scalars as environment variables
if self.config.get('config') is not None and self.config.get('inputs') is not None:
# New config behavior

cf = self.config['config']

# Whitelist characters that can be used in bash variable names
bash_variable_letters = set(string.ascii_letters + string.digits + ' ' + '_')

for x in cf:

if isinstance(cf[x], list) or isinstance(cf[x], dict):
# Current gear spec only allows for scalars!
raise Exception('Non-scalar config value ' + x + ' ' + str(cf[x]))
else:

# Strip non-whitelisted characters, set to underscore, and uppercase
config_name = filter(lambda char: char in bash_variable_letters, x)
config_name = config_name.replace(' ', '_').upper()

# Don't set nonsensical environment variables
if config_name == '':
print 'The gear config name ' + x + ' has no whitelisted characters!'
continue

# Stringify scalar
# Python strings true as "True"; fix
if not isinstance(cf[x], bool):
r['target']['env']['FW_CONFIG_' + config_name] = str(cf[x])
else:
r['target']['env']['FW_CONFIG_' + config_name] = str(cf[x]).lower()

else:
# Old config map.
pass

r['inputs'].append({
'type': 'scitran',
'uri': '/jobs/' + self.id_ + '/config.json',
Expand Down
4 changes: 0 additions & 4 deletions api/jobs/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ def eval_rule(rule, file_, container):
for match in rule.get('any', []):
if eval_match(match['type'], match['value'], file_, container):
has_match = True
config.log.debug('we found one here')
break

# If there were matches in the 'any' array and none of them succeeded
Expand All @@ -129,7 +128,6 @@ def eval_rule(rule, file_, container):
# Are there matches in the 'all' set?
for match in rule.get('all', []):
if not eval_match(match['type'], match['value'], file_, container):
config.log.debug('we didnt fine one here')
return False

return True
Expand Down Expand Up @@ -261,10 +259,8 @@ def get_rules_for_container(db, container):
result = list(db.project_rules.find({'project_id': str(container['_id'])}))

if not result:
log.debug('Container ' + str(container['_id']) + ' found NO rules')
return []
else:
log.debug('Container ' + str(container['_id']) + ' found ' + str(len(result)) + ' rules')
return result

def copy_site_rules_for_project(project_id):
Expand Down
8 changes: 8 additions & 0 deletions api/web/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,11 @@ def json_sse_pack(d):
d['data'] = json.dumps(d['data'], default=custom_json_serializer)

return sse_pack(d)

def pseudo_consistent_json_encode(d):
"""
Some parts of our system rely upon consistently-produced JSON encoding.
This implementation is not guaranteed to be consistent, but it's good enough for now.
"""

return json.dumps(d, sort_keys=True, indent=4, separators=(',', ': ')) + '\n'
26 changes: 26 additions & 0 deletions bin/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,32 @@ def upgrade_to_35():
cursor = config.db.batch.find({})
process_cursor(cursor, upgrade_to_35_closure)




###
### BEGIN RESERVED UPGRADE SECTION
###

# Due to performance concerns with database upgrades, some upgrade implementations might be postposed.
# The team contract is that if you write an upgrade touch one of the tables mentioned below, you MUST also implement any reserved upgrades.
# This way, we can bundle changes together that need large cursor iterations and save multi-hour upgrade times.


## Jobs table

# The old job.config format was a set of keys, which was manually placed on a "config": key when fetched.
# Now, it's a { "config": , "inputs": } map, with the old values being placed under the "config": key when stored.
# Move the keys accordingly so that legacy logic can be removed.
#
# Ref: JobHandler.get_config, Job.generate_request


###
### END RESERVED UPGRADE SECTION
###


def upgrade_schema(force_from = None):
"""
Upgrades db to the current schema version
Expand Down
4 changes: 2 additions & 2 deletions bin/install-ubuntu.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ sudo apt-get install -y \
libpcre3-dev \
git

pip install -U pip
sudo pip install -U pip

pip install -r requirements.txt
sudo pip install -r requirements.txt
2 changes: 1 addition & 1 deletion test/bin/setup-integration-tests-ubuntu.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ set -e
unset CDPATH
cd "$( dirname "${BASH_SOURCE[0]}" )/../.."

pip install -U -r "test/integration_tests/requirements-integration-test.txt"
sudo pip install -U -r "test/integration_tests/requirements-integration-test.txt"

NODE_URL="https://nodejs.org/dist/v6.4.0/node-v6.4.0-linux-x64.tar.gz"

Expand Down
10 changes: 9 additions & 1 deletion test/integration_tests/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,15 @@ def create(self, resource, **kwargs):

# add missing gear when creating job
if resource == 'job' and 'gear_id' not in payload:
payload['gear_id'] = self.get_or_create('gear')

# create file inputs for each job input on gear
gear_inputs = {}
for i in payload.get('inputs', {}).keys():
gear_inputs[i] = {'base': 'file'}

gear_doc = _default_payload['gear']['gear']
gear_doc['inputs'] = gear_inputs
payload['gear_id'] = self.create('gear', gear=gear_doc)

# put together the create url to post to
create_url = '/' + resource + 's'
Expand Down
11 changes: 9 additions & 2 deletions test/integration_tests/python/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@ def test_jobs_access(as_user):
assert r.status_code == 403


def test_jobs(data_builder, as_user, as_admin, as_root, api_db):
gear = data_builder.create_gear()
def test_jobs(data_builder, default_payload, as_user, as_admin, as_root, api_db, file_form):
gear_doc = default_payload['gear']['gear']
gear_doc['inputs'] = {
'dicom': {
'base': 'file'
}
}
gear = data_builder.create_gear(gear=gear_doc)
invalid_gear = data_builder.create_gear(gear={'custom': {'flywheel': {'invalid': True}}})
project = data_builder.create_project()
session = data_builder.create_session()
acquisition = data_builder.create_acquisition()
assert as_admin.post('/acquisitions/' + acquisition + '/files', files=file_form('test.zip')).ok

job_data = {
'gear_id': gear,
Expand Down
9 changes: 6 additions & 3 deletions test/integration_tests/python/test_uploads.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,11 @@ def test_acquisition_engine_upload(data_builder, file_form, as_root):
project = data_builder.create_project()
session = data_builder.create_session()
acquisition = data_builder.create_acquisition()
assert as_root.post('/acquisitions/' + acquisition + '/files', files=file_form('test.txt')).ok


job = data_builder.create_job(inputs={
'test': {'type': 'acquisition', 'id': acquisition, 'name': 'test'}
'test': {'type': 'acquisition', 'id': acquisition, 'name': 'test.txt'}
})

metadata = {
Expand Down Expand Up @@ -569,8 +572,8 @@ def test_acquisition_engine_upload(data_builder, file_form, as_root):
m_timestamp = dateutil.parser.parse(metadata['acquisition']['timestamp'])
assert a_timestamp == m_timestamp

for f in a['files']:
mf = find_file_in_array(f['name'], metadata['acquisition']['files'])
for mf in metadata['acquisition']['files']:
f = find_file_in_array(mf['name'], a['files'])
assert mf is not None
assert f['type'] == mf['type']
assert f['info'] == mf['info']
Expand Down