Skip to content

Commit

Permalink
WIP restful: file uploading in chunks addition
Browse files Browse the repository at this point in the history
Signed-off-by: Harris Tzovanakis <me@drjova.com>
  • Loading branch information
drjova committed Apr 6, 2016
1 parent eae5edc commit 8401658
Show file tree
Hide file tree
Showing 10 changed files with 458 additions and 23 deletions.
34 changes: 33 additions & 1 deletion invenio_files_rest/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
from markupsafe import Markup
from wtforms.validators import ValidationError

from .models import Bucket, FileInstance, Location, ObjectVersion, slug_pattern
from .models import Bucket, FileInstance, Location, MultipartObject, \
ObjectVersion, slug_pattern
from .tasks import verify_checksum


Expand Down Expand Up @@ -233,6 +234,33 @@ def action_verify_checksum(self, ids):
'error') # pragma: no cover


class MultipartObjectModelView(ModelView):
"""ModelView for the objects."""

filter_converter = FilterConverter()
can_create = False
can_edit = False
can_delete = False
can_view_details = True
column_formatters = dict(
file_instance=link('File', lambda o: url_for(
'fileinstance.index_view', flt0_0=o.file_id)),
)
column_labels = dict(
id=_('ID'),
complete=_('Complete'),
file_instance=_('File'),
)
column_list = (
'upload_id', 'complete', 'created', 'updated', 'file_instance', )
column_details_list = (
'upload_id', 'complete', 'created', 'updated', 'file_instance', )
column_filters = (
'upload_id', 'complete', 'created', 'updated', )
column_default_sort = ('upload_id', True)
page_size = 25


location_adminview = dict(
modelview=LocationModelView,
model=Location,
Expand All @@ -249,3 +277,7 @@ def action_verify_checksum(self, ids):
modelview=FileInstanceModelView,
model=FileInstance,
category=_('Files'))
multipartobject_adminview = dict(
modelview=MultipartObjectModelView,
model=MultipartObject,
category=_('Files'))
7 changes: 5 additions & 2 deletions invenio_files_rest/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,8 @@
FILES_REST_STORAGE_FACTORY = None
"""Import path of factory used to create a storage instance."""

FILES_REST_PERMISSION_FACTORY = None
"""Import path of permission factory."""
FILES_REST_DEFAULT_PERMISSION_FACTORY = "invenio_files_rest.permissions" \
":permission_factory"

FILES_REST_UPLOAD_FACTORY = None
"""Import path of factory used to parse chunked upload parameters."""
8 changes: 8 additions & 0 deletions invenio_files_rest/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,11 @@ class FileInstanceAlreadySetError(FilesException):

class InvalidOperationError(FilesException):
"""Exception raise when an invalid operation is performed."""


class MultipartObjectException(FilesException):
"""Exception for multipart objects."""


class MultipartObjectAlreadyCompleted(MultipartObjectException):
"""Exception raised when multipart object is already completed."""
7 changes: 7 additions & 0 deletions invenio_files_rest/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

from . import config
from .storage import pyfs_storage_factory
from .uploader import ng_file_upload
from .views import blueprint


Expand Down Expand Up @@ -81,6 +82,12 @@ def file_size_limiter(self):
from invenio_files_rest.helpers import file_size_limiter
return file_size_limiter

@cached_property
def upload_factory(self):
"""Load default permission factory."""
imp = self.app.config["FILES_REST_UPLOAD_FACTORY"]
return import_string(imp) if imp else ng_file_upload


class InvenioFilesREST(object):
"""Invenio-Files-REST extension."""
Expand Down
56 changes: 53 additions & 3 deletions invenio_files_rest/helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015, 2016 CERN.
#
Expand Down Expand Up @@ -29,10 +27,17 @@
import mimetypes
from time import time

from flask import current_app, request
from flask import abort, current_app, request
from invenio_db import db
from werkzeug.datastructures import Headers
from werkzeug.local import LocalProxy
from werkzeug.wsgi import FileWrapper

from .models import MultipartObject

