Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
960 changes: 271 additions & 689 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ futures-buffered = "0.2.4"
futures-lite = "2.3.0"
futures-util = { version = "0.3.25" }
hex = "0.4"
iroh = { version = "0.93" }
iroh-base = { version = "0.93", features = ["ticket"] }
iroh-blobs = { version = "0.95" }
iroh-gossip = { version = "0.93", features = ["net"] }
iroh = { version = "0.94" }
iroh-tickets = { version = "0.1"}
iroh-blobs = { version = "0.96" }
iroh-gossip = { version = "0.94", features = ["net"] }
iroh-metrics = { version = "0.36", default-features = false }
irpc = { version = "0.9.0" }
n0-future = "0.1.3"
irpc = { version = "0.10.0" }
n0-future = "0.3"
num_enum = "0.7"
postcard = { version = "1", default-features = false, features = [
"alloc",
Expand All @@ -60,8 +60,8 @@ tracing = "0.1"

[dev-dependencies]
data-encoding = "2.6.0"
iroh = { version = "0.93", features = ["test-utils"] }
nested_enum_utils = "0.1.0"
iroh = { version = "0.94", features = ["test-utils"] }
nested_enum_utils = "0.2"
parking_lot = "0.12.3"
proptest = "1.2.0"
rand_chacha = "0.9"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use iroh_gossip::{net::Gossip, ALPN as GOSSIP_ALPN};
async fn main() -> anyhow::Result<()> {
// create an iroh endpoint that includes the standard discovery mechanisms
// we've built at number0
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
let endpoint = Endpoint::builder().bind().await?;

// build the blobs protocol
let blobs = MemStore::default();
Expand Down
2 changes: 1 addition & 1 deletion examples/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use iroh_gossip::{net::Gossip, ALPN as GOSSIP_ALPN};
async fn main() -> anyhow::Result<()> {
// create an iroh endpoint that includes the standard discovery mechanisms
// we've built at number0
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
let endpoint = Endpoint::builder().bind().await?;

// build the blobs protocol
let blobs = MemStore::default();
Expand Down
4 changes: 2 additions & 2 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{

use anyhow::{Context, Result};
use bytes::Bytes;
use iroh::NodeAddr;
use iroh::EndpointAddr;
use iroh_blobs::{
api::blobs::{AddPathOptions, AddProgressItem, ExportMode, ExportOptions, ExportProgress},
Hash,
Expand Down Expand Up @@ -433,7 +433,7 @@ impl Doc {
}

/// Starts to sync this document with a list of peers.
pub async fn start_sync(&self, peers: Vec<NodeAddr>) -> Result<()> {
pub async fn start_sync(&self, peers: Vec<EndpointAddr>) -> Result<()> {
self.ensure_open()?;
self.inner
.rpc(StartSyncRequest {
Expand Down
2 changes: 1 addition & 1 deletion src/api/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ impl RpcActor {
mode,
addr_options,
} = req;
let me = self.endpoint.node_addr();
let me = self.endpoint.addr();
let me = addr_options.apply(&me);

let capability = match mode {
Expand Down
49 changes: 23 additions & 26 deletions src/api/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::path::PathBuf;

use bytes::Bytes;
use iroh::NodeAddr;
use iroh::EndpointAddr;
use iroh_blobs::{api::blobs::ExportMode, Hash};
use irpc::{
channel::{mpsc, oneshot},
Expand Down Expand Up @@ -179,7 +179,7 @@ pub struct DelResponse {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct StartSyncRequest {
pub doc_id: NamespaceId,
pub peers: Vec<NodeAddr>,
pub peers: Vec<EndpointAddr>,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
Expand Down Expand Up @@ -365,7 +365,7 @@ pub enum DocsProtocol {
AuthorDelete(AuthorDeleteRequest),
}

/// Options to configure what is included in a [`iroh::NodeAddr`].
/// Options to configure what is included in a [`iroh::EndpointAddr`].
#[derive(
Copy,
Clone,
Expand Down Expand Up @@ -396,32 +396,29 @@ impl AddrInfoOptions {
/// Apply the options to the given address.
pub fn apply(
&self,
iroh::NodeAddr {
node_id,
relay_url,
direct_addresses,
}: &iroh::NodeAddr,
) -> iroh::NodeAddr {
iroh::EndpointAddr { id, addrs }: &iroh::EndpointAddr,
) -> iroh::EndpointAddr {
match self {
Self::Id => iroh::NodeAddr {
node_id: *node_id,
relay_url: None,
direct_addresses: Default::default(),
Self::Id => iroh::EndpointAddr::new(*id),
Self::Relay => iroh::EndpointAddr {
id: *id,
addrs: addrs
.iter()
.filter(|addr| matches!(addr, iroh::TransportAddr::Relay(_)))
.cloned()
.collect(),
},
Self::Relay => iroh::NodeAddr {
node_id: *node_id,
relay_url: relay_url.clone(),
direct_addresses: Default::default(),
Self::Addresses => iroh::EndpointAddr {
id: *id,
addrs: addrs
.iter()
.filter(|addr| matches!(addr, iroh::TransportAddr::Ip(_)))
.cloned()
.collect(),
},
Self::Addresses => iroh::NodeAddr {
node_id: *node_id,
relay_url: None,
direct_addresses: direct_addresses.clone(),
},
Self::RelayAndAddresses => iroh::NodeAddr {
node_id: *node_id,
relay_url: relay_url.clone(),
direct_addresses: direct_addresses.clone(),
Self::RelayAndAddresses => iroh::EndpointAddr {
id: *id,
addrs: addrs.clone(),
},
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{

use anyhow::{bail, Context, Result};
use futures_lite::{Stream, StreamExt};
use iroh::{Endpoint, NodeAddr, PublicKey};
use iroh::{Endpoint, EndpointAddr, PublicKey};
use iroh_blobs::{
api::{blobs::BlobStatus, downloader::Downloader, Store},
store::fs::options::{ProtectCb, ProtectOutcome},
Expand Down Expand Up @@ -75,7 +75,7 @@ impl Engine {
protect_cb: Option<ProtectCallbackHandler>,
) -> anyhow::Result<Self> {
let (live_actor_tx, to_live_actor_recv) = mpsc::channel(ACTOR_CHANNEL_CAP);
let me = endpoint.node_id().fmt_short().to_string();
let me = endpoint.id().fmt_short().to_string();

let content_status_cb: ContentStatusCallback = {
let blobs = bao_store.blobs().clone();
Expand Down Expand Up @@ -173,7 +173,7 @@ impl Engine {
///
/// If `peers` is non-empty, it will both do an initial set-reconciliation sync with each peer,
/// and join an iroh-gossip swarm with these peers to receive and broadcast document updates.
pub async fn start_sync(&self, namespace: NamespaceId, peers: Vec<NodeAddr>) -> Result<()> {
pub async fn start_sync(&self, namespace: NamespaceId, peers: Vec<EndpointAddr>) -> Result<()> {
let (reply, reply_rx) = oneshot::channel();
self.to_live_actor
.send(ToLiveActor::StartSync {
Expand Down
4 changes: 2 additions & 2 deletions src/engine/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::{Context, Result};
use bytes::Bytes;
use futures_lite::StreamExt;
use futures_util::FutureExt;
use iroh::NodeId;
use iroh::EndpointId;
use iroh_gossip::{
api::{Event, GossipReceiver, GossipSender, JoinOptions},
net::Gossip,
Expand Down Expand Up @@ -44,7 +44,7 @@ impl GossipState {
}
}

pub async fn join(&mut self, namespace: NamespaceId, bootstrap: Vec<NodeId>) -> Result<()> {
pub async fn join(&mut self, namespace: NamespaceId, bootstrap: Vec<EndpointId>) -> Result<()> {
match self.active.entry(namespace) {
hash_map::Entry::Occupied(mut entry) => {
if !bootstrap.is_empty() {
Expand Down
49 changes: 24 additions & 25 deletions src/engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use std::{

use anyhow::{Context, Result};
use futures_lite::FutureExt;
use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
use iroh::{
discovery::static_provider::StaticProvider, Endpoint, EndpointAddr, EndpointId, PublicKey,
};
use iroh_blobs::{
api::{
blobs::BlobStatus,
Expand Down Expand Up @@ -38,9 +40,6 @@ use crate::{
AuthorHeads, ContentStatus, NamespaceId, SignedEntry,
};

/// Name used for logging when new node addresses are added from the docs engine.
const SOURCE_NAME: &str = "docs_engine";

/// An iroh-docs operation
///
/// This is the message that is broadcast over iroh-gossip.
Expand All @@ -67,7 +66,7 @@ pub struct SyncReport {
pub enum ToLiveActor {
StartSync {
namespace: NamespaceId,
peers: Vec<NodeAddr>,
peers: Vec<EndpointAddr>,
#[debug("onsehot::Sender")]
reply: sync::oneshot::Sender<anyhow::Result<()>>,
},
Expand Down Expand Up @@ -157,6 +156,7 @@ pub struct LiveActor {
endpoint: Endpoint,
bao_store: Store,
downloader: Downloader,
static_provider: StaticProvider,
replica_events_tx: async_channel::Sender<crate::Event>,
replica_events_rx: async_channel::Receiver<crate::Event>,

Expand Down Expand Up @@ -201,12 +201,15 @@ impl LiveActor {
) -> Self {
let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
let gossip_state = GossipState::new(gossip, sync.clone(), sync_actor_tx.clone());
let static_provider = StaticProvider::new();
endpoint.discovery().add(static_provider.clone());
Self {
inbox,
sync,
replica_events_rx,
replica_events_tx,
endpoint,
static_provider,
gossip: gossip_state,
bao_store,
downloader,
Expand Down Expand Up @@ -376,7 +379,7 @@ impl LiveActor {
&endpoint,
&sync,
namespace,
NodeAddr::new(peer),
EndpointAddr::new(peer),
Some(&metrics),
)
.await;
Expand All @@ -401,7 +404,11 @@ impl LiveActor {
Ok(())
}

async fn start_sync(&mut self, namespace: NamespaceId, mut peers: Vec<NodeAddr>) -> Result<()> {
async fn start_sync(
&mut self,
namespace: NamespaceId,
mut peers: Vec<EndpointAddr>,
) -> Result<()> {
debug!(?namespace, peers = peers.len(), "start sync");
// update state to allow sync
if !self.state.is_syncing(&namespace) {
Expand All @@ -421,7 +428,7 @@ impl LiveActor {
// peers are stored as bytes, don't fail the operation if they can't be
// decoded: simply ignore the peer
match PublicKey::from_bytes(&peer_id_bytes) {
Ok(public_key) => Some(NodeAddr::new(public_key)),
Ok(public_key) => Some(EndpointAddr::new(public_key)),
Err(_signing_error) => {
warn!("potential db corruption: peers per doc can't be decoded");
None
Expand Down Expand Up @@ -459,26 +466,18 @@ impl LiveActor {
Ok(())
}

async fn join_peers(&mut self, namespace: NamespaceId, peers: Vec<NodeAddr>) -> Result<()> {
async fn join_peers(&mut self, namespace: NamespaceId, peers: Vec<EndpointAddr>) -> Result<()> {
let mut peer_ids = Vec::new();

// add addresses of peers to our endpoint address book
for peer in peers.into_iter() {
let peer_id = peer.node_id;
let peer_id = peer.id;
// adding a node address without any addressing info fails with an error,
// but we still want to include those peers because node discovery might find addresses for them
if peer.is_empty() {
peer_ids.push(peer_id)
} else {
match self.endpoint.add_node_addr_with_source(peer, SOURCE_NAME) {
Ok(()) => {
peer_ids.push(peer_id);
}
Err(err) => {
warn!(peer = %peer_id.fmt_short(), "failed to add known addrs: {err:?}");
}
}
if !peer.is_empty() {
self.static_provider.add_endpoint_info(peer);
}
peer_ids.push(peer_id);
}

// tell gossip to join
Expand Down Expand Up @@ -679,7 +678,7 @@ impl LiveActor {
async fn on_neighbor_content_ready(
&mut self,
namespace: NamespaceId,
node: NodeId,
node: EndpointId,
hash: Hash,
) {
self.start_download(namespace, hash, node, true).await;
Expand Down Expand Up @@ -826,7 +825,7 @@ impl LiveActor {
peer: PublicKey,
) -> AcceptOutcome {
self.state
.accept_request(&self.endpoint.node_id(), &namespace, peer)
.accept_request(&self.endpoint.id(), &namespace, peer)
}
}

Expand Down Expand Up @@ -898,10 +897,10 @@ struct QueuedHashes {
}

#[derive(Debug, Clone, Default)]
struct ProviderNodes(Arc<std::sync::Mutex<HashMap<Hash, HashSet<NodeId>>>>);
struct ProviderNodes(Arc<std::sync::Mutex<HashMap<Hash, HashSet<EndpointId>>>>);

impl ContentDiscovery for ProviderNodes {
fn find_providers(&self, hash: HashAndFormat) -> n0_future::stream::Boxed<NodeId> {
fn find_providers(&self, hash: HashAndFormat) -> n0_future::stream::Boxed<EndpointId> {
let nodes = self
.0
.lock()
Expand Down
Loading
Loading