Skip to content

Commit

Permalink
Remove stream lock - unreliable since Node 17.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
szmarczak committed Apr 24, 2022
1 parent e393662 commit bb8eca9
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 250 deletions.
68 changes: 7 additions & 61 deletions source/core/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import process from 'node:process';
import {Buffer} from 'node:buffer';
import {Duplex, Writable, Readable} from 'node:stream';
import {Duplex, Readable} from 'node:stream';
import {URL, URLSearchParams} from 'node:url';
import http, {ServerResponse} from 'node:http';
import type {ClientRequest, RequestOptions} from 'node:http';
Expand Down Expand Up @@ -197,30 +197,6 @@ export default class Request extends Duplex implements RequestEvents<Request> {

this._stopRetry = noop;

const unlockWrite = (): void => {
this._unlockWrite();
};

const lockWrite = (): void => {
this._lockWrite();
};

this.on('pipe', (source: Writable) => {
source.prependListener('data', unlockWrite);
source.on('data', lockWrite);

source.prependListener('end', unlockWrite);
source.on('end', lockWrite);
});

this.on('unpipe', (source: Writable) => {
source.off('data', unlockWrite);
source.off('data', lockWrite);

source.off('end', unlockWrite);
source.off('end', lockWrite);
});

this.on('pipe', source => {
if (source.headers) {
Object.assign(this.options.headers, source.headers);
Expand Down Expand Up @@ -259,13 +235,9 @@ export default class Request extends Duplex implements RequestEvents<Request> {
return;
}

const {json, body, form} = this.options;
if (json || body || form) {
this._lockWrite();
}

// Important! If you replace `body` in a handler with another stream, make sure it's readable first.
// The below is run only once.
const {body} = this.options;
if (is.nodeStream(body)) {
body.once('error', error => {
if (this._flushed) {
Expand Down Expand Up @@ -553,20 +525,6 @@ export default class Request extends Duplex implements RequestEvents<Request> {
return this;
}

private _lockWrite(): void {
const onLockedWrite = (): never => {
throw new TypeError('The payload has been already provided');
};

this.write = onLockedWrite;
this.end = onLockedWrite;
}

private _unlockWrite(): void {
this.write = super.write;
this.end = super.end;
}

private async _finalizeBody(): Promise<void> {
const {options} = this;
const {headers} = options;
Expand Down Expand Up @@ -639,10 +597,6 @@ export default class Request extends Duplex implements RequestEvents<Request> {
if (is.undefined(headers['content-length']) && is.undefined(headers['transfer-encoding']) && !cannotHaveBody && !is.undefined(uploadBodySize)) {
headers['content-length'] = String(uploadBodySize);
}
} else if (cannotHaveBody) {
this._lockWrite();
} else {
this._unlockWrite();
}

if (options.responseType === 'json' && !('accept' in options.headers)) {
Expand Down Expand Up @@ -974,19 +928,11 @@ export default class Request extends Duplex implements RequestEvents<Request> {
this._beforeError(error);
}
})();
} else {
this._unlockWrite();

if (!is.undefined(body)) {
this._writeRequest(body, undefined, () => {});
currentRequest.end();

this._lockWrite();
} else if (this._cannotHaveBody || this._noPipe) {
currentRequest.end();

this._lockWrite();
}
} else if (!is.undefined(body)) {
this._writeRequest(body, undefined, () => {});
currentRequest.end();
} else if (this._cannotHaveBody || this._noPipe) {
currentRequest.end();
}
}

Expand Down
155 changes: 0 additions & 155 deletions test/progress.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import {Buffer} from 'buffer';
import {promisify} from 'util';
import stream from 'stream';
import fs from 'fs';
// @ts-expect-error Fails to find slow-stream/index.d.ts
import SlowStream from 'slow-stream';
import toReadableStream from 'to-readable-stream';
import getStream from 'get-stream';
import FormData from 'form-data';
import tempy from 'tempy';
import is from '@sindresorhus/is';
import test, {ExecutionContext} from 'ava';
import {Handler} from 'express';
Expand Down Expand Up @@ -45,24 +41,6 @@ const checkEvents = (t: ExecutionContext, events: Progress[], bodySize?: number)

const file = Buffer.alloc(1024 * 1024 * 2);

const downloadEndpoint: Handler = (_request, response) => {
response.setHeader('content-length', file.length);

stream.pipeline(
toReadableStream(file),
new SlowStream({maxWriteInterval: 50}),
response,
() => {
response.end();
},
);
};

const noTotalEndpoint: Handler = (_request, response) => {
response.write('hello');
response.end();
};

const uploadEndpoint: Handler = (request, response) => {
stream.pipeline(
request,
Expand All @@ -73,118 +51,6 @@ const uploadEndpoint: Handler = (request, response) => {
);
};

test('download progress', withServer, async (t, server, got) => {
server.get('/', downloadEndpoint);

const events: Progress[] = [];

const {body} = await got({responseType: 'buffer'})
.on('downloadProgress', event => events.push(event));

checkEvents(t, events, body.length);
});

test('download progress - missing total size', withServer, async (t, server, got) => {
server.get('/', noTotalEndpoint);

const events: Progress[] = [];

await got('').on('downloadProgress', (event: Progress) => events.push(event));

t.is(events[0]?.total, undefined);
checkEvents(t, events);
});

test('download progress - stream', withServer, async (t, server, got) => {
server.get('/', downloadEndpoint);

const events: Progress[] = [];

const stream = got.stream({responseType: 'buffer'})
.on('downloadProgress', event => events.push(event));

await getStream(stream);

checkEvents(t, events, file.length);
});

test('upload progress - file', withServer, async (t, server, got) => {
server.post('/', uploadEndpoint);

const events: Progress[] = [];

await got.post({body: file}).on('uploadProgress', (event: Progress) => events.push(event));

checkEvents(t, events, file.length);
});

test('upload progress - file stream', withServer, async (t, server, got) => {
server.post('/', uploadEndpoint);

const path = tempy.file();
fs.writeFileSync(path, file);

const {size} = await promisify(fs.stat)(path);

const events: Progress[] = [];

await got.post({
body: fs.createReadStream(path),
headers: {
'content-length': size.toString(),
},
})
.on('uploadProgress', (event: Progress) => events.push(event));

checkEvents(t, events, file.length);
});

test('upload progress - form data', withServer, async (t, server, got) => {
server.post('/', uploadEndpoint);

const events: Progress[] = [];

const body = new FormData();
body.append('key', 'value');
body.append('file', file);

const size = await promisify(body.getLength.bind(body))();

await got.post({body}).on('uploadProgress', (event: Progress) => events.push(event));

checkEvents(t, events, size);
});

test('upload progress - json', withServer, async (t, server, got) => {
server.post('/', uploadEndpoint);

const body = JSON.stringify({key: 'value'});
const size = Buffer.byteLength(body);
const events: Progress[] = [];

await got.post({body}).on('uploadProgress', (event: Progress) => events.push(event));

checkEvents(t, events, size);
});

test('upload progress - stream with known body size', withServer, async (t, server, got) => {
server.post('/', uploadEndpoint);

const events: Progress[] = [];
const options = {
headers: {'content-length': file.length.toString()},
};

const request = got.stream.post(options)
.on('uploadProgress', event => events.push(event));

await getStream(
stream.pipeline(toReadableStream(file), request, () => {}),
);

checkEvents(t, events, file.length);
});

test('upload progress - stream with unknown body size', withServer, async (t, server, got) => {
server.post('/', uploadEndpoint);

Expand All @@ -200,24 +66,3 @@ test('upload progress - stream with unknown body size', withServer, async (t, se
t.is(events[0]?.total, undefined);
checkEvents(t, events);
});

test('upload progress - no body', withServer, async (t, server, got) => {
server.post('/', uploadEndpoint);

const events: Progress[] = [];

await got.post('').on('uploadProgress', (event: Progress) => events.push(event));

t.deepEqual(events, [
{
percent: 0,
transferred: 0,
total: undefined,
},
{
percent: 1,
transferred: 0,
total: 0,
},
]);
});
34 changes: 0 additions & 34 deletions test/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,26 +95,6 @@ test('returns writeable stream', withServer, async (t, server, got) => {
t.is(await promise, 'wow');
});

test('throws on write if body is specified', withServer, (t, server, got) => {
server.post('/', postHandler);

const streams = [
got.stream.post({body: 'wow'}),
got.stream.post({json: {}}),
got.stream.post({form: {}}),
];

for (const stream of streams) {
t.throws(() => {
stream.end('wow');
}, {
message: 'The payload has been already provided',
});

stream.destroy();
}
});

test('does not throw if using stream and passing a json option', withServer, async (t, server, got) => {
server.post('/', postHandler);

Expand All @@ -127,20 +107,6 @@ test('does not throw if using stream and passing a form option', withServer, asy
await t.notThrowsAsync(getStream(got.stream.post({form: {}})));
});

test('throws on write if no payload method is present', withServer, (t, server, got) => {
server.post('/', postHandler);

const stream = got.stream.get('');

t.throws(() => {
stream.end('wow');
}, {
message: 'The payload has been already provided',
});

stream.destroy();
});

test('has request event', withServer, async (t, server, got) => {
server.get('/', defaultHandler);

Expand Down

0 comments on commit bb8eca9

Please sign in to comment.