Skip to content

Commit

Permalink
WIP restful: file uploading in chunks addition
Browse files Browse the repository at this point in the history
Signed-off-by: Harris Tzovanakis <me@drjova.com>
  • Loading branch information
drjova committed Mar 15, 2016
1 parent ec9e53b commit de14115
Show file tree
Hide file tree
Showing 10 changed files with 440 additions and 12 deletions.
34 changes: 33 additions & 1 deletion invenio_files_rest/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
from markupsafe import Markup
from wtforms.validators import ValidationError

from .models import Bucket, FileInstance, Location, ObjectVersion, slug_pattern
from .models import Bucket, FileInstance, Location, MultipartObject, \
ObjectVersion, slug_pattern
from .tasks import verify_checksum


Expand Down Expand Up @@ -233,6 +234,33 @@ def action_verify_checksum(self, ids):
'error') # pragma: no cover


class MultipartObjectModelView(ModelView):
"""ModelView for the objects."""

filter_converter = FilterConverter()
can_create = False
can_edit = False
can_delete = False
can_view_details = True
column_formatters = dict(
file_instance=link('File', lambda o: url_for(
'fileinstance.index_view', flt0_0=o.file_id)),
)
column_labels = dict(
id=_('ID'),
complete=_('Complete'),
file_instance=_('File'),
)
column_list = (
'upload_id', 'complete', 'created', 'updated', 'file_instance', )
column_details_list = (
'upload_id', 'complete', 'created', 'updated', 'file_instance', )
column_filters = (
'upload_id', 'complete', 'created', 'updated', )
column_default_sort = ('upload_id', True)
page_size = 25


location_adminview = dict(
modelview=LocationModelView,
model=Location,
Expand All @@ -249,3 +277,7 @@ def action_verify_checksum(self, ids):
modelview=FileInstanceModelView,
model=FileInstance,
category=_('Files'))
multipartobject_adminview = dict(
modelview=MultipartObjectModelView,
model=MultipartObject,
category=_('Files'))
3 changes: 3 additions & 0 deletions invenio_files_rest/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,6 @@

FILES_REST_DEFAULT_PERMISSION_FACTORY = "invenio_files_rest.permissions" \
":permission_factory"

FILES_REST_UPLOAD_FACTORY = None
"""Import path of factory used to parse chunked upload parameters."""
8 changes: 8 additions & 0 deletions invenio_files_rest/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ class FileInstanceAlreadySetError(FilesException):

class InvalidOperationError(FilesException):
"""Exception raise when an invalid operation is performed."""


class MultipartObjectException(FilesException):
"""Exception for multipart objects."""


class MultipartObjectAlreadyCompleted(MultipartObjectException):
"""Exception raised when multipart object is already completed."""
7 changes: 7 additions & 0 deletions invenio_files_rest/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from . import config
from .permissions import permission_factory
from .storage import pyfs_storage_factory
from .uploader import ng_file_upload
from .views import blueprint


Expand All @@ -53,6 +54,12 @@ def permission_factory(self):
imp = self.app.config.get("FILES_REST_DEFAULT_PERMISSION_FACTORY")
return import_string(imp) if imp else permission_factory

@cached_property
def upload_factory(self):
"""Load default permission factory."""
imp = self.app.config["FILES_REST_UPLOAD_FACTORY"]
return import_string(imp) if imp else ng_file_upload


class InvenioFilesREST(object):
"""Invenio-Files-REST extension."""
Expand Down
44 changes: 43 additions & 1 deletion invenio_files_rest/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
import mimetypes
from time import time

from flask import current_app, request
from flask import abort, current_app, request
from invenio_db import db
from werkzeug.datastructures import Headers
from werkzeug.wsgi import FileWrapper

from .models import MultipartObject


