Skip to content

Commit

Permalink
kad tests and inturns identify tests now pass
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubDoka committed Dec 22, 2023
1 parent e54bb99 commit c616e02
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 28 deletions.
65 changes: 42 additions & 23 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use libp2p_swarm::{
};
use smallvec::SmallVec;
use std::collections::HashSet;
use std::task::Waker;
use std::{task::Context, task::Poll, time::Duration};
use tracing::Level;

Expand Down Expand Up @@ -91,6 +92,8 @@ pub struct Handler {
local_supported_protocols: SupportedProtocols,
remote_supported_protocols: HashSet<StreamProtocol>,
external_addresses: HashSet<Multiaddr>,

waker: Option<Waker>,
}

/// An event from `Behaviour` with the information requested by the `Handler`.
Expand Down Expand Up @@ -143,9 +146,28 @@ impl Handler {
remote_supported_protocols: HashSet::default(),
remote_info: Default::default(),
external_addresses,
waker: None,
}
}

fn wake(&mut self) {
if let Some(waker) = self.waker.take() {
waker.wake();
}
}

fn push_event(
&mut self,
event: ConnectionHandlerEvent<
Either<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>,
(),
Event,
>,
) {
self.events.push(event);
self.wake();
}

fn on_fully_negotiated_inbound(
&mut self,
FullyNegotiatedInbound {
Expand Down Expand Up @@ -253,17 +275,15 @@ impl Handler {
.collect::<HashSet<_>>();

if !remote_added_protocols.is_empty() {
self.events
.push(ConnectionHandlerEvent::ReportRemoteProtocols(
ProtocolSupport::Added(remote_added_protocols),
));
self.push_event(ConnectionHandlerEvent::ReportRemoteProtocols(
ProtocolSupport::Added(remote_added_protocols),
));
}

if !remote_removed_protocols.is_empty() {
self.events
.push(ConnectionHandlerEvent::ReportRemoteProtocols(
ProtocolSupport::Removed(remote_removed_protocols),
));
self.push_event(ConnectionHandlerEvent::ReportRemoteProtocols(
ProtocolSupport::Removed(remote_removed_protocols),
));
}

self.remote_supported_protocols = new_remote_protocols;
Expand Down Expand Up @@ -303,13 +323,12 @@ impl ConnectionHandler for Handler {
self.external_addresses = addresses;
}
InEvent::Push => {
self.events
.push(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)),
(),
),
});
self.push_event(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)),
(),
),
});
}
}
}
Expand Down Expand Up @@ -376,6 +395,7 @@ impl ConnectionHandler for Handler {
Poll::Pending => {}
}

self.waker = Some(cx.waker().clone());
Poll::Pending
}

Expand All @@ -396,7 +416,7 @@ impl ConnectionHandler for Handler {
self.on_fully_negotiated_outbound(fully_negotiated_outbound)
}
ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => {
self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
self.push_event(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationError(
error.map_upgrade_err(|e| void::unreachable(e.into_inner())),
),
Expand All @@ -420,13 +440,12 @@ impl ConnectionHandler for Handler {
"Supported listen protocols changed, pushing to peer"
);

self.events
.push(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)),
(),
),
});
self.push_event(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)),
(),
),
});
}
}
_ => {}
Expand Down
4 changes: 4 additions & 0 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,10 @@ where
.push_back(ToSwarm::GenerateEvent(Event::ModeChanged {
new_mode: self.mode,
}));

if let Some(waker) = self.no_events_waker.take() {
waker.wake();
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,9 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
/// the routing table with `BucketInserts::Manual`.
#[test]
fn manual_bucket_inserts() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();
let mut cfg = Config::default();
cfg.set_kbucket_inserts(BucketInserts::Manual);
// 1 -> 2 -> [3 -> ...]
Expand Down
9 changes: 7 additions & 2 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ impl Handler {

fn wake(&mut self) {
if let Some(waker) = self.waker.take() {
tracing::info!("Waking up handler");
waker.wake();
}
}
Expand All @@ -492,6 +493,7 @@ impl Handler {
if let Some(sender) = self.pending_streams.pop_front() {
let _ = sender.send(Ok(stream));
}

self.lazy_init_protocol_status();
}

Expand Down Expand Up @@ -553,7 +555,7 @@ impl Handler {
// remote is configured with the same protocol name and we want
// the behaviour to add this peer to the routing table, if possible.
self.protocol_status = Some(ProtocolStatus {
supported: false,
supported: true,
reported: false,
});
self.wake();
Expand All @@ -565,7 +567,8 @@ impl Handler {
let (sender, receiver) = oneshot::channel();

self.pending_streams.push_back(sender);
// self.wake(); // the poll is called again after this so outbound_streams is polled
// self.wake(); // the poll is called again after this so outbound_streams is polled after
// this push
let result = self.outbound_substreams.try_push(
async move {
let mut stream = receiver
Expand Down Expand Up @@ -715,6 +718,7 @@ impl ConnectionHandler for Handler {
}
}

self.wake();
self.mode = new_mode;
}
}
Expand Down Expand Up @@ -785,6 +789,7 @@ impl ConnectionHandler for Handler {
}
}

tracing::info!("Handler is pending");
self.waker = Some(cx.waker().clone());
Poll::Pending
}
Expand Down
2 changes: 1 addition & 1 deletion protocols/kad/tests/client_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ async fn set_client_to_server_mode() {
#[derive(libp2p_swarm::NetworkBehaviour)]
#[behaviour(prelude = "libp2p_swarm::derive_prelude")]
struct MyBehaviour {
identify: identify::Behaviour,
kad: Behaviour<MemoryStore>,
identify: identify::Behaviour,
}

impl MyBehaviour {
Expand Down
5 changes: 4 additions & 1 deletion swarm/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,10 @@ where

// we do polling first so that handler can update its waker
let mut hcx = handler_waker.guard();
while let Poll::Ready(event) = hcx.with(|cx| handler.poll(cx)) {
while let Poll::Ready(event) = hcx.with(|cx| {
handler_mutated = true; // edge case: handler woken up form behaviour event
handler.poll(cx)
}) {
match event {
ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
let timeout = *protocol.timeout();
Expand Down
2 changes: 1 addition & 1 deletion swarm/src/connection/supported_protocols.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ impl SupportedProtocols {
let mut changed = false;

for p in added {
changed |= self.protocols.insert(p.clone());
changed |= self.protocols.insert(p);
}

changed
Expand Down

0 comments on commit c616e02

Please sign in to comment.