Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 40 additions & 43 deletions api/dao/hierarchy.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,53 +334,50 @@ def dict_fileinfos(infos):
return dict_infos


def update_container_hierarchy(metadata, acquisition_id, level):
project = metadata.get('project')
session = metadata.get('session')
acquisition = metadata.get('acquisition')
def update_container_hierarchy(metadata, cid, container_type):
Copy link
Contributor

@kofalt kofalt Jun 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a docstring with a few sentences describing what this does? I'm not sure if I've ever understood what (exactly) this function does without reverse-engineering it each time I read it 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment applies to the other diff'd functions in this file

c_metadata = metadata.get(container_type)
now = datetime.datetime.utcnow()
if acquisition.get('timestamp'):
acquisition['timestamp'] = dateutil.parser.parse(acquisition['timestamp'])
acquisition['modified'] = now
acquisition_obj = _update_container({'_id': acquisition_id}, acquisition, 'acquisitions')
if acquisition_obj is None:
raise APIStorageException('acquisition doesn''t exist')
if acquisition.get('timestamp'):
session_obj = config.db.sessions.find_one_and_update(
{'_id': acquisition_obj['session']},
{
'$min': dict(timestamp=acquisition['timestamp']),
'$set': dict(timezone=acquisition.get('timezone'))
},
return_document=pymongo.collection.ReturnDocument.AFTER
)
config.db.projects.find_one_and_update(
{'_id': session_obj['project']},
{
'$max': dict(timestamp=acquisition['timestamp']),
'$set': dict(timezone=acquisition.get('timezone'))
}
)
session_obj = None
if session:
session['modified'] = now
session_obj = _update_container({'_id': acquisition_obj['session']}, session, 'sessions')
if project:
project['modified'] = now
if not session_obj:
session_obj = config.db.sessions.find_one({'_id': acquisition_obj['session']})
_update_container({'_id': session_obj['project']}, project, 'projects')
return acquisition_obj