def send_stream(stream, filename, size, mtime, mimetype=None, restricted=False,
as_attachment=False, etag=None, content_md5=None,
Expand Down Expand Up @@ -79,3 +82,42 @@ def send_stream(stream, filename, size, mtime, mimetype=None, restricted=False,
rv = rv.make_conditional(request)

return rv


def proccess_chunked_upload(bucket_id, key, content_length=None):
"""Proccess chunked upload.
:param bucket_id_id: The bucket_id_id
:param key: The filename
"""
if request.args.get('uploads', ''):
# requesting a new uplaod_id
obj = MultipartObject.create(bucket_id, key)
db.session.commit()
return obj
elif request.form.get('upload_id'):
params = current_app.extensions[
'invenio-files-rest'].upload_factory(
request.form)
# Get the upload_id
upload_id = request.form.get('upload_id')
if upload_id:
# Get the upload object
obj = MultipartObject.get(upload_id)
# If it has chunks proccess them otherwise throw error
if params.get('current'):
# Update the file
uploaded_file = request.files['file']
if not uploaded_file:
abort(400, 'file missing in request.')
# If current chunk less than total chunks
if params.get('current') <= params.get('total'):
obj.set_contents(
uploaded_file, size=content_length
)
# If the current chunk < avg size finalize
if params.get('current') < params.get('size'):
obj.finalize()
db.session.commit()
return obj
abort(400, 'Not valid chunk parameters.')
153 changes: 152 additions & 1 deletion invenio_files_rest/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@
from sqlalchemy.orm.exc import MultipleResultsFound
from sqlalchemy_utils.types import UUIDType

from .errors import FileInstanceAlreadySetError, InvalidOperationError
from .errors import FileInstanceAlreadySetError, InvalidOperationError, \
MultipartObjectAlreadyCompleted

slug_pattern = re.compile("^[a-z][a-z0-9-]+$")

Expand Down Expand Up @@ -454,6 +455,22 @@ def verify_checksum(self, progress_callback=None, **kwargs):
self.last_check_at = datetime.utcnow()
return self.last_check

def update_contents(self, stream, chunk_size=None,
progress_callback=None, **kwargs):
"""Save contents of stream to this file.
:param obj: ObjectVersion instance from where this file is accessed
from.
:param stream: File-like stream.
"""
if not self.writable:
raise ValueError("File instance is not writable.")
self.set_uri(
*self.storage(**kwargs).update(
stream, chunk_size=chunk_size,
progress_callback=progress_callback
), readable=False, writable=True)

def set_contents(self, stream, chunk_size=None,
progress_callback=None, **kwargs):
"""Save contents of stream to this file.
Expand Down Expand Up @@ -797,10 +814,144 @@ def relink_all(cls, old_file, new_file):
ObjectVersion.query.filter_by(file_id=str(old_file.id)).update({
ObjectVersion.file_id: str(new_file.id)})

def serialize(self):
"""Dummy serialization."""
return dict(
file_id=str(self.file.id),
version_id=str(self.version_id),
bucket=str(self.bucket.id)
)


class MultipartObject(db.Model, Timestamp):
"""Model for storing files in chunks.
A multipart object handles the file upload in chunks.
First it creates an ``upload_id`` and a file instance and updates the file
until the ``finalize()`` called. Then the multipart object will mark the
instance as completed and create a ObjectVersion with the uploaded file.
"""

__tablename__ = 'files_multipart_object'

bucket_id = db.Column(
UUIDType,
db.ForeignKey(Bucket.id, ondelete='RESTRICT'),
default=uuid.uuid4,
primary_key=True, )
"""Bucket identifier."""

key = db.Column(
db.String(255),
primary_key=True, )
"""Key identifying the object."""

upload_id = db.Column(
UUIDType,
default=uuid.uuid4, )
"""Identifier for the specific version of an object."""

file_id = db.Column(
UUIDType,
db.ForeignKey(FileInstance.id, ondelete='RESTRICT'), nullable=True)
"""File instance for this object version.
A null value in this column defines that the object has been deleted.
"""

complete = db.Column(db.Boolean, nullable=False, default=True)
"""Defines if object is the completed."""

# Relationships definitions
bucket = db.relationship(Bucket, backref='multipart_objects')
"""Relationship to buckets."""

file = db.relationship(FileInstance, backref='multipart_objects')
"""Relationship to file instance."""

def __repr__(self):
"""Return representation of the multipart object."""
return "{0}:{2}:{1}".format(
self.bucket_id, self.key, self.upload_id)

def set_contents(self, stream, size=None, chunk_size=None,
progress_callback=None):
"""Save contents of stream to file instance.
If a file instance has already been set, this methods raises an
``MultipartObjectAlreadyCompleted`` exception.
:param stream: File-like stream.
:param size: Size of stream if known.
:param chunk_size: Desired chunk size to read stream in. It is up to
the storage interface if it respects this value.
"""
if self.complete:
raise MultipartObjectAlreadyCompleted()

self.file.update_contents(
stream, size=size, chunk_size=chunk_size,
progress_callback=progress_callback, objectversion=self)

return self

def finalize(self):
"""Move the multipart object to a versioned object."""
if self.complete:
raise MultipartObjectAlreadyCompleted()

self.file.writable = False
self.file.readable = True
# Update bucket size
self.bucket.size += self.file.size
# Create a Object version with the specific file
obj = ObjectVersion.create(
self.bucket, self.key
)
obj.set_file(self.file)
db.session.add(obj)
return obj

@classmethod
def create(cls, bucket, key, **kwargs):
"""Create a new object in a bucket."""
bucket = bucket if isinstance(bucket, Bucket) else Bucket.get(bucket)
with db.session.begin_nested():
obj = cls(
bucket=bucket,
key=key,
upload_id=uuid.uuid4(),
complete=False,
file=FileInstance(),
)
# Create the file
db.session.add(obj)
return obj

@classmethod
def get(cls, upload_id):
"""Fetch a specific multipart object."""
args = [
cls.complete.is_(False),
cls.upload_id == upload_id,
]

return cls.query.filter(*args).one_or_none()

def serialize(self):
"""Dummy serialization."""
return dict(
complete=str(self.complete),
file_id=str(self.file.id),
upload_id=str(self.upload_id),
bucket=str(self.bucket.id)
)

__all__ = (
'Bucket',
'FileInstance',
'Location',
'MultipartObject',
'ObjectVersion',
)
20 changes: 20 additions & 0 deletions invenio_files_rest/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ def send_file(self, **kwargs):
"""Send the file to the client."""
raise NotImplementedError

def update(self, incoming_stream, chunk_size=None, progress_callback=None):
"""Update an existed file in the storage."""
raise NotImplementedError

def save(self, incoming_stream, chunk_size=None, progress_callback=None):
"""Create a new file in the storage."""
raise NotImplementedError
Expand Down Expand Up @@ -168,6 +172,22 @@ def open(self):
"""
return opener.open(self.file.uri, mode='rb')

def update(self, incoming_stream, chunk_size=None, progress_callback=None):
"""Update a file in the file system."""
fs = opener.opendir(self.make_path(), create_dir=True)
fp = fs.open(self.filename, 'ab')
try:
bytes_written, checksum = self._write_stream(
incoming_stream, fp, chunk_size=chunk_size,
progress_callback=progress_callback)
finally:
fp.close()

uri = fs.getpathurl(self.filename, allow_none=True) or \
fs.getsyspath(self.filename, allow_none=True)

return uri, bytes_written, checksum

def save(self, incoming_stream, chunk_size=None, progress_callback=None):
"""Save file in the file system."""
fs = opener.opendir(self.file.uri or self.make_path(), create_dir=True)
Expand Down
Loading

0 comments on commit de14115

Please sign in to comment.