diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 7f758ac91..f355f02be 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,7 +1,8 @@ -2.4.8 (unreleased) +2.5.0 (unreleased) ------------------ -- Nothing changed yet. +- normalize file manager api so we can have more simple integrations with s3/gcloud + [vangheem] 2.4.7 (2018-03-17) diff --git a/VERSION b/VERSION index 630cad5a9..ece4f82d9 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.4.8.dev0 +2.5.0.dev0 diff --git a/guillotina/api/service.py b/guillotina/api/service.py index dcff6d9be..1c281084a 100644 --- a/guillotina/api/service.py +++ b/guillotina/api/service.py @@ -55,7 +55,10 @@ async def publish_traverse(self, traverse): if field is None: raise KeyError('No valid name') - self.field = field.bind(self.behavior) + if self.behavior is not None: + self.field = field.bind(self.behavior) + else: + self.field = field.bind(self.context) else: self.field = None diff --git a/guillotina/exceptions.py b/guillotina/exceptions.py index f7a860169..095a43202 100644 --- a/guillotina/exceptions.py +++ b/guillotina/exceptions.py @@ -210,3 +210,8 @@ def __init__(self, field, value, msg): class QueryParsingError(Exception): """An error happened while parsing a search query. """ + + +class FileNotFoundException(Exception): + ''' + ''' diff --git a/guillotina/files/__init__.py b/guillotina/files/__init__.py index c81544df1..0384f1628 100644 --- a/guillotina/files/__init__.py +++ b/guillotina/files/__init__.py @@ -1,11 +1,14 @@ -from .adapter import DBFileManagerAdapter # noqa +from .adapter import DBFileStorageManagerAdapter # noqa from .const import CHUNK_SIZE # noqa from .const import MAX_REQUEST_CACHE_SIZE # noqa from .const import MAX_RETRIES # noqa from .field import BaseCloudFile # noqa from .field import CloudFileField # noqa -from .manager import CloudFileManager # noqa +from .manager import FileManager # noqa from .utils import convert_base64_to_binary # noqa from .utils import get_contenttype # noqa from .utils import read_request_data # noqa from guillotina.exceptions import UnRetryableRequestError # noqa + + +CloudFileManager = FileManager # b/w compat diff --git a/guillotina/files/adapter.py b/guillotina/files/adapter.py index 2bb7d88a2..d958066d5 100644 --- a/guillotina/files/adapter.py +++ b/guillotina/files/adapter.py @@ -1,23 +1,89 @@ -from .const import CHUNK_SIZE from .dbfile import DBFile -from aiohttp.web import StreamResponse -from aiohttp.web_exceptions import HTTPNotFound -from datetime import datetime -from datetime import timedelta -from dateutil.tz import tzutc from guillotina import configure -from guillotina._settings import app_settings -from guillotina.browser import Response -from guillotina.files.utils import read_request_data -from guillotina.interfaces import IAbsoluteURL +from guillotina.blob import Blob +from guillotina.files.utils import guess_content_type from guillotina.interfaces import IDBFileField from guillotina.interfaces import IFileCleanup -from guillotina.interfaces import IFileManager +from guillotina.interfaces import IFileStorageManager from guillotina.interfaces import IRequest from guillotina.interfaces import IResource +from guillotina.interfaces import IUploadDataManager -import base64 -import uuid + +@configure.adapter( + for_=IFileStorageManager, + provides=IUploadDataManager) +class DBDataManager: + + _file = None + + def __init__(self, file_storage_manager): + self.file_storage_manager = file_storage_manager + self.context = file_storage_manager.context + self.request = file_storage_manager.request + self.field = file_storage_manager.field + self._data = {} + + @property + def real_context(self): + return self.field.context or self.context + + async def load(self): + if not hasattr(self.context, '__uploads__'): + self.context.__uploads__ = {} + if self.field.__name__ not in self.context.__uploads__: + self.context.__uploads__[self.field.__name__] = {} + self._data = self.context.__uploads__[self.field.__name__] + + async def start(self): + if '_blob' in self._data: + # clean it + blob = self._data['_blob'] + bfile = blob.open('r') + await bfile.async_del() + self._data.clear() + self.context._p_register() + + async def update(self, **kwargs): + self._data.update(kwargs) + self.context._p_register() + + async def finish(self, values=None): + # create file object with new data from finished upload + file = self.field.get(self.real_context) + if file is None: + file = self.file_storage_manager.file_class() + + if values is None: + values = self._data + self.field.set(self.real_context, file) + for key, value in values.items(): + setattr(file, key, value) + + if self.field.__name__ in getattr(self.context, '__uploads__', {}): + del self.context.__uploads__[self.field.__name__] + self.context._p_register() + + try: + self.field.context.data._p_register() + except AttributeError: + self.field.context._p_register() + + @property + def content_type(self): + return guess_content_type( + self._data.get('content_type'), + self._data.get('filename')) + + @property + def size(self): + return self._data.get('size', 0) + + def get_offset(self): + return self._data.get('offset', 0) + + def get(self, name, default=None): + return self._data.get(name, default) @configure.adapter( @@ -34,8 +100,8 @@ def should_clean(self, **kwargs): @configure.adapter( for_=(IResource, IRequest, IDBFileField), - provides=IFileManager) -class DBFileManagerAdapter: + provides=IFileStorageManager) +class DBFileStorageManagerAdapter: file_class = DBFile @@ -44,221 +110,36 @@ def __init__(self, context, request, field): self.request = request self.field = field - async def upload(self): - """In order to support TUS and IO upload. - """ - try: - self.field.context.data._p_register() # register change... - except AttributeError: - self.context._p_register() - - file = self.field.get(self.field.context or self.context) - if not isinstance(file, self.file_class): - file = self.file_class(content_type=self.request.content_type) - self.field.set(self.field.context or self.context, file) - else: - self.content_type = self.request.content_type - - if 'X-UPLOAD-MD5HASH' in self.request.headers: - file._md5 = self.request.headers['X-UPLOAD-MD5HASH'] - else: - file._md5 = None - - if 'X-UPLOAD-EXTENSION' in self.request.headers: - file._extension = self.request.headers['X-UPLOAD-EXTENSION'] - else: - file._extension = None - - if 'X-UPLOAD-SIZE' in self.request.headers: - file._size = int(self.request.headers['X-UPLOAD-SIZE']) - else: - if 'Content-Length' in self.request.headers: - file._size = int(self.request.headers['Content-Length']) - else: - raise AttributeError('x-upload-size or content-length header needed') - - if 'X-UPLOAD-FILENAME' in self.request.headers: - file.filename = self.request.headers['X-UPLOAD-FILENAME'] - elif 'X-UPLOAD-FILENAME-B64' in self.request.headers: - file.filename = base64.b64decode( - self.request.headers['X-UPLOAD-FILENAME-B64']).decode("utf-8") - else: - file.filename = uuid.uuid4().hex - - await file.init_upload(self.context) - self.request._last_read_pos = 0 - data = await read_request_data(self.request, CHUNK_SIZE) - - while data: - await file.append_data(self.context, data) - data = await read_request_data(self.request, CHUNK_SIZE) - - # Test resp and checksum to finish upload - await file.finish_upload(self.context) - - async def tus_create(self): - try: - self.field.context.data._p_register() # register change... - except AttributeError: - self.context._p_register() - - # This only happens in tus-java-client, redirect this POST to a PATCH - if self.request.headers.get('X-HTTP-Method-Override') == 'PATCH': - return await self.tus_patch() - - file = self.field.get(self.field.context or self.context) - if not isinstance(file, self.file_class): - file = self.file_class(content_type=self.request.content_type) - self.field.set(self.field.context or self.context, file) - if 'CONTENT-LENGTH' in self.request.headers: - file._current_upload = int(self.request.headers['CONTENT-LENGTH']) - else: - file._current_upload = 0 - if 'UPLOAD-LENGTH' in self.request.headers: - file._size = int(self.request.headers['UPLOAD-LENGTH']) - else: - raise AttributeError('We need upload-length header') - - if 'UPLOAD-MD5' in self.request.headers: - file._md5 = self.request.headers['UPLOAD-MD5'] - - if 'UPLOAD-EXTENSION' in self.request.headers: - file._extension = self.request.headers['UPLOAD-EXTENSION'] - - if 'TUS-RESUMABLE' not in self.request.headers: - raise AttributeError('TUS needs a TUS version') - - if 'UPLOAD-METADATA' not in self.request.headers: - file.filename = uuid.uuid4().hex - else: - filename = self.request.headers['UPLOAD-METADATA'] - file.filename = base64.b64decode(filename.split()[1]).decode('utf-8') - - file._resumable_uri_date = datetime.now(tz=tzutc()) - - await file.init_upload(self.context) - # Location will need to be adapted on aiohttp 1.1.x - resp = Response(headers={ - 'Location': IAbsoluteURL(self.context, self.request)() + '/@tusupload/' + self.field.__name__, # noqa - 'Tus-Resumable': '1.0.0', - 'Access-Control-Expose-Headers': 'Location,Tus-Resumable' - }, status=201) - return resp - - async def tus_patch(self): - try: - self.field.context.data._p_register() # register change... - except AttributeError: - self.context._p_register() - - file = self.field.get(self.field.context or self.context) - if 'CONTENT-LENGTH' in self.request.headers: - to_upload = int(self.request.headers['CONTENT-LENGTH']) - else: - raise AttributeError('No content-length header') - - try: - self.field.context.data._p_register() # register change... - except AttributeError: - self.context._p_register() - - if 'UPLOAD-OFFSET' in self.request.headers: - file._current_upload = int(self.request.headers['UPLOAD-OFFSET']) - else: - raise AttributeError('No upload-offset header') - - self.request._last_read_pos = 0 - data = await read_request_data(self.request, to_upload) - - while data: - await file.append_data(self.context, data) - data = await read_request_data(self.request, CHUNK_SIZE) - - await file.finish_upload(self.context) - expiration = file._resumable_uri_date + timedelta(days=7) - - resp = Response(headers={ - 'Upload-Offset': str(file.get_actual_size()), - 'Tus-Resumable': '1.0.0', - 'Upload-Expires': expiration.isoformat(), - 'Access-Control-Expose-Headers': 'Upload-Offset,Upload-Expires,Tus-Resumable' - }) - return resp - - async def tus_head(self): - file = self.field.get(self.field.context or self.context) - if not isinstance(file, self.file_class): - raise KeyError('No file on this context') - head_response = { - 'Upload-Offset': str(file.get_actual_size()), - 'Tus-Resumable': '1.0.0', - 'Access-Control-Expose-Headers': 'Upload-Offset,Upload-Length,Tus-Resumable' - } - if file.size: - head_response['Upload-Length'] = str(file._size) - resp = Response(headers=head_response) - return resp - - async def tus_options(self): - resp = Response(headers={ - 'Tus-Resumable': '1.0.0', - 'Tus-Version': '1.0.0', - 'Tus-Max-Size': '1073741824', - 'Tus-Extension': 'creation,expiration' - }) - return resp - - async def download(self, disposition=None): - if disposition is None: - disposition = self.request.GET.get('disposition', 'attachment') - file = self.field.get(self.field.context or self.context) - if not isinstance(file, self.file_class) or not file.valid: - return HTTPNotFound(text='No file found') - - cors_renderer = app_settings['cors_renderer'](self.request) - headers = await cors_renderer.get_headers() - headers.update({ - 'CONTENT-DISPOSITION': f'{disposition}; filename="%s"' % file.filename - }) - - download_resp = StreamResponse(headers=headers) - download_resp.content_type = file.guess_content_type() - if file.size: - download_resp.content_length = file.size - - await download_resp.prepare(self.request) - resp = await file.download(self.context, download_resp) - return resp + async def start(self, dm): + blob = Blob(self.context) + await dm.update(_blob=blob) async def iter_data(self): file = self.field.get(self.field.context or self.context) - if not isinstance(file, self.file_class): - raise AttributeError('No field value') - - async for chunk in file.iter_data(self.context): + blob = file._blob + bfile = blob.open() + async for chunk in bfile.iter_async_read(): yield chunk - async def save_file(self, generator, content_type=None, size=None, - filename=None): - try: - self.field.context.data._p_register() # register change... - except AttributeError: - self.context._p_register() - - file = self.field.get(self.field.context or self.context) - if not isinstance(file, self.file_class): - file = self.file_class(content_type=content_type) - self.field.set(self.field.context or self.context, file) - - file._size = size - if filename is None: - filename = uuid.uuid4().hex - file.filename = filename - - await file.init_upload(self.context) - - async for data in generator(): - await file.append_data(self.context, data) + async def append(self, dm, iterable, offset) -> int: + blob = dm.get('_blob') + mode = 'a' + if blob.chunks == 0: + mode = 'w' + bfile = blob.open(mode) + size = 0 + async for chunk in iterable: + size += len(chunk) + await bfile.async_write_chunk(chunk) + return size + + async def finish(self, dm): + pass - await file.finish_upload(self.context) - return file + async def copy(self, to_storage_manager, dm): + # too much storage manager logic here? only way to give file manager + # more control for plugins + await to_storage_manager.start(dm) + await to_storage_manager.append(dm, to_storage_manager.iter_data(), 0) + await to_storage_manager.finish(dm) + await dm.finish() diff --git a/guillotina/files/dbfile.py b/guillotina/files/dbfile.py index be13f9574..32cf894fa 100644 --- a/guillotina/files/dbfile.py +++ b/guillotina/files/dbfile.py @@ -1,13 +1,7 @@ from .field import BaseCloudFile -from guillotina.blob import Blob -from guillotina.event import notify -from guillotina.events import FileUploadFinishedEvent from guillotina.interfaces import IDBFile -from guillotina.interfaces import IFileCleanup from zope.interface import implementer -import uuid - @implementer(IDBFile) class DBFile(BaseCloudFile): @@ -19,63 +13,17 @@ class DBFile(BaseCloudFile): def valid(self): return self._blob is not None - async def init_upload(self, context): - context._p_register() - - self._old_uri = self.uri - self._old_size = self.size - self._old_filename = self.filename - self._old_md5 = self.md5 - self._old_content_type = self.guess_content_type() - - self._current_upload = 0 - if self._blob is not None: - cleanup = IFileCleanup(context, None) - if cleanup is None or cleanup.should_clean(file=self): - bfile = self._blob.open('r') - await bfile.async_del() - else: - self._previous_blob = self._blob - blob = Blob(context) - self._uri = uuid.uuid4().hex - self._blob = blob - - async def append_data(self, context, data): - context._p_register() - mode = 'a' - if self._blob.chunks == 0: - mode = 'w' - bfile = self._blob.open(mode) - await bfile.async_write_chunk(data) - self._current_upload = self._blob.size - def get_actual_size(self): - return self._blob.size - - async def finish_upload(self, context): - await notify(FileUploadFinishedEvent(context)) - - async def download(self, context, resp): - bfile = self._blob.open() - async for chunk in bfile.iter_async_read(): - resp.write(chunk) - await resp.drain() - return resp - - async def iter_data(self, context): - bfile = self._blob.open() - async for chunk in bfile.iter_async_read(): - yield chunk + if self._blob is not None: + return self._blob.size + return 0 - async def copy_cloud_file(self, context, new_uri=None): - if self._blob is None: - return - existing_blob = self._blob - self._blob = None # make sure to set None or init will delete it! - await self.init_upload(context) + @property + def size(self): + if self._blob is not None: + return self._blob.size + return 0 - existing_bfile = existing_blob.open('r', context._p_jar) - bfile = self._blob.open('w', context._p_jar) - async for chunk in existing_bfile.iter_async_read(): - await bfile.async_write_chunk(chunk) - self._current_upload = self._blob.size + @size.setter + def size(self, val): + pass diff --git a/guillotina/files/field.py b/guillotina/files/field.py index 669e18300..365e993b7 100644 --- a/guillotina/files/field.py +++ b/guillotina/files/field.py @@ -2,18 +2,17 @@ from guillotina import configure from guillotina.component import get_multi_adapter from guillotina.files.utils import convert_base64_to_binary +from guillotina.files.utils import guess_content_type from guillotina.interfaces import ICloudFileField from guillotina.interfaces import IContentBehavior from guillotina.interfaces import IFile from guillotina.interfaces import IFileManager from guillotina.schema import Object from guillotina.schema.fieldproperty import FieldProperty -from guillotina.utils import get_content_path from guillotina.utils import get_current_request from guillotina.utils import to_str from zope.interface import implementer -import mimetypes import uuid @@ -74,40 +73,31 @@ def __init__(self, content_type='application/octet-stream', self._size = size self._md5 = md5 - self._data = b'' + self._current_upload = 0 def guess_content_type(self): - ct = to_str(self.content_type) - if ct == 'application/octet-stream': - # try guessing content_type - ct, _ = mimetypes.guess_type(self.filename) - if ct is None: - ct = 'application/octet-stream' - return ct - - def generate_key(self, request, context): - return '{}{}/{}::{}'.format( - request._container_id, - get_content_path(context), - context._p_oid, - uuid.uuid4().hex) + return guess_content_type(self.content_type, self.filename) - def get_actual_size(self): + @property + def current_upload(self): return self._current_upload - def _set_data(self, data): - self._data = data + @current_upload.setter + def current_upload(self, val): + self._current_upload = val - def _get_data(self): - return self._data - - data = property(_get_data, _set_data) + def get_actual_size(self): + return self._current_upload @property def uri(self): if hasattr(self, '_uri'): return self._uri + @uri.setter + def uri(self, val): + self._uri = val + @property def size(self): if hasattr(self, '_size'): @@ -115,6 +105,10 @@ def size(self): else: return None + @size.setter + def size(self, val): + self._size = val + @property def md5(self): if hasattr(self, '_md5'): @@ -122,6 +116,10 @@ def md5(self): else: return None + @md5.setter + def md5(self, val): + self._md5 = val + @property def extension(self): if getattr(self, '_extension', None): @@ -131,26 +129,9 @@ def extension(self): return self.filename.split('.')[-1] return None - async def copy_cloud_file(self, context, new_uri): - raise NotImplemented() - - async def rename_cloud_file(self, new_uri): - raise NotImplemented() - - async def init_upload(self, context): - raise NotImplemented() - - async def append_data(self, data): - raise NotImplemented() - - async def finish_upload(self, context): - raise NotImplemented() - - async def delete_upload(self, uri=None): - raise NotImplemented() - - async def download(self, buf): - raise NotImplemented() + @extension.setter + def extension(self, val): + self._extension = val async def _generator(value): @@ -168,6 +149,5 @@ async def deserialize_cloud_field(field, value, context): field = field.bind(context) file_manager = get_multi_adapter((context, request, field), IFileManager) val = await file_manager.save_file( - partial(_generator, value), content_type=value['content_type'], - size=len(value['data'])) + partial(_generator, value), content_type=value['content_type']) return val diff --git a/guillotina/files/manager.py b/guillotina/files/manager.py index 785a6e42a..8a08114b2 100644 --- a/guillotina/files/manager.py +++ b/guillotina/files/manager.py @@ -1,46 +1,263 @@ +from .const import CHUNK_SIZE +from aiohttp.web import StreamResponse +from aiohttp.web_exceptions import HTTPConflict +from aiohttp.web_exceptions import HTTPPreconditionFailed from guillotina import configure from guillotina._settings import app_settings +from guillotina.browser import Response +from guillotina.component import get_adapter from guillotina.component import get_multi_adapter +from guillotina.files.utils import read_request_data +from guillotina.interfaces import IAbsoluteURL from guillotina.interfaces import ICloudFileField from guillotina.interfaces import IFileManager +from guillotina.interfaces import IFileStorageManager from guillotina.interfaces import IRequest from guillotina.interfaces import IResource +from guillotina.interfaces import IUploadDataManager from guillotina.utils import import_class from zope.interface import alsoProvides +import base64 +import uuid + @configure.adapter( for_=(IResource, IRequest, ICloudFileField), provides=IFileManager) -class CloudFileManager(object): +class FileManager(object): def __init__(self, context, request, field): + self.context = context + self.request = request + self.field = field + iface = import_class(app_settings['cloud_storage']) alsoProvides(field, iface) - self.real_file_manager = get_multi_adapter( - (context, request, field), IFileManager) - async def download(self, *args, **kwargs): - return await self.real_file_manager.download(*args, **kwargs) + self.file_storage_manager = get_multi_adapter( + (context, request, field), IFileStorageManager) + self.dm = get_adapter( + self.file_storage_manager, IUploadDataManager) + + async def download(self, disposition=None): + if disposition is None: + disposition = self.request.GET.get('disposition', 'attachment') + + file = self.field.get(self.field.context or self.context) + cors_renderer = app_settings['cors_renderer'](self.request) + headers = await cors_renderer.get_headers() + headers.update({ + 'CONTENT-DISPOSITION': f'{disposition}; filename="%s"' % file.filename + }) + + download_resp = StreamResponse(headers=headers) + download_resp.content_type = file.guess_content_type() + if file.size: + download_resp.content_length = file.size + + await download_resp.prepare(self.request) + + async for chunk in self.file_storage_manager.iter_data(): + download_resp.write(chunk) + await download_resp.drain() + return download_resp async def tus_options(self, *args, **kwargs): - return await self.real_file_manager.tus_options(*args, **kwargs) + resp = Response(headers={ + 'Tus-Resumable': '1.0.0', + 'Tus-Version': '1.0.0', + 'Tus-Extension': 'creation-defer-length' + }) + return resp async def tus_head(self, *args, **kwargs): - return await self.real_file_manager.tus_head(*args, **kwargs) + await self.dm.load() + head_response = { + 'Upload-Offset': str(self.dm.get_offset()), + 'Tus-Resumable': '1.0.0', + 'Access-Control-Expose-Headers': 'Upload-Offset,Tus-Resumable' + } + if self.dm.get('size'): + head_response['Upload-Length'] = str(self.dm.get('size')) + resp = Response(headers=head_response) + return resp + + async def _iterate_request_data(self): + self.request._last_read_pos = 0 + data = await read_request_data(self.request, CHUNK_SIZE) + + while data: + yield data + data = await read_request_data(self.request, CHUNK_SIZE) async def tus_patch(self, *args, **kwargs): - return await self.real_file_manager.tus_patch(*args, **kwargs) + await self.dm.load() + to_upload = None + if 'CONTENT-LENGTH' in self.request.headers: + # header is optional, we'll be okay with unknown lengths... + to_upload = int(self.request.headers['CONTENT-LENGTH']) + + if 'UPLOAD-LENGTH' in self.request.headers: + if self.dm.get('deferred_length'): + size = int(self.request.headers['UPLOAD-LENGTH']) + await self.dm.update(size=size) + + if 'UPLOAD-OFFSET' in self.request.headers: + offset = int(self.request.headers['UPLOAD-OFFSET']) + else: + raise HTTPPreconditionFailed(reason='No upload-offset header') + + if offset != self.dm.get('offset'): + raise HTTPConflict(reason='Current upload does not match offset') + + read_bytes = await self.file_storage_manager.append( + self.dm, self._iterate_request_data(), offset) + + if to_upload and read_bytes != to_upload: + # check length matches if provided + raise HTTPPreconditionFailed( + reason='Upload size does not match what was provided') + await self.dm.update(offset=offset + read_bytes) + + if self.dm.get('size') and self.dm.get_offset() >= self.dm.get('size'): + await self.file_storage_manager.finish(self.dm) + await self.dm.finish() + + resp = Response(headers={ + 'Upload-Offset': str(self.dm.get_offset()), + 'Tus-Resumable': '1.0.0', + 'Access-Control-Expose-Headers': 'Upload-Offset,Upload-Expires,Tus-Resumable' + }) + return resp async def tus_create(self, *args, **kwargs): - return await self.real_file_manager.tus_create(*args, **kwargs) + await self.dm.load() + # This only happens in tus-java-client, redirect this POST to a PATCH + if self.request.headers.get('X-HTTP-Method-Override') == 'PATCH': + return await self.tus_patch() + + md5 = extension = size = None + + deferred_length = False + if self.request.headers.get('Upload-Defer-Length') == '1': + deferred_length = True - async def upload(self, *args, **kwargs): - return await self.real_file_manager.upload(*args, **kwargs) + if 'UPLOAD-LENGTH' in self.request.headers: + size = int(self.request.headers['UPLOAD-LENGTH']) + else: + if not deferred_length: + raise HTTPPreconditionFailed(reason='We need upload-length header') + + if 'UPLOAD-MD5' in self.request.headers: + md5 = self.request.headers['UPLOAD-MD5'] + + if 'UPLOAD-EXTENSION' in self.request.headers: + extension = self.request.headers['UPLOAD-EXTENSION'] + + if 'TUS-RESUMABLE' not in self.request.headers: + raise HTTPPreconditionFailed(reason='TUS needs a TUS version') + + if 'X-UPLOAD-FILENAME' in self.request.headers: + filename = self.request.headers['X-UPLOAD-FILENAME'] + elif 'UPLOAD-FILENAME' in self.request.headers: + filename = self.request.headers['UPLOAD-FILENAME'] + elif 'UPLOAD-METADATA' not in self.request.headers: + filename = uuid.uuid4().hex + else: + filename = self.request.headers['UPLOAD-METADATA'] + filename = base64.b64decode(filename.split()[1]).decode('utf-8') + if extension is None and '.' in filename: + extension = filename.split('.')[-1] + + await self.dm.start() + await self.dm.update( + content_type=self.request.content_type, + md5=md5, + filename=filename, + extension=extension, + size=size, + deferred_length=deferred_length, + offset=0) + + await self.file_storage_manager.start(self.dm) + + # Location will need to be adapted on aiohttp 1.1.x + resp = Response(headers={ + 'Location': IAbsoluteURL( + self.context, self.request)() + '/@tusupload/' + self.field.__name__, # noqa + 'Tus-Resumable': '1.0.0', + 'Access-Control-Expose-Headers': 'Location,Tus-Resumable' + }, status=201) + return resp + + async def upload(self): + await self.dm.load() + md5 = extension = size = None + if 'X-UPLOAD-MD5HASH' in self.request.headers: + md5 = self.request.headers['X-UPLOAD-MD5HASH'] + + if 'X-UPLOAD-EXTENSION' in self.request.headers: + extension = self.request.headers['X-UPLOAD-EXTENSION'] + + if 'X-UPLOAD-SIZE' in self.request.headers: + size = int(self.request.headers['X-UPLOAD-SIZE']) + else: + if 'Content-Length' in self.request.headers: + size = int(self.request.headers['Content-Length']) + else: + raise AttributeError('x-upload-size or content-length header needed') + + if 'X-UPLOAD-FILENAME' in self.request.headers: + filename = self.request.headers['X-UPLOAD-FILENAME'] + elif 'X-UPLOAD-FILENAME-B64' in self.request.headers: + filename = base64.b64decode( + self.request.headers['X-UPLOAD-FILENAME-B64']).decode("utf-8") + else: + filename = uuid.uuid4().hex + + await self.dm.start() + await self.dm.update( + content_type=self.request.content_type, + md5=md5, + filename=filename, + extension=extension, + size=size) + await self.file_storage_manager.start(self.dm) + + read_bytes = await self.file_storage_manager.append( + self.dm, self._iterate_request_data(), 0) + + if read_bytes != size: + raise HTTPPreconditionFailed( + reason='Upload size does not match what was provided') + + await self.file_storage_manager.finish(self.dm) + await self.dm.finish() async def iter_data(self, *args, **kwargs): - async for chunk in self.real_file_manager.iter_data(*args, **kwargs): + async for chunk in self.file_storage_manager.iter_data(): yield chunk - async def save_file(self, generator, *args, **kwargs): - return await self.real_file_manager.save_file(generator, *args, **kwargs) + async def save_file(self, generator, content_type=None, filename=None, + extension=None): + await self.dm.load() + await self.dm.start() + await self.dm.update( + content_type=content_type, + filename=filename or uuid.uuid4().hex, + extension=extension + ) + await self.file_storage_manager.start(self.dm) + + size = await self.file_storage_manager.append(self.dm, generator(), 0) + await self.dm.update( + size=size + ) + await self.file_storage_manager.finish(self.dm) + await self.dm.finish() + + async def copy(self, to_manager): + await to_manager.dm.load() + await self.file_storage_manager.copy( + to_manager.file_storage_manager, to_manager.dm) diff --git a/guillotina/files/utils.py b/guillotina/files/utils.py index 50094c512..4feab135a 100644 --- a/guillotina/files/utils.py +++ b/guillotina/files/utils.py @@ -1,10 +1,13 @@ from .const import MAX_REQUEST_CACHE_SIZE from guillotina.exceptions import UnRetryableRequestError +from guillotina.utils import get_content_path +from guillotina.utils import to_str import asyncio import base64 import mimetypes import os +import uuid async def read_request_data(request, chunk_size): @@ -19,7 +22,9 @@ async def read_request_data(request, chunk_size): # so retrying this request is not supported and we need to throw # another error raise UnRetryableRequestError() - data = request._cache_data[request._last_read_pos:request._last_read_pos + chunk_size] + start = request._last_read_pos + end = request._last_read_pos + chunk_size + data = request._cache_data[start:end] request._last_read_pos += len(data) if request._last_read_pos >= len(request._cache_data): # done reading cache data @@ -72,3 +77,23 @@ def convert_base64_to_binary(b64data): 'content_type': content_type, 'data': data } + + +def guess_content_type(content_type, filename): + ct = to_str(content_type) + if not ct or ct == 'application/octet-stream': + if not filename: + return 'application/octet-stream' + # try guessing content_type + ct, _ = mimetypes.guess_type(filename) + if ct is None: + ct = 'application/octet-stream' + return ct + + +def generate_key(request, context): + return '{}{}/{}::{}'.format( + request._container_id, + get_content_path(context), + context._p_oid, + uuid.uuid4().hex) diff --git a/guillotina/interfaces/__init__.py b/guillotina/interfaces/__init__.py index aec218dcf..1a13ad90d 100644 --- a/guillotina/interfaces/__init__.py +++ b/guillotina/interfaces/__init__.py @@ -54,6 +54,8 @@ from .files import IFileCleanup # noqa from .files import IFileField # noqa from .files import IFileManager # noqa +from .files import IFileStorageManager # noqa +from .files import IUploadDataManager # noqa from .json import IFactorySerializeToJson # noqa from .json import IJSONToValue # noqa from .json import IResourceDeserializeFromJson # noqa diff --git a/guillotina/interfaces/files.py b/guillotina/interfaces/files.py index 5d683070a..0b3b7317c 100644 --- a/guillotina/interfaces/files.py +++ b/guillotina/interfaces/files.py @@ -5,31 +5,119 @@ from zope.interface import Interface +class IUploadDataManager(Interface): + ''' + Interface to manage upload data + ''' + + async def load(): + ''' + Load the current upload status + ''' + + async def update(**kwargs): + ''' + update file upload data + ''' + + async def finish(): + ''' + finish upload + ''' + + async def save(): + ''' + save any current operations to db + ''' + + async def get(name): + ''' + get attribute + ''' + + async def get_offset(self): + ''' + get current upload offset + ''' + + +class IFileStorageManager(Interface): + ''' + Manage storing file data + ''' + + async def start(dm): + ''' + start upload + ''' + + async def iter_data(): + ''' + iterate through data in file + ''' + + async def append(data): + ''' + append data to the file + ''' + + async def finish(): + ''' + finish upload + ''' + + async def copy(dm, other_storage_manager, other_dm): + ''' + copy file to another file + ''' + + class IFileManager(Interface): """Interface to create uploaders and downloaders.""" - async def upload(self): + async def upload(): + ''' + Upload complete file in one shot + ''' + + async def download(): + ''' + Download file + ''' + + async def tus_post(): + ''' + Start tus upload process + ''' + + async def tus_patch(): ''' + Upload part of file ''' - async def download(self): + async def tus_options(): ''' + Get tus supported version ''' - async def tus_post(self): + async def tus_head(): ''' + Get current tus status ''' - async def tus_patch(self): + async def iter_data(): ''' + Return an async iterator of the file ''' - async def tus_options(self): + async def save_file(generator): ''' + Save data to a file from an async generator ''' - async def tus_head(self): + async def copy(other_manager): ''' + Copy current file to new one ''' diff --git a/guillotina/tests/test_attachment.py b/guillotina/tests/test_attachment.py index e859e227e..8dbe1a40a 100644 --- a/guillotina/tests/test_attachment.py +++ b/guillotina/tests/test_attachment.py @@ -1,8 +1,11 @@ from guillotina.behaviors.attachment import IAttachment +from guillotina.component import get_multi_adapter +from guillotina.interfaces import IFileManager from guillotina.tests import utils from guillotina.transactions import managed_transaction import json +import random async def test_create_content_with_behavior(container_requester): @@ -140,6 +143,83 @@ async def test_tus(container_requester): assert behavior.file._blob.chunks == 10 +async def test_tus_unknown_size(container_requester): + async with container_requester as requester: + response, status = await requester( + 'POST', + '/db/guillotina/', + data=json.dumps({ + '@type': 'Item', + '@behaviors': ['guillotina.behaviors.attachment.IAttachment'], + 'id': 'foobar' + }) + ) + assert status == 201 + + response, status = await requester( + 'OPTIONS', + '/db/guillotina/foobar/@tusupload/file') + assert status == 200 + + response, status = await requester( + 'POST', + '/db/guillotina/foobar/@tusupload/file', + headers={ + 'Upload-Defer-Length': '1', + 'TUS-RESUMABLE': '1.0.0' + } + ) + assert status == 201 + + response, status = await requester( + 'HEAD', + '/db/guillotina/foobar/@tusupload/file') + assert status == 200 + + offset = 0 + for idx in range(10): + # random sizes + size = 1024 * random.choice([1024, 1243, 5555, 7777]) + response, status = await requester( + 'PATCH', + '/db/guillotina/foobar/@tusupload/file', + headers={ + 'TUS-RESUMABLE': '1.0.0', + 'upload-offset': str(offset) + }, + data=b'X' * size + ) + offset += size + assert status == 200 + + response, status = await requester( + 'PATCH', + '/db/guillotina/foobar/@tusupload/file', + headers={ + 'TUS-RESUMABLE': '1.0.0', + 'upload-offset': str(offset), + 'UPLOAD-LENGTH': str(offset) # finish it + }, + data=b'' + ) + + response, status = await requester( + 'GET', + '/db/guillotina/foobar/@download/file' + ) + assert status == 200 + assert len(response) == offset + + request = utils.get_mocked_request(requester.db) + root = await utils.get_root(request) + async with managed_transaction(request=request, abort_when_done=True): + container = await root.async_get('guillotina') + obj = await container.async_get('foobar') + behavior = IAttachment(obj) + await behavior.load() + assert behavior.file._blob.size == offset + + async def test_copy_file_ob(container_requester): async with container_requester as requester: response, status = await requester( @@ -170,5 +250,11 @@ async def test_copy_file_ob(container_requester): attachment = IAttachment(obj) await attachment.load() existing_bid = attachment.file._blob.bid - await attachment.file.copy_cloud_file(obj) + cfm = get_multi_adapter( + (obj, request, IAttachment['file'].bind(attachment)), IFileManager + ) + from_cfm = get_multi_adapter( + (obj, request, IAttachment['file'].bind(attachment)), IFileManager + ) + await cfm.copy(from_cfm) assert existing_bid != attachment.file._blob.bid diff --git a/guillotina/tests/test_serialize.py b/guillotina/tests/test_serialize.py index f827c49cf..64fe75b5d 100644 --- a/guillotina/tests/test_serialize.py +++ b/guillotina/tests/test_serialize.py @@ -70,13 +70,29 @@ async def test_serialize_omit_main_interface_field(dummy_request): assert 'file' in result -async def test_serialize_cloud_file(dummy_request): - from guillotina.test_package import FileContent +async def test_serialize_cloud_file(dummy_request, dummy_guillotina): + request = dummy_request + request._txn = mocks.MockTransaction() + from guillotina.test_package import FileContent, IFileContent + from guillotina.interfaces import IFileManager obj = create_content(FileContent) - obj.file = DBFile(filename='foobar.json', size=25, md5='foobar') + obj.file = DBFile(filename='foobar.json', md5='foobar') + + fm = get_multi_adapter( + (obj, request, IFileContent['file'].bind(obj)), + IFileManager) + await fm.dm.load() + await fm.file_storage_manager.start(fm.dm) + + async def _data(): + yield b'{"foo": "bar"}' + + await fm.file_storage_manager.append(fm.dm, _data(), 0) + await fm.file_storage_manager.finish(fm.dm) + await fm.dm.finish() value = json_compatible(obj.file) assert value['filename'] == 'foobar.json' - assert value['size'] == 25 + assert value['size'] == 14 assert value['md5'] == 'foobar' @@ -84,17 +100,18 @@ async def test_deserialize_cloud_file(dummy_request): from guillotina.test_package import IFileContent, FileContent request = dummy_request # noqa tm = dummy_request._tm - await tm.begin(dummy_request) + txn = await tm.begin(dummy_request) obj = create_content(FileContent) + obj._p_jar = txn obj.file = None - value = await get_adapter( - IFileContent['file'], IJSONToValue, + await get_adapter( + IFileContent['file'].bind(obj), IJSONToValue, args=[ '', obj ]) - assert isinstance(value, DBFile) - assert value.size == 42 + assert isinstance(obj.file, DBFile) + assert obj.file.size == 42 class ITestSchema(Interface):