Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow the same data to be shared by multiple Upload scalars #92

Merged
merged 25 commits into from
Aug 15, 2018
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7b15da6
Upgrade fs-capacitor to 1.0.0-alpha.2
mike-marcacci Jul 22, 2018
a21f10c
Add support for file deduplication
mike-marcacci Jul 22, 2018
51f8447
Add cleanup tests
mike-marcacci Jul 23, 2018
1058294
fs-capacitor@1.0.0-beta.3
mike-marcacci Jul 23, 2018
132188b
More robust cleanup tests
mike-marcacci Jul 23, 2018
5309b93
Only destroy totally unconsumed read streams
mike-marcacci Jul 23, 2018
c3cc78f
Remove setMaxListeners
mike-marcacci Jul 23, 2018
b69f5ef
Release resources on node <10
mike-marcacci Aug 2, 2018
b2d5308
Initial mockup of new API
mike-marcacci Aug 2, 2018
be418a8
Match tests to new behavior
mike-marcacci Aug 2, 2018
1d345e1
Fix error order with node 8
mike-marcacci Aug 2, 2018
c8c49d9
update readme and changelog
mike-marcacci Aug 2, 2018
a2a2072
Reenable max file test
mike-marcacci Aug 2, 2018
902b886
simplify error handling
mike-marcacci Aug 3, 2018
09b2610
Fix test timings on linux
mike-marcacci Aug 3, 2018
5a999d9
Make sure to resume request after unpipe
mike-marcacci Aug 3, 2018
dd9c6b3
Destroy parser before unpiping/resuming
mike-marcacci Aug 3, 2018
ef813ce
Merge branch 'master' into dedup-files
jaydenseric Aug 6, 2018
b0a4da5
Tidy and typo fixes.
jaydenseric Aug 6, 2018
503cdc5
Changelog fixes.
jaydenseric Aug 6, 2018
81f7980
Test simplifications.
jaydenseric Aug 6, 2018
895855c
Use snapshots to better test upload promise resolved enumerable prope…
jaydenseric Aug 6, 2018
6d9ca37
Unskip test.
jaydenseric Aug 6, 2018
c257701
Reordered upload properties.
jaydenseric Aug 6, 2018
2dafb02
Synchronous scripts and tests.
jaydenseric Aug 6, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

## 6.0.0-alpha.1

### Major

