Skip to content

Commit 91129c2

Browse files
authored
Merge 414a5d5 into 378fc0f
2 parents 378fc0f + 414a5d5 commit 91129c2

File tree

15 files changed

+4394
-40
lines changed

15 files changed

+4394
-40
lines changed

Cargo.lock

Lines changed: 205 additions & 39 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ chrono = "0.4.31"
2121
derive_more = { version = "1.0.0", features = ["debug", "display", "deref", "deref_mut", "from", "try_into", "into"] }
2222
futures-buffered = "0.2.4"
2323
futures-lite = "2.3"
24+
futures-util = { version = "0.3.30", optional = true }
2425
genawaiter = { version = "0.99.1", features = ["futures03"] }
2526
hashlink = { version = "0.9.0", optional = true }
2627
hex = "0.4.3"
@@ -29,27 +30,34 @@ iroh-io = { version = "0.6.0", features = ["stats"] }
2930
iroh-metrics = { version = "0.27.0", default-features = false }
3031
iroh-net = { version = "0.27.0" }
3132
iroh-router = "0.27.0"
33+
nested_enum_utils = { version = "0.1.0", optional = true }
3234
num_cpus = "1.15.0"
3335
oneshot = "0.1.8"
3436
parking_lot = { version = "0.12.1", optional = true }
3537
pin-project = "1.1.5"
38+
portable-atomic = { version = "1", optional = true }
3639
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
40+
quic-rpc = { version = "0.13.0", optional = true }
41+
quic-rpc-derive = { version = "0.13.0", optional = true }
3742
quinn = { package = "iroh-quinn", version = "0.11", features = ["ring"] }
3843
rand = "0.8"
3944
range-collections = "0.4.0"
4045
redb = { version = "2.0.0", optional = true }
4146
redb_v1 = { package = "redb", version = "1.5.1", optional = true }
47+
ref-cast = { version = "1.0.23", optional = true }
4248
reflink-copy = { version = "0.1.8", optional = true }
4349
self_cell = "1.0.1"
4450
serde = { version = "1", features = ["derive"] }
4551
serde-error = "0.1.3"
4652
smallvec = { version = "1.10.0", features = ["serde", "const_new"] }
53+
strum = { version = "0.26.3", optional = true }
4754
tempfile = { version = "3.10.0", optional = true }
4855
thiserror = "1"
4956
tokio = { version = "1", features = ["fs"] }
5057
tokio-util = { version = "0.7", features = ["io-util", "io"] }
5158
tracing = "0.1"
5259
tracing-futures = "0.2.5"
60+
walkdir = { version = "2.5.0", optional = true }
5361

5462
[dev-dependencies]
5563
http-body = "0.4.5"
@@ -65,13 +73,18 @@ rcgen = "0.12.0"
6573
rustls = { version = "0.23", default-features = false, features = ["ring"] }
6674
tempfile = "3.10.0"
6775
futures-util = "0.3.30"
76+
testdir = "0.9.1"
6877

6978
[features]
70-
default = ["fs-store"]
79+
default = ["fs-store", "rpc"]
7180
downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"]
7281
fs-store = ["dep:reflink-copy", "redb", "dep:redb_v1", "dep:tempfile"]
7382
metrics = ["iroh-metrics/metrics"]
7483
redb = ["dep:redb"]
84+
rpc = ["dep:quic-rpc", "dep:quic-rpc-derive", "dep:nested_enum_utils", "dep:strum", "dep:futures-util", "dep:ref-cast", "dep:portable-atomic", "dep:walkdir", "downloader"]
85+
ref-cast = ["dep:ref-cast"]
86+
portable-atomic = ["dep:portable-atomic"]
87+
walkdir = ["dep:walkdir"]
7588

7689
[package.metadata.docs.rs]
7790
all-features = true

src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,16 @@ pub mod metrics;
4040
pub mod net_protocol;
4141
pub mod protocol;
4242
pub mod provider;
43+
#[cfg(feature = "rpc")]
44+
#[cfg_attr(iroh_docsrs, doc(cfg(feature = "rpc")))]
45+
pub mod rpc;
4346
pub mod store;
4447
pub mod util;
4548

49+
#[cfg(feature = "rpc")]
50+
#[cfg_attr(iroh_docsrs, doc(cfg(feature = "rpc")))]
51+
pub mod node;
52+
4653
use bao_tree::BlockSize;
4754
pub use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
4855

src/net_protocol.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ pub struct Blobs<S> {
7373
events: EventSender,
7474
downloader: Downloader,
7575
batches: tokio::sync::Mutex<BlobBatches>,
76+
endpoint: Endpoint,
7677
}
7778

7879
/// Name used for logging when new node addresses are added from gossip.
@@ -135,12 +136,14 @@ impl<S: crate::store::Store> Blobs<S> {
135136
rt: LocalPoolHandle,
136137
events: EventSender,
137138
downloader: Downloader,
139+
endpoint: Endpoint,
138140
) -> Self {
139141
Self {
140142
rt,
141143
store,
142144
events,
143145
downloader,
146+
endpoint,
144147
batches: Default::default(),
145148
}
146149
}
@@ -149,6 +152,14 @@ impl<S: crate::store::Store> Blobs<S> {
149152
&self.store
150153
}
151154

155+
pub(crate) fn rt(&self) -> LocalPoolHandle {
156+
self.rt.clone()
157+
}
158+
159+
pub(crate) fn endpoint(&self) -> &Endpoint {
160+
&self.endpoint
161+
}
162+
152163
pub async fn batches(&self) -> tokio::sync::MutexGuard<'_, BlobBatches> {
153164
self.batches.lock().await
154165
}

