Skip to content
Permalink
Browse files

fix complete messageport transfer

  • Loading branch information...
gterzian committed Nov 5, 2019
1 parent 4cdfe23 commit 2ac6e1cc2eed7152c91e00ba7e4478aaf91a259d
@@ -1553,6 +1553,12 @@ where
};

match content {
FromScriptMsg::CompleteMessagePortTransfer(router_id, ports) => {
self.complete_message_port_transfer(router_id, ports);
},
FromScriptMsg::MessagePortTransferCompleted(router_id, ports) => {
self.handle_message_port_transfer_completed(router_id, ports);
},
FromScriptMsg::RerouteMessagePort(port_id, task) => {
self.handle_reroute_messageport(port_id, task);
},
@@ -1779,6 +1785,103 @@ where
}
}

fn handle_message_port_transfer_completed(
&mut self,
router_id: MessagePortRouterId,
mut ports: Vec<MessagePortId>,
) {
for port_id in ports.drain(0..) {
match self.message_ports.entry(port_id) {
// If the port was transferred, we must know about it.
Entry::Occupied(mut entry) => {
match entry.get().state {
TransferState::EntangledRemoved => {
// If the entangled port has been removed while this one was in-transfer,
// remove it now.
if let Some(sender) = self.message_port_routers.get(&router_id) {
let _ = sender.send(MessagePortMsg::RemoveMessagePort(port_id));
} else {
warn!("No message-port sender for {:?}", router_id);
}
entry.remove_entry();
continue;
},
TransferState::TransferInProgress(_) => {
// Drop the buffer, and update the state to managed.
let new_info = MessagePortInfo {
state: TransferState::Managed(router_id),
entangled_with: entry.get().entangled_with.clone(),
};
entry.insert(new_info);
},
TransferState::Managed(_) => {
warn!("Constellation received unexpected port transfer completed message: {:?}", port_id);
continue;
}
}
},
Entry::Vacant(_) => warn!(
"Constellation received a port transfer completed msg for unknown messageport {:?}",
port_id
),
}
}
}

fn complete_message_port_transfer(
&mut self,
router_id: MessagePortRouterId,
mut ports: Vec<MessagePortId>,
) {
let mut response = HashMap::new();
for port_id in ports.drain(0..) {
match self.message_ports.entry(port_id) {
// If the port was transferred, we must know about it.
Entry::Occupied(entry) => {
match &entry.get().state {
TransferState::EntangledRemoved => {
// If the entangled port has been removed while this one was in-transfer,
// remove it now.
if let Some(sender) = self.message_port_routers.get(&router_id) {
let _ = sender.send(MessagePortMsg::RemoveMessagePort(port_id));
} else {
warn!("No message-port sender for {:?}", router_id);
}
entry.remove_entry();
continue;
},
TransferState::TransferInProgress(buffer) => {
// Clone the buffer, and leave the state as it is,
// since we're not sure if the transfer will be completed successfully,
// for example if a port is transferred again
// before the CompleteTransfer msg is handled by the global.
response.insert(port_id, buffer.clone());
},
TransferState::Managed(_) => {
warn!("Constellation received complete message port msg for a managed port: {:?}", port_id);
continue;
},
}
},
Entry::Vacant(_) => warn!(
"Constellation asked to complete transfer for unknown messageport {:?}",
port_id
),
}
}
// Forward the buffered message-queue.
if let Some(sender) = self.message_port_routers.get(&router_id) {
if sender
.send(MessagePortMsg::CompleteTransfer(response))
.is_err()
{
warn!("Constellation failed to send complete port transfer response.");
}
} else {
warn!("No message-port sender for {:?}", router_id);
}
}

