Skip to content

Commit

Permalink
stream: fix cloned webstreams not being unref correctly
Browse files Browse the repository at this point in the history
PR-URL: #51526
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Vinícius Lourenço Claro Cardoso <contact@viniciusl.com.br>
  • Loading branch information
tsctx authored and targos committed Feb 15, 2024
1 parent 4b1d25b commit 438b7fd
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 14 deletions.
6 changes: 5 additions & 1 deletion lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,11 @@ class ReadableStream {
const transfer = lazyTransfer();
setupReadableStreamDefaultControllerFromSource(
this,
new transfer.CrossRealmTransformReadableSource(port),
// The MessagePort is set to be referenced when reading.
// After two MessagePorts are closed, there is a problem with
// lingering promise not being properly resolved.
// https://github.com/nodejs/node/issues/51486
new transfer.CrossRealmTransformReadableSource(port, true),
0, () => 1);
}
}
Expand Down
36 changes: 26 additions & 10 deletions lib/internal/webstreams/transfer.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,11 @@ function InternalCloneableDOMException() {
InternalCloneableDOMException[kDeserialize] = () => {};

class CrossRealmTransformReadableSource {
constructor(port) {
constructor(port, unref) {
this[kState] = {
port,
controller: undefined,
unref,
};

port.onmessage = ({ data }) => {
Expand Down Expand Up @@ -143,13 +144,19 @@ class CrossRealmTransformReadableSource {
error);
port.close();
};

port.unref();
}

start(controller) {
this[kState].controller = controller;
}

async pull() {
if (this[kState].unref) {
this[kState].unref = false;
this[kState].port.ref();
}
this[kState].port.postMessage({ type: 'pull' });
}

Expand All @@ -170,11 +177,12 @@ class CrossRealmTransformReadableSource {
}

class CrossRealmTransformWritableSink {
constructor(port) {
constructor(port, unref) {
this[kState] = {
port,
controller: undefined,
backpressurePromise: createDeferredPromise(),
unref,
};

port.onmessage = ({ data }) => {
Expand Down Expand Up @@ -211,13 +219,18 @@ class CrossRealmTransformWritableSink {
port.close();
};

port.unref();
}

start(controller) {
this[kState].controller = controller;
}

async write(chunk) {
if (this[kState].unref) {
this[kState].unref = false;
this[kState].port.ref();
}
if (this[kState].backpressurePromise === undefined) {
this[kState].backpressurePromise = {
promise: PromiseResolve(),
Expand Down Expand Up @@ -262,12 +275,12 @@ class CrossRealmTransformWritableSink {
}

function newCrossRealmReadableStream(writable, port) {
const readable =
new ReadableStream(
new CrossRealmTransformReadableSource(port));
// MessagePort should always be unref.
// There is a problem with the process not terminating.
// https://github.com/nodejs/node/issues/44985
const readable = new ReadableStream(new CrossRealmTransformReadableSource(port, false));

const promise =
readableStreamPipeTo(readable, writable, false, false, false);
const promise = readableStreamPipeTo(readable, writable, false, false, false);

setPromiseHandled(promise);

Expand All @@ -278,12 +291,15 @@ function newCrossRealmReadableStream(writable, port) {
}

function newCrossRealmWritableSink(readable, port) {
const writable =
new WritableStream(
new CrossRealmTransformWritableSink(port));
// MessagePort should always be unref.
// There is a problem with the process not terminating.
// https://github.com/nodejs/node/issues/44985
const writable = new WritableStream(new CrossRealmTransformWritableSink(port, false));

const promise = readableStreamPipeTo(readable, writable, false, false, false);

setPromiseHandled(promise);

return {
writable,
promise,
Expand Down
8 changes: 5 additions & 3 deletions lib/internal/webstreams/writablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,6 @@ class WritableStream {
this[kState].transfer.readable = readable;
this[kState].transfer.promise = promise;

setPromiseHandled(this[kState].transfer.promise);

return {
data: { port: this[kState].transfer.port2 },
deserializeInfo:
Expand All @@ -283,7 +281,11 @@ class WritableStream {
const transfer = lazyTransfer();
setupWritableStreamDefaultControllerFromSink(
this,
new transfer.CrossRealmTransformWritableSink(port),
// The MessagePort is set to be referenced when reading.
// After two MessagePorts are closed, there is a problem with
// lingering promise not being properly resolved.
// https://github.com/nodejs/node/issues/51486
new transfer.CrossRealmTransformWritableSink(port, true),
1,
() => 1);
}
Expand Down
16 changes: 16 additions & 0 deletions test/parallel/test-webstreams-clone-unref.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
'use strict';

require('../common');
const { ok } = require('node:assert');

// This test verifies that cloned ReadableStream and WritableStream instances
// do not keep the process alive. The test fails if it timesout (it should just
// exit immediately)

const rs1 = new ReadableStream();
const ws1 = new WritableStream();

const [rs2, ws2] = structuredClone([rs1, ws1], { transfer: [rs1, ws1] });

ok(rs2 instanceof ReadableStream);
ok(ws2 instanceof WritableStream);
11 changes: 11 additions & 0 deletions test/parallel/test-whatwg-webstreams-transfer.js
Original file line number Diff line number Diff line change
Expand Up @@ -464,12 +464,23 @@ const theData = 'hello';
tracker.verify();
});
// We create an interval to keep the event loop alive while
// we wait for the stream read to complete. The reason this is needed is because there's
// otherwise nothing to keep the worker thread event loop alive long enough to actually
// complete the read from the stream. Under the covers the ReadableStream uses an
// unref'd MessagePort to communicate with the main thread. Because the MessagePort
// is unref'd, it's existence would not keep the thread alive on its own. There was previously
// a bug where this MessagePort was ref'd which would block the thread and main thread
// from terminating at all unless the stream was consumed/closed.
const i = setInterval(() => {}, 1000);
parentPort.onmessage = tracker.calls(({ data }) => {
assert(isReadableStream(data));
const reader = data.getReader();
reader.read().then(tracker.calls((result) => {
assert(!result.done);
assert(result.value instanceof Uint8Array);
clearInterval(i);
}));
parentPort.close();
});
Expand Down

0 comments on commit 438b7fd

Please sign in to comment.