diff --git a/src/node_messaging.cc b/src/node_messaging.cc index 7ddd3c4165c511..712add06d3e2bc 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -144,7 +144,7 @@ void Message::AddMessagePort(std::unique_ptr&& data) { namespace { -void ThrowDataCloneError(Environment* env, Local message) { +void ThrowDataCloneException(Environment* env, Local message) { Local argv[] = { message, FIXED_ONE_BYTE_STRING(env->isolate(), "DataCloneError") @@ -168,7 +168,7 @@ class SerializerDelegate : public ValueSerializer::Delegate { : env_(env), context_(context), msg_(m) {} void ThrowDataCloneError(Local message) override { - ThrowDataCloneError(env_, message); + ThrowDataCloneException(env_, message); } Maybe WriteHostObject(Isolate* isolate, Local object) override { @@ -239,7 +239,8 @@ class SerializerDelegate : public ValueSerializer::Delegate { Maybe Message::Serialize(Environment* env, Local context, Local input, - Local transfer_list_v) { + Local transfer_list_v, + Local source_port) { HandleScope handle_scope(env->isolate()); Context::Scope context_scope(context); @@ -273,8 +274,23 @@ Maybe Message::Serialize(Environment* env, continue; } else if (env->message_port_constructor_template() ->HasInstance(entry)) { + // Check if the source MessagePort is being transferred. + if (!source_port.IsEmpty() && entry == source_port) { + ThrowDataCloneException( + env, + FIXED_ONE_BYTE_STRING(env->isolate(), + "Transfer list contains source port")); + return Nothing(); + } MessagePort* port = Unwrap(entry.As()); - CHECK_NE(port, nullptr); + if (port == nullptr || port->IsDetached()) { + ThrowDataCloneException( + env, + FIXED_ONE_BYTE_STRING( + env->isolate(), + "MessagePort in transfer list is already detached")); + return Nothing(); + } delegate.ports_.push_back(port); continue; } @@ -410,6 +426,10 @@ uv_async_t* MessagePort::async() { return reinterpret_cast(GetHandle()); } +bool MessagePort::IsDetached() const { + return data_ == nullptr || IsHandleClosing(); +} + void MessagePort::TriggerAsync() { if (IsHandleClosing()) return; CHECK_EQ(uv_async_send(async()), 0); @@ -552,36 +572,69 @@ std::unique_ptr MessagePort::Detach() { } -void MessagePort::Send(Message&& message) { - Mutex::ScopedLock lock(*data_->sibling_mutex_); - if (data_->sibling_ == nullptr) - return; - data_->sibling_->AddToIncomingQueue(std::move(message)); -} +Maybe MessagePort::PostMessage(Environment* env, + Local message_v, + Local transfer_v) { + Isolate* isolate = env->isolate(); + Local obj = object(isolate); + Local context = obj->CreationContext(); -void MessagePort::Send(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - Local context = object(env->isolate())->CreationContext(); Message msg; - if (msg.Serialize(env, context, args[0], args[1]) - .IsNothing()) { - return; + + // Per spec, we need to both check if transfer list has the source port, and + // serialize the input message, even if the MessagePort is closed or detached. + + Maybe serialization_maybe = + msg.Serialize(env, context, message_v, transfer_v, obj); + if (data_ == nullptr) { + return serialization_maybe; + } + if (serialization_maybe.IsNothing()) { + return Nothing(); + } + + Mutex::ScopedLock lock(*data_->sibling_mutex_); + bool doomed = false; + + // Check if the target port is posted to itself. + if (data_->sibling_ != nullptr) { + for (const auto& port_data : msg.message_ports()) { + if (data_->sibling_ == port_data.get()) { + doomed = true; + ProcessEmitWarning(env, "The target port was posted to itself, and " + "the communication channel was lost"); + break; + } + } } - Send(std::move(msg)); + + if (data_->sibling_ == nullptr || doomed) + return Just(true); + + data_->sibling_->AddToIncomingQueue(std::move(msg)); + return Just(true); } void MessagePort::PostMessage(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); - MessagePort* port; - ASSIGN_OR_RETURN_UNWRAP(&port, args.This()); - if (!port->data_) { - return THROW_ERR_CLOSED_MESSAGE_PORT(env); - } if (args.Length() == 0) { return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to " "MessagePort.postMessage"); } - port->Send(args); + + MessagePort* port = Unwrap(args.This()); + // Even if the backing MessagePort object has already been deleted, we still + // want to serialize the message to ensure spec-compliant behavior w.r.t. + // transfers. + if (port == nullptr) { + Message msg; + Local obj = args.This(); + Local context = obj->CreationContext(); + USE(msg.Serialize(env, context, args[0], args[1], obj)); + return; + } + + port->PostMessage(env, args[0], args[1]); } void MessagePort::Start() { diff --git a/src/node_messaging.h b/src/node_messaging.h index 28122c526cccea..62ae633b9e0b8d 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -32,10 +32,14 @@ class Message { // Serialize a JS value, and optionally transfer objects, into this message. // The Message object retains ownership of all transferred objects until // deserialization. + // The source_port parameter, if provided, will make Serialize() throw a + // "DataCloneError" DOMException if source_port is found in transfer_list. v8::Maybe Serialize(Environment* env, v8::Local context, v8::Local input, - v8::Local transfer_list); + v8::Local transfer_list, + v8::Local source_port = + v8::Local()); // Internal method of Message that is called when a new SharedArrayBuffer // object is encountered in the incoming value's structure. @@ -44,6 +48,13 @@ class Message { // and that transfers ownership of `data` to this message. void AddMessagePort(std::unique_ptr&& data); + // The MessagePorts that will be transferred, as recorded by Serialize(). + // Used for warning user about posting the target MessagePort to itself, + // which will as a side effect destroy the communication channel. + const std::vector>& message_ports() const { + return message_ports_; + } + private: MallocedBuffer main_message_buf_; std::vector> array_buffer_contents_; @@ -122,10 +133,11 @@ class MessagePort : public HandleWrap { std::unique_ptr data = nullptr); // Send a message, i.e. deliver it into the sibling's incoming queue. - // If there is no sibling, i.e. this port is closed, - // this message is silently discarded. - void Send(Message&& message); - void Send(const v8::FunctionCallbackInfo& args); + // If this port is closed, or if there is no sibling, this message is + // serialized with transfers, then silently discarded. + v8::Maybe PostMessage(Environment* env, + v8::Local message, + v8::Local transfer); // Deliver a single message into this port's incoming queue. void AddToIncomingQueue(Message&& message); @@ -157,6 +169,15 @@ class MessagePort : public HandleWrap { void Close( v8::Local close_callback = v8::Local()) override; + // Returns true if either data_ has been freed, or if the handle is being + // closed. Equivalent to the [[Detached]] internal slot in the HTML Standard. + // + // If checking if a JavaScript MessagePort object is detached, this method + // alone is often not enough, since the backing C++ MessagePort object may + // have been deleted already. For all intents and purposes, an object with a + // NULL pointer to the C++ MessagePort object is also detached. + inline bool IsDetached() const; + size_t self_size() const override; private: diff --git a/test/parallel/test-worker-message-port-transfer-closed.js b/test/parallel/test-worker-message-port-transfer-closed.js new file mode 100644 index 00000000000000..435a3789fca724 --- /dev/null +++ b/test/parallel/test-worker-message-port-transfer-closed.js @@ -0,0 +1,54 @@ +// Flags: --experimental-worker +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { MessageChannel } = require('worker_threads'); + +// This tests various behaviors around transferring MessagePorts with closing +// or closed handles. + +const { port1, port2 } = new MessageChannel(); + +const arrayBuf = new ArrayBuffer(10); +port1.onmessage = common.mustNotCall(); +port2.onmessage = common.mustNotCall(); + +function testSingle(closedPort, potentiallyOpenPort) { + assert.throws(common.mustCall(() => { + potentiallyOpenPort.postMessage(null, [arrayBuf, closedPort]); + }), common.mustCall((err) => { + assert.strictEqual(err.name, 'DataCloneError'); + assert.strictEqual(err.message, + 'MessagePort in transfer list is already detached'); + assert.strictEqual(err.code, 25); + assert.ok(err instanceof Error); + + const DOMException = err.constructor; + assert.ok(err instanceof DOMException); + assert.strictEqual(DOMException.name, 'DOMException'); + + return true; + })); + + // arrayBuf must not be transferred, even though it is present earlier in the + // transfer list than the closedPort. + assert.strictEqual(arrayBuf.byteLength, 10); +} + +function testBothClosed() { + testSingle(port1, port2); + testSingle(port2, port1); +} + +// Even though the port handles may not be completely closed in C++ land, the +// observable behavior must be that the closing/detachment is synchronous and +// instant. + +port1.close(common.mustCall(testBothClosed)); +testSingle(port1, port2); +port2.close(common.mustCall(testBothClosed)); +testBothClosed(); + +setTimeout(common.mustNotCall('The communication channel is still open'), + common.platformTimeout(1000)).unref(); diff --git a/test/parallel/test-worker-message-port-transfer-self.js b/test/parallel/test-worker-message-port-transfer-self.js new file mode 100644 index 00000000000000..1855023cdfae04 --- /dev/null +++ b/test/parallel/test-worker-message-port-transfer-self.js @@ -0,0 +1,33 @@ +// Flags: --experimental-worker +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { MessageChannel } = require('worker_threads'); + +const { port1, port2 } = new MessageChannel(); + +assert.throws(common.mustCall(() => { + port1.postMessage(null, [port1]); +}), common.mustCall((err) => { + assert.strictEqual(err.name, 'DataCloneError'); + assert.strictEqual(err.message, 'Transfer list contains source port'); + assert.strictEqual(err.code, 25); + assert.ok(err instanceof Error); + + const DOMException = err.constructor; + assert.ok(err instanceof DOMException); + assert.strictEqual(DOMException.name, 'DOMException'); + + return true; +})); + +// The failed transfer should not affect the ports in anyway. +port2.onmessage = common.mustCall((message) => { + assert.strictEqual(message, 2); + port1.close(); + + setTimeout(common.mustNotCall('The communication channel is still open'), + common.platformTimeout(1000)).unref(); +}); +port1.postMessage(2); diff --git a/test/parallel/test-worker-message-port-transfer-target.js b/test/parallel/test-worker-message-port-transfer-target.js new file mode 100644 index 00000000000000..8e6354d8269771 --- /dev/null +++ b/test/parallel/test-worker-message-port-transfer-target.js @@ -0,0 +1,24 @@ +// Flags: --experimental-worker +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { MessageChannel } = require('worker_threads'); + +const { port1, port2 } = new MessageChannel(); + +const arrayBuf = new ArrayBuffer(10); + +common.expectWarning('Warning', + 'The target port was posted to itself, and the ' + + 'communication channel was lost', + common.noWarnCode); +port2.onmessage = common.mustNotCall(); +port2.postMessage(null, [port1, arrayBuf]); + +// arrayBuf must be transferred, despite the fact that port2 never received the +// message. +assert.strictEqual(arrayBuf.byteLength, 0); + +setTimeout(common.mustNotCall('The communication channel is still open'), + common.platformTimeout(1000)).unref();