Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplified download path logic #1023

Merged
merged 4 commits into from
Dec 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 54 additions & 47 deletions api/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def _filter_check(property_filter, property_values):

class Download(base.RequestHandler):

def _append_targets(self, targets, cont_name, container, prefix, total_size, total_cnt, optional, data_path, filters):
def _append_targets(self, targets, cont_name, container, prefix, total_size, total_cnt, data_path, filters):
for f in container.get('files', []):
if filters:
filtered = True
Expand All @@ -46,15 +46,16 @@ def _append_targets(self, targets, cont_name, container, prefix, total_size, tot
break
if filtered:
continue
if optional or not f.get('optional', False):
filepath = os.path.join(data_path, util.path_from_hash(f['hash']))
if os.path.exists(filepath): # silently skip missing files
if cont_name == 'analyses':
targets.append((filepath, prefix + '/' + ('input' if f.get('input') else 'output') + '/' + f['name'], cont_name, str(container.get('_id')),f['size']))
else:
targets.append((filepath, prefix + '/' + f['name'], cont_name, str(container.get('_id')),f['size']))
total_size += f['size']
total_cnt += 1
filepath = os.path.join(data_path, util.path_from_hash(f['hash']))
if os.path.exists(filepath): # silently skip missing files
Copy link
Contributor

@nagem nagem Dec 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the location of the skipped missing filepaths I mentioned. I would write something similar to

Expected filepath <filepath> to exist but it is missing. File will be skipped in download.

if cont_name == 'analyses':
targets.append((filepath, prefix + '/' + ('input' if f.get('input') else 'output') + '/' + f['name'], cont_name, str(container.get('_id')),f['size']))
else:
targets.append((filepath, prefix + '/' + f['name'], cont_name, str(container.get('_id')),f['size']))
total_size += f['size']
total_cnt += 1
else:
log.warn("Expected {} to exist but it is missing. File will be skipped in download.".format(filepath))
return total_size, total_cnt

def _bulk_preflight_archivestream(self, file_refs):
Expand Down Expand Up @@ -91,6 +92,7 @@ def _bulk_preflight_archivestream(self, file_refs):
except Exception: # pylint: disable=broad-except
# self.abort(404, 'File {} on Container {} {} not found'.format(filename, cont_name, cont_id))
# silently skip missing files/files user does not have access to
log.warn("Expected file {} on Container {} {} to exist but it is missing. File will be skipped in download.".format(filename, cont_name, cont_id))
continue

filepath = os.path.join(data_path, util.path_from_hash(file_obj['hash']))
Expand All @@ -116,7 +118,6 @@ def _preflight_archivestream(self, req_spec, collection=None):
targets = []
filename = None

used_subpaths = {}
ids_of_paths = {}
base_query = {}
if not self.superuser_request:
Expand All @@ -130,11 +131,12 @@ def _preflight_archivestream(self, req_spec, collection=None):
if item['level'] == 'project':
project = config.db.projects.find_one(base_query, ['group', 'label', 'files'])
if not project:
# silently skip missing objects/objects user does not have access to
# silently(while logging it) skip missing objects/objects user does not have access to
log.warn("Expected project {} to exist but it is missing. Node will be skipped".format(item_id))
continue

prefix = '/'.join([arc_prefix, project['group'], project['label']])
total_size, file_cnt = self._append_targets(targets, 'projects', project, prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'projects', project, prefix, total_size, file_cnt, data_path, req_spec.get('filters'))

sessions = config.db.sessions.find({'project': item_id}, ['label', 'files', 'uid', 'timestamp', 'timezone', 'subject'])
session_dict = {session['_id']: session for session in sessions}
Expand All @@ -153,35 +155,36 @@ def _preflight_archivestream(self, req_spec, collection=None):
subject_dict[code] = subject

for code, subject in subject_dict.iteritems():
subject_prefix = prefix + '/' + self._path_from_container(subject, used_subpaths, ids_of_paths, code)
subject_prefix = self._path_from_container(prefix, subject, ids_of_paths, code)
subject_prefixes[code] = subject_prefix
total_size, file_cnt = self._append_targets(targets, 'subjects', subject, subject_prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'subjects', subject, subject_prefix, total_size, file_cnt, data_path, req_spec.get('filters'))

for session in session_dict.itervalues():
subject_code = session['subject'].get('code', 'unknown_subject')
subject = subject_dict[subject_code]
session_prefix = subject_prefixes[subject_code] + '/' + self._path_from_container(session, used_subpaths, ids_of_paths, session["_id"])
session_prefix = self._path_from_container(subject_prefixes[subject_code], session, ids_of_paths, session["_id"])
session_prefixes[session['_id']] = session_prefix
total_size, file_cnt = self._append_targets(targets, 'sessions', session, session_prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'sessions', session, session_prefix, total_size, file_cnt, data_path, req_spec.get('filters'))

for acq in acquisitions:
session = session_dict[acq['session']]
acq_prefix = session_prefixes[session['_id']] + '/' + self._path_from_container(acq, used_subpaths, ids_of_paths, acq['_id'])
total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
acq_prefix = self._path_from_container(session_prefixes[session['_id']], acq, ids_of_paths, acq['_id'])
total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, data_path, req_spec.get('filters'))


