Skip to content

Commit

Permalink
fix: premature close with chunked transfer encoding and for async ite…
Browse files Browse the repository at this point in the history
…rators in Node 12 (node-fetch#1172)

* fix: premature close with chunked transfer encoding and for async iterators in Node 12

This PR backports the fix from node-fetch#1064 to the `2.x.x` branch following
the [comment here](node-fetch#1064 (comment)).

I had to add some extra babel config to allow using the `for await..of`
syntax in the tests.  The config is only needed for the tests as
this syntax is not used in the implementation.

* chore: fix up tests for node 6+

* chore: codecov dropped support for node < 8 without shipping major

* chore: npm7 strips empty dependencies hash during install

* chore: pin deps to versions that work on node 4

* chore: do not emit close error after aborting a request

* chore: test on node 4-16

* chore: simplify chunked transer encoding bad ending

* chore: avoid calling .destroy as it is not in every node.js release

* chore: listen for response close as socket is reused and shows warnings
  • Loading branch information
achingbrain committed Jul 16, 2022
1 parent 838d971 commit 50536d1
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 4 deletions.
6 changes: 4 additions & 2 deletions .babelrc
Expand Up @@ -14,7 +14,8 @@
} ]
],
plugins: [
'./build/babel-plugin'
'./build/babel-plugin',
'transform-async-generator-functions'
]
},
coverage: {
Expand All @@ -31,7 +32,8 @@
],
plugins: [
[ 'istanbul', { exclude: [ 'src/blob.js', 'build', 'test' ] } ],
'./build/babel-plugin'
'./build/babel-plugin',
'transform-async-generator-functions'
]
},
rollup: {
Expand Down
2 changes: 2 additions & 0 deletions .travis.yml
Expand Up @@ -4,6 +4,8 @@ node_js:
- "6"
- "8"
- "10"
- "12"
- "14"
- "node"
env:
- FORMDATA_VERSION=1.0.0
Expand Down
43 changes: 43 additions & 0 deletions README.md
Expand Up @@ -188,6 +188,49 @@ fetch('https://assets-cdn.github.com/images/modules/logos_page/Octocat.png')
});
```

In Node.js 14 you can also use async iterators to read `body`; however, be careful to catch
errors -- the longer a response runs, the more likely it is to encounter an error.

```js
const fetch = require('node-fetch');
const response = await fetch('https://httpbin.org/stream/3');
try {
for await (const chunk of response.body) {
console.dir(JSON.parse(chunk.toString()));
}
} catch (err) {
console.error(err.stack);
}
```

In Node.js 12 you can also use async iterators to read `body`; however, async iterators with streams
did not mature until Node.js 14, so you need to do some extra work to ensure you handle errors
directly from the stream and wait on it response to fully close.

```js
const fetch = require('node-fetch');
const read = async body => {
let error;
body.on('error', err => {
error = err;
});
for await (const chunk of body) {
console.dir(JSON.parse(chunk.toString()));
}
return new Promise((resolve, reject) => {
body.on('close', () => {
error ? reject(error) : resolve();
});
});
};
try {
const response = await fetch('https://httpbin.org/stream/3');
await read(response.body);
} catch (err) {
console.error(err.stack);
}
```

#### Buffer
If you prefer to cache binary data in full, use buffer(). (NOTE: `buffer()` is a `node-fetch`-only API)

Expand Down
4 changes: 3 additions & 1 deletion package.json
Expand Up @@ -53,7 +53,9 @@
"abortcontroller-polyfill": "^1.3.0",
"babel-core": "^6.26.3",
"babel-plugin-istanbul": "^4.1.6",
"babel-preset-env": "^1.6.1",
"babel-plugin-transform-async-generator-functions": "^6.24.1",
"babel-polyfill": "^6.26.0",
"babel-preset-env": "1.4.0",
"babel-register": "^6.16.3",
"chai": "^3.5.0",
"chai-as-promised": "^7.1.1",
Expand Down
68 changes: 67 additions & 1 deletion src/index.js
Expand Up @@ -67,7 +67,7 @@ export default function fetch(url, opts) {
let error = new AbortError('The user aborted a request.');
reject(error);
if (request.body && request.body instanceof Stream.Readable) {
request.body.destroy(error);
destroyStream(request.body, error);
}
if (!response || !response.body) return;
response.body.emit('error', error);
Expand Down Expand Up @@ -108,9 +108,41 @@ export default function fetch(url, opts) {

req.on('error', err => {
reject(new FetchError(`request to ${request.url} failed, reason: ${err.message}`, 'system', err));

if (response && response.body) {
destroyStream(response.body, err);
}

finalize();
});

fixResponseChunkedTransferBadEnding(req, err => {
if (signal && signal.aborted) {
return
}

destroyStream(response.body, err);
});

/* c8 ignore next 18 */
if (parseInt(process.version.substring(1)) < 14) {
// Before Node.js 14, pipeline() does not fully support async iterators and does not always
// properly handle when the socket close/end events are out of order.
req.on('socket', s => {
s.addListener('close', hadError => {
// if a data listener is still present we didn't end cleanly
const hasDataListener = s.listenerCount('data') > 0

// if end happened before close but the socket didn't emit an error, do it now
if (response && hasDataListener && !hadError && !(signal && signal.aborted)) {
const err = new Error('Premature close');
err.code = 'ERR_STREAM_PREMATURE_CLOSE';
response.body.emit('error', err);
}
});
});
}

req.on('response', res => {
clearTimeout(reqTimeout);

Expand Down Expand Up @@ -303,6 +335,40 @@ export default function fetch(url, opts) {

};

function fixResponseChunkedTransferBadEnding(request, errorCallback) {
let socket;

request.on('socket', s => {
socket = s;
});

request.on('response', response => {
const {headers} = response;
if (headers['transfer-encoding'] === 'chunked' && !headers['content-length']) {
response.once('close', hadError => {
// if a data listener is still present we didn't end cleanly
const hasDataListener = socket.listenerCount('data') > 0;

if (hasDataListener && !hadError) {
const err = new Error('Premature close');
err.code = 'ERR_STREAM_PREMATURE_CLOSE';
errorCallback(err);
}
});
}
});
}

function destroyStream (stream, err) {
if (stream.destroy) {
stream.destroy(err);
} else {
// node < 8
stream.emit('error', err);
stream.end();
}
}

/**
* Redirect code matching
*
Expand Down
28 changes: 28 additions & 0 deletions test/server.js
Expand Up @@ -329,6 +329,34 @@ export default class TestServer {
res.destroy();
}

if (p === '/error/premature/chunked') {
res.writeHead(200, {
'Content-Type': 'application/json',
'Transfer-Encoding': 'chunked'
});

// Transfer-Encoding: 'chunked' sends chunk sizes followed by the
// chunks - https://en.wikipedia.org/wiki/Chunked_transfer_encoding
const sendChunk = (obj) => {
const data = JSON.stringify(obj)

res.write(`${data.length}\r\n`)
res.write(`${data}\r\n`)
}

sendChunk({data: 'hi'})

setTimeout(() => {
sendChunk({data: 'bye'})
}, 200);

setTimeout(() => {
// should send '0\r\n\r\n' to end the response properly but instead
// just close the connection
res.destroy();
}, 400);
}

if (p === '/error/json') {
res.statusCode = 200;
res.setHeader('Content-Type', 'application/json');
Expand Down
74 changes: 74 additions & 0 deletions test/test.js
@@ -1,4 +1,7 @@

import 'babel-core/register'
import 'babel-polyfill'

// test tools
import chai from 'chai';
import chaiPromised from 'chai-as-promised';
Expand Down Expand Up @@ -552,6 +555,77 @@ describe('node-fetch', () => {
.and.have.property('code', 'ECONNRESET');
});

it('should handle network-error in chunked response', () => {
const url = `${base}error/premature/chunked`;
return fetch(url).then(res => {
expect(res.status).to.equal(200);
expect(res.ok).to.be.true;

return expect(new Promise((resolve, reject) => {
res.body.on('error', reject);
res.body.on('close', resolve);
})).to.eventually.be.rejectedWith(Error, 'Premature close')
.and.have.property('code', 'ERR_STREAM_PREMATURE_CLOSE');
});
});

// Skip test if streams are not async iterators (node < 10)
const itAsyncIterator = Boolean(new stream.PassThrough()[Symbol.asyncIterator]) ? it : it.skip;

itAsyncIterator('should handle network-error in chunked response async iterator', () => {
const url = `${base}error/premature/chunked`;
return fetch(url).then(res => {
expect(res.status).to.equal(200);
expect(res.ok).to.be.true;

const read = async body => {
const chunks = [];

if (process.version < 'v14') {
// In Node.js 12, some errors don't come out in the async iterator; we have to pick
// them up from the event-emitter and then throw them after the async iterator
let error;
body.on('error', err => {
error = err;
});

for await (const chunk of body) {
chunks.push(chunk);
}

if (error) {
throw error;
}

return new Promise(resolve => {
body.on('close', () => resolve(chunks));
});
}

for await (const chunk of body) {
chunks.push(chunk);
}

return chunks;
};

return expect(read(res.body))
.to.eventually.be.rejectedWith(Error, 'Premature close')
.and.have.property('code', 'ERR_STREAM_PREMATURE_CLOSE');
});
});

it('should handle network-error in chunked response in consumeBody', () => {
const url = `${base}error/premature/chunked`;
return fetch(url).then(res => {
expect(res.status).to.equal(200);
expect(res.ok).to.be.true;

return expect(res.text())
.to.eventually.be.rejectedWith(Error, 'Premature close');
});
});

it('should handle DNS-error response', function() {
const url = 'http://domain.invalid';
return expect(fetch(url)).to.eventually.be.rejected
Expand Down

0 comments on commit 50536d1

Please sign in to comment.