Skip to content

Commit

Permalink
@tus/server: support Tus-Max-Size (#517)
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Dec 21, 2023
1 parent ba8ef31 commit aafb40d
Show file tree
Hide file tree
Showing 11 changed files with 489 additions and 16 deletions.
5 changes: 5 additions & 0 deletions packages/server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ Creates a new tus server with options.

The route to accept requests (`string`).

#### `options.maxSize`

Max file size (in bytes) allowed when uploading (`number` | (`(req, id: string | null) => Promise<number> | number`)).
When providing a function during the OPTIONS request the id will be `null`.

#### `options.relativeLocation`

Return a relative URL as the `Location` header to the client (`boolean`).
Expand Down
8 changes: 8 additions & 0 deletions packages/server/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ export const ERRORS = {
status_code: 410,
body: 'The file for this url no longer exists\n',
},
ERR_SIZE_EXCEEDED: {
status_code: 413,
body: "upload's size exceeded\n",
},
ERR_MAX_SIZE_EXCEEDED: {
status_code: 413,
body: 'Maximum size exceeded\n',
},
INVALID_LENGTH: {
status_code: 400,
body: 'Upload-Length or Upload-Defer-Length header required\n',
Expand Down
85 changes: 76 additions & 9 deletions packages/server/src/handlers/BaseHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ import EventEmitter from 'node:events'
import type {ServerOptions} from '../types'
import type {DataStore, CancellationContext} from '../models'
import type http from 'node:http'
import stream from 'node:stream'
import {Upload} from '../models'
import {ERRORS} from '../constants'
import stream from 'node:stream/promises'
import {addAbortSignal, PassThrough} from 'stream'
import {StreamLimiter} from '../models/StreamLimiter'

const reExtractFileID = /([^/]+)\/?$/
const reForwardedHost = /host="?([^";]+)/
Expand Down Expand Up @@ -132,24 +135,24 @@ export class BaseHandler extends EventEmitter {
req: http.IncomingMessage,
id: string,
offset: number,
maxFileSize: number,
context: CancellationContext
) {
return new Promise<number>(async (resolve, reject) => {
// Abort early if the operation has been cancelled.
if (context.signal.aborted) {
reject(ERRORS.ABORTED)
return
}

const proxy = new stream.PassThrough()
stream.addAbortSignal(context.signal, proxy)
// Create a PassThrough stream as a proxy to manage the request stream.
// This allows for aborting the write process without affecting the incoming request stream.
const proxy = new PassThrough()
addAbortSignal(context.signal, proxy)

proxy.on('error', (err) => {
req.unpipe(proxy)
if (err.name === 'AbortError') {
reject(ERRORS.ABORTED)
} else {
reject(err)
}
reject(err.name === 'AbortError' ? ERRORS.ABORTED : err)
})

req.on('error', (err) => {
Expand All @@ -158,7 +161,71 @@ export class BaseHandler extends EventEmitter {
}
})

this.store.write(req.pipe(proxy), id, offset).then(resolve).catch(reject)
// Pipe the request stream through the proxy. We use the proxy instead of the request stream directly
// to ensure that errors in the pipeline do not cause the request stream to be destroyed,
// which would result in a socket hangup error for the client.
stream
.pipeline(req.pipe(proxy), new StreamLimiter(maxFileSize), async (stream) => {
return this.store.write(stream as StreamLimiter, id, offset)
})
.then(resolve)
.catch(reject)
})
}

getConfiguredMaxSize(req: http.IncomingMessage, id: string | null) {
if (typeof this.options.maxSize === 'function') {
return this.options.maxSize(req, id)
}
return this.options.maxSize ?? 0
}