elif item['level'] == 'session':
session = config.db.sessions.find_one(base_query, ['project', 'label', 'files', 'uid', 'timestamp', 'timezone', 'subject'])
if not session:
# silently skip missing objects/objects user does not have access to
# silently(while logging it) skip missing objects/objects user does not have access to
log.warn("Expected session {} to exist but it is missing. Node will be skipped".format(item_id))
continue

project = config.db.projects.find_one({'_id': session['project']}, ['group', 'label'])
subject = session.get('subject', {'code': 'unknown_subject'})
if not subject.get('code'):
subject['code'] = 'unknown_subject'
prefix = project['group'] + '/' + project['label'] + '/' + self._path_from_container(subject, used_subpaths, ids_of_paths, subject['code']) + '/' + self._path_from_container(session, used_subpaths, ids_of_paths, session['_id'])
total_size, file_cnt = self._append_targets(targets, 'sessions', session, prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
prefix = self._path_from_container(self._path_from_container(project['group'] + '/' + project['label'], subject, ids_of_paths, subject["code"]), session, ids_of_paths, session['_id'])
total_size, file_cnt = self._append_targets(targets, 'sessions', session, prefix, total_size, file_cnt, data_path, req_spec.get('filters'))

# If the param `collection` holding a collection id is not None, filter out acquisitions that are not in the collection
a_query = {'session': item_id}
Expand All @@ -190,13 +193,14 @@ def _preflight_archivestream(self, req_spec, collection=None):
acquisitions = config.db.acquisitions.find(a_query, ['label', 'files', 'uid', 'timestamp', 'timezone'])

for acq in acquisitions:
acq_prefix = prefix + '/' + self._path_from_container(acq, used_subpaths, ids_of_paths, acq['_id'])
total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
acq_prefix = self._path_from_container(prefix, acq, ids_of_paths, acq['_id'])
total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, data_path, req_spec.get('filters'))

elif item['level'] == 'acquisition':
acq = config.db.acquisitions.find_one(base_query, ['session', 'label', 'files', 'uid', 'timestamp', 'timezone'])
if not acq:
# silently skip missing objects/objects user does not have access to
# silently(while logging it) skip missing objects/objects user does not have access to
log.warn("Expected acquisition {} to exist but it is missing. Node will be skipped".format(item_id))
continue

session = config.db.sessions.find_one({'_id': acq['session']}, ['project', 'label', 'uid', 'timestamp', 'timezone', 'subject'])
Expand All @@ -205,17 +209,18 @@ def _preflight_archivestream(self, req_spec, collection=None):
subject['code'] = 'unknown_subject'

project = config.db.projects.find_one({'_id': session['project']}, ['group', 'label'])
prefix = project['group'] + '/' + project['label'] + '/' + self._path_from_container(subject, used_subpaths, ids_of_paths, subject['code']) + '/' + self._path_from_container(session, used_subpaths, ids_of_paths, session['_id']) + '/' + self._path_from_container(acq, used_subpaths, ids_of_paths, acq['_id'])
total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
prefix = self._path_from_container(self._path_from_container(self._path_from_container(project['group'] + '/' + project['label'], subject, ids_of_paths, subject['code']), session, ids_of_paths, session["_id"]), acq, ids_of_paths, acq['_id'])
total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, prefix, total_size, file_cnt, data_path, req_spec.get('filters'))

elif item['level'] == 'analysis':
analysis = config.db.analyses.find_one(base_query, ['parent', 'label', 'files', 'uid', 'timestamp'])
if not analysis:
# silently skip missing objects/objects user does not have access to
# silently(while logging it) skip missing objects/objects user does not have access to
log.warn("Expected anaylysis {} to exist but it is missing. Node will be skipped".format(item_id))
continue
prefix = self._path_from_container(analysis, used_subpaths, ids_of_paths, util.sanitize_string_to_filename(analysis['label']))
prefix = self._path_from_container("", analysis, ids_of_paths, util.sanitize_string_to_filename(analysis['label']))
filename = 'analysis_' + util.sanitize_string_to_filename(analysis['label']) + '.tar'
total_size, file_cnt = self._append_targets(targets, 'analyses', analysis, prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'analyses', analysis, prefix, total_size, file_cnt, data_path, req_spec.get('filters'))

