Skip to content

Commit

Permalink
fix(iroh): use two stage accept from quic-rpc (#2416)
Browse files Browse the repository at this point in the history
## Description

fix(iroh): use two stage accept from quic-rpc to make the accept process
cancel-safe

Needs n0-computer/quic-rpc#87

## Breaking Changes

<!-- Optional, if there are any breaking changes document them,
including how to migrate older code. -->

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.
- [x] All breaking changes documented.

---------

Co-authored-by: dignifiedquire <me@dignifiedquire.com>
  • Loading branch information
rklaehn and dignifiedquire committed Jun 26, 2024
1 parent 38e8ce0 commit 83b01ad
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 11 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iroh-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ parking_lot = "0.12.1"
pkarr = { version = "1.1.5", default-features = false }
portable-atomic = "1"
postcard = "1.0.8"
quic-rpc = { version = "0.10.2", features = ["flume-transport", "quinn-transport"] }
quic-rpc = { version = "0.11", features = ["flume-transport", "quinn-transport"] }
rand = "0.8.5"
ratatui = "0.26.2"
reqwest = { version = "0.12.4", default-features = false, features = ["json", "rustls-tls"] }
Expand Down
2 changes: 1 addition & 1 deletion iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ iroh-docs = { version = "0.18.0", path = "../iroh-docs" }
iroh-gossip = { version = "0.18.0", path = "../iroh-gossip" }
parking_lot = "0.12.1"
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
quic-rpc = { version = "0.10.2", default-features = false, features = ["flume-transport", "quinn-transport"] }
quic-rpc = { version = "0.11", default-features = false, features = ["flume-transport", "quinn-transport"] }
quinn = { package = "iroh-quinn", version = "0.10" }
rand = "0.8"
serde = { version = "1", features = ["derive"] }
Expand Down
8 changes: 4 additions & 4 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
// accept is just a pending future.
request = external_rpc.accept() => {
match request {
Ok((msg, chan)) => {
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, msg, chan);
Ok(accepting) => {
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, accepting);
}
Err(e) => {
info!("rpc request error: {:?}", e);
Expand All @@ -284,8 +284,8 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
// handle internal rpc requests.
request = internal_rpc.accept() => {
match request {
Ok((msg, chan)) => {
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, msg, chan);
Ok(accepting) => {
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, accepting);
}
Err(e) => {
info!("internal rpc request error: {:?}", e);
Expand Down
2 changes: 1 addition & 1 deletion iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ where
.await?;

// Initialize the internal RPC connection.
let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1);
let (internal_rpc, controller) = quic_rpc::transport::flume::connection(32);
// box the controller. Boxing has a special case for the flume channel that avoids allocations,
// so this has zero overhead.
let controller = quic_rpc::transport::boxed::Connection::new(controller);
Expand Down
4 changes: 2 additions & 2 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ impl<D: BaoStore> Handler<D> {
pub(crate) fn spawn_rpc_request<E: ServiceEndpoint<RpcService>>(
inner: Arc<NodeInner<D>>,
join_set: &mut JoinSet<anyhow::Result<()>>,
msg: Request,
chan: RpcChannel<RpcService, E>,
accepting: quic_rpc::server::Accepting<RpcService, E>,
) {
let handler = Self::new(inner);
join_set.spawn(async move {
let (msg, chan) = accepting.read_first().await?;
if let Err(err) = handler.handle_rpc_request(msg, chan).await {
warn!("rpc request handler error: {err:?}");
}
Expand Down

0 comments on commit 83b01ad

Please sign in to comment.