Skip to content

Commit

Permalink
WIP: restful: file uploading in chunks addition
Browse files Browse the repository at this point in the history
Signed-off-by: Harris Tzovanakis <me@drjova.com>
  • Loading branch information
drjova committed Mar 9, 2016
1 parent 400f6b2 commit 2035bdb
Show file tree
Hide file tree
Showing 9 changed files with 368 additions and 24 deletions.
36 changes: 35 additions & 1 deletion invenio_files_rest/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
from markupsafe import Markup
from wtforms.validators import ValidationError

from .models import Bucket, FileInstance, Location, ObjectVersion, slug_pattern
from .models import (
Bucket, FileInstance, Location, MultipartObject, ObjectVersion,
slug_pattern
)
from .tasks import verify_checksum


Expand Down Expand Up @@ -230,6 +233,33 @@ def action_verify_checksum(self, ids):
'error') # pragma: no cover


class MultipartObjectModelView(ModelView):
"""ModelView for the objects."""

filter_converter = FilterConverter()
can_create = False
can_edit = False
can_delete = False
can_view_details = True
column_formatters = dict(
file_instance=link('File', lambda o: url_for(
'fileinstance.index_view', flt0_0=o.file_id)),
)
column_labels = dict(
id=_('ID'),
complete=_('Complete'),
file_instance=_('File'),
)
column_list = (
'upload_id', 'complete', 'created', 'updated', 'file_instance', )
column_details_list = (
'upload_id', 'complete', 'created', 'updated', 'file_instance', )
column_filters = (
'upload_id', 'complete', 'created', 'updated', )
column_default_sort = ('upload_id', True)
page_size = 25


location_adminview = dict(
modelview=LocationModelView,
model=Location,
Expand All @@ -246,3 +276,7 @@ def action_verify_checksum(self, ids):
modelview=FileInstanceModelView,
model=FileInstance,
category=_('Files'))
multipartobject_adminview = dict(
modelview=MultipartObjectModelView,
model=MultipartObject,
category=_('Files'))
3 changes: 3 additions & 0 deletions invenio_files_rest/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,6 @@

FILES_REST_STORAGE_FACTORY = None
"""Import path of factory used to create a storage instance."""

FILES_REST_UPLOAD_FACTORY = None
"""Import path of factory used to parse chunked upload parameters."""
8 changes: 8 additions & 0 deletions invenio_files_rest/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ class FileInstanceAlreadySetError(FilesException):

class InvalidOperationError(FilesException):
"""Exception raise when an invalid operation is performed."""


class MultipartObjectException(FilesException):
"""Exception for multipart objects."""


class MultipartObjectAlreadyCompleted(MultipartObjectException):
"""Exception raised when multipart object is already completed."""
10 changes: 9 additions & 1 deletion invenio_files_rest/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@

from __future__ import absolute_import, print_function

from werkzeug.utils import import_string
from werkzeug.utils import cached_property, import_string

from . import config
from .storage import storage_factory
from .uploader import plupload
from .views import blueprint


Expand All @@ -40,6 +41,7 @@ def __init__(self, app):
"""Initialize state."""
self.app = app
self._storage_factory = None
self._upload_factory = None

@property
def storage_factory(self):
Expand All @@ -50,6 +52,12 @@ def storage_factory(self):
storage_factory
return self._storage_factory

@cached_property
def upload_factory(self):
"""Load default permission factory."""
imp = self.app.config["FILES_REST_UPLOAD_FACTORY"]
return import_string(imp) if imp else plupload


class InvenioFilesREST(object):
"""Invenio-Files-REST extension."""
Expand Down
130 changes: 127 additions & 3 deletions invenio_files_rest/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@
from sqlalchemy.orm.exc import MultipleResultsFound
from sqlalchemy_utils.types import UUIDType

from .errors import FileInstanceAlreadySetError, InvalidOperationError
from .errors import (
FileInstanceAlreadySetError, InvalidOperationError,
MultipartObjectAlreadyCompleted
)

slug_pattern = re.compile("^[a-z][a-z0-9-]+$")

Expand Down Expand Up @@ -417,7 +420,13 @@ def verify_checksum(self, obj=None, progress_callback=None):
self.last_check_at = datetime.utcnow()
return self.last_check

def set_contents(self, obj, stream, size=None, chunk_size=None):
def update_contents(self, obj, stream, location=None, size=None,
chunk_size=None):
"""Update contents of the file."""
self.set_uri(*self.storage(obj, location=location).update(
stream, size=size, chunk_size=chunk_size), read_only=False)