current_files_rest = LocalProxy(
lambda: current_app.extensions['invenio-files-rest'])


def send_stream(stream, filename, size, mtime, mimetype=None, restricted=False,
as_attachment=False, etag=None, content_md5=None,
Expand Down Expand Up @@ -88,3 +93,48 @@ def file_size_limiter(bucket):
'Bucket quota is {0} bytes. {1} bytes are currently '
'used.'.format(bucket.quota_size, bucket.size))
return (None, None)


def proccess_chunked_upload(bucket, key, content_length=None):
"""Proccess chunked upload.
:param bucket: The bucket
:param key: The filename
"""
if request.args.get('uploads') is not None:
size = int(request.headers.get('Uploader-File-Size', 0))
# check content size limit
size_limit, size_limit_reason = current_files_rest.file_size_limiter(
bucket=bucket)
if size_limit is not None and size > size_limit:
abort(400, size_limit_reason)
# requesting a new uplaod_id
obj = MultipartObject.create(bucket, key, size)
db.session.commit()
return obj
elif request.form.get('upload_id') is not None:
params = current_app.extensions[
'invenio-files-rest'].upload_factory(
request.form)
# Get the upload_id
upload_id = request.form.get('upload_id')
if upload_id:
# Get the upload object
obj = MultipartObject.get(upload_id)
# If it has chunks proccess them otherwise throw error
if params.get('current'):
# Update the file
uploaded_file = request.files['file']
if not uploaded_file:
abort(400, 'file missing in request.')
# If current chunk less than total chunks
if params.get('current') <= params.get('total'):
obj.set_contents(
uploaded_file, size=content_length
)
# If the current chunk < avg size finalize
if params.get('current') < params.get('size'):
obj.finalize()
db.session.commit()
return obj
abort(400, 'Not valid chunk parameters.')
153 changes: 152 additions & 1 deletion invenio_files_rest/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@
from sqlalchemy.orm.exc import MultipleResultsFound
from sqlalchemy_utils.types import UUIDType

from .errors import FileInstanceAlreadySetError, InvalidOperationError
from .errors import FileInstanceAlreadySetError, InvalidOperationError, \
MultipartObjectAlreadyCompleted

slug_pattern = re.compile("^[a-z][a-z0-9-]+$")

Expand Down Expand Up @@ -456,6 +457,22 @@ def verify_checksum(self, progress_callback=None, **kwargs):
self.last_check_at = datetime.utcnow()
return self.last_check

def update_contents(self, stream, chunk_size=None,
progress_callback=None, **kwargs):
"""Save contents of stream to this file.
:param obj: ObjectVersion instance from where this file is accessed
from.
:param stream: File-like stream.
"""
if not self.writable:
raise ValueError("File instance is not writable.")
self.set_uri(
*self.storage(**kwargs).update(
stream, chunk_size=chunk_size,
progress_callback=progress_callback
), readable=False, writable=True)

