Skip to content
Permalink
Browse files

Auto merge of #24664 - gterzian:fix_port_transfer, r=<try>

[WIP] Fix loophole in messageport transfer

<!-- Please describe your changes on the following line: -->

---
<!-- Thank you for contributing to Servo! Please replace each `[ ]` by `[X]` when the step is complete, and replace `___` with appropriate data: -->
- [ ] `./mach build -d` does not report any errors
- [ ] `./mach test-tidy` does not report any errors
- [ ] These changes fix #24600 (GitHub issue number if applicable)

<!-- Either: -->
- [ ] There are tests for these changes OR
- [ ] These changes do not require tests because ___

<!-- Also, please make sure that "Allow edits from maintainers" checkbox is checked, so that we can help you if you get stuck somewhere along the way.-->

<!-- Pull requests that do not address these steps are welcome, but they will require additional verification as part of the review process. -->
  • Loading branch information...
bors-servo committed Nov 8, 2019
2 parents 8cb6145 + 395faf7 commit c8fa2e8eabbbd545ab17aed735c9e7a710e35b49
@@ -184,6 +184,18 @@ enum TransferState {
/// The port is currently in-transfer,
/// and incoming tasks should be buffered until it becomes managed again.
TransferInProgress(VecDeque<PortMessageTask>),
/// A global has requested the transfer to be completed,
/// it's pending a confirmation of either failure or success to complete the transfer.
CompletionInProgress(MessagePortRouterId),
/// While a completion of a transfer was in progress, the port was shipped,
/// hence the transfer failed to complete.
/// We start buffering incoming messages,
/// while awaiting the return of the previous buffer from the global
/// that failed to complete the transfer.
CompletionFailed(VecDeque<PortMessageTask>),
/// While a completion failed, another global requested to complete the transfer.
/// We are still buffering messages, and awaiting the return of the buffer from the global who failed.
TransferCompletionRequested(MessagePortRouterId, VecDeque<PortMessageTask>),
/// The entangled port has been removed while the port was in-transfer,
/// the current port should be removed as well once it is managed again.
EntangledRemoved,
@@ -1549,6 +1561,15 @@ where
};

match content {
FromScriptMsg::CompleteMessagePortTransfer(router_id, ports) => {
self.complete_message_port_transfer(router_id, ports);
},
FromScriptMsg::MessagePortTransferCompleted(router_id, ports) => {
self.handle_message_port_transfer_completed(router_id, ports);
},
FromScriptMsg::MessagePortTransferFailed(router_id, ports) => {
self.handle_message_port_transfer_failed(router_id, ports);
},
FromScriptMsg::RerouteMessagePort(port_id, task) => {
self.handle_reroute_messageport(port_id, task);
},
@@ -1775,6 +1796,214 @@ where
}
}

fn handle_message_port_transfer_completed(
&mut self,
router_id: MessagePortRouterId,
mut ports: Vec<MessagePortId>,
) {
for port_id in ports.drain(0..) {
match self.message_ports.entry(port_id) {
// If the port was transferred, we must know about it.
Entry::Occupied(mut entry) => {
match entry.get().state {
TransferState::EntangledRemoved => {
// If the entangled port has been removed while this one was in-transfer,
// remove it now.
if let Some(sender) = self.message_port_routers.get(&router_id) {
let _ = sender.send(MessagePortMsg::RemoveMessagePort(port_id));
} else {
warn!("No message-port sender for {:?}", router_id);
}
entry.remove_entry();
continue;
},
TransferState::CompletionInProgress(expected_router_id) => {
// Here, the transfer was normally completed.

if expected_router_id != router_id {
return warn!("Transfer completed by an unexpected router: {:?}", router_id);
}
// Drop the buffer, and update the state to managed.
let new_info = MessagePortInfo {
state: TransferState::Managed(router_id),
entangled_with: entry.get().entangled_with.clone(),
};
entry.insert(new_info);
},
_ => warn!("Constellation received unexpected port transfer completed message"),
}
},
Entry::Vacant(_) => warn!(
"Constellation received a port transfer completed msg for unknown messageport {:?}",
port_id
),
}
}
}

