Skip to content
Permalink
Browse files

async flow with confirmations and robust to further transfer

  • Loading branch information...
gterzian committed Nov 8, 2019
1 parent 2ac6e1c commit 5ac1d3ccf91182c5c0c72a3552acbffeba3900a5
@@ -186,6 +186,15 @@ enum TransferState {
/// The port is currently in-transfer,
/// and incoming tasks should be buffered until it becomes managed again.
TransferInProgress(VecDeque<PortMessageTask>),
/// A global has requested the transfer to be completed, we wait for comfirmation it completed.
CompletionInProgress(MessagePortRouterId),
/// A transfer failed to complete, we start buffering messages,
/// while awaiting the return of the previous buffer from the global
/// that failed to complete the transfer.
CompletionFailed(VecDeque<PortMessageTask>),
/// While a completion failed, another global requested to complete the transfer.
/// We are still buffering messages, and awaiting the return of the buffer from the global who failed.
TransferCompletionRequested(MessagePortRouterId, VecDeque<PortMessageTask>),
/// The entangled port has been removed while the port was in-transfer,
/// the current port should be removed as well once it is managed again.
EntangledRemoved,
@@ -1559,6 +1568,9 @@ where
FromScriptMsg::MessagePortTransferCompleted(router_id, ports) => {
self.handle_message_port_transfer_completed(router_id, ports);
},
FromScriptMsg::MessagePortTransferFailed(router_id, ports) => {
self.handle_message_port_transfer_failed(router_id, ports);
},
FromScriptMsg::RerouteMessagePort(port_id, task) => {
self.handle_reroute_messageport(port_id, task);
},
@@ -1806,18 +1818,18 @@ where
entry.remove_entry();
continue;
},
TransferState::TransferInProgress(_) => {
TransferState::CompletionInProgress(expected_router_id) => {
if expected_router_id != router_id {
return warn!("Transfer completed by an unexpected router: {:?}", router_id);
}
// Drop the buffer, and update the state to managed.
let new_info = MessagePortInfo {
state: TransferState::Managed(router_id),
entangled_with: entry.get().entangled_with.clone(),
};
entry.insert(new_info);
},
TransferState::Managed(_) => {
warn!("Constellation received unexpected port transfer completed message: {:?}", port_id);
continue;
}
_ => warn!("Constellation received unexpected port transfer completed message"),
}
},
Entry::Vacant(_) => warn!(
@@ -1828,17 +1840,91 @@ where
}
}

fn handle_message_port_transfer_failed(
&mut self,
router_id: MessagePortRouterId,
mut ports: HashMap<MessagePortId, VecDeque<PortMessageTask>>,
) {
for (port_id, mut previous_buffer) in ports.drain() {
let new_info = match self.message_ports.remove(&port_id) {
// If the port was transferred, we must know about it.
Some(entry) => {
match entry.state {
TransferState::EntangledRemoved => {
// If the entangled port has been removed while this one was in-transfer,
// just drop it.
continue;
},
TransferState::CompletionFailed(mut current_buffer) => {
// Tasks in the previous buffer are older,
// hence need to be added to the front of the current one.
while let Some(task) = previous_buffer.pop_back() {
current_buffer.push_front(task);
}
// Update the state to transfer-in-progress.
MessagePortInfo {
state: TransferState::TransferInProgress(current_buffer),
entangled_with: entry.entangled_with,
}
},
TransferState::TransferCompletionRequested(
target_router_id,
mut current_buffer,
) => {
// Tasks in the previous buffer are older,
// hence need to be added to the front of the current one.
while let Some(task) = previous_buffer.pop_back() {
current_buffer.push_front(task);
}
// Forward the buffered message-queue.
if let Some(sender) = self.message_port_routers.get(&target_router_id) {
if sender
.send(MessagePortMsg::CompletePendingTransfer(
port_id,
current_buffer,
))
.is_err()
{
warn!("Constellation failed to send complete port transfer response.");
}
} else {
warn!("No message-port sender for {:?}", router_id);
}
// Update the state to completion-in-progress.
MessagePortInfo {
state: TransferState::CompletionInProgress(target_router_id),
entangled_with: entry.entangled_with,
}
},
_ => {
warn!("Unexpected port transfer failed message received");
continue;
},
}
},
None => {
warn!(
"Constellation received a port transfer completed msg for unknown messageport {:?}",
port_id
);
continue;
},
};
self.message_ports.insert(port_id, new_info);
}
}