- API change: `processRequest` now requires a [`http.ServerResponse`](https://nodejs.org/api/http.html#http_class_http_serverresponse) as its second argument
- API change: upload promises now resolve with a `createReadStream` method instead of a `stream` property
- An `Upload` variable can now be used by multiple resolvers
- Multile `Upload` variables can now use the same multipart data

## 6.0.0-alpha.1

Big thanks to new collaborator [@mike-marcacci](https://github.com/mike-marcacci) for his help solving tricky bugs and edge-cases!

### Major
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
},
"dependencies": {
"busboy": "^0.2.14",
"fs-capacitor": "^0.0.3",
"fs-capacitor": "^1.0.0",
"object-path": "^0.11.4"
},
"devDependencies": {
Expand Down
10 changes: 5 additions & 5 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ app.use(

### Custom middleware

Middleware wraps the async function `processRequest` which accepts a Node.js request and an optional [options object](#options) as arguments. It returns a promise that resolves an operations object for a GraphQL server to consume (usually as the request body). Import it to create custom middleware:
Middleware wraps the async function `processRequest` which accepts a required [http.IncomingMessage](https://nodejs.org/api/http.html#http_class_http_incomingmessage), a required [http.ServerResponse](https://nodejs.org/api/http.html#http_class_http_serverresponse), and an optional [options object](#options) as arguments. It returns a promise that resolves an operations object for a GraphQL server to consume (usually as the request body). Import it to create custom middleware:

```js
import { processRequest } from 'apollo-upload-server'
Expand All @@ -89,10 +89,10 @@ import { processRequest } from 'apollo-upload-server'

A file upload promise that resolves an object containing:

- `stream`
- `filename`
- `mimetype`
- `encoding`
- `filename`: `string` - the filename of the upload
- `mimetype`: `string` - the mimetype of the upload
- `encoding`: `string` - the encoding of the upload
- `createReadStream`: `() => ReadStream` - calling this method returns a readable stream of the upload's contents. Calling `createReadStream` multiple times will create multiple streams which can be read indipendantly of each other. Note that `createReadStream` will throw if called after all resolvers have finished, or after an error has interrupted the request.

It must be added to your types and resolvers:

Expand Down
3 changes: 1 addition & 2 deletions src/errors.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,4 @@ export class MaxFilesUploadError extends UploadError {}
export class MapBeforeOperationsUploadError extends UploadError {}
export class FilesBeforeMapUploadError extends UploadError {}
export class FileMissingUploadError extends UploadError {}
export class UploadPromiseDisconnectUploadError extends UploadError {}
export class FileStreamDisconnectUploadError extends UploadError {}
export class DisconnectUploadError extends UploadError {}
146 changes: 83 additions & 63 deletions src/middleware.mjs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import Busboy from 'busboy'
import objectPath from 'object-path'
import Capacitor from 'fs-capacitor'
import WriteStream from 'fs-capacitor'
import {
SPEC_URL,
MaxFileSizeUploadError,
MaxFilesUploadError,
MapBeforeOperationsUploadError,
FilesBeforeMapUploadError,
FileMissingUploadError,
UploadPromiseDisconnectUploadError,
FileStreamDisconnectUploadError
DisconnectUploadError
} from './errors'

class Upload {
Expand All @@ -30,9 +29,16 @@ class Upload {

export const processRequest = (
request,
response,
{ maxFieldSize, maxFileSize, maxFiles } = {}
) =>
new Promise((resolve, reject) => {
let operations
let operationsPath
let map
let currentStream
let error

const parser = new Busboy({
headers: request.headers,
limits: {
Expand All @@ -43,16 +49,36 @@ export const processRequest = (
}
})

let operations
let operationsPath
let map
let currentStream
const exit = exitError => {
if (error) return
error = exitError

const exit = error => {
reject(error)
parser.destroy(error)

parser.destroy()

if (currentStream) currentStream.destroy(error)

if (map)
for (const upload of map.values())
if (!upload.file) upload.reject(error)

request.unpipe(parser)
request.resume()
}

let isReleased = false
const release = () => {
if (isReleased) return
isReleased = true

if (map)
for (const upload of map.values())
if (upload.file) upload.file.capacitor.destroy()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought capacitor was only going to be used in tests?

}

// Parser Events
// -------------
parser.on('field', (fieldName, value) => {
switch (fieldName) {
case 'operations':
Expand Down Expand Up @@ -93,8 +119,6 @@ export const processRequest = (
for (const [fieldName, paths] of mapEntries) {
map.set(fieldName, new Upload())

// Repopulate operations with the promise wherever the file occurred
// for use by the Upload scalar.
for (const path of paths)
operationsPath.set(path, map.get(fieldName).promise)
}
Expand All @@ -108,7 +132,6 @@ export const processRequest = (
if (!map) {
// Prevent an unhandled error from crashing the process.
stream.on('error', () => {})

stream.resume()

return exit(
Expand All @@ -120,56 +143,57 @@ export const processRequest = (
}

currentStream = stream

stream.on('end', () => {
if (currentStream === stream) currentStream = null
})

if (map.has(fieldName)) {
const capacitor = new Capacitor()
const upload = map.get(fieldName)
if (upload) {
const capacitor = new WriteStream()

capacitor.on('error', () => {
stream.unpipe()
stream.resume()
})

stream.on('limit', () =>
stream.on('limit', () => {
if (currentStream === stream) currentStream = null
stream.unpipe()
capacitor.destroy(
new MaxFileSizeUploadError(
'File truncated as it exceeds the size limit.',
413
)
)
)

stream.on('error', error => {
if (capacitor.finished || capacitor.destroyed) return

// A terminated connection may cause the request to emit a 'close'
// event either before or after the parser encounters an error,
// depending on the Node.js version and the state of stream buffers.
})

if (
error.message ===
// https://github.com/mscdex/dicer/blob/v0.2.5/lib/Dicer.js#L62
'Unexpected end of multipart data' ||
error.message ===
// https://github.com/mscdex/dicer/blob/v0.2.5/lib/Dicer.js#L65
'Part terminated early due to unexpected end of multipart data'
)
error = new FileStreamDisconnectUploadError(error.message)
stream.on('error', streamError => {
if (currentStream === stream) currentStream = null

capacitor.destroy(error)
stream.unpipe()
capacitor.destroy(error || streamError)
})

stream.pipe(capacitor)

map.get(fieldName).resolve({
stream: capacitor,
filename,
mimetype,
encoding
})
upload.resolve(
Object.create(null, {
capacitor: { value: capacitor, enumerable: false },
createReadStream: {
value() {
const createReadStreamError =
capacitor.error || (isReleased ? error : null)

if (createReadStreamError) throw createReadStreamError
return capacitor.createReadStream()
},
enumerable: true
},
encoding: { value: encoding, enumerable: true },
filename: { value: filename, enumerable: true },
mimetype: { value: mimetype, enumerable: true }
})
)
}
// Discard the unexpected file.
else {
Expand All @@ -196,30 +220,26 @@ export const processRequest = (
)
})

parser.on('error', error => {
request.unpipe(parser)
request.resume()
parser.once('error', error => {
exit(error)
})

if (map)
for (const upload of map.values())
if (!upload.file) upload.reject(error)
// Response Events
// ---------------
response.once('finish', release)
response.once('close', release)

if (currentStream) currentStream.destroy(error)
// Request Events
// --------------
let requestEnded = false
request.once('end', () => {
requestEnded = true
})

request.on('close', () => {
if (map)
for (const upload of map.values())
if (!upload.file)
upload.reject(
new UploadPromiseDisconnectUploadError(
'Request disconnected before file upload stream parsing.'
)
)

if (!parser._finished)
parser.destroy(
new FileStreamDisconnectUploadError(
request.once('close', () => {
if (!requestEnded)
exit(
new DisconnectUploadError(
'Request disconnected during file upload stream parsing.'
)
)
Expand All @@ -234,7 +254,7 @@ export const apolloUploadKoa = options => async (ctx, next) => {
const finished = new Promise(resolve => ctx.req.on('end', resolve))

try {
ctx.request.body = await processRequest(ctx.req, options)
ctx.request.body = await processRequest(ctx.req, ctx.res, options)
await next()
} finally {
await finished
Expand All @@ -245,16 +265,16 @@ export const apolloUploadExpress = options => (request, response, next) => {
if (!request.is('multipart/form-data')) return next()

const finished = new Promise(resolve => request.on('end', resolve))
const { send } = response

const { send } = response
response.send = (...args) => {
finished.then(() => {
response.send = send
response.send(...args)
})
}

processRequest(request, options)
processRequest(request, response, options)
.then(body => {
request.body = body
next()
Expand Down
Loading