def _update_container(query, update, cont_name):
return config.db[cont_name].find_one_and_update(
query,
{
'$set': util.mongo_dict(update)
},
if c_metadata.get('timestamp'):
c_metadata['timestamp'] = dateutil.parser.parse(c_metadata['timestamp'])
c_metadata['modified'] = now
c_obj = _update_container({'_id': cid}, {}, c_metadata, container_type)
if c_obj is None:
raise APIStorageException('container does not exist')
if container_type in ['session', 'acquisition']:
_update_hierarchy(c_obj, container_type, metadata)
return c_obj


def _update_container(query, update, set_update, container_type):
coll_name = container_type if container_type.endswith('s') else container_type+'s'
update['$set'] = util.mongo_dict(set_update)
return config.db[coll_name].find_one_and_update(query,update,
return_document=pymongo.collection.ReturnDocument.AFTER
)


def _update_hierarchy(container, container_type, metadata):
project_id = container.get('project') # for sessions
now = datetime.datetime.utcnow()

if container_type == 'acquisition':
session = metadata.get('session', {})
session_obj = None
if session.keys():
session['modified'] = now
session_obj = _update_container({'_id': container['session']}, {}, session, 'sessions')
if session_obj is None:
session_obj = get_container('session', container['session'])
project_id = session_obj['project']

if project_id is None:
raise APIStorageException('Failed to find project id in session obj')
project = metadata.get('project', {})
if project.keys():
project['modified'] = now
project_obj = _update_container({'_id': project_id}, {}, project, 'projects')


def merge_fileinfos(parsed_files, infos):
"""it takes a dictionary of "hard_infos" (file size, hash)
merging them with infos derived from a list of infos on the same or on other files
Expand Down
32 changes: 14 additions & 18 deletions api/placer.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,41 +184,37 @@ class LabelPlacer(UIDPlacer):

class EnginePlacer(Placer):
"""
A placer that can accept files sent to it from an engine.
Currently a stub.
A placer that can accept files and/or metadata sent to it from the engine

It uses update_container_hierarchy to update the container and it's parents' fields from the metadata
"""

def check(self):
self.requireTarget()
validators.validate_data(self.metadata, 'enginemetadata.json', 'input', 'POST', optional=True)
if self.metadata is not None:
validators.validate_data(self.metadata, 'enginemetadata.json', 'input', 'POST', optional=True)
self.saved = []

# Could avoid loops in process_file_field by setting up the

def process_file_field(self, field, info):
if self.metadata is not None:
# OPPORTUNITY: hard-coded container levels will need to go away soon
# Engine shouldn't know about container names; maybe parent contexts?
# How will this play with upload unification? Unify schemas as well?
file_mds = self.metadata.get('acquisition', {}).get('files', [])
file_mds = self.metadata.get(self.container_type, {}).get('files', [])

for file_md in file_mds:
if file_md['name'] == info['name']:
info.update(file_md)
break
else:
file_md = {}

for x in ('type', 'instrument', 'measurements', 'tags', 'metadata'):
info[x] = file_md.get(x) or info[x]

self.save_file(field, info)
self.saved.append(info)

def finalize(self):
# Updating various properties of the hierarchy; currently assumes acquisitions; might need fixing for other levels.
# NOTE: only called in EnginePlacer
bid = bson.ObjectId(self.id)
self.obj = hierarchy.update_container_hierarchy(self.metadata, bid, '')
if self.metadata is not None:
bid = bson.ObjectId(self.id)

# Remove file metadata as it was already updated in process_file_field
for k in self.metadata.keys():
self.metadata[k].pop('files', {})
self.obj = hierarchy.update_container_hierarchy(self.metadata, bid, self.container_type)

return self.saved

Expand Down
1 change: 0 additions & 1 deletion api/schemas/input/enginemetadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,5 @@
"additionalProperties": false
}
},
"required": ["acquisition"],
"additionalProperties": false
}
108 changes: 33 additions & 75 deletions api/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ def process_upload(request, strategy, container_type=None, id=None, origin=None,
if placer.sse and not response:
raise Exception("Programmer error: response required")
elif placer.sse:
log.debug('SSE')
response.headers['Content-Type'] = 'text/event-stream; charset=utf-8'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

response.headers['Connection'] = 'keep-alive'
response.app_iter = placer.finalize()
Expand Down Expand Up @@ -207,92 +206,51 @@ def engine(self):
"""
.. http:post:: /api/engine

Confirm endpoint is ready for requests
Default behavior:
Uploads a list of files sent as file1, file2, etc to a existing
container and updates fields of the files, the container and it's
parents as specified in the metadata fileformfield using the
engine placer class
When ``level`` is ``analysis``:
Uploads a list of files to an existing analysis object, marking
all files as ``output=true`` using the job-based analyses placer
class

:query level: container_type
:query id: container_id
:query job: job_id
:param level: one of ``project``, ``session``, ``acquisition``, ``analysis``
:type level: string

:param id: Container ID
:type id: string

:param id: Job ID
:type id: string

:statuscode 200: no error
:statuscode 400: Target container ``level`` is required
:statuscode 400: Level must be ``project``, ``session``, ``acquisition``, ``analysis``
:statuscode 400: Target container ``id`` is required
:statuscode 402: Uploads must be from an authorized drone

:statuscode 400: describe me
:statuscode 402: describe me
:statuscode 404: describe me
"""

if not self.superuser_request:
self.abort(402, 'uploads must be from an authorized drone')

level = self.get_param('level')
if level is None:
self.abort(404, 'container level is required')

cont_id = self.get_param('id')
if not cont_id:
self.abort(404, 'container id is required')
self.abort(400, 'container level is required')
if level not in ['analysis', 'acquisition', 'session', 'project']:
self.abort(400, 'container level must be analysis, acquisition, session or project.')
cid = self.get_param('id')
if not cid:
self.abort(400, 'container id is required')
else:
cont_id = bson.ObjectId(cont_id)
if level not in ['acquisition', 'analysis']:
self.abort(404, 'engine uploads are supported only at the acquisition or analysis level')
cid = bson.ObjectId(cid)

if level == 'analysis':
context = {'job_id': self.get_param('job')}
return process_upload(self.request, Strategy.analysis_job, origin=self.origin, container_type=level, id=cont_id, context=context)

if not self.superuser_request:
self.abort(402, 'uploads must be from an authorized drone')
with tempfile.TemporaryDirectory(prefix='.tmp', dir=config.get_item('persistent', 'data_path')) as tempdir_path:
try:
file_store = files.MultiFileStore(self.request, tempdir_path)
except files.FileStoreException as e:
self.abort(400, str(e))
if not file_store.metadata:
self.abort(400, 'metadata is missing')
payload_schema_uri = util.schema_uri('input', 'enginemetadata.json')
metadata_validator = validators.from_schema_path(payload_schema_uri)
metadata_validator(file_store.metadata, 'POST')
file_infos = file_store.metadata['acquisition'].pop('files', [])
now = datetime.datetime.utcnow()
try:
acquisition_obj = hierarchy.update_container_hierarchy(file_store.metadata, cont_id, level)
except APIStorageException as e:
self.abort(400, e.message)
# move the files before updating the database
for name, parsed_file in file_store.files.items():
fileinfo = parsed_file.info
target_path = os.path.join(config.get_item('persistent', 'data_path'), util.path_from_hash(fileinfo['hash']))
files.move_file(parsed_file.path, target_path)
# merge infos from the actual file and from the metadata
merged_files = hierarchy.merge_fileinfos(file_store.files, file_infos)
# update the fileinfo in mongo if a file already exists
for f in acquisition_obj['files']:
merged_file = merged_files.get(f['name'])
if merged_file:
fileinfo = merged_file.info
fileinfo['modified'] = now
acquisition_obj = hierarchy.update_fileinfo('acquisitions', acquisition_obj['_id'], fileinfo)
fileinfo['existing'] = True
# create the missing fileinfo in mongo
for name, merged_file in merged_files.items():
fileinfo = merged_file.info
# if the file exists we don't need to create it
# skip update fileinfo for files that don't have a path
if not fileinfo.get('existing') and merged_file.path:
fileinfo['mimetype'] = fileinfo.get('mimetype') or util.guess_mimetype(name)
fileinfo['created'] = now
fileinfo['modified'] = now
fileinfo['origin'] = self.origin
acquisition_obj = hierarchy.add_fileinfo('acquisitions', acquisition_obj['_id'], fileinfo)

for f in acquisition_obj['files']:
if f['name'] in file_store.files:
file_ = {
'name': f['name'],
'hash': f['hash'],
'type': f.get('type'),
'measurements': f.get('measurements', []),
'mimetype': f.get('mimetype')
}
rules.create_jobs(config.db, acquisition_obj, 'acquisition', file_)
return [{'name': k, 'hash': v.info.get('hash'), 'size': v.info.get('size')} for k, v in merged_files.items()]
return process_upload(self.request, Strategy.analysis_job, origin=self.origin, container_type=level, id=cid, context=context)
else:
return process_upload(self.request, Strategy.engine, container_type=level, id=cid, origin=self.origin)

def clean_packfile_tokens(self):
"""
Expand Down
Loading