fn complete_message_port_transfer(
&mut self,
router_id: MessagePortRouterId,
mut ports: Vec<MessagePortId>,
) {
let mut response = HashMap::new();
for port_id in ports.drain(0..) {
match self.message_ports.entry(port_id) {
let new_info = match self.message_ports.remove(&port_id) {
// If the port was transferred, we must know about it.
Entry::Occupied(entry) => {
match &entry.get().state {
Some(entry) => {
match entry.state {
TransferState::EntangledRemoved => {
// If the entangled port has been removed while this one was in-transfer,
// remove it now.
@@ -1847,27 +1933,44 @@ where
} else {
warn!("No message-port sender for {:?}", router_id);
}
entry.remove_entry();
continue;
},
TransferState::TransferInProgress(buffer) => {
// Clone the buffer, and leave the state as it is,
// since we're not sure if the transfer will be completed successfully,
// for example if a port is transferred again
// before the CompleteTransfer msg is handled by the global.
response.insert(port_id, buffer.clone());
response.insert(port_id, buffer);

// Update the state to completion-in-progress.
MessagePortInfo {
state: TransferState::CompletionInProgress(router_id),
entangled_with: entry.entangled_with,
}
},
TransferState::CompletionFailed(buffer) => {
response.insert(port_id, buffer);

// Update the state to completion-requested.
MessagePortInfo {
state: TransferState::TransferCompletionRequested(
router_id,
VecDeque::new(),
),
entangled_with: entry.entangled_with,
}
},
TransferState::Managed(_) => {
warn!("Constellation received complete message port msg for a managed port: {:?}", port_id);
_ => {
warn!("Unexpected complete port transfer message received");
continue;
},
}
},
Entry::Vacant(_) => warn!(
"Constellation asked to complete transfer for unknown messageport {:?}",
port_id
),
}
None => {
warn!(
"Constellation asked to complete transfer for unknown messageport {:?}",
port_id
);
continue;
},
};
self.message_ports.insert(port_id, new_info);
}
// Forward the buffered message-queue.
if let Some(sender) = self.message_port_routers.get(&router_id) {
@@ -1893,14 +1996,16 @@ where
},
};
match &mut info.state {
TransferState::Managed(router_id) => {
TransferState::Managed(router_id) | TransferState::CompletionInProgress(router_id) => {
if let Some(sender) = self.message_port_routers.get(&router_id) {
let _ = sender.send(MessagePortMsg::NewTask(port_id, task));
} else {
warn!("No message-port sender for {:?}", router_id);
}
},
TransferState::TransferInProgress(queue) => queue.push_back(task),
TransferState::CompletionFailed(queue) => queue.push_back(task),
TransferState::TransferCompletionRequested(_, queue) => queue.push_back(task),
TransferState::EntangledRemoved => warn!(
"Messageport received a message, but entangled has alread been removed {:?}",
port_id
@@ -1910,8 +2015,14 @@ where

fn handle_messageport_shipped(&mut self, port_id: MessagePortId) {
if let Some(info) = self.message_ports.get_mut(&port_id) {
if let TransferState::Managed(_) = info.state {
info.state = TransferState::TransferInProgress(VecDeque::new());
match info.state {
TransferState::Managed(_) => {
info.state = TransferState::TransferInProgress(VecDeque::new());
},
TransferState::CompletionInProgress(_) => {
info.state = TransferState::CompletionFailed(VecDeque::new());
},
_ => warn!("Unexpected messageport shipped received"),
}
} else {
warn!(
@@ -1978,7 +2089,7 @@ where
"Constellation asked to remove entangled messageport by a port that was already removed {:?}",
port_id
),
TransferState::TransferInProgress(_) => {
TransferState::TransferInProgress(_) | TransferState::CompletionInProgress(_) | TransferState::CompletionFailed(_) | TransferState::TransferCompletionRequested(_, _) => {
// Note: since the port is in-transer, we don't have a router to send it a message
// to let it know that its entangled port has been removed.
// Hence we mark it so that it will be messaged and removed once the transfer completes.
@@ -227,20 +227,36 @@ impl MessageListener {
// the global might not have a router-id anymore,
// if all ports are transferred out in the tasks.
let router_id = global.port_router_id();
let ports = ports
.into_iter()
.filter_map(|(id, buffer)| {
global.complete_port_transfer(id, buffer).ok()
})
.collect();
if let Some(router_id) = router_id {
let _ =
global.script_to_constellation_chan()
.send(ScriptMsg::MessagePortTransferCompleted(
router_id,
ports,
));

let mut succeeded: Vec<MessagePortId> = vec![];
let mut failed = HashMap::new();

for (id, buffer) in ports.into_iter() {
if global.is_managing_port(&id) {
succeeded.push(id.clone());
global.complete_port_transfer(id, buffer);
} else {
failed.insert(id, buffer);
}
}

let router = router_id.expect("Global to have been managing ports");
let _ = global
.script_to_constellation_chan()
.send(ScriptMsg::MessagePortTransferCompleted(router.clone(), succeeded));
let _ = global
.script_to_constellation_chan()
.send(ScriptMsg::MessagePortTransferFailed(router, failed));
}),
&self.canceller,
);
},
MessagePortMsg::CompletePendingTransfer(port_id, buffer) => {
let context = self.context.clone();
let _ = self.task_source.queue_with_canceller(
task!(complete_pending: move || {
let global = context.root();
global.complete_port_transfer(port_id, buffer);
}),
&self.canceller,
);
@@ -320,19 +336,24 @@ impl GlobalScope {
}
}

/// Is this global managing a given port?
fn is_managing_port(&self, port_id: &MessagePortId) -> bool {
if let MessagePortState::Managed(_router_id, message_ports) =
&*self.message_port_state.borrow()
{
return message_ports.contains_key(port_id);
}
false
}

/// Complete the transfer of a message-port.
fn complete_port_transfer(
&self,
port_id: MessagePortId,
tasks: VecDeque<PortMessageTask>,
) -> Result<MessagePortId, ()> {
fn complete_port_transfer(&self, port_id: MessagePortId, tasks: VecDeque<PortMessageTask>) {
let should_start = if let MessagePortState::Managed(_id, message_ports) =
&mut *self.message_port_state.borrow_mut()
{
match message_ports.get_mut(&port_id) {
None => {
// Port has been transferred-out already.
return Err(());
panic!("complete_port_transfer called for an unknown port.");
},
Some(ManagedMessagePort::Pending(_, _)) => {
panic!("CompleteTransfer msg received for a pending port.");
@@ -343,12 +364,11 @@ impl GlobalScope {
},
}
} else {
return Err(());
panic!("complete_port_transfer called for an unknown port.");
};
if should_start {
self.start_message_port(&port_id);
}
Ok(port_id)
}

/// Update our state to un-managed,
@@ -1038,6 +1038,8 @@ pub struct PortMessageTask {
pub enum MessagePortMsg {
/// Enables a port to catch-up on messages that were sent while the transfer was ongoing.
CompleteTransfer(HashMap<MessagePortId, VecDeque<PortMessageTask>>),
/// Complete a transfer for a port for previous transfer had failed.
CompletePendingTransfer(MessagePortId, VecDeque<PortMessageTask>),
/// Remove a port, the entangled one doesn't exists anymore.
RemoveMessagePort(MessagePortId),
/// Handle a new port-message-task.
@@ -30,6 +30,7 @@ use net_traits::storage_thread::StorageType;
use net_traits::CoreResourceMsg;
use servo_url::ImmutableOrigin;
use servo_url::ServoUrl;
use std::collections::{HashMap, VecDeque};
use std::fmt;
use style_traits::viewport::ViewportConstraints;
use style_traits::CSSPixel;
@@ -117,8 +118,13 @@ pub enum ScriptMsg {
/// Complete the transfer of a set of ports to a router,
/// subject to the port still being managed by the router when the message is handled by it.
CompleteMessagePortTransfer(MessagePortRouterId, Vec<MessagePortId>),
/// The transfer of a list of ports message has been successfully completed by a router.
/// A transfer of ports was successfully completed.
MessagePortTransferCompleted(MessagePortRouterId, Vec<MessagePortId>),
/// The transfer of a list of ports message has failed.
MessagePortTransferFailed(
MessagePortRouterId,
HashMap<MessagePortId, VecDeque<PortMessageTask>>,
),
/// A new message-port was created or transferred, with corresponding control-sender.
NewMessagePort(MessagePortRouterId, MessagePortId),
/// A global has started managing message-ports
@@ -255,6 +261,7 @@ impl fmt::Debug for ScriptMsg {
let variant = match *self {
CompleteMessagePortTransfer(..) => "CompleteMessagePortTransfer",
MessagePortTransferCompleted(..) => "MessagePortTransferCompleted",
MessagePortTransferFailed(..) => "MessagePortTransferFailed",
NewMessagePortRouter(..) => "NewMessagePortRouter",
RemoveMessagePortRouter(..) => "RemoveMessagePortRouter",
NewMessagePort(..) => "NewMessagePort",

1 comment on commit 5ac1d3c

@community-tc-integration

This comment has been minimized.

Copy link

community-tc-integration bot commented on 5ac1d3c Nov 8, 2019

Submitting the task to Taskcluster failed. Details

Taskcluster-GitHub attempted to create a task for this event with the following scopes:

[
  "assume:repo:github.com/servo/servo:pull-request",
  "queue:route:statuses",
  "queue:scheduler-id:taskcluster-github"
]

The expansion of these scopes is not sufficient to create the task, leading to the following:

Client ID static/taskcluster/github does not have sufficient scopes and is missing the following scopes:

{
  "AnyOf": [
    {
      "AnyOf": [
        "queue:create-task:highest:aws-provisioner-v1/servo-docker-untrusted",
        "queue:create-task:very-high:aws-provisioner-v1/servo-docker-untrusted",
        "queue:create-task:high:aws-provisioner-v1/servo-docker-untrusted",
        "queue:create-task:medium:aws-provisioner-v1/servo-docker-untrusted",
        "queue:create-task:low:aws-provisioner-v1/servo-docker-untrusted",
        "queue:create-task:very-low:aws-provisioner-v1/servo-docker-untrusted",
        "queue:create-task:lowest:aws-provisioner-v1/servo-docker-untrusted"
      ]
    },
    {
      "AnyOf": [
        "queue:create-task:aws-provisioner-v1/servo-docker-untrusted",
        {
          "AllOf": [
            "queue:define-task:aws-provisioner-v1/servo-docker-untrusted",
            "queue:task-group-id:taskcluster-github/D8gzPAhNRuqUY_lbWO5vdg",
            "queue:schedule-task:taskcluster-github/D8gzPAhNRuqUY_lbWO5vdg/D8gzPAhNRuqUY_lbWO5vdg"
          ]
        }
      ]
    }
  ]
}

This request requires the client to satisfy the following scope expression:

{
  "AllOf": [
    "assume:repo:github.com/servo/servo:pull-request",
    "queue:route:tc-treeherder.v2._/servo-prs.5ac1d3ccf91182c5c0c72a3552acbffeba3900a5",
    "queue:route:tc-treeherder-staging.v2._/servo-prs.5ac1d3ccf91182c5c0c72a3552acbffeba3900a5",
    "queue:route:statuses",
    {
      "AnyOf": [
        {
          "AllOf": [
            "queue:scheduler-id:taskcluster-github",
            {
              "AnyOf": [
                "queue:create-task:highest:aws-provisioner-v1/servo-docker-untrusted",
                "queue:create-task:very-high:aws-provisioner-v1/servo-docker-untrusted",
                "queue:create-task:high:aws-provisioner-v1/servo-docker-untrusted",
                "queue:create-task:medium:aws-provisioner-v1/servo-docker-untrusted",
                "queue:create-task:low:aws-provisioner-v1/servo-docker-untrusted",
                "queue:create-task:very-low:aws-provisioner-v1/servo-docker-untrusted",
                "queue:create-task:lowest:aws-provisioner-v1/servo-docker-untrusted"
              ]
            }
          ]
        },
        {
          "AnyOf": [
            "queue:create-task:aws-provisioner-v1/servo-docker-untrusted",
            {
              "AllOf": [
                "queue:define-task:aws-provisioner-v1/servo-docker-untrusted",
                "queue:task-group-id:taskcluster-github/D8gzPAhNRuqUY_lbWO5vdg",
                "queue:schedule-task:taskcluster-github/D8gzPAhNRuqUY_lbWO5vdg/D8gzPAhNRuqUY_lbWO5vdg"
              ]
            }
          ]
        }
      ]
    }
  ]
}

  • method: createTask
  • errorCode: InsufficientScopes
  • statusCode: 403
  • time: 2019-11-08T12:14:48.578Z
Please sign in to comment.
You can’t perform that action at this time.