-
Notifications
You must be signed in to change notification settings - Fork 62
TQ: Support proxying Nexus-related API requests #9403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,13 +5,15 @@ | |
| //! A mechanism for maintaining a full mesh of trust quorum node connections | ||
|
|
||
| use crate::established_conn::EstablishedConn; | ||
| use crate::proxy; | ||
| use trust_quorum_protocol::{BaseboardId, Envelope, PeerMsg}; | ||
|
|
||
| // TODO: Move to this crate | ||
| // https://github.com/oxidecomputer/omicron/issues/9311 | ||
| use bootstore::schemes::v0::NetworkConfig; | ||
|
|
||
| use camino::Utf8PathBuf; | ||
| use derive_more::From; | ||
| use iddqd::{ | ||
| BiHashItem, BiHashMap, TriHashItem, TriHashMap, bi_upcast, tri_upcast, | ||
| }; | ||
|
|
@@ -60,7 +62,7 @@ pub enum MainToConnMsg { | |
| /// | ||
| /// All `WireMsg`s sent between nodes is prefixed with a 4 byte size header used | ||
| /// for framing. | ||
| #[derive(Debug, Serialize, Deserialize)] | ||
| #[derive(Debug, Serialize, Deserialize, From)] | ||
| pub enum WireMsg { | ||
| /// Used for connection keep alive | ||
| Ping, | ||
|
|
@@ -79,6 +81,12 @@ pub enum WireMsg { | |
| /// of tiny information layered on top of trust quorum. You can still think | ||
| /// of it as a bootstore, although, we no longer use that name. | ||
| NetworkConfig(NetworkConfig), | ||
|
|
||
| /// Requests proxied to other nodes | ||
| ProxyRequest(proxy::WireRequest), | ||
|
|
||
| /// Responses to proxy requests | ||
| ProxyResponse(proxy::WireResponse), | ||
| } | ||
|
|
||
| /// Messages sent from connection managing tasks to the main peer task | ||
|
|
@@ -99,6 +107,8 @@ pub enum ConnToMainMsgInner { | |
| Received { from: BaseboardId, msg: PeerMsg }, | ||
| ReceivedNetworkConfig { from: BaseboardId, config: NetworkConfig }, | ||
| Disconnected { peer_id: BaseboardId }, | ||
| ProxyRequestReceived { from: BaseboardId, req: proxy::WireRequest }, | ||
| ProxyResponseReceived { from: BaseboardId, rsp: proxy::WireResponse }, | ||
| } | ||
|
|
||
| pub struct TaskHandle { | ||
|
|
@@ -120,15 +130,11 @@ impl TaskHandle { | |
| self.abort_handle.abort() | ||
| } | ||
|
|
||
| pub async fn send(&self, msg: PeerMsg) { | ||
| let _ = self.tx.send(MainToConnMsg::Msg(WireMsg::Tq(msg))).await; | ||
| } | ||
|
|
||
| pub async fn send_network_config(&self, config: NetworkConfig) { | ||
| let _ = self | ||
| .tx | ||
| .send(MainToConnMsg::Msg(WireMsg::NetworkConfig(config))) | ||
| .await; | ||
| pub async fn send<T>(&self, msg: T) | ||
| where | ||
| T: Into<WireMsg>, | ||
| { | ||
| let _ = self.tx.send(MainToConnMsg::Msg(msg.into())).await; | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -172,7 +178,10 @@ impl EstablishedTaskHandle { | |
| self.task_handle.abort(); | ||
| } | ||
|
|
||
| pub async fn send(&self, msg: PeerMsg) { | ||
| pub async fn send<T>(&self, msg: T) | ||
| where | ||
| T: Into<WireMsg>, | ||
| { | ||
| let _ = self.task_handle.send(msg).await; | ||
| } | ||
| } | ||
|
|
@@ -235,6 +244,12 @@ pub struct ConnMgrStatus { | |
| pub total_tasks_spawned: u64, | ||
| } | ||
|
|
||
| /// The state of a proxy connection | ||
| pub enum ProxyConnState { | ||
| Connected, | ||
| Disconnected, | ||
| } | ||
|
|
||
| /// A structure to manage all sprockets connections to peer nodes | ||
| /// | ||
| /// Each sprockets connection runs in its own task which communicates with the | ||
|
|
@@ -399,7 +414,7 @@ impl ConnMgr { | |
| "peer_id" => %h.baseboard_id, | ||
| "generation" => network_config.generation | ||
| ); | ||
| h.task_handle.send_network_config(network_config.clone()).await; | ||
| h.send(network_config.clone()).await; | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -415,7 +430,42 @@ impl ConnMgr { | |
| "peer_id" => %h.baseboard_id, | ||
| "generation" => network_config.generation | ||
| ); | ||
| h.task_handle.send_network_config(network_config.clone()).await; | ||
| h.send(network_config.clone()).await; | ||
| } | ||
| } | ||
|
|
||
| /// Forward an API request to another node | ||
| /// | ||
| /// Return the state of the connection at this point in time so that the | ||
| /// [`proxy::Tracker`] can manage the outstanding request on behalf of the | ||
| /// user. | ||
| pub async fn proxy_request( | ||
| &mut self, | ||
| destination: &BaseboardId, | ||
| req: proxy::WireRequest, | ||
| ) -> ProxyConnState { | ||
| if let Some(h) = self.established.get1(destination) { | ||
| info!(self.log, "Sending {req:?}"; "peer_id" => %destination); | ||
| h.send(req).await; | ||
| ProxyConnState::Connected | ||
|
Comment on lines
+449
to
+450
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In a previous PR I noted that the The proxy APIs defined in another file expect either a response from the server or a The failure case I still see is when the channel is busy (with 10 pending requests) and the send method silently discards the message. In that case, we return a The code as is could be fine if the caller to any proxy method takes care of adding timeouts everywhere, but this feels like a problem waiting to happen. I'd feel way more comfortable if this returned a Footnotes
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a great analysis @pietroalbini. Thank you so much for taking the time to go through this process and understand the code and design. I actually think if this was a problem, that it would also be somewhat of a problem for the normal trust quorum protocol, as some messages from peers are never resent on a timer. For example: If a coordinator is sending a With all that being said, I don't actually think what you pointed out is entirely true, and therefore this isn't actually a problem here. However, this analysis is also non-trivial. It almost makes me question whether using connection state for retries instead of timers is the right move. So far, I think it continues to work and has the benefit of not re-sending messages already sent over a reliable stream. Ok, so back to the problem.
The channel being used in To help ensure that the disconnect callback occurs when buffers start filling up, there is also a MSG_WRITE_QUEUE_CAPACITY for each established connection that will disconnect if too many `` messages are pulled off the channel and serialized before they can be sent. Somewhat importantly, this channel is sized smaller than the queue, so if the queue is full it means that the TCP connection (or serialization) is too slow to move things along. We get backpressure, and eventually a disconnection that should allow things to clear up on a reconnect. I should cleanup the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It appears there is actually one place where the However, this has me wondering if instead I should signal to Unfortunately, I also realized that there is another race condition that may get worse if I do this. If a new connection is accepted for the same peer it will trigger an abort of the old connection. In this case the old disconnect will occur after the new connection is established. That could cause problems for the protocol, and I should gate it the Another option to solve the latter problem is to always reject a new accepted connection for the same peer if one is already established. Eventually the old one will go away, and the remote peer will retry. I need to think about this a bit more, but will likely add a few small cleanup patches.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, part of Emily's concern is that the request tracker inserted into
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That behavior is intentional. There is an inherent TOCTTOU where the message can be put on the channel and then the socket can disconnect. In this case we return the What makes this work is that the disconnect callback always fires after the tracked socket is recorded. We know it hasn't fired yet because the |
||
| } else { | ||
| ProxyConnState::Disconnected | ||
| } | ||
| } | ||
|
|
||
| /// Return a response to a proxied request to another node | ||
| /// | ||
| /// There is no need to track whether this succeeds or fails. If the | ||
| /// connection goes away the client on the other side will notice it and | ||
| /// retry if needed. | ||
| pub async fn proxy_response( | ||
| &mut self, | ||
| destination: &BaseboardId, | ||
| rsp: proxy::WireResponse, | ||
| ) { | ||
| if let Some(h) = self.established.get1(destination) { | ||
| info!(self.log, "Sending {rsp:?}"; "peer_id" => %destination); | ||
| h.send(rsp).await; | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -233,6 +233,36 @@ impl EstablishedConn { | |
| panic!("Connection to main task channnel full"); | ||
| } | ||
| } | ||
| WireMsg::ProxyRequest(req) => { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: all variants of this enum except for let msg = match msg {
WireMsg::Ping => continue,
WireMsg::NetworkConfig => ConnToMainMsgInner::ReceivedNetworkConfig { ... },
};
if let Err(_) = ... {
...
} |
||
| if let Err(_) = self.main_tx.try_send(ConnToMainMsg { | ||
| task_id: self.task_id, | ||
| msg: ConnToMainMsgInner::ProxyRequestReceived { | ||
| from: self.peer_id.clone(), | ||
| req, | ||
| }, | ||
| }) { | ||
| error!( | ||
| self.log, | ||
| "Failed to send received proxy msg to the main task" | ||
| ); | ||
| panic!("Connection to main task channel full"); | ||
| } | ||
| } | ||
| WireMsg::ProxyResponse(rsp) => { | ||
| if let Err(_) = self.main_tx.try_send(ConnToMainMsg { | ||
| task_id: self.task_id, | ||
| msg: ConnToMainMsgInner::ProxyResponseReceived { | ||
| from: self.peer_id.clone(), | ||
| rsp, | ||
| }, | ||
| }) { | ||
| error!( | ||
| self.log, | ||
| "Failed to send received proxy msg to the main task" | ||
| ); | ||
| panic!("Connection to main task channel full"); | ||
| } | ||
|
Comment on lines
+244
to
+264
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, i think that the log line and panic messages here should probably include whether the error indicates that the channel is full or was disconnected because the main task exited. Assuming that |
||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
woah 👀