fn handle_message_port_transfer_failed(
&mut self,
router_id: MessagePortRouterId,
mut ports: HashMap<MessagePortId, VecDeque<PortMessageTask>>,
) {
for (port_id, mut previous_buffer) in ports.drain() {
let new_info = match self.message_ports.remove(&port_id) {
// If the port was transferred, we must know about it.
Some(entry) => {
match entry.state {
TransferState::EntangledRemoved => {
// If the entangled port has been removed while this one was in-transfer,
// just drop it.
continue;
},
TransferState::CompletionFailed(mut current_buffer) => {
// The transfer failed,
// and now the global has returned us the buffer we previously sent.
// So the next update is back to a "normal" transfer in progress.

// Tasks in the previous buffer are older,
// hence need to be added to the front of the current one.
while let Some(task) = previous_buffer.pop_back() {
current_buffer.push_front(task);
}
// Update the state to transfer-in-progress.
MessagePortInfo {
state: TransferState::TransferInProgress(current_buffer),
entangled_with: entry.entangled_with,
}
},
TransferState::TransferCompletionRequested(
target_router_id,
mut current_buffer,
) => {
// Here, before the global who failed the last transfer could return us the buffer,
// another global already sent us a request to complete a new transfer.
// So we use the returned buffer to update
// the current-buffer(of new incoming messages),
// and we sent everything to the global
// who is waiting for completion of the current transfer.

// Tasks in the previous buffer are older,
// hence need to be added to the front of the current one.
while let Some(task) = previous_buffer.pop_back() {
current_buffer.push_front(task);
}
// Forward the buffered message-queue to complete the current transfer.
if let Some(sender) = self.message_port_routers.get(&target_router_id) {
if sender
.send(MessagePortMsg::CompletePendingTransfer(
port_id,
current_buffer,
))
.is_err()
{
warn!("Constellation failed to send complete port transfer response.");
}
} else {
warn!("No message-port sender for {:?}", router_id);
}

// Update the state to completion-in-progress.
MessagePortInfo {
state: TransferState::CompletionInProgress(target_router_id),
entangled_with: entry.entangled_with,
}
},
_ => {
warn!("Unexpected port transfer failed message received");
continue;
},
}
},
None => {
warn!(
"Constellation received a port transfer completed msg for unknown messageport {:?}",
port_id
);
continue;
},
};
self.message_ports.insert(port_id, new_info);
}
}

fn complete_message_port_transfer(
&mut self,
router_id: MessagePortRouterId,
mut ports: Vec<MessagePortId>,
) {
let mut response = HashMap::new();
for port_id in ports.drain(0..) {
let new_info = match self.message_ports.remove(&port_id) {
// If the port was transferred, we must know about it.
Some(entry) => {
match entry.state {
TransferState::EntangledRemoved => {
// If the entangled port has been removed while this one was in-transfer,
// remove it now.
if let Some(sender) = self.message_port_routers.get(&router_id) {
let _ = sender.send(MessagePortMsg::RemoveMessagePort(port_id));
} else {
warn!("No message-port sender for {:?}", router_id);
}
continue;
},
TransferState::TransferInProgress(buffer) => {
response.insert(port_id, buffer);

// If the port was in transfer, and a global is requesting completion,
// we note the start of the completion.
MessagePortInfo {
state: TransferState::CompletionInProgress(router_id),
entangled_with: entry.entangled_with,
}
},
TransferState::CompletionFailed(buffer) => {
// If the completion had already failed,
// this is a request coming from a global to complete a new transfer,
// but we're still awaiting the return of the buffer
// from the first global who failed.
//
// So we note the request from the new global,
// and continue to buffer incoming messages
// and wait for the buffer used in the previous transfer to be returned.
MessagePortInfo {
state: TransferState::TransferCompletionRequested(
router_id,
buffer,
),
entangled_with: entry.entangled_with,
}
},
_ => {
warn!("Unexpected complete port transfer message received");
continue;
},
}
},
None => {
warn!(
"Constellation asked to complete transfer for unknown messageport {:?}",
port_id
);
continue;
},
};
self.message_ports.insert(port_id, new_info);
}
// Forward the buffered message-queue.
if let Some(sender) = self.message_port_routers.get(&router_id) {
if sender
.send(MessagePortMsg::CompleteTransfer(response))
.is_err()
{
warn!("Constellation failed to send complete port transfer response.");
}
} else {
warn!("No message-port sender for {:?}", router_id);
}
}