src/node.rs

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
//! An iroh node that just has the blobs transport
2+
use std::{path::Path, sync::Arc};
3+
4+
use iroh_net::{NodeAddr, NodeId};
5+
use quic_rpc::client::BoxedServiceConnection;
6+
use tokio_util::task::AbortOnDropHandle;
7+
8+
use crate::{
9+
provider::{CustomEventSender, EventSender},
10+
rpc::client::{blobs, tags},
11+
util::local_pool::LocalPool,
12+
};
13+
14+
type RpcClient = quic_rpc::RpcClient<
15+
crate::rpc::proto::RpcService,
16+
BoxedServiceConnection<crate::rpc::proto::RpcService>,
17+
crate::rpc::proto::RpcService,
18+
>;
19+
20+
/// An iroh node that just has the blobs transport
21+
#[derive(Debug)]
22+
pub struct Node {
23+
router: iroh_router::Router,
24+
client: RpcClient,
25+
_local_pool: LocalPool,
26+
_rpc_task: AbortOnDropHandle<()>,
27+
}
28+
29+
/// An iroh node builder
30+
#[derive(Debug)]
31+
pub struct Builder<S> {
32+
store: S,
33+
events: EventSender,
34+
}
35+
36+
impl<S: crate::store::Store> Builder<S> {
37+
/// Sets the event sender
38+
pub fn blobs_events(self, events: impl CustomEventSender) -> Self {
39+
Builder {
40+
store: self.store,
41+
events: events.into(),
42+
}
43+
}
44+
45+
/// Spawns the node
46+
pub async fn spawn(self) -> anyhow::Result<Node> {
47+
let (client, router, rpc_task, _local_pool) = setup_router(self.store, self.events).await?;
48+
Ok(Node {
49+
router,
50+
client,
51+
_rpc_task: AbortOnDropHandle::new(rpc_task),
52+
_local_pool,
53+
})
54+
}
55+
}
56+
57+
impl Node {
58+
/// Creates a new node with memory storage
59+
pub fn memory() -> Builder<crate::store::mem::Store> {
60+
Builder {
61+
store: crate::store::mem::Store::new(),
62+
events: Default::default(),
63+
}
64+
}
65+
66+
/// Creates a new node with persistent storage
67+
pub async fn persistent(
68+
path: impl AsRef<Path>,
69+
) -> anyhow::Result<Builder<crate::store::fs::Store>> {
70+
Ok(Builder {
71+
store: crate::store::fs::Store::load(path).await?,
72+
events: Default::default(),
73+
})
74+
}
75+
76+
/// Returns the node id
77+
pub fn node_id(&self) -> NodeId {
78+
self.router.endpoint().node_id()
79+
}
80+
81+
/// Returns the node address
82+
pub async fn node_addr(&self) -> anyhow::Result<NodeAddr> {
83+
self.router.endpoint().node_addr().await
84+
}
85+
86+
/// Shuts down the node
87+
pub async fn shutdown(self) -> anyhow::Result<()> {
88+
self.router.shutdown().await
89+
}
90+
91+
/// Returns an in-memory blobs client
92+
pub fn blobs(&self) -> blobs::Client {
93+
blobs::Client::new(self.client.clone())
94+
}
95+
96+
/// Returns an in-memory tags client
97+
pub fn tags(&self) -> tags::Client {
98+
tags::Client::new(self.client.clone())
99+
}
100+
}
101+
102+
async fn setup_router<S: crate::store::Store>(
103+
store: S,
104+
events: EventSender,
105+
) -> anyhow::Result<(
106+
RpcClient,
107+
iroh_router::Router,
108+
tokio::task::JoinHandle<()>,
109+
LocalPool,
110+
)> {
111+
let endpoint = iroh_net::Endpoint::builder().discovery_n0().bind().await?;
112+
let local_pool = LocalPool::single();
113+
let mut router = iroh_router::Router::builder(endpoint.clone());
114+
115+
// Setup blobs
116+
let downloader = crate::downloader::Downloader::new(
117+
store.clone(),
118+
endpoint.clone(),
119+
local_pool.handle().clone(),
120+
);
121+
let blobs = Arc::new(crate::net_protocol::Blobs::new_with_events(
122+
store.clone(),
123+
local_pool.handle().clone(),
124+
events,
125+
downloader,
126+
endpoint.clone(),
127+
));
128+
router = router.accept(crate::protocol::ALPN.to_vec(), blobs.clone());
129+
130+
// Build the router
131+
let router = router.spawn().await?;
132+
133+
// Setup RPC
134+
let (internal_rpc, controller) =
135+
quic_rpc::transport::flume::service_connection::<crate::rpc::proto::RpcService>(32);
136+
let controller = quic_rpc::transport::boxed::Connection::new(controller);
137+
let internal_rpc = quic_rpc::transport::boxed::ServerEndpoint::new(internal_rpc);
138+
let internal_rpc = quic_rpc::RpcServer::new(internal_rpc);
139+
140+
let rpc_server_task: tokio::task::JoinHandle<()> = tokio::task::spawn(async move {
141+
loop {
142+
let request = internal_rpc.accept().await;
143+
match request {
144+
Ok(accepting) => {
145+
let blobs = blobs.clone();
146+
tokio::task::spawn(async move {
147+
let (msg, chan) = accepting.read_first().await.unwrap();
148+
blobs.handle_rpc_request(msg, chan).await.unwrap();
149+
});
150+
}
151+
Err(err) => {
152+
tracing::warn!("rpc error: {:?}", err);
153+
}
154+
}
155+
}
156+
});
157+
158+
let client = quic_rpc::RpcClient::new(controller);
159+
160+
Ok((client, router, rpc_server_task, local_pool))
161+
}

0 commit comments

Comments
 (0)