def set_contents(self, stream, chunk_size=None,
progress_callback=None, **kwargs):
"""Save contents of stream to this file.
Expand Down Expand Up @@ -803,10 +820,144 @@ def relink_all(cls, old_file, new_file):
ObjectVersion.query.filter_by(file_id=str(old_file.id)).update({
ObjectVersion.file_id: str(new_file.id)})

def serialize(self):
"""Dummy serialization."""
return dict(
file_id=str(self.file.id),
version_id=str(self.version_id),
bucket=str(self.bucket.id)
)


class MultipartObject(db.Model, Timestamp):
"""Model for storing files in chunks.
A multipart object handles the file upload in chunks.
First it creates an ``upload_id`` and a file instance and updates the file
until the ``finalize()`` called. Then the multipart object will mark the
instance as completed and create a ObjectVersion with the uploaded file.
"""

__tablename__ = 'files_multipart_object'

bucket_id = db.Column(
UUIDType,
db.ForeignKey(Bucket.id, ondelete='RESTRICT'),
default=uuid.uuid4,
primary_key=True, )
"""Bucket identifier."""

key = db.Column(
db.String(255),
primary_key=True, )
"""Key identifying the object."""

upload_id = db.Column(
UUIDType,
default=uuid.uuid4, )
"""Identifier for the specific version of an object."""

file_id = db.Column(
UUIDType,
db.ForeignKey(FileInstance.id, ondelete='RESTRICT'), nullable=True)
"""File instance for this object version.
A null value in this column defines that the object has been deleted.
"""

complete = db.Column(db.Boolean, nullable=False, default=True)
"""Defines if object is the completed."""

# Relationships definitions
bucket = db.relationship(Bucket, backref='multipart_objects')
"""Relationship to buckets."""

file = db.relationship(FileInstance, backref='multipart_objects')
"""Relationship to file instance."""

def __repr__(self):
"""Return representation of the multipart object."""
return "{0}:{2}:{1}".format(
self.bucket_id, self.key, self.upload_id)

def set_contents(self, stream, size=None, chunk_size=None,
progress_callback=None):
"""Save contents of stream to file instance.
If a file instance has already been set, this methods raises an
``MultipartObjectAlreadyCompleted`` exception.
:param stream: File-like stream.
:param size: Size of stream if known.
:param chunk_size: Desired chunk size to read stream in. It is up to
the storage interface if it respects this value.
"""
if self.complete:
raise MultipartObjectAlreadyCompleted()

self.file.update_contents(
stream, size=size, chunk_size=chunk_size,
progress_callback=progress_callback, objectversion=self)

return self

def finalize(self):
"""Move the multipart object to a versioned object."""
if self.complete:
raise MultipartObjectAlreadyCompleted()

self.file.writable = False
self.file.readable = True
# Create a Object version with the specific file
obj = ObjectVersion.create(
self.bucket, self.key
)
obj.set_file(self.file)
db.session.add(obj)
return obj

@classmethod
def create(cls, bucket, key, size, **kwargs):
"""Create a new object in a bucket."""
bucket = bucket if isinstance(bucket, Bucket) else Bucket.get(bucket)
with db.session.begin_nested():
obj = cls(
bucket=bucket,
key=key,
upload_id=uuid.uuid4(),
complete=False,
file=FileInstance(),
)
# Update bucket size
bucket.size += size
# Create the file
db.session.add(obj)
return obj

@classmethod
def get(cls, upload_id):
"""Fetch a specific multipart object."""
args = [
cls.complete.is_(False),
cls.upload_id == upload_id,
]

return cls.query.filter(*args).one_or_none()

def serialize(self):
"""Dummy serialization."""
return dict(
complete=str(self.complete),
file_id=str(self.file.id),
upload_id=str(self.upload_id),
bucket=str(self.bucket.id)
)

__all__ = (
'Bucket',
'FileInstance',
'Location',
'MultipartObject',
'ObjectVersion',
)
21 changes: 21 additions & 0 deletions invenio_files_rest/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ def send_file(self, **kwargs):
"""Send the file to the client."""
raise NotImplementedError

def update(self, incoming_stream, chunk_size=None, progress_callback=None):
"""Update an existed file in the storage."""
raise NotImplementedError

def save(self, incoming_stream, chunk_size=None, progress_callback=None):
"""Create a new file in the storage."""
raise NotImplementedError
Expand Down Expand Up @@ -175,6 +179,23 @@ def open(self):
"""
return opener.open(self.file.uri, mode='rb')

def update(self, incoming_stream, size=None, chunk_size=None,
progress_callback=None):
"""Update a file in the file system."""
fs = opener.opendir(self.make_path(), create_dir=True)
fp = fs.open(self.filename, 'ab')
try:
bytes_written, checksum = self._write_stream(
incoming_stream, fp, chunk_size=chunk_size,
progress_callback=progress_callback)
finally:
fp.close()

uri = fs.getpathurl(self.filename, allow_none=True) or \
fs.getsyspath(self.filename, allow_none=True)

return uri, bytes_written, checksum

def save(self, incoming_stream, size=None, chunk_size=None,
progress_callback=None):
"""Save file in the file system."""
Expand Down

0 comments on commit 8401658

Please sign in to comment.