From 2035bdb2efd6384843985166072933094c7e176c Mon Sep 17 00:00:00 2001 From: Harris Tzovanakis Date: Sun, 6 Mar 2016 19:19:57 +0100 Subject: [PATCH] WIP: restful: file uploading in chunks addition Signed-off-by: Harris Tzovanakis --- invenio_files_rest/admin.py | 36 ++++++++- invenio_files_rest/config.py | 3 + invenio_files_rest/errors.py | 8 ++ invenio_files_rest/ext.py | 10 ++- invenio_files_rest/models.py | 130 ++++++++++++++++++++++++++++++++- invenio_files_rest/storage.py | 27 ++++++- invenio_files_rest/uploader.py | 59 +++++++++++++++ invenio_files_rest/views.py | 117 +++++++++++++++++++++++++---- setup.py | 2 + 9 files changed, 368 insertions(+), 24 deletions(-) create mode 100644 invenio_files_rest/uploader.py diff --git a/invenio_files_rest/admin.py b/invenio_files_rest/admin.py index 2f859d26..d4b7e1f5 100644 --- a/invenio_files_rest/admin.py +++ b/invenio_files_rest/admin.py @@ -32,7 +32,10 @@ from markupsafe import Markup from wtforms.validators import ValidationError -from .models import Bucket, FileInstance, Location, ObjectVersion, slug_pattern +from .models import ( + Bucket, FileInstance, Location, MultipartObject, ObjectVersion, + slug_pattern +) from .tasks import verify_checksum @@ -230,6 +233,33 @@ def action_verify_checksum(self, ids): 'error') # pragma: no cover +class MultipartObjectModelView(ModelView): + """ModelView for the objects.""" + + filter_converter = FilterConverter() + can_create = False + can_edit = False + can_delete = False + can_view_details = True + column_formatters = dict( + file_instance=link('File', lambda o: url_for( + 'fileinstance.index_view', flt0_0=o.file_id)), + ) + column_labels = dict( + id=_('ID'), + complete=_('Complete'), + file_instance=_('File'), + ) + column_list = ( + 'upload_id', 'complete', 'created', 'updated', 'file_instance', ) + column_details_list = ( + 'upload_id', 'complete', 'created', 'updated', 'file_instance', ) + column_filters = ( + 'upload_id', 'complete', 'created', 'updated', ) + column_default_sort = ('upload_id', True) + page_size = 25 + + location_adminview = dict( modelview=LocationModelView, model=Location, @@ -246,3 +276,7 @@ def action_verify_checksum(self, ids): modelview=FileInstanceModelView, model=FileInstance, category=_('Files')) +multipartobject_adminview = dict( + modelview=MultipartObjectModelView, + model=MultipartObject, + category=_('Files')) diff --git a/invenio_files_rest/config.py b/invenio_files_rest/config.py index 8198c33d..523bdd35 100644 --- a/invenio_files_rest/config.py +++ b/invenio_files_rest/config.py @@ -40,3 +40,6 @@ FILES_REST_STORAGE_FACTORY = None """Import path of factory used to create a storage instance.""" + +FILES_REST_UPLOAD_FACTORY = None +"""Import path of factory used to parse chunked upload parameters.""" diff --git a/invenio_files_rest/errors.py b/invenio_files_rest/errors.py index 1521cdc7..e06f4b0b 100644 --- a/invenio_files_rest/errors.py +++ b/invenio_files_rest/errors.py @@ -41,3 +41,11 @@ class FileInstanceAlreadySetError(FilesException): class InvalidOperationError(FilesException): """Exception raise when an invalid operation is performed.""" + + +class MultipartObjectException(FilesException): + """Exception for multipart objects.""" + + +class MultipartObjectAlreadyCompleted(MultipartObjectException): + """Exception raised when multipart object is already completed.""" diff --git a/invenio_files_rest/ext.py b/invenio_files_rest/ext.py index d0e3593e..34602c72 100644 --- a/invenio_files_rest/ext.py +++ b/invenio_files_rest/ext.py @@ -26,10 +26,11 @@ from __future__ import absolute_import, print_function -from werkzeug.utils import import_string +from werkzeug.utils import cached_property, import_string from . import config from .storage import storage_factory +from .uploader import plupload from .views import blueprint @@ -40,6 +41,7 @@ def __init__(self, app): """Initialize state.""" self.app = app self._storage_factory = None + self._upload_factory = None @property def storage_factory(self): @@ -50,6 +52,12 @@ def storage_factory(self): storage_factory return self._storage_factory + @cached_property + def upload_factory(self): + """Load default permission factory.""" + imp = self.app.config["FILES_REST_UPLOAD_FACTORY"] + return import_string(imp) if imp else plupload + class InvenioFilesREST(object): """Invenio-Files-REST extension.""" diff --git a/invenio_files_rest/models.py b/invenio_files_rest/models.py index 96cf1558..b695168d 100644 --- a/invenio_files_rest/models.py +++ b/invenio_files_rest/models.py @@ -58,7 +58,10 @@ from sqlalchemy.orm.exc import MultipleResultsFound from sqlalchemy_utils.types import UUIDType -from .errors import FileInstanceAlreadySetError, InvalidOperationError +from .errors import ( + FileInstanceAlreadySetError, InvalidOperationError, + MultipartObjectAlreadyCompleted +) slug_pattern = re.compile("^[a-z][a-z0-9-]+$") @@ -417,7 +420,13 @@ def verify_checksum(self, obj=None, progress_callback=None): self.last_check_at = datetime.utcnow() return self.last_check - def set_contents(self, obj, stream, size=None, chunk_size=None): + def update_contents(self, obj, stream, location=None, size=None, + chunk_size=None): + """Update contents of the file.""" + self.set_uri(*self.storage(obj, location=location).update( + stream, size=size, chunk_size=chunk_size), read_only=False) + + def set_contents(self, location, stream, size=None, chunk_size=None): """Save contents of stream to this file. :param obj: ObjectVersion instance from where this file is accessed @@ -426,7 +435,7 @@ def set_contents(self, obj, stream, size=None, chunk_size=None): """ if self.read_only: raise ValueError("FileInstance is read-only.") - self.set_uri(*self.storage(obj).save( + self.set_uri(*self.storage(location).save( stream, size=size, chunk_size=chunk_size), read_only=True) def send_file(self, obj): @@ -732,9 +741,124 @@ def get_by_bucket(cls, bucket, versions=False): return cls.query.filter(*args).order_by(cls.key, cls.created.desc()) +class MultipartObject(db.Model, Timestamp): + """Model for storing files in chunks. + + A multipart object handles the file upload in chunks. + + First it creates an ``upload_id`` and a file instance and updates the file + until the ``finalize()`` called. Then the multipart object will mark the + instance as completed and create a ObjectVersion with the uploaded file. + """ + + __tablename__ = 'files_multipart_object' + + bucket_id = db.Column( + UUIDType, + db.ForeignKey(Bucket.id, ondelete='RESTRICT'), + default=uuid.uuid4, + primary_key=True, ) + """Bucket identifier.""" + + key = db.Column( + db.String(255), + primary_key=True, ) + """Key identifying the object.""" + + upload_id = db.Column( + UUIDType, + unique=True, + default=uuid.uuid4, ) + """Identifier for the specific version of an object.""" + + file_id = db.Column( + UUIDType, + db.ForeignKey(FileInstance.id, ondelete='RESTRICT'), nullable=True) + """File instance for this object version. + + A null value in this column defines that the object has been deleted. + """ + + complete = db.Column(db.Boolean, nullable=False, default=True) + """Defines if object is the completed.""" + + # Relationships definitions + bucket = db.relationship(Bucket, backref='multipart_objects') + """Relationship to buckets.""" + + file = db.relationship(FileInstance, backref='multipart_objects') + """Relationship to file instance.""" + + def __repr__(self): + """Return representation of the multipart object.""" + return "{0}:{2}:{1}".format( + self.bucket_id, self.key, self.upload_id) + + def set_contents(self, stream, size=None, chunk_size=None): + """Save contents of stream to file instance.""" + if self.complete: + raise MultipartObjectAlreadyCompleted() + + self.file.update_contents( + self, stream, location=self.upload_id, + size=size, chunk_size=chunk_size + ) + + return self + + def finalize(self): + """Move the multipart object to a versioned object.""" + if self.complete: + raise MultipartObjectAlreadyCompleted() + # Update bucket size + self.bucket.size += self.file.size + # Make file read_only and proceed + self.file.read_only = True + # Create a Object version with the specific file + obj = ObjectVersion.create(self.bucket, self.key) + obj.set_file(self.file) + db.session.add(obj) + return obj + + @classmethod + def create(cls, bucket, key, **kwargs): + """Create a new object in a bucket.""" + bucket = bucket if isinstance(bucket, Bucket) else Bucket.get(bucket) + with db.session.begin_nested(): + obj = cls( + bucket=bucket, + key=key, + upload_id=uuid.uuid4(), + complete=False, + file=FileInstance(), + ) + # Create the file + db.session.add(obj) + return obj + + @classmethod + def get(cls, upload_id): + """Fetch a specific multipart object.""" + args = [ + cls.complete.is_(False), + cls.upload_id == upload_id, + ] + + return cls.query.filter(*args).one_or_none() + + def serialize(self): + """HELP.""" + return dict( + complete=str(self.complete), + file_id=str(self.file.id), + upload_id=str(self.upload_id), + bucket=str(self.bucket.id) + ) + __all__ = ( 'Bucket', 'FileInstance', 'Location', + 'MultipartObject', 'ObjectVersion', ) diff --git a/invenio_files_rest/storage.py b/invenio_files_rest/storage.py index 39f4ce65..eb652629 100644 --- a/invenio_files_rest/storage.py +++ b/invenio_files_rest/storage.py @@ -36,9 +36,9 @@ from .helpers import send_stream -def storage_factory(obj=None, fileinstance=None): +def storage_factory(obj=None, location=None, fileinstance=None): """Factory function for creating a storage instance.""" - return PyFilesystemStorage(obj, fileinstance) + return PyFilesystemStorage(obj, location, fileinstance) class Storage(object): @@ -47,18 +47,26 @@ class Storage(object): :param object: A dict with data used to initialize the backend. """ - def __init__(self, object, fileinstance): + def __init__(self, object, location, fileinstance): """Storage init.""" self.obj = object self.file = fileinstance + self.location = location def make_path(self): """Make path to file in a given storage location.""" + if self.location: + return str(self.location) + return join( str(self.obj.bucket_id), str(self.obj.version_id), ) + def update(self, incoming_stream, size=None, chunk_size=None): + """Update a file in the storage.""" + raise NotImplementedError + def save(self, incoming_stream, size=None, chunk_size=None): """Create a new file in the storage.""" raise NotImplementedError @@ -118,6 +126,19 @@ def _save_stream(self, src, dst, chunk_size=None): class PyFilesystemStorage(Storage): """File system storage using PyFilesystem.""" + def update(self, incoming_stream, size=None, chunk_size=None): + """Update file in the file system.""" + uri = self.obj.bucket.location.uri + path = self.make_path() + + with opener.opendir(uri) as fs: + dest_file = fs.makeopendir(path, recursive=True).open('data', 'ab') + bytes_written, checksum = self._save_stream( + incoming_stream, dest_file, chunk_size=chunk_size) + dest_file.close() + + return join(uri, path, 'data'), bytes_written, checksum + def save(self, incoming_stream, size=None, chunk_size=None): """Save file in the file system.""" uri = self.obj.bucket.location.uri diff --git a/invenio_files_rest/uploader.py b/invenio_files_rest/uploader.py new file mode 100644 index 00000000..6fe3d3ac --- /dev/null +++ b/invenio_files_rest/uploader.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Invenio. +# Copyright (C) 2016 CERN. +# +# Invenio is free software; you can redistribute it +# and/or modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# Invenio is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Invenio; if not, write to the +# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307, USA. +# +# In applying this license, CERN does not +# waive the privileges and immunities granted to it by virtue of its status +# as an Intergovernmental Organization or submit itself to any jurisdiction. + +"""Uploader factory for REST API. + +The default factory allows you to use chunks with ``plupload`` javascript +plugin. + +The API expects three arguments from the uploader: + * ``total`` - the total number of chunks + * ``current`` - the current chunk + * ``upload_id`` the upload id +""" + + +def plupload(params): + """Plupload factory. + + :param dict params: the request object arguments. + + .. note:: + + Plupload sends chunk either as multipart/form-data (default) or as + binary system, depending on the value of multipart option. Also three + arguments sent with each chunk of data: + + * ``chunks`` - the total number of chunks in the file + * ``chunk`` - the ordinal number of the current chink in the set + * ``name`` - the name of the file + + See Plupload documentation for full details of chunk upload: + http://www.plupload.com/docs/Chunking + """ + return dict( + total=params.get('chunks'), + current=params.get('chunk'), + upload_id=params.get('upload_id') + ) diff --git a/invenio_files_rest/views.py b/invenio_files_rest/views.py index 89f15fcd..19f6db71 100644 --- a/invenio_files_rest/views.py +++ b/invenio_files_rest/views.py @@ -33,7 +33,7 @@ from webargs import fields from webargs.flaskparser import parser, use_kwargs -from .models import Bucket, Location, ObjectVersion +from .models import Bucket, Location, MultipartObject, ObjectVersion from .serializer import json_serializer blueprint = Blueprint( @@ -379,7 +379,7 @@ class ObjectResource(ContentNegotiatedMethodView): ) """GET query arguments.""" - put_args = dict( + post_args = dict( content_length=fields.Int( load_from='Content-Length', location='headers', @@ -388,7 +388,7 @@ class ObjectResource(ContentNegotiatedMethodView): load_from='Content-MD5', location='headers', ), ) - """PUT header arguments.""" + """POST header arguments.""" def __init__(self, serializers=None, *args, **kwargs): """Constructor.""" @@ -443,11 +443,11 @@ def get(self, bucket_id, key, version_id=None, **kwargs): abort(404, 'Object does not exist.') return obj.send_file() - @use_kwargs(put_args) - def put(self, bucket_id, key, content_length=None, content_md5=None): + @use_kwargs(post_args) + def post(self, bucket_id, key, content_length=None, content_md5=None): """Upload object file. - .. http:put:: /files/(uuid:bucket_id) + .. http:post:: /files/(uuid:bucket_id) Uploads a new object file. @@ -493,16 +493,66 @@ def put(self, bucket_id, key, content_length=None, content_md5=None): :statuscode 400: invalid request :statuscode 403: access denied :statuscode 500: failed request - """ - # TODO: Check key is a valid key. - uploaded_file = request.files['file'] - if not uploaded_file: - abort(400, 'File missing in request.') + .. http:post:: /files/(uuid:bucket_id)/?uploads=1 + + Create a new multipart object file. + + **Request**: + + .. sourcecode:: http + + POST /files/14b0b1d3-71f3-4b6b-87a2-0796f1624bb6?uploads=1... + Accept: */* + Accept-Encoding: gzip, deflate + Connection: keep-alive + Content-Length: 15340 + Content-Type: multipart/form-data; boundary=44dea52ee18245e7... + Host: localhost:5000 + + -----------------------------44dea52ee18245e7... + Content-Disposition: form-data; name="file"; filename="f.pdf" + Content-Type: application/pdf + + [binary] + + :reqheader Content-Type: multipart/form-data + :formparam file file: file object. + + **Responses**: + + .. sourcecode:: http + + HTTP/1.0 200 OK + Content-Length: 165 + Content-Type: application/json + + { + "upload_id": "322ea4c6-6650-4143-a328-274eee55f45d", + "uuid": "322ea4c6-6650-4143-a328-274eee55f45d", + "updated": "2015-12-10T14:16:57.202795" + } + + :resheader Content-Type: application/json + :statuscode 200: no error + :statuscode 400: invalid request + :statuscode 403: access denied + :statuscode 500: failed request + + .. note:: + + Proccess to upload a file in chunks: + + * Request an ``upload_id`` by requesting 322e.../f.pdf?uploads=1 + * Start uploading chunks 322e.../f.pdf?uploads=1&upload_id=... + * Finish uploading 322e.../f.pdf?uploads=1&upload_id=... + + """ # Retrieve bucket. bucket = Bucket.get(bucket_id) if bucket is None: abort(404, 'Bucket does not exist.') + # TODO: Check key is a valid key. # TODO: Check access permission on bucket. # TODO: Check quota on bucket using content length @@ -511,10 +561,45 @@ def put(self, bucket_id, key, content_length=None, content_md5=None): # TODO: Don't create a new file if content is identical. try: - # TODO: Pass storage class to get_or_create - obj = ObjectVersion.create(bucket, key) - obj.set_contents(uploaded_file, size=content_length) - db.session.commit() + if request.args.get('uploads'): + params = current_app.extensions[ + 'invenio-files-rest'].upload_factory( + request.args) + # Get the upload_id + upload_id = request.args.get('upload_id') + if upload_id: + # Get the upload object + obj = MultipartObject.get(upload_id) + # If it has chunks proccess them otherwise throw error + if params.get('current'): + # If current chunk less than total chunks + if params.get('current') < params.get('total'): + # Update the file + uploaded_file = request.files['file'] + if not uploaded_file: + abort(400, 'file missing in request.') + obj.update_contents( + uploaded_file, size=content_length + ) + else: + # Otherwise finilize it + obj.finilize() + # Commit the changes + db.session.commit() + # Abort not valid parameters + abort(400, 'Not valid chunk parameters.') + else: + # Requesting a new uplaod_id + obj = MultipartObject.create(bucket, key) + db.session.commit() + else: + uploaded_file = request.files['file'] + if not uploaded_file: + abort(400, 'file missing in request.') + # TODO: Pass storage class to get_or_create + obj = ObjectVersion.create(bucket, key) + obj.set_contents(uploaded_file, size=content_length) + db.session.commit() # TODO: Fix response object to only include headers? return {'json': obj.serialize()} except SQLAlchemyError: @@ -643,5 +728,5 @@ def head(self, bucket_id, filename, **kwargs): blueprint.add_url_rule( '//', view_func=object_view, - methods=['GET', 'PUT', 'DELETE', 'HEAD'] + methods=['GET', 'POST', 'DELETE', 'HEAD'] ) diff --git a/setup.py b/setup.py index 3c675bcb..f9b08e54 100644 --- a/setup.py +++ b/setup.py @@ -163,6 +163,8 @@ def run_tests(self): 'object_adminview = invenio_files_rest.admin:object_adminview', 'fileinstance_adminview ' '= invenio_files_rest.admin:fileinstance_adminview', + 'multipartobject_adminview ' + '= invenio_files_rest.admin:multipartobject_adminview', ], 'invenio_access.actions': [ 'bucket_create = invenio_files_rest.permissions:bucket_create',