fn handle_reroute_messageport(&mut self, port_id: MessagePortId, task: PortMessageTask) {
let info = match self.message_ports.get_mut(&port_id) {
Some(info) => info,
@@ -1832,37 +1935,11 @@ where

fn handle_new_messageport(&mut self, router_id: MessagePortRouterId, port_id: MessagePortId) {
match self.message_ports.entry(port_id) {
// If we know about this port, it means it was transferred.
Entry::Occupied(mut entry) => {
if let TransferState::EntangledRemoved = entry.get().state {
// If the entangled port has been removed while this one was in-transfer,
// remove it now.
if let Some(sender) = self.message_port_routers.get(&router_id) {
let _ = sender.send(MessagePortMsg::RemoveMessagePort(port_id));
} else {
warn!("No message-port sender for {:?}", router_id);
}
entry.remove_entry();
return;
}
let new_info = MessagePortInfo {
state: TransferState::Managed(router_id),
entangled_with: entry.get().entangled_with.clone(),
};
let old_info = entry.insert(new_info);
let buffer = match old_info.state {
TransferState::TransferInProgress(buffer) => buffer,
_ => {
return warn!("Completing transfer of a port that did not have a transfer in progress.");
},
};
// Forward the buffered message-queue.
if let Some(sender) = self.message_port_routers.get(&router_id) {
let _ = sender.send(MessagePortMsg::CompleteTransfer(port_id.clone(), buffer));
} else {
warn!("No message-port sender for {:?}", router_id);
}
},
// If it's a new port, we should not know about it.
Entry::Occupied(_) => warn!(
"Constellation asked to start tracking an existing messageport {:?}",
port_id
),
Entry::Vacant(entry) => {
let info = MessagePortInfo {
state: TransferState::Managed(router_id),
@@ -218,12 +218,29 @@ impl MessageListener {
/// and we can only access the root from the event-loop.
fn notify(&self, msg: MessagePortMsg) {
match msg {
MessagePortMsg::CompleteTransfer(port_id, tasks) => {
MessagePortMsg::CompleteTransfer(ports) => {
let context = self.context.clone();
let _ = self.task_source.queue_with_canceller(
task!(process_complete_transfer: move || {
let global = context.root();
global.complete_port_transfer(port_id, tasks);
// Note: needed because by the end of the "complete_port_transfer" call,
// the global might not have a router-id anymore,
// if all ports are transferred out in the tasks.
let router_id = global.port_router_id();
let ports = ports
.into_iter()
.filter_map(|(id, buffer)| {
global.complete_port_transfer(id, buffer).ok()
})
.collect();
if let Some(router_id) = router_id {
let _ =
global.script_to_constellation_chan()
.send(ScriptMsg::MessagePortTransferCompleted(
router_id,
ports,
));
}
}),
&self.canceller,
);
@@ -294,14 +311,28 @@ impl GlobalScope {
}
}

/// The message-port router Id of the global, if any
fn port_router_id(&self) -> Option<MessagePortRouterId> {
if let MessagePortState::Managed(id, _message_ports) = &*self.message_port_state.borrow() {
Some(id.clone())
} else {
None
}
}

/// Complete the transfer of a message-port.
fn complete_port_transfer(&self, port_id: MessagePortId, tasks: VecDeque<PortMessageTask>) {
fn complete_port_transfer(
&self,
port_id: MessagePortId,
tasks: VecDeque<PortMessageTask>,
) -> Result<MessagePortId, ()> {
let should_start = if let MessagePortState::Managed(_id, message_ports) =
&mut *self.message_port_state.borrow_mut()
{
match message_ports.get_mut(&port_id) {
None => {
panic!("CompleteTransfer msg received in a global not managing the port.");
// Port has been transferred-out already.
return Err(());
},
Some(ManagedMessagePort::Pending(_, _)) => {
panic!("CompleteTransfer msg received for a pending port.");
@@ -312,11 +343,12 @@ impl GlobalScope {
},
}
} else {
return warn!("CompleteTransfer msg received in a global not managing any ports.");
return Err(());
};
if should_start {
self.start_message_port(&port_id);
}
Ok(port_id)
}

/// Update our state to un-managed,
@@ -548,28 +580,33 @@ impl GlobalScope {
&mut *self.message_port_state.borrow_mut()
{
let to_be_added: Vec<MessagePortId> = message_ports
.iter()
.filter_map(|(id, port_info)| match port_info {
ManagedMessagePort::Pending(_, _) => Some(id.clone()),
_ => None,
.iter_mut()
.filter_map(|(id, port_info)| {
if let ManagedMessagePort::Pending(_port_impl, _dom_port) = port_info {
return Some(id.clone());
}
None
})
.collect();
for id in to_be_added {
for id in to_be_added.iter() {
let (id, port_info) = message_ports
.remove_entry(&id)
.expect("Collected port-id to match an entry");
if let ManagedMessagePort::Pending(port_impl, dom_port) = port_info {
let _ = self
.script_to_constellation_chan()
.send(ScriptMsg::NewMessagePort(
router_id.clone(),
port_impl.message_port_id().clone(),
));
let new_port_info = ManagedMessagePort::Added(port_impl, dom_port);
let present = message_ports.insert(id, new_port_info);
assert!(present.is_none());
match port_info {
ManagedMessagePort::Pending(port_impl, dom_port) => {
let new_port_info = ManagedMessagePort::Added(port_impl, dom_port);
let present = message_ports.insert(id, new_port_info);
assert!(present.is_none());
},
_ => panic!("Only pending ports should be found in to_be_added"),
}
}
let _ =
self.script_to_constellation_chan()
.send(ScriptMsg::CompleteMessagePortTransfer(
router_id.clone(),
to_be_added,
));
} else {
warn!("maybe_add_pending_ports called on a global not managing any ports.");
}
@@ -1016,7 +1016,7 @@ impl ScriptToConstellationChan {

/// A data-holder for serialized data and transferred objects.
/// <https://html.spec.whatwg.org/multipage/#structuredserializewithtransfer>
#[derive(Debug, Deserialize, MallocSizeOf, Serialize)]
#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]
pub struct StructuredSerializedData {
/// Data serialized by SpiderMonkey.
pub serialized: Vec<u8>,
@@ -1025,7 +1025,7 @@ pub struct StructuredSerializedData {
}

/// A task on the https://html.spec.whatwg.org/multipage/#port-message-queue
#[derive(Debug, Deserialize, MallocSizeOf, Serialize)]
#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]
pub struct PortMessageTask {
/// The origin of this task.
pub origin: ImmutableOrigin,
@@ -1037,7 +1037,7 @@ pub struct PortMessageTask {
#[derive(Debug, Deserialize, Serialize)]
pub enum MessagePortMsg {
/// Enables a port to catch-up on messages that were sent while the transfer was ongoing.
CompleteTransfer(MessagePortId, VecDeque<PortMessageTask>),
CompleteTransfer(HashMap<MessagePortId, VecDeque<PortMessageTask>>),
/// Remove a port, the entangled one doesn't exists anymore.
RemoveMessagePort(MessagePortId),
/// Handle a new port-message-task.
@@ -114,6 +114,11 @@ pub enum HistoryEntryReplacement {
/// Messages from the script to the constellation.
#[derive(Deserialize, Serialize)]
pub enum ScriptMsg {
/// Complete the transfer of a set of ports to a router,
/// subject to the port still being managed by the router when the message is handled by it.
CompleteMessagePortTransfer(MessagePortRouterId, Vec<MessagePortId>),
/// The transfer of a list of ports message has been successfully completed by a router.
MessagePortTransferCompleted(MessagePortRouterId, Vec<MessagePortId>),
/// A new message-port was created or transferred, with corresponding control-sender.
NewMessagePort(MessagePortRouterId, MessagePortId),
/// A global has started managing message-ports
@@ -248,6 +253,8 @@ impl fmt::Debug for ScriptMsg {
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
use self::ScriptMsg::*;
let variant = match *self {
CompleteMessagePortTransfer(..) => "CompleteMessagePortTransfer",
MessagePortTransferCompleted(..) => "MessagePortTransferCompleted",
NewMessagePortRouter(..) => "NewMessagePortRouter",
RemoveMessagePortRouter(..) => "RemoveMessagePortRouter",
NewMessagePort(..) => "NewMessagePort",
@@ -11,7 +11,7 @@ use crate::PortMessageTask;
use msg::constellation_msg::MessagePortId;
use std::collections::VecDeque;

#[derive(Debug, Deserialize, MallocSizeOf, Serialize)]
#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]
enum MessagePortState {
/// <https://html.spec.whatwg.org/multipage/#detached>
Detached,
@@ -25,7 +25,7 @@ enum MessagePortState {
Disabled(bool),
}

#[derive(Debug, Deserialize, MallocSizeOf, Serialize)]
#[derive(Clone, Debug, Deserialize, MallocSizeOf, Serialize)]
/// The data and logic backing the DOM managed MessagePort.
pub struct MessagePortImpl {
/// The current state of the port.
@@ -11281,6 +11281,19 @@
{}
]
],
"mozilla/Channel_postMessage_with_second_transfer_in_timeout.window.js": [
[
"mozilla/Channel_postMessage_with_second_transfer_in_timeout.window.html",
{
"script_metadata": [
[
"script",
"/common/get-host-info.sub.js"
]
]
}
]
],
"mozilla/DOMParser.html": [
[
"mozilla/DOMParser.html",
@@ -18409,6 +18422,10 @@
"276791c4348ada7e1da71041f2ccd383305e209c",
"support"
],
"mozilla/Channel_postMessage_with_second_transfer_in_timeout.window.js": [
"fffb91c5b89490f69fdb65845487d98b914a31b2",
"testharness"
],
"mozilla/DOMParser.html": [
"f386a3e0191af2c70dcb05790ce7db15dd5ccbf1",
"testharness"

1 comment on commit 2ac6e1c

@community-tc-integration

This comment has been minimized.

Copy link

community-tc-integration bot commented on 2ac6e1c Nov 7, 2019

Submitting the task to Taskcluster failed. Details

Taskcluster-GitHub attempted to create a task for this event with the following scopes:

[
  "assume:repo:github.com/servo/servo:pull-request",
  "queue:route:statuses",
  "queue:scheduler-id:taskcluster-github"
]

The expansion of these scopes is not sufficient to create the task, leading to the following:

Client ID static/taskcluster/github does not have sufficient scopes and is missing the following scopes:

{
  "AnyOf": [
    {
      "AnyOf": [
        "queue:create-task:highest:aws-provisioner-v1/servo-docker-untrusted",
        "queue:create-task:very-high:aws-provisioner-v1/servo-docker-untrusted",
        "queue:create-task:high:aws-provisioner-v1/servo-docker-untrusted",
        "queue:create-task:medium:aws-provisioner-v1/servo-docker-untrusted",
        "queue:create-task:low:aws-provisioner-v1/servo-docker-untrusted",
        "queue:create-task:very-low:aws-provisioner-v1/servo-docker-untrusted",
        "queue:create-task:lowest:aws-provisioner-v1/servo-docker-untrusted"
      ]
    },
    {
      "AnyOf": [
        "queue:create-task:aws-provisioner-v1/servo-docker-untrusted",
        {
          "AllOf": [
            "queue:define-task:aws-provisioner-v1/servo-docker-untrusted",
            "queue:task-group-id:taskcluster-github/QgKbGeUoSdCAIKxNtWw0JQ",
            "queue:schedule-task:taskcluster-github/QgKbGeUoSdCAIKxNtWw0JQ/QgKbGeUoSdCAIKxNtWw0JQ"
          ]
        }
      ]
    }
  ]
}

This request requires the client to satisfy the following scope expression:

{
  "AllOf": [
    "assume:repo:github.com/servo/servo:pull-request",
    "queue:route:tc-treeherder.v2._/servo-prs.2ac6e1cc2eed7152c91e00ba7e4478aaf91a259d",
    "queue:route:tc-treeherder-staging.v2._/servo-prs.2ac6e1cc2eed7152c91e00ba7e4478aaf91a259d",
    "queue:route:statuses",
    {
      "AnyOf": [
        {
          "AllOf": [
            "queue:scheduler-id:taskcluster-github",
            {
              "AnyOf": [
                "queue:create-task:highest:aws-provisioner-v1/servo-docker-untrusted",
                "queue:create-task:very-high:aws-provisioner-v1/servo-docker-untrusted",
                "queue:create-task:high:aws-provisioner-v1/servo-docker-untrusted",
                "queue:create-task:medium:aws-provisioner-v1/servo-docker-untrusted",
                "queue:create-task:low:aws-provisioner-v1/servo-docker-untrusted",
                "queue:create-task:very-low:aws-provisioner-v1/servo-docker-untrusted",
                "queue:create-task:lowest:aws-provisioner-v1/servo-docker-untrusted"
              ]
            }
          ]
        },
        {
          "AnyOf": [
            "queue:create-task:aws-provisioner-v1/servo-docker-untrusted",
            {
              "AllOf": [
                "queue:define-task:aws-provisioner-v1/servo-docker-untrusted",
                "queue:task-group-id:taskcluster-github/QgKbGeUoSdCAIKxNtWw0JQ",
                "queue:schedule-task:taskcluster-github/QgKbGeUoSdCAIKxNtWw0JQ/QgKbGeUoSdCAIKxNtWw0JQ"
              ]
            }
          ]
        }
      ]
    }
  ]
}

  • method: createTask
  • errorCode: InsufficientScopes
  • statusCode: 403
  • time: 2019-11-07T15:54:58.161Z
Please sign in to comment.
You can’t perform that action at this time.