/**
* Calculates the maximum allowed size for the body of an upload request.
* This function considers both the server's configured maximum size and
* the specifics of the upload, such as whether the size is deferred or fixed.
*/
async calculateMaxBodySize(
req: http.IncomingMessage,
file: Upload,
configuredMaxSize?: number
) {
// Use the server-configured maximum size if it's not explicitly provided.
configuredMaxSize ??= await this.getConfiguredMaxSize(req, file.id)

// Parse the Content-Length header from the request (default to 0 if not set).
const length = parseInt(req.headers['content-length'] || '0', 10)
const offset = file.offset

const hasContentLengthSet = req.headers['content-length'] !== undefined
const hasConfiguredMaxSizeSet = configuredMaxSize > 0

if (file.sizeIsDeferred) {
// For deferred size uploads, if it's not a chunked transfer, check against the configured maximum size.
if (
hasContentLengthSet &&
hasConfiguredMaxSizeSet &&
offset + length > configuredMaxSize
) {
throw ERRORS.ERR_SIZE_EXCEEDED
}

if (hasConfiguredMaxSizeSet) {
return configuredMaxSize - offset
} else {
return Number.MAX_SAFE_INTEGER
}
}

// Check if the upload fits into the file's size when the size is not deferred.
if (offset + length > (file.size || 0)) {
throw ERRORS.ERR_SIZE_EXCEEDED
}

if (hasContentLengthSet) {
return length
}

return (file.size || 0) - offset
}
}
8 changes: 7 additions & 1 deletion packages/server/src/handlers/OptionsHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ import type http from 'node:http'
// A successful response indicated by the 204 No Content status MUST contain
// the Tus-Version header. It MAY include the Tus-Extension and Tus-Max-Size headers.
export class OptionsHandler extends BaseHandler {
async send(_: http.IncomingMessage, res: http.ServerResponse) {
async send(req: http.IncomingMessage, res: http.ServerResponse) {
const maxSize = await this.getConfiguredMaxSize(req, null)

if (maxSize) {
res.setHeader('Tus-Max-Size', maxSize)
}

const allowedHeaders = [...HEADERS, ...(this.options.allowedHeaders ?? [])]

res.setHeader('Access-Control-Allow-Methods', ALLOWED_METHODS)
Expand Down
9 changes: 8 additions & 1 deletion packages/server/src/handlers/PatchHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ export class PatchHandler extends BaseHandler {
await this.options.onIncomingRequest(req, res, id)
}

const maxFileSize = await this.getConfiguredMaxSize(req, id)

const lock = await this.acquireLock(req, id, context)

let upload: Upload
Expand Down Expand Up @@ -90,11 +92,16 @@ export class PatchHandler extends BaseHandler {
throw ERRORS.INVALID_LENGTH
}

if (maxFileSize > 0 && size > maxFileSize) {
throw ERRORS.ERR_MAX_SIZE_EXCEEDED
}

await this.store.declareUploadLength(id, size)
upload.size = size
}

newOffset = await this.writeToStore(req, id, offset, context)
const maxBodySize = await this.calculateMaxBodySize(req, upload, maxFileSize)
newOffset = await this.writeToStore(req, id, offset, maxBodySize, context)
} finally {
await lock.unlock()
}
Expand Down
15 changes: 14 additions & 1 deletion packages/server/src/handlers/PostHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ export class PostHandler extends BaseHandler {
throw ERRORS.FILE_WRITE_ERROR
}

const maxFileSize = await this.getConfiguredMaxSize(req, id)

if (
upload_length &&
maxFileSize > 0 &&
Number.parseInt(upload_length, 10) > maxFileSize
) {
throw ERRORS.ERR_MAX_SIZE_EXCEEDED
}

let metadata
if ('upload-metadata' in req.headers) {
try {
Expand Down Expand Up @@ -92,12 +102,14 @@ export class PostHandler extends BaseHandler {
}

const lock = await this.acquireLock(req, id, context)

let isFinal: boolean
let url: string
let headers: {
'Upload-Offset'?: string
'Upload-Expires'?: string
}

try {
await this.store.create(upload)
url = this.generateUrl(req, upload.id)
Expand All @@ -109,7 +121,8 @@ export class PostHandler extends BaseHandler {

// The request MIGHT include a Content-Type header when using creation-with-upload extension
if (validateHeader('content-type', req.headers['content-type'])) {
const newOffset = await this.writeToStore(req, id, 0, context)
const bodyMaxSize = await this.calculateMaxBodySize(req, upload, maxFileSize)
const newOffset = await this.writeToStore(req, id, 0, bodyMaxSize, context)

headers['Upload-Offset'] = newOffset.toString()
isFinal = newOffset === Number.parseInt(upload_length as string, 10)
Expand Down
34 changes: 34 additions & 0 deletions packages/server/src/models/StreamLimiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import {Transform, TransformCallback} from 'stream'
import {ERRORS} from '../constants'

// TODO: create HttpError and use it everywhere instead of throwing objects
export class MaxFileExceededError extends Error {
status_code: number
body: string

constructor() {
super(ERRORS.ERR_MAX_SIZE_EXCEEDED.body)
this.status_code = ERRORS.ERR_MAX_SIZE_EXCEEDED.status_code
this.body = ERRORS.ERR_MAX_SIZE_EXCEEDED.body
Object.setPrototypeOf(this, MaxFileExceededError.prototype)
}
}

export class StreamLimiter extends Transform {
private maxSize: number
private currentSize = 0

constructor(maxSize: number) {
super()
this.maxSize = maxSize
}

_transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
this.currentSize += chunk.length
if (this.currentSize > this.maxSize) {
callback(new MaxFileExceededError())
} else {
callback(null, chunk)
}
}
}
7 changes: 7 additions & 0 deletions packages/server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ export type ServerOptions = {
*/
path: string

/**
* Max file size allowed when uploading
*/
maxSize?:
| number
| ((req: http.IncomingMessage, uploadId: string | null) => Promise<number> | number)

/**
* Return a relative URL as the `Location` header.
*/
Expand Down
64 changes: 64 additions & 0 deletions packages/server/test/PatchHandler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import {EVENTS} from '../src/constants'
import {EventEmitter} from 'node:events'
import {addPipableStreamBody} from './utils'
import {MemoryLocker} from '../src'
import streamP from 'node:stream/promises'
import stream from 'node:stream'

describe('PatchHandler', () => {
const path = '/test/output'
Expand Down Expand Up @@ -182,4 +184,66 @@ describe('PatchHandler', () => {
assert.ok(spy.args[0][1])
assert.equal(spy.args[0][2].offset, 10)
})

it('should throw max size exceeded error when upload-length is higher then the maxSize', async () => {
handler = new PatchHandler(store, {path, maxSize: 5, locker: new MemoryLocker()})
req.headers = {
'upload-offset': '0',
'upload-length': '10',
'content-type': 'application/offset+octet-stream',
}
req.url = `${path}/file`

store.hasExtension.withArgs('creation-defer-length').returns(true)
store.getUpload.resolves(new Upload({id: '1234', offset: 0}))
store.write.resolves(5)
store.declareUploadLength.resolves()

try {
await handler.send(req, res, context)
throw new Error('failed test')
} catch (e) {
assert.equal('body' in e, true)
assert.equal('status_code' in e, true)
assert.equal(e.body, 'Maximum size exceeded\n')
assert.equal(e.status_code, 413)
}
})

it('should throw max size exceeded error when the request body is bigger then the maxSize', async () => {
handler = new PatchHandler(store, {path, maxSize: 5, locker: new MemoryLocker()})
const req = addPipableStreamBody(
httpMocks.createRequest({
method: 'PATCH',
url: `${path}/1234`,
body: Buffer.alloc(30),
})
)
const res = httpMocks.createResponse({req})
req.headers = {
'upload-offset': '0',
'content-type': 'application/offset+octet-stream',
}
req.url = `${path}/file`

store.getUpload.resolves(new Upload({id: '1234', offset: 0}))
store.write.callsFake(async (readable: http.IncomingMessage | stream.Readable) => {
const writeStream = new stream.PassThrough()
await streamP.pipeline(readable, writeStream)
return writeStream.readableLength
})
store.declareUploadLength.resolves()

try {
await handler.send(req, res, context)
throw new Error('failed test')
} catch (e) {
assert.equal(e.message !== 'failed test', true, 'failed test')
assert.equal('body' in e, true)
assert.equal('status_code' in e, true)
assert.equal(e.body, 'Maximum size exceeded\n')
assert.equal(e.status_code, 413)
assert.equal(context.signal.aborted, true)
}
})
})
18 changes: 14 additions & 4 deletions packages/server/test/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,27 @@ export function addPipableStreamBody<T extends httpMocks.MockRequest<unknown>>(
mockRequest: T
) {
// Create a Readable stream that simulates the request body
const bodyStream = new stream.Readable({
const bodyStream = new stream.Duplex({
read() {
this.push(JSON.stringify(mockRequest.body))
this.push(
mockRequest.body instanceof Buffer
? mockRequest.body
: JSON.stringify(mockRequest.body)
)
this.push(null)
},
})

// Add the pipe method to the mockRequest
// @ts-expect-error pipe exists
// @ts-ignore

Check warning on line 20 in packages/server/test/utils.ts

View workflow job for this annotation

GitHub Actions / lts/hydrogen

Do not use "@ts-ignore" because it alters compilation errors

Check warning on line 20 in packages/server/test/utils.ts

View workflow job for this annotation

GitHub Actions / node

Do not use "@ts-ignore" because it alters compilation errors
mockRequest.pipe = function (dest: stream.Writable) {
bodyStream.pipe(dest)
return bodyStream.pipe(dest)
}

// Add the unpipe method to the mockRequest
// @ts-ignore

Check warning on line 26 in packages/server/test/utils.ts

View workflow job for this annotation

GitHub Actions / lts/hydrogen

Do not use "@ts-ignore" because it alters compilation errors

Check warning on line 26 in packages/server/test/utils.ts

View workflow job for this annotation

GitHub Actions / node

Do not use "@ts-ignore" because it alters compilation errors
mockRequest.unpipe = function (dest: stream.Writable) {
return bodyStream.unpipe(dest)
}
return mockRequest
}

0 comments on commit aafb40d

Please sign in to comment.