if len(targets) > 0:
if not filename:
Expand All @@ -226,23 +231,25 @@ def _preflight_archivestream(self, req_spec, collection=None):
else:
self.abort(404, 'No requested containers could be found')

def _path_from_container(self, container, used_subpaths, ids_of_paths, _id):
def _find_new_path(path, list_used_subpaths, ids_of_paths, _id):
"""from the input path finds a path that hasn't been used"""

path = str(path).replace('/', '_')
if path in list_used_subpaths:
return path
elif _id == ids_of_paths.get(path,_id):
ids_of_paths[path] = _id
return path
def _path_from_container(self, prefix, container, ids_of_paths, _id):
"""
Returns the full path of a container instead of just a subpath, it must be provided with a prefix though
"""
def _find_new_path(path, ids_of_paths, _id):
"""
Checks to see if the full path is used
"""
if _id in ids_of_paths.keys():
# If the id is already associated with a path, use that instead of modifying it
return ids_of_paths[_id]
used_paths = [ids_of_paths[id_] for id_ in ids_of_paths if id_ != _id]
path = str(path)
i = 0
while True:
modified_path = path
while modified_path in used_paths:
modified_path = path + '_' + str(i)
if modified_path not in ids_of_paths.keys():
ids_of_paths[modified_path] = _id
return modified_path
i += 1
return modified_path

path = None
if not path and container.get('label'):
Expand All @@ -260,9 +267,9 @@ def _find_new_path(path, list_used_subpaths, ids_of_paths, _id):
if not path:
path = 'untitled'

path = _find_new_path(path, used_subpaths.get(_id, []), ids_of_paths, _id)

used_subpaths[_id] = used_subpaths.get(_id, []) + [path]
path = prefix + '/' + path
path = _find_new_path(path, ids_of_paths, _id)
ids_of_paths[_id] = path
return path

def archivestream(self, ticket):
Expand Down
79 changes: 79 additions & 0 deletions bin/oneoffs/load_external_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env python

import bson
import copy
import datetime
import dateutil.parser
import json

from api import config

## DEFAULTS ##

USER_ID = "meganhenning@flywheel.io"
SAFE_FILE_HASH = "v0-sha384-a8d0d1bd9368e5385f31d3582db07f9bc257537d5e1f207d36a91fdd3d2f188fff56616c0874bb3535c37fdf761a446c"
PROJECT_ID = "5a26e049c6fa4a00161e4a1a"
GROUP_ID = 'scitran'

# Some day maybe this can use the SDK/API calls to get the proper test data
# For now, paste it in

SESSIONS = []

ACQUISITIONS = []

def handle_permissions(obj):
obj['permissions'] = [{
"access": "admin",
"_id": USER_ID
}]

def handle_dates(obj):
if obj.get('timestamp'):
obj['timestamp'] = dateutil.parser.parse(obj['timestamp'])
if obj.get('created'):
obj['created'] = dateutil.parser.parse(obj['created'])
if obj.get('modified'):
obj['modified'] = dateutil.parser.parse(obj['modified'])

def handle_file(f):
handle_dates(f)
f.pop('info_exists', None)
f.pop('join_origin', None)
f['hash'] = SAFE_FILE_HASH


for i, s in enumerate(SESSIONS):
print "Processing session {} of {} sessions".format(i+1, len(SESSIONS))

s.pop('join-origin', None)

s['_id'] = bson.ObjectId(s['_id'])
s['project'] = bson.ObjectId(str(PROJECT_ID))
s['group'] = GROUP_ID
handle_dates(s)
handle_permissions(s)

for f in s.get('files', []):
handle_file(f)


config.db.sessions.delete_many({'_id': s['_id']})
config.db.sessions.insert(s)

for i, a in enumerate(ACQUISITIONS):
print "Processing acquisition {} of {} acquisitions".format(i+1, len(ACQUISITIONS))

a['_id'] = bson.ObjectId(a['_id'])
a['session'] = bson.ObjectId(a['session'])

a.pop('join-origin', None)

handle_dates(a)
handle_permissions(a)

for f in a.get('files', []):
handle_file(f)

config.db.acquisitions.delete_many({'_id': a['_id']})
config.db.acquisitions.insert(a)