def set_contents(self, location, stream, size=None, chunk_size=None):
"""Save contents of stream to this file.
:param obj: ObjectVersion instance from where this file is accessed
Expand All @@ -426,7 +435,7 @@ def set_contents(self, obj, stream, size=None, chunk_size=None):
"""
if self.read_only:
raise ValueError("FileInstance is read-only.")
self.set_uri(*self.storage(obj).save(
self.set_uri(*self.storage(location).save(
stream, size=size, chunk_size=chunk_size), read_only=True)

def send_file(self, obj):
Expand Down Expand Up @@ -732,9 +741,124 @@ def get_by_bucket(cls, bucket, versions=False):
return cls.query.filter(*args).order_by(cls.key, cls.created.desc())


class MultipartObject(db.Model, Timestamp):
"""Model for storing files in chunks.
A multipart object handles the file upload in chunks.
First it creates an ``upload_id`` and a file instance and updates the file
until the ``finalize()`` called. Then the multipart object will mark the
instance as completed and create a ObjectVersion with the uploaded file.
"""

__tablename__ = 'files_multipart_object'

bucket_id = db.Column(
UUIDType,
db.ForeignKey(Bucket.id, ondelete='RESTRICT'),
default=uuid.uuid4,
primary_key=True, )
"""Bucket identifier."""

key = db.Column(
db.String(255),
primary_key=True, )
"""Key identifying the object."""

upload_id = db.Column(
UUIDType,
unique=True,
default=uuid.uuid4, )
"""Identifier for the specific version of an object."""

file_id = db.Column(
UUIDType,
db.ForeignKey(FileInstance.id, ondelete='RESTRICT'), nullable=True)
"""File instance for this object version.
A null value in this column defines that the object has been deleted.
"""

complete = db.Column(db.Boolean, nullable=False, default=True)
"""Defines if object is the completed."""

# Relationships definitions
bucket = db.relationship(Bucket, backref='multipart_objects')
"""Relationship to buckets."""

file = db.relationship(FileInstance, backref='multipart_objects')
"""Relationship to file instance."""

def __repr__(self):
"""Return representation of the multipart object."""
return "{0}:{2}:{1}".format(
self.bucket_id, self.key, self.upload_id)

def set_contents(self, stream, size=None, chunk_size=None):
"""Save contents of stream to file instance."""
if self.complete:
raise MultipartObjectAlreadyCompleted()

self.file.update_contents(
self, stream, location=self.upload_id,
size=size, chunk_size=chunk_size
)

return self

def finalize(self):
"""Move the multipart object to a versioned object."""
if self.complete:
raise MultipartObjectAlreadyCompleted()
# Update bucket size
self.bucket.size += self.file.size
# Make file read_only and proceed
self.file.read_only = True
# Create a Object version with the specific file
obj = ObjectVersion.create(self.bucket, self.key)
obj.set_file(self.file)
db.session.add(obj)
return obj

@classmethod
def create(cls, bucket, key, **kwargs):
"""Create a new object in a bucket."""
bucket = bucket if isinstance(bucket, Bucket) else Bucket.get(bucket)
with db.session.begin_nested():
obj = cls(
bucket=bucket,
key=key,
upload_id=uuid.uuid4(),
complete=False,
file=FileInstance(),
)
# Create the file
db.session.add(obj)
return obj

@classmethod
def get(cls, upload_id):
"""Fetch a specific multipart object."""
args = [
cls.complete.is_(False),
cls.upload_id == upload_id,
]

return cls.query.filter(*args).one_or_none()

def serialize(self):
"""HELP."""
return dict(
complete=str(self.complete),
file_id=str(self.file.id),
upload_id=str(self.upload_id),
bucket=str(self.bucket.id)
)

__all__ = (
'Bucket',
'FileInstance',
'Location',
'MultipartObject',
'ObjectVersion',
)
27 changes: 24 additions & 3 deletions invenio_files_rest/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
from .helpers import send_stream


def storage_factory(obj=None, fileinstance=None):
def storage_factory(obj=None, location=None, fileinstance=None):
"""Factory function for creating a storage instance."""
return PyFilesystemStorage(obj, fileinstance)
return PyFilesystemStorage(obj, location, fileinstance)


class Storage(object):
Expand All @@ -47,18 +47,26 @@ class Storage(object):
:param object: A dict with data used to initialize the backend.
"""

def __init__(self, object, fileinstance):
def __init__(self, object, location, fileinstance):
"""Storage init."""
self.obj = object
self.file = fileinstance
self.location = location

def make_path(self):
"""Make path to file in a given storage location."""
if self.location:
return str(self.location)

return join(
str(self.obj.bucket_id),
str(self.obj.version_id),
)

def update(self, incoming_stream, size=None, chunk_size=None):
"""Update a file in the storage."""
raise NotImplementedError

def save(self, incoming_stream, size=None, chunk_size=None):
"""Create a new file in the storage."""
raise NotImplementedError
Expand Down Expand Up @@ -118,6 +126,19 @@ def _save_stream(self, src, dst, chunk_size=None):
class PyFilesystemStorage(Storage):
"""File system storage using PyFilesystem."""

def update(self, incoming_stream, size=None, chunk_size=None):
"""Update file in the file system."""
uri = self.obj.bucket.location.uri
path = self.make_path()

with opener.opendir(uri) as fs:
dest_file = fs.makeopendir(path, recursive=True).open('data', 'ab')
bytes_written, checksum = self._save_stream(
incoming_stream, dest_file, chunk_size=chunk_size)
dest_file.close()

return join(uri, path, 'data'), bytes_written, checksum

def save(self, incoming_stream, size=None, chunk_size=None):
"""Save file in the file system."""
uri = self.obj.bucket.location.uri
Expand Down
59 changes: 59 additions & 0 deletions invenio_files_rest/uploader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2016 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.

"""Uploader factory for REST API.
The default factory allows you to use chunks with ``plupload`` javascript
plugin.
The API expects three arguments from the uploader:
* ``total`` - the total number of chunks
* ``current`` - the current chunk
* ``upload_id`` the upload id
"""


def plupload(params):
"""Plupload factory.
:param dict params: the request object arguments.
.. note::
Plupload sends chunk either as multipart/form-data (default) or as
binary system, depending on the value of multipart option. Also three
arguments sent with each chunk of data:
* ``chunks`` - the total number of chunks in the file
* ``chunk`` - the ordinal number of the current chink in the set
* ``name`` - the name of the file
See Plupload documentation for full details of chunk upload:
http://www.plupload.com/docs/Chunking
"""
return dict(
total=params.get('chunks'),
current=params.get('chunk'),
upload_id=params.get('upload_id')
)

0 comments on commit 2035bdb

Please sign in to comment.