Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the Tus-Max-Size extension #310

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions lib/handlers/PatchHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
const BaseHandler = require('./BaseHandler');
const File = require('../models/File');
const { ERRORS, EVENTS } = require('../constants');
const stream = require('stream');
const debug = require('debug');
const { StreamDuplexLimiter: StreamLimiter } = require('../models/StreamLimiter');
const log = debug('tus-node-server:handlers:patch');

class PatchHandler extends BaseHandler {
/**
* Write data to the DataStore and return the new offset.
Expand Down Expand Up @@ -61,14 +64,25 @@ class PatchHandler extends BaseHandler {
file.upload_length = upload_length;
}

const new_offset = await this.store.write(req, file_id, offset);
if (new_offset === parseInt(file.upload_length, 10)) {
let max_bytes = Number.MAX_SAFE_INTEGER;
if (req.headers['content-length']) {
max_bytes = Math.min(max_bytes, req.headers['content-length']);
}
if (file.upload_length !== undefined) {
const remaining_bytes = parseInt(file.upload_length, 10) - file.size;
max_bytes = Math.min(max_bytes, remaining_bytes);
}

const limiter = stream.pipeline(req, new StreamLimiter(max_bytes), () => {});

file.size = await this.store.write(limiter, file_id, offset);
if (file.size === parseInt(file.upload_length, 10)) {
this.emit(EVENTS.EVENT_UPLOAD_COMPLETE, { file: new File(file_id, file.upload_length, file.upload_defer_length, file.upload_metadata) });
}

// It MUST include the Upload-Offset header containing the new offset.
const headers = {
'Upload-Offset': new_offset,
'Upload-Offset': file.size,
};

// The Server MUST acknowledge successful PATCH requests with the 204
Expand Down
19 changes: 16 additions & 3 deletions lib/handlers/PostHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ const File = require('../models/File');
const Uid = require('../models/Uid');
const RequestValidator = require('../validators/RequestValidator');
const { EVENTS, ERRORS } = require('../constants');
const stream = require('stream');
const debug = require('debug');
const { StreamDuplexLimiter: StreamLimiter } = require('../models/StreamLimiter');
const log = debug('tus-node-server:handlers:post');
class PostHandler extends BaseHandler {

Expand Down Expand Up @@ -70,12 +72,23 @@ class PostHandler extends BaseHandler {

// The request MIGHT include a Content-Type header when using creation-with-upload extension
if (!RequestValidator.isInvalidHeader('content-type', req.headers['content-type'])) {
const new_offset = await this.store.write(req, file.id, 0);
optional_headers['Upload-Offset'] = new_offset;
let max_bytes = Number.MAX_SAFE_INTEGER;
if (req.headers['content-length']) {
max_bytes = Math.min(max_bytes, req.headers['content-length']);
}
if (file.upload_length !== undefined) {
const remaining_bytes = parseInt(file.upload_length, 10) - (file.size ?? 0);
max_bytes = Math.min(max_bytes, remaining_bytes);
}

if (new_offset === parseInt(upload_length, 10)) {
const limiter = stream.pipeline(req, new StreamLimiter(max_bytes), () => {});

file.size = await this.store.write(limiter, file.id, 0);
if (file.size === parseInt(file.upload_length, 10)) {
this.emit(EVENTS.EVENT_UPLOAD_COMPLETE, { file: new File(file_id, file.upload_length, file.upload_defer_length, file.upload_metadata) });
}

optional_headers['Upload-Offset'] = file.size;
}

return this.write(res, 201, { Location: url, ...optional_headers });
Expand Down
96 changes: 96 additions & 0 deletions lib/models/StreamLimiter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
const stream = require('stream');

class StreamTransformLimiter extends stream.Transform {
constructor(limitBytes, options = {}) {
super(options);

this.remainingBytes = limitBytes;

if (options.objectMode) {
throw new Error('Object mode is not supported');
}
}

_transform(chunk, encoding, callback) {
if (!Buffer.isBuffer(chunk)) {
callback(new Error('Only buffers are supported'));
return;
}

const bytes = Math.min(chunk.length, this.remainingBytes);
const data = chunk.subarray(0, bytes);

this.remainingBytes -= data.length;

callback(null, data);
}
}


class StreamDuplexLimiter extends stream.Duplex {
constructor(limitBytes, options = {}) {
super(options);

this.receivedBytes = 0;
this.remainingBytes = limitBytes;

if (options.objectMode) {
throw new Error('Object mode is not supported');
}
}

_write(chunk, encoding, callback) {
if (!Buffer.isBuffer(chunk)) {
callback(new Error('Only buffers are supported'));
return;
}

this.receivedBytes += chunk.length;

if (this.remainingBytes === 0) {
callback();
return;
}

const bytes = Math.min(chunk.length, this.remainingBytes);
const data = chunk.subarray(0, bytes);

this.remainingBytes -= data.length;

const final = this.remainingBytes === 0;

if (final) {
this.push(data, encoding);
this.push(null);

callback();
}
else if (this.push(data, encoding)) {
callback();
}
else {
this.callback = callback;
}
}

_read(size) {
if (this.callback) {
const callback = this.callback;
this.callback = null;
callback();
}
}

_final(callback) {
if (this.readable) {
this.push(null);
}

callback();
}
_destroy(err, callback) {
callback(err);
}
}

module.exports = { StreamTransformLimiter, StreamDuplexLimiter };
119 changes: 118 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.