Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub struct ConfigBuilder {
identify: Option<identify::Config>,

/// Kademlia protocol config.
kademlia: Option<kademlia::Config>,
kademlia: Vec<kademlia::Config>,

/// Bitswap protocol config.
bitswap: Option<bitswap::Config>,
Expand Down Expand Up @@ -149,7 +149,7 @@ impl ConfigBuilder {
keypair: None,
ping: None,
identify: None,
kademlia: None,
kademlia: Vec::new(),
bitswap: None,
mdns: None,
executor: None,
Expand Down Expand Up @@ -219,7 +219,7 @@ impl ConfigBuilder {

/// Enable IPFS Kademlia protocol.
pub fn with_libp2p_kademlia(mut self, config: kademlia::Config) -> Self {
self.kademlia = Some(config);
self.kademlia.push(config);
self
}

Expand Down Expand Up @@ -307,7 +307,7 @@ impl ConfigBuilder {
websocket: self.websocket.take(),
ping: self.ping.take(),
identify: self.identify.take(),
kademlia: self.kademlia.take(),
kademlia: self.kademlia,
bitswap: self.bitswap.take(),
max_parallel_dials: self.max_parallel_dials,
executor: self.executor.map_or(Arc::new(DefaultExecutor {}), |executor| executor),
Expand Down Expand Up @@ -349,7 +349,7 @@ pub struct Litep2pConfig {
pub(crate) identify: Option<identify::Config>,

/// Kademlia protocol configuration, if enabled.
pub(crate) kademlia: Option<kademlia::Config>,
pub(crate) kademlia: Vec<kademlia::Config>,

/// Bitswap protocol configuration, if enabled.
pub(crate) bitswap: Option<bitswap::Config>,
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ impl Litep2p {
}));
}

// start kademlia protocol event loop if enabled
if let Some(kademlia_config) = litep2p_config.kademlia.take() {
// start kademlia protocol event loops
for kademlia_config in litep2p_config.kademlia.into_iter() {
tracing::debug!(
target: LOG_TARGET,
protocol_names = ?kademlia_config.protocol_names,
Expand Down
9 changes: 8 additions & 1 deletion src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ impl Kademlia {
self.routing_table.on_connection_established(Key::from(peer), endpoint);

let Some(actions) = self.pending_dials.remove(&peer) else {
entry.insert(PeerContext::new());
// Note that we do not add peer entry if we don't have any pending actions.
// This is done to not populate `self.peers` with peers that don't support
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dq: This won't affect networks with a single DHT started? Maybe we could add a trace here in case some issues popup in the future?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking outloud: If that's the case we could maybe add a new builder method on the KadConfig to signal we run in multi-dht-worlds? And return none only on multi-dht?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't affect a single DHT network, as the entry is always inserted anyway when substream is opened. Strictly speaking, it doesn't make sense to consider transport-level connected peers as connected, because they might not speak the Kademlia protocol (even in a single DHT case). We are interested in substreams over a specific protocol.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking outloud: If that's the case we could maybe add a new builder method on the KadConfig to signal we run in multi-dht-worlds? And return none only on multi-dht?

The logic shouldn't be different for a single DHT versus multi-DHT cases.

// our Kademlia protocol.
return Ok(());
};

Expand Down Expand Up @@ -343,6 +345,7 @@ impl Kademlia {
let pending_action = &mut self
.peers
.get_mut(&peer)
// If we opened an outbound substream, we must have pending actions for the peer.
Copy link
Collaborator

@lexnv lexnv Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we get in a race case here between the following timelines?

  • T0: Pending to open outbound substream
  • T1: Outbound opened and queue for reporting
  • T2: disconnect_peer - the peer is disconnected and reported
  • T3: Outbound opened reproted now

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might happen when there was an error reading inbound request at the same time as we sent an outbound request. The worst that can happen is we won't process pending actions for a peer, but this is not much different to an error during outbound request.

As a side note, the PR doesn't change the way this race can happen.

.ok_or(Error::PeerDoesntExist(peer))?
.pending_actions
.remove(&substream_id);
Expand Down Expand Up @@ -412,6 +415,10 @@ impl Kademlia {
async fn on_inbound_substream(&mut self, peer: PeerId, substream: Substream) {
tracing::trace!(target: LOG_TARGET, ?peer, "inbound substream opened");

// Ensure peer entry exists to treat peer as [`ConnectionType::Connected`].
// when inserting into the routing table.
self.peers.entry(peer).or_default();

self.executor.read_message(peer, None, substream);
}

Expand Down
Loading