fn handle_reroute_messageport(&mut self, port_id: MessagePortId, task: PortMessageTask) {
let info = match self.message_ports.get_mut(&port_id) {
Some(info) => info,
@@ -1786,14 +2015,19 @@ where
},
};
match &mut info.state {
TransferState::Managed(router_id) => {
TransferState::Managed(router_id) | TransferState::CompletionInProgress(router_id) => {
// In both the managed and completion of a transfer case, we forward the message.
// Note that in both cases, if the port is transferred before the message is handled,
// it will be sent back here and buffered while the transfer is ongoing.
if let Some(sender) = self.message_port_routers.get(&router_id) {
let _ = sender.send(MessagePortMsg::NewTask(port_id, task));
} else {
warn!("No message-port sender for {:?}", router_id);
}
},
TransferState::TransferInProgress(queue) => queue.push_back(task),
TransferState::CompletionFailed(queue) => queue.push_back(task),
TransferState::TransferCompletionRequested(_, queue) => queue.push_back(task),
TransferState::EntangledRemoved => warn!(
"Messageport received a message, but entangled has alread been removed {:?}",
port_id
@@ -1803,8 +2037,19 @@ where

fn handle_messageport_shipped(&mut self, port_id: MessagePortId) {
if let Some(info) = self.message_ports.get_mut(&port_id) {
if let TransferState::Managed(_) = info.state {
info.state = TransferState::TransferInProgress(VecDeque::new());
match info.state {
TransferState::Managed(_) => {
// If shipped while managed, note the start of a transfer.
info.state = TransferState::TransferInProgress(VecDeque::new());
},
TransferState::CompletionInProgress(_) => {
// If shipped while completion of a transfer was in progress,
// the completion failed.
// This will be followed by a MessagePortTransferFailed message,
// containing the buffer we previously sent.
info.state = TransferState::CompletionFailed(VecDeque::new());
},
_ => warn!("Unexpected messageport shipped received"),
}
} else {
warn!(
@@ -1828,37 +2073,11 @@ where

fn handle_new_messageport(&mut self, router_id: MessagePortRouterId, port_id: MessagePortId) {
match self.message_ports.entry(port_id) {
// If we know about this port, it means it was transferred.
Entry::Occupied(mut entry) => {
if let TransferState::EntangledRemoved = entry.get().state {
// If the entangled port has been removed while this one was in-transfer,
// remove it now.
if let Some(sender) = self.message_port_routers.get(&router_id) {
let _ = sender.send(MessagePortMsg::RemoveMessagePort(port_id));
} else {
warn!("No message-port sender for {:?}", router_id);
}
entry.remove_entry();
return;
}
let new_info = MessagePortInfo {
state: TransferState::Managed(router_id),
entangled_with: entry.get().entangled_with.clone(),
};
let old_info = entry.insert(new_info);
let buffer = match old_info.state {
TransferState::TransferInProgress(buffer) => buffer,
_ => {
return warn!("Completing transfer of a port that did not have a transfer in progress.");
},
};
// Forward the buffered message-queue.
if let Some(sender) = self.message_port_routers.get(&router_id) {
let _ = sender.send(MessagePortMsg::CompleteTransfer(port_id.clone(), buffer));
} else {
warn!("No message-port sender for {:?}", router_id);
}
},
// If it's a new port, we should not know about it.
Entry::Occupied(_) => warn!(
"Constellation asked to start tracking an existing messageport {:?}",
port_id
),
Entry::Vacant(entry) => {
let info = MessagePortInfo {
state: TransferState::Managed(router_id),
@@ -1897,7 +2116,10 @@ where
"Constellation asked to remove entangled messageport by a port that was already removed {:?}",
port_id
),
TransferState::TransferInProgress(_) => {
TransferState::TransferInProgress(_) |
TransferState::CompletionInProgress(_) |
TransferState::CompletionFailed(_) |
TransferState::TransferCompletionRequested(_, _) => {
// Note: since the port is in-transer, we don't have a router to send it a message
// to let it know that its entangled port has been removed.
// Hence we mark it so that it will be messaged and removed once the transfer completes.

0 comments on commit c8fa2e8

Please sign in to comment.
You can’t perform that action at this time.