diff --git a/api/dao/containerutil.py b/api/dao/containerutil.py index 50feaf9dc..e7198a990 100644 --- a/api/dao/containerutil.py +++ b/api/dao/containerutil.py @@ -239,6 +239,15 @@ def from_dictionary(cls, d): name = d['name'] ) + def get_file(self): + container = super(FileReference, self).get() + + for file in container['files']: + if file['name'] == self.name: + return file + + raise Exception('No such file {} on {} {} in database'.format(self.name, self.type, self.id)) + def create_filereference_from_dictionary(d): return FileReference.from_dictionary(d) diff --git a/api/jobs/handlers.py b/api/jobs/handlers.py index ca1a0ab23..1f324b4bf 100644 --- a/api/jobs/handlers.py +++ b/api/jobs/handlers.py @@ -2,7 +2,6 @@ API request handlers for the jobs module """ import bson -import json import os import StringIO from jsonschema import ValidationError @@ -14,6 +13,7 @@ from ..dao.containerstorage import ProjectStorage, AcquisitionStorage from ..dao.containerutil import create_filereference_from_dictionary, create_containerreference_from_dictionary, create_containerreference_from_filereference, ContainerReference from ..web import base +from ..web.encoder import pseudo_consistent_json_encode from ..validators import InputValidationException from .. import config from . import batch @@ -300,6 +300,48 @@ def add(self): self.abort(400, 'Gear marked as invalid, will not run!') validate_gear_config(gear, config_) + # Config options are stored on the job object under the "config" key + config_ = { + 'config': config_, + 'inputs': { } + } + + # Implementation notes: with regard to sending the gear file information, we have two options: + # + # 1) Send the file object as it existed when you enqueued the job + # 2) Send the file object as it existed when the job was started + # + # Option #2 is possibly more convenient - it's more up to date - but the only file modifications after a job is enqueued would be from + # + # A) a gear finishing, and updating the file object + # B) a user editing the file object + # + # You can count on neither occurring before a job starts, because the queue is not globally FIFO. + # So option #2 is potentially more convenient, but unintuitive and prone to user confusion. + + + for x in inputs: + if gear['gear']['inputs'][x]['base'] == 'file': + + obj = inputs[x].get_file() + cr = create_containerreference_from_filereference(inputs[x]) + + # Whitelist file fields passed to gear to those that are scientific-relevant + whitelisted_keys = ['info', 'tags', 'measurements', 'mimetype', 'type', 'modality', 'size'] + obj_projection = { key: obj[key] for key in whitelisted_keys } + + config_['inputs'][x] = { + 'base': 'file', + 'hierarchy': cr.__dict__, + 'location': { + 'name': obj['name'], + 'path': '/flywheel/v0/input/' + x + '/' + obj['name'], + }, + 'object': obj_projection, + } + else: + self.abort(500, 'Non-file input base type') + gear_name = gear['gear']['name'] if gear_name not in tags: @@ -361,12 +403,22 @@ def get_config(self, _id): if c is None: c = {} + # Serve config as formatted json file self.response.headers['Content-Type'] = 'application/octet-stream' self.response.headers['Content-Disposition'] = 'attachment; filename="config.json"' - # Serve config as formatted json file - encoded = json.dumps({"config": c}, sort_keys=True, indent=4, separators=(',', ': ')) + '\n' - self.response.app_iter = StringIO.StringIO(encoded) + # Detect if config is old- or new-style. + # TODO: remove this logic with a DB upgrade, ref database.py's reserved upgrade section. + + if c.get('config') is not None and c.get('inputs') is not None: + # New behavior + encoded = pseudo_consistent_json_encode(c) + self.response.app_iter = StringIO.StringIO(encoded) + else: + # Legacy behavior + encoded = pseudo_consistent_json_encode({"config": c}) + self.response.app_iter = StringIO.StringIO(encoded) + @require_login def put(self, _id): diff --git a/api/jobs/jobs.py b/api/jobs/jobs.py index 13319204e..015ec1672 100644 --- a/api/jobs/jobs.py +++ b/api/jobs/jobs.py @@ -5,6 +5,7 @@ import bson import copy import datetime +import string from ..types import Origin from ..dao.containerutil import create_filereference_from_dictionary, create_containerreference_from_dictionary, create_containerreference_from_filereference @@ -113,11 +114,9 @@ def intention_equals(self, other_job): self.inputs == other_job.inputs and self.destination == other_job.destination ): - config.log.debug('the jobs {} and {} are equal.'.format(self.map, other_job.map)) return True else: - config.log.debug('the jobs {} and {} are NOT equal.'.format(self.map, other_job.map)) return False @@ -258,7 +257,7 @@ def generate_request(self, gear): 'env': { 'PATH': '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin' }, - 'dir': "/flywheel/v0", + 'dir': '/flywheel/v0', }, 'outputs': [ { @@ -278,6 +277,45 @@ def generate_request(self, gear): if self.id_ is None: raise Exception('Running a job requires an ID') + # Detect if config is old- or new-style. + # TODO: remove this logic with a DB upgrade, ref database.py's reserved upgrade section. + + # Add config scalars as environment variables + if self.config.get('config') is not None and self.config.get('inputs') is not None: + # New config behavior + + cf = self.config['config'] + + # Whitelist characters that can be used in bash variable names + bash_variable_letters = set(string.ascii_letters + string.digits + ' ' + '_') + + for x in cf: + + if isinstance(cf[x], list) or isinstance(cf[x], dict): + # Current gear spec only allows for scalars! + raise Exception('Non-scalar config value ' + x + ' ' + str(cf[x])) + else: + + # Strip non-whitelisted characters, set to underscore, and uppercase + config_name = filter(lambda char: char in bash_variable_letters, x) + config_name = config_name.replace(' ', '_').upper() + + # Don't set nonsensical environment variables + if config_name == '': + print 'The gear config name ' + x + ' has no whitelisted characters!' + continue + + # Stringify scalar + # Python strings true as "True"; fix + if not isinstance(cf[x], bool): + r['target']['env']['FW_CONFIG_' + config_name] = str(cf[x]) + else: + r['target']['env']['FW_CONFIG_' + config_name] = str(cf[x]).lower() + + else: + # Old config map. + pass + r['inputs'].append({ 'type': 'scitran', 'uri': '/jobs/' + self.id_ + '/config.json', diff --git a/api/jobs/rules.py b/api/jobs/rules.py index c543be32c..37e08c13a 100644 --- a/api/jobs/rules.py +++ b/api/jobs/rules.py @@ -119,7 +119,6 @@ def eval_rule(rule, file_, container): for match in rule.get('any', []): if eval_match(match['type'], match['value'], file_, container): has_match = True - config.log.debug('we found one here') break # If there were matches in the 'any' array and none of them succeeded @@ -129,7 +128,6 @@ def eval_rule(rule, file_, container): # Are there matches in the 'all' set? for match in rule.get('all', []): if not eval_match(match['type'], match['value'], file_, container): - config.log.debug('we didnt fine one here') return False return True @@ -261,10 +259,8 @@ def get_rules_for_container(db, container): result = list(db.project_rules.find({'project_id': str(container['_id'])})) if not result: - log.debug('Container ' + str(container['_id']) + ' found NO rules') return [] else: - log.debug('Container ' + str(container['_id']) + ' found ' + str(len(result)) + ' rules') return result def copy_site_rules_for_project(project_id): diff --git a/api/web/encoder.py b/api/web/encoder.py index ea0ef629b..81d2da92d 100644 --- a/api/web/encoder.py +++ b/api/web/encoder.py @@ -43,3 +43,11 @@ def json_sse_pack(d): d['data'] = json.dumps(d['data'], default=custom_json_serializer) return sse_pack(d) + +def pseudo_consistent_json_encode(d): + """ + Some parts of our system rely upon consistently-produced JSON encoding. + This implementation is not guaranteed to be consistent, but it's good enough for now. + """ + + return json.dumps(d, sort_keys=True, indent=4, separators=(',', ': ')) + '\n' diff --git a/bin/database.py b/bin/database.py index aa3e64a45..557032e89 100755 --- a/bin/database.py +++ b/bin/database.py @@ -1188,6 +1188,32 @@ def upgrade_to_35(): cursor = config.db.batch.find({}) process_cursor(cursor, upgrade_to_35_closure) + + + +### +### BEGIN RESERVED UPGRADE SECTION +### + +# Due to performance concerns with database upgrades, some upgrade implementations might be postposed. +# The team contract is that if you write an upgrade touch one of the tables mentioned below, you MUST also implement any reserved upgrades. +# This way, we can bundle changes together that need large cursor iterations and save multi-hour upgrade times. + + +## Jobs table + +# The old job.config format was a set of keys, which was manually placed on a "config": key when fetched. +# Now, it's a { "config": , "inputs": } map, with the old values being placed under the "config": key when stored. +# Move the keys accordingly so that legacy logic can be removed. +# +# Ref: JobHandler.get_config, Job.generate_request + + +### +### END RESERVED UPGRADE SECTION +### + + def upgrade_schema(force_from = None): """ Upgrades db to the current schema version diff --git a/bin/install-ubuntu.sh b/bin/install-ubuntu.sh index 238b9b7d8..4165328eb 100755 --- a/bin/install-ubuntu.sh +++ b/bin/install-ubuntu.sh @@ -19,6 +19,6 @@ sudo apt-get install -y \ libpcre3-dev \ git -pip install -U pip +sudo pip install -U pip -pip install -r requirements.txt +sudo pip install -r requirements.txt diff --git a/test/bin/setup-integration-tests-ubuntu.sh b/test/bin/setup-integration-tests-ubuntu.sh index 19ce0a13a..c384ac0ee 100755 --- a/test/bin/setup-integration-tests-ubuntu.sh +++ b/test/bin/setup-integration-tests-ubuntu.sh @@ -3,7 +3,7 @@ set -e unset CDPATH cd "$( dirname "${BASH_SOURCE[0]}" )/../.." -pip install -U -r "test/integration_tests/requirements-integration-test.txt" +sudo pip install -U -r "test/integration_tests/requirements-integration-test.txt" NODE_URL="https://nodejs.org/dist/v6.4.0/node-v6.4.0-linux-x64.tar.gz" diff --git a/test/integration_tests/python/conftest.py b/test/integration_tests/python/conftest.py index 88a9c6a63..581ebcbc8 100644 --- a/test/integration_tests/python/conftest.py +++ b/test/integration_tests/python/conftest.py @@ -266,7 +266,15 @@ def create(self, resource, **kwargs): # add missing gear when creating job if resource == 'job' and 'gear_id' not in payload: - payload['gear_id'] = self.get_or_create('gear') + + # create file inputs for each job input on gear + gear_inputs = {} + for i in payload.get('inputs', {}).keys(): + gear_inputs[i] = {'base': 'file'} + + gear_doc = _default_payload['gear']['gear'] + gear_doc['inputs'] = gear_inputs + payload['gear_id'] = self.create('gear', gear=gear_doc) # put together the create url to post to create_url = '/' + resource + 's' diff --git a/test/integration_tests/python/test_jobs.py b/test/integration_tests/python/test_jobs.py index e5e410d1a..c6212bb77 100644 --- a/test/integration_tests/python/test_jobs.py +++ b/test/integration_tests/python/test_jobs.py @@ -20,12 +20,19 @@ def test_jobs_access(as_user): assert r.status_code == 403 -def test_jobs(data_builder, as_user, as_admin, as_root, api_db): - gear = data_builder.create_gear() +def test_jobs(data_builder, default_payload, as_user, as_admin, as_root, api_db, file_form): + gear_doc = default_payload['gear']['gear'] + gear_doc['inputs'] = { + 'dicom': { + 'base': 'file' + } + } + gear = data_builder.create_gear(gear=gear_doc) invalid_gear = data_builder.create_gear(gear={'custom': {'flywheel': {'invalid': True}}}) project = data_builder.create_project() session = data_builder.create_session() acquisition = data_builder.create_acquisition() + assert as_admin.post('/acquisitions/' + acquisition + '/files', files=file_form('test.zip')).ok job_data = { 'gear_id': gear, diff --git a/test/integration_tests/python/test_uploads.py b/test/integration_tests/python/test_uploads.py index 04b4d4f31..394644165 100644 --- a/test/integration_tests/python/test_uploads.py +++ b/test/integration_tests/python/test_uploads.py @@ -497,8 +497,11 @@ def test_acquisition_engine_upload(data_builder, file_form, as_root): project = data_builder.create_project() session = data_builder.create_session() acquisition = data_builder.create_acquisition() + assert as_root.post('/acquisitions/' + acquisition + '/files', files=file_form('test.txt')).ok + + job = data_builder.create_job(inputs={ - 'test': {'type': 'acquisition', 'id': acquisition, 'name': 'test'} + 'test': {'type': 'acquisition', 'id': acquisition, 'name': 'test.txt'} }) metadata = { @@ -569,8 +572,8 @@ def test_acquisition_engine_upload(data_builder, file_form, as_root): m_timestamp = dateutil.parser.parse(metadata['acquisition']['timestamp']) assert a_timestamp == m_timestamp - for f in a['files']: - mf = find_file_in_array(f['name'], metadata['acquisition']['files']) + for mf in metadata['acquisition']['files']: + f = find_file_in_array(mf['name'], a['files']) assert mf is not None assert f['type'] == mf['type'] assert f['info'] == mf['info']