diff --git a/api/api.py b/api/api.py index 4c4d5b541..c6eecae2b 100644 --- a/api/api.py +++ b/api/api.py @@ -71,6 +71,7 @@ def _format(route): webapp2_extras.routes.PathPrefixRoute(r'/api', [ webapp2.Route(r'/download', download.Download, handler_method='download', methods=['GET', 'POST'], name='download'), webapp2.Route(r'/reaper', upload.Upload, handler_method='reaper', methods=['POST']), + webapp2.Route(r'/uploader', upload.Upload, handler_method='uploader', methods=['POST']), webapp2.Route(r'/engine', upload.Upload, handler_method='engine', methods=['POST']), webapp2.Route(r'/sites', centralclient.CentralClient, handler_method='sites', methods=['GET']), webapp2.Route(r'/register', centralclient.CentralClient, handler_method='register', methods=['POST']), diff --git a/api/dao/reaperutil.py b/api/dao/reaperutil.py index 4b5469fb9..2b1394867 100644 --- a/api/dao/reaperutil.py +++ b/api/dao/reaperutil.py @@ -1,9 +1,11 @@ import bson +import copy import difflib import pymongo import datetime import dateutil.parser +from .. import files from .. import util from .. import config from . import APIStorageException @@ -12,37 +14,36 @@ PROJECTION_FIELDS = ['group', 'name', 'label', 'timestamp', 'permissions', 'public'] -class TargetAcquisition(object): +class TargetContainer(object): - def __init__(self, acquisition, fileinfo): - self.acquisition = acquisition - self.dbc = config.db.acquisitions - self._id = acquisition['_id'] - self.fileinfo = fileinfo or {} + def __init__(self, container, level): + self.container = container + self.level = level + self.dbc = config.db[level] + self._id = container['_id'] def find(self, filename): - for f in self.acquisition.get('files', []): + for f in self.container.get('files', []): if f['name'] == filename: return f return None def update_file(self, fileinfo): + update_set = {'files.$.modified': datetime.datetime.utcnow()} # in this method, we are overriding an existing file. # update_set allows to update all the fileinfo like size, hash, etc. - fileinfo.update(self.fileinfo) for k,v in fileinfo.iteritems(): update_set['files.$.' + k] = v return self.dbc.find_one_and_update( - {'_id': self.acquisition['_id'], 'files.name': fileinfo['name']}, + {'_id': self._id, 'files.name': fileinfo['name']}, {'$set': update_set}, return_document=pymongo.collection.ReturnDocument.AFTER ) def add_file(self, fileinfo): - fileinfo.update(self.fileinfo) return self.dbc.find_one_and_update( - {'_id': self.acquisition['_id']}, + {'_id': self._id}, {'$push': {'files': fileinfo}}, return_document=pymongo.collection.ReturnDocument.AFTER ) @@ -147,8 +148,10 @@ def create_container_hierarchy(metadata): if acquisition.get('timestamp'): acquisition['timestamp'] = dateutil.parser.parse(acquisition['timestamp']) - config.db.projects.update_one({'_id': project_obj['_id']}, {'$max': dict(timestamp=acquisition['timestamp']), '$set': dict(timezone=acquisition.get('timezone'))}) - config.db.sessions.update_one({'_id': session_obj['_id']}, {'$min': dict(timestamp=acquisition['timestamp']), '$set': dict(timezone=acquisition.get('timezone'))}) + session_operations = {'$min': dict(timestamp=acquisition['timestamp'])} + if acquisition.get('timezone'): + session_operations['$set'] = {'timezone': acquisition['timezone']} + config.db.sessions.update_one({'_id': session_obj['_id']}, session_operations) acquisition['modified'] = now acq_operations = { @@ -165,9 +168,106 @@ def create_container_hierarchy(metadata): {'uid': acquisition_uid}, acq_operations, upsert=True, + return_document=pymongo.collection.ReturnDocument.AFTER + ) + return TargetContainer(acquisition_obj, 'acquisitions'), file_ + +def create_root_to_leaf_hierarchy(metadata, files): + target_containers = [] + + group = metadata['group'] + project = metadata['project'] + session = metadata['session'] + acquisition = metadata['acquisition'] + + now = datetime.datetime.utcnow() + + group_obj = config.db.groups.find_one({'_id': group['_id']}) + if not group_obj: + raise APIStorageException('group does not exist') + project['modified'] = session['modified'] = acquisition['modified'] = now + project_files = merge_fileinfos(files, project.pop('files', [])) + project_obj = config.db.projects.find_one_and_update({'label': project['label']}, + { + '$setOnInsert': dict( + group=group_obj['_id'], + permissions=group_obj['roles'], + public=False, + created=now + ), + '$set': project + }, + upsert=True, + return_document=pymongo.collection.ReturnDocument.AFTER, + ) + target_containers.append( + (TargetContainer(project_obj, 'projects'), project_files) + ) + session_files = merge_fileinfos(files, session.pop('files', [])) + session_operations = { + '$setOnInsert': dict( + group=project_obj['group'], + project=project_obj['_id'], + permissions=project_obj['permissions'], + public=project_obj['public'], + created=now + ), + '$set': session + } + session_obj = config.db.sessions.find_one_and_update( + { + 'label': session['label'], + 'project': project_obj['_id'], + }, + session_operations, + upsert=True, return_document=pymongo.collection.ReturnDocument.AFTER, ) - return TargetAcquisition(acquisition_obj, file_) + target_containers.append( + (TargetContainer(session_obj, 'sessions'), session_files) + ) + acquisition_files = merge_fileinfos(files, acquisition.pop('files', [])) + acq_operations = { + '$setOnInsert': dict( + session=session_obj['_id'], + permissions=session_obj['permissions'], + public=session_obj['public'], + created=now + ), + '$set': acquisition + } + acquisition_obj = config.db.acquisitions.find_one_and_update( + { + 'label': acquisition['label'], + 'session': session_obj['_id'] + }, + acq_operations, + upsert=True, + return_document=pymongo.collection.ReturnDocument.AFTER + ) + target_containers.append( + (TargetContainer(acquisition_obj, 'acquisitions'), acquisition_files) + ) + return target_containers + + +def merge_fileinfos(parsed_files, infos): + """it takes a dictionary of "hard_infos" (file size, hash) + merging them with infos derived from a list of infos on the same or on other files + """ + merged_files = {} + for info in infos: + parsed = parsed_files.get(info['name']) + if parsed: + path = parsed.path + new_infos = copy.deepcopy(parsed.info) + else: + path = None + new_infos = {} + new_infos.update(info) + merged_files[info['name']] = files.ParsedFile(new_infos, path) + return merged_files + def update_container_hierarchy(metadata, acquisition_id, level): project = metadata.get('project') diff --git a/api/files.py b/api/files.py index f0ae30f70..4e362721e 100644 --- a/api/files.py +++ b/api/files.py @@ -5,6 +5,7 @@ import hashlib import zipfile import datetime +import collections from . import util from . import config @@ -33,6 +34,8 @@ def write(self, data): def get_hash(self): return self.hash_alg.hexdigest() +ParsedFile = collections.namedtuple('ParsedFile', ['info', 'path']) + def getHashingFieldStorage(upload_dir, hash_alg): class HashingFieldStorage(cgi.FieldStorage): @@ -116,22 +119,22 @@ def move_file(self, target_path): move_file(self.path, target_path) self.path = target_path - def identical(self, filepath, hash_): - if zipfile.is_zipfile(filepath) and zipfile.is_zipfile(self.path): - with zipfile.ZipFile(filepath) as zf1, zipfile.ZipFile(self.path) as zf2: - zf1_infolist = sorted(zf1.infolist(), key=lambda zi: zi.filename) - zf2_infolist = sorted(zf2.infolist(), key=lambda zi: zi.filename) - if zf1.comment != zf2.comment: - return False - if len(zf1_infolist) != len(zf2_infolist): +def identical(hash_0, path_0, hash_1, path_1): + if zipfile.is_zipfile(path_0) and zipfile.is_zipfile(path_1): + with zipfile.ZipFile(path_0) as zf1, zipfile.ZipFile(path_1) as zf2: + zf1_infolist = sorted(zf1.infolist(), key=lambda zi: zi.filename) + zf2_infolist = sorted(zf2.infolist(), key=lambda zi: zi.filename) + if zf1.comment != zf2.comment: + return False + if len(zf1_infolist) != len(zf2_infolist): + return False + for zii, zij in zip(zf1_infolist, zf2_infolist): + if zii.CRC != zij.CRC: return False - for zii, zij in zip(zf1_infolist, zf2_infolist): - if zii.CRC != zij.CRC: - return False - else: - return True - else: - return hash_ == self.hash + else: + return True + else: + return hash_0 == hash_1 class MultiFileStore(object): """This class provides and interface for file uploads. @@ -153,9 +156,9 @@ def _save_multipart_files(self, dest_path, hash_alg): for field in form: if form[field].filename: filename = os.path.basename(form[field].filename) - self.files[filename] = { - 'hash': util.format_hash(hash_alg, form[field].file.get_hash()), - 'size': os.path.getsize(os.path.join(dest_path, filename)), - 'path': os.path.join(dest_path, filename), - 'mimetype': util.guess_mimetype(filename) - } + self.files[filename] = ParsedFile( + { + 'hash': util.format_hash(hash_alg, form[field].file.get_hash()), + 'size': os.path.getsize(os.path.join(dest_path, filename)), + 'mimetype': util.guess_mimetype(filename) + }, os.path.join(dest_path, filename)) diff --git a/api/schemas/input/enginemetadata.json b/api/schemas/input/enginemetadata.json index 39e193f56..5a5a63f4c 100644 --- a/api/schemas/input/enginemetadata.json +++ b/api/schemas/input/enginemetadata.json @@ -8,7 +8,11 @@ "properties": { "public": {"type": ["boolean", "null"]}, "label": {"type": ["string", "null"]}, - "metadata": {"type": ["object", "null"]} + "metadata": {"type": ["object", "null"]}, + "files": { + "type": ["array", "null"], + "items": {"$ref": "file.json"} + } }, "additionalProperties": false }, @@ -22,7 +26,11 @@ "uid": {"type": ["string", "null"]}, "timestamp": {"type": ["string", "null"]}, "timezone": {"type": ["string", "null"]}, - "subject": {"$ref": "subject.json"} + "subject": {"$ref": "subject.json"}, + "files": { + "type": ["array", "null"], + "items": {"$ref": "file.json"} + } }, "additionalProperties": false }, diff --git a/api/schemas/input/reaper.json b/api/schemas/input/reaper.json new file mode 100644 index 000000000..022135563 --- /dev/null +++ b/api/schemas/input/reaper.json @@ -0,0 +1,69 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "title": "EngineMetadata", + "type": "object", + "properties": { + "group": { + "type": "object", + "properties": { + "_id": {"type": "string"} + }, + "additionalProperties": false, + "required": ["_id"] + }, + "project": { + "type": "object", + "properties": { + "public": {"type": ["boolean", "null"]}, + "label": {"type": "string"}, + "metadata": {"type": ["object", "null"]}, + "files": { + "type": ["array", "null"], + "items": {"$ref": "file.json"} + } + }, + "additionalProperties": false, + "required": ["label"] + }, + "session": { + "type": "object", + "properties": { + "public": {"type": ["boolean", "null"]}, + "label": {"type": ["string", "null"]}, + "metadata": {"type": ["object", "null"]}, + "operator": {"type": ["string", "null"]}, + "uid": {"type": "string"}, + "timestamp": {"type": ["string", "null"]}, + "timezone": {"type": ["string", "null"]}, + "subject": {"$ref": "subject.json"}, + "files": { + "type": ["array", "null"], + "items": {"$ref": "file.json"} + } + }, + "additionalProperties": false, + "required": ["uid"] + }, + "acquisition": { + "type": "object", + "properties": { + "public": {"type": ["boolean", "null"]}, + "label": {"type": ["string", "null"]}, + "metadata": {"type": ["object", "null"]}, + "uid": {"type": "string"}, + "instrument": {"type": ["string", "null"]}, + "measurement": {"type": ["string", "null"]}, + "timestamp": {"type": ["string", "null"]}, + "timezone": {"type": ["string", "null"]}, + "files": { + "type": ["array", "null"], + "items": {"$ref": "file.json"} + } + }, + "additionalProperties": false, + "required": ["uid"] + } + }, + "required": ["acquisition", "group", "project", "session"], + "additionalProperties": false +} diff --git a/api/schemas/input/uploader.json b/api/schemas/input/uploader.json new file mode 100644 index 000000000..eb31e6b58 --- /dev/null +++ b/api/schemas/input/uploader.json @@ -0,0 +1,69 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "title": "EngineMetadata", + "type": "object", + "properties": { + "group": { + "type": "object", + "properties": { + "_id": {"type": "string"} + }, + "additionalProperties": false, + "required": ["_id"] + }, + "project": { + "type": "object", + "properties": { + "public": {"type": ["boolean", "null"]}, + "label": {"type": "string"}, + "metadata": {"type": ["object", "null"]}, + "files": { + "type": ["array", "null"], + "items": {"$ref": "file.json"} + } + }, + "additionalProperties": false, + "required": ["label"] + }, + "session": { + "type": "object", + "properties": { + "public": {"type": ["boolean", "null"]}, + "label": {"type": "string"}, + "metadata": {"type": ["object", "null"]}, + "operator": {"type": ["string", "null"]}, + "uid": {"type": ["string", "null"]}, + "timestamp": {"type": ["string", "null"]}, + "timezone": {"type": ["string", "null"]}, + "subject": {"$ref": "subject.json"}, + "files": { + "type": ["array", "null"], + "items": {"$ref": "file.json"} + } + }, + "additionalProperties": false, + "required": ["label"] + }, + "acquisition": { + "type": "object", + "properties": { + "public": {"type": ["boolean", "null"]}, + "label": {"type": "string"}, + "metadata": {"type": ["object", "null"]}, + "uid": {"type": ["string", "null"]}, + "instrument": {"type": ["string", "null"]}, + "measurement": {"type": ["string", "null"]}, + "timestamp": {"type": ["string", "null"]}, + "timezone": {"type": ["string", "null"]}, + "files": { + "type": ["array", "null"], + "items": {"$ref": "file.json"} + } + }, + "additionalProperties": false, + "required": ["label"] + } + }, + "required": ["acquisition", "group", "project", "session"], + "additionalProperties": false +} diff --git a/api/upload.py b/api/upload.py index 4e72f2e22..3ec641dfb 100644 --- a/api/upload.py +++ b/api/upload.py @@ -1,10 +1,7 @@ import bson -import copy import os.path import datetime -import validators - from . import base from . import util from . import files @@ -25,7 +22,7 @@ def reaper(self): with tempfile.TemporaryDirectory(prefix='.tmp', dir=config.get_item('persistent', 'data_path')) as tempdir_path: try: file_store = files.FileStore(self.request, tempdir_path) - except files.FileStoreException as e: + except FileStoreException as e: self.abort(400, str(e)) now = datetime.datetime.utcnow() fileinfo = dict( @@ -38,20 +35,53 @@ def reaper(self): tags=file_store.tags, metadata=file_store.metadata ) - container = reaperutil.create_container_hierarchy(file_store.metadata) - f = container.find(file_store.filename) + + target, file_metadata = reaperutil.create_container_hierarchy(file_store.metadata) + fileinfo.update(file_metadata) + f = target.find(file_store.filename) target_path = os.path.join(config.get_item('persistent', 'data_path'), util.path_from_hash(fileinfo['hash'])) if not f: file_store.move_file(target_path) - container.add_file(fileinfo) - rules.create_jobs(config.db, container.acquisition, 'acquisition', fileinfo) - elif not file_store.identical(util.path_from_hash(fileinfo['hash']), f['hash']): + target.add_file(fileinfo) + rules.create_jobs(config.db, target.container, target.level[:-1], fileinfo) + elif not files.identical(file_store.hash, file_store.path, f['hash'], util.path_from_hash(f['hash'])): file_store.move_file(target_path) - container.update_file(fileinfo) - rules.create_jobs(config.db, container.acquisition, 'acquisition', fileinfo) + target.update_file(fileinfo) + rules.create_jobs(config.db, target.container, target.level[:-1], fileinfo) throughput = file_store.size / file_store.duration.total_seconds() log.info('Received %s [%s, %s/s] from %s' % (file_store.filename, util.hrsize(file_store.size), util.hrsize(throughput), self.request.client_addr)) + def uploader(self): + """Receive a sortable reaper upload.""" + if not self.superuser_request: + self.abort(402, 'uploads must be from an authorized drone') + with tempfile.TemporaryDirectory(prefix='.tmp', dir=config.get_item('persistent', 'data_path')) as tempdir_path: + try: + file_store = files.MultiFileStore(self.request, tempdir_path) + except FileStoreException as e: + self.abort(400, str(e)) + if not file_store.metadata: + self.abort(400, 'metadata is missing') + metadata_validator = validators.payload_from_schema_file(self, 'uploader.json') + metadata_validator(file_store.metadata, 'POST') + try: + target_containers = reaperutil.create_root_to_leaf_hierarchy(file_store.metadata, file_store.files) + except APIStorageException as e: + self.abort(400, str(e)) + for target, file_dict in target_containers: + for filename, parsed_file in file_dict.items(): + fileinfo = parsed_file.info + f = target.find(filename) + target_path = os.path.join(config.get_item('persistent', 'data_path'), util.path_from_hash(fileinfo['hash'])) + if not f: + files.move_file(parsed_file.path, target_path) + target.add_file(fileinfo) + rules.create_jobs(config.db, target.container, target.level[:-1], fileinfo) + elif not files.identical(fileinfo['hash'], parsed_file.path, f['hash'], util.path_from_hash(f['hash'])): + files.move_file(parsed_file.path, target_path) + target.update_file(fileinfo) + rules.create_jobs(config.db, target.container, target.level[:-1], fileinfo) + def engine(self): """ URL format: api/engine?level=&id= @@ -74,7 +104,7 @@ def engine(self): with tempfile.TemporaryDirectory(prefix='.tmp', dir=config.get_item('persistent', 'data_path')) as tempdir_path: try: file_store = files.MultiFileStore(self.request, tempdir_path) - except files.FileStoreException as e: + except FileStoreException as e: self.abort(400, str(e)) if not file_store.metadata: self.abort(400, 'metadata is missing') @@ -87,26 +117,26 @@ def engine(self): except APIStorageException as e: self.abort(400, e.message) # move the files before updating the database - for name, fileinfo in file_store.files.items(): - path = fileinfo['path'] + for name, parsed_file in file_store.files.items(): + fileinfo = parsed_file.info target_path = os.path.join(config.get_item('persistent', 'data_path'), util.path_from_hash(fileinfo['hash'])) - files.move_file(path, target_path) + files.move_file(parsed_file.path, target_path) # merge infos from the actual file and from the metadata - merged_infos = self._merge_fileinfos(file_store.files, file_infos) + merged_files = reaperutil.merge_fileinfos(file_store.files, file_infos) # update the fileinfo in mongo if a file already exists for f in acquisition_obj['files']: - fileinfo = merged_infos.get(f['name']) - if fileinfo: - fileinfo.pop('path', None) + merged_file = merged_files.get(f['name']) + if merged_file: + fileinfo = merged_file.info fileinfo['modified'] = now acquisition_obj = reaperutil.update_fileinfo('acquisitions', acquisition_obj['_id'], fileinfo) fileinfo['existing'] = True # create the missing fileinfo in mongo - for name, fileinfo in merged_infos.items(): + for name, merged_file in merged_files.items(): + fileinfo = merged_file.info # if the file exists we don't need to create it # skip update fileinfo for files that don't have a path - if not fileinfo.get('existing') and fileinfo.get('path'): - del fileinfo['path'] + if not fileinfo.get('existing') and merged_file.path: fileinfo['mimetype'] = fileinfo.get('mimetype') or util.guess_mimetype(name) fileinfo['created'] = now fileinfo['modified'] = now @@ -122,14 +152,4 @@ def engine(self): 'mimetype': f.get('mimetype') } rules.create_jobs(config.db, acquisition_obj, 'acquisition', file_) - return [{'name': k, 'hash': v['hash'], 'size': v['size']} for k, v in merged_infos.items()] - - def _merge_fileinfos(self, hard_infos, infos): - """it takes a dictionary of "hard_infos" (file size, hash) - merging them with infos derived from a list of infos on the same or on other files - """ - new_infos = copy.deepcopy(hard_infos) - for info in infos: - new_infos[info['name']] = new_infos.get(info['name'], {}) - new_infos[info['name']].update(info) - return new_infos \ No newline at end of file + return [{'name': k, 'hash': v.info.get('hash'), 'size': v.info.get('size')} for k, v in merged_files.items()] diff --git a/api/validators.py b/api/validators.py index d5df7bce6..d31b0e587 100644 --- a/api/validators.py +++ b/api/validators.py @@ -43,7 +43,9 @@ 'avatars.json', 'download.json', 'tag.json', - 'enginemetadata.json' + 'enginemetadata.json', + 'uploader.json', + 'reaper.json' ]) mongo_schemas = set() input_schemas = set() diff --git a/bin/bootstrap.py b/bin/bootstrap.py index 6782d4c0a..8111f4814 100755 --- a/bin/bootstrap.py +++ b/bin/bootstrap.py @@ -90,7 +90,7 @@ def data(args): except ValueError as e: log.warning(str(e)) continue - container = reaperutil.create_container_hierarchy(metadata) + target, file_ = reaperutil.create_container_hierarchy(metadata) with open(filepath, 'rb') as fd: for chunk in iter(lambda: fd.read(2**20), ''): hash_.update(chunk) @@ -114,8 +114,9 @@ def data(args): 'modified': modified, 'mimetype': util.guess_mimetype(filename), } - container.add_file(fileinfo) - rules.create_jobs(config.db, container.acquisition, 'acquisition', fileinfo) + fileinfo.update(file_) + target.add_file(fileinfo) + rules.create_jobs(config.db, target.container, 'acquisition', fileinfo) data_desc = """