Skip to content

Commit

Permalink
core/: Remove DisconnectedPeer::set_connected and Pool::add (#2195)
Browse files Browse the repository at this point in the history
This logic seems to be a leftover of
#889 and unused today.
  • Loading branch information
mxinden authored Aug 19, 2021
1 parent f2905c0 commit 1e9fcf9
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 129 deletions.
4 changes: 4 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
- Require `ConnectionHandler::{InEvent,OutEvent,Error}` to implement `Debug`
(see [PR 2183]).

- Remove `DisconnectedPeer::set_connected` and `Pool::add` (see [PR 2195]).


[PR 2145]: https://github.com/libp2p/rust-libp2p/pull/2145
[PR 2142]: https://github.com/libp2p/rust-libp2p/pull/2142
[PR 2137]: https://github.com/libp2p/rust-libp2p/pull/2137
[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183
[PR 2195]: https://github.com/libp2p/rust-libp2p/pull/2195

# 0.29.0 [2021-07-12]

Expand Down
38 changes: 2 additions & 36 deletions core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

use super::{
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
Connected, ConnectedPoint, Connection, ConnectionError, ConnectionHandler,
IntoConnectionHandler, PendingConnectionError, Substream,
Connected, ConnectedPoint, ConnectionError, ConnectionHandler, IntoConnectionHandler,
PendingConnectionError, Substream,
};
use crate::{muxing::StreamMuxer, Executor};
use fnv::FnvHashMap;
Expand Down Expand Up @@ -276,40 +276,6 @@ impl<H: IntoConnectionHandler, TE> Manager<H, TE> {
ConnectionId(task_id)
}

/// Adds an existing connection to the manager.
pub fn add<M>(&mut self, conn: Connection<M, H::Handler>, info: Connected) -> ConnectionId
where
H: IntoConnectionHandler + Send + 'static,
H::Handler: ConnectionHandler<Substream = Substream<M>> + Send + 'static,
<H::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TE: error::Error + Send + 'static,
M: StreamMuxer + Send + Sync + 'static,
M::OutboundSubstream: Send + 'static,
{
let task_id = self.next_task_id;
self.next_task_id.0 += 1;

let (tx, rx) = mpsc::channel(self.task_command_buffer_size);
self.tasks.insert(
task_id,
TaskInfo {
sender: tx,
state: TaskState::Established(info),
},
);

let task: Pin<Box<Task<Pin<Box<future::Pending<_>>>, _, _, _>>> =
Box::pin(Task::established(task_id, self.events_tx.clone(), rx, conn));

if let Some(executor) = &mut self.executor {
executor.exec(task);
} else {
self.local_spawns.push(task);
}

ConnectionId(task_id)
}

/// Gets an entry for a managed connection, if it exists.
pub fn entry(&mut self, id: ConnectionId) -> Option<Entry<'_, THandlerInEvent<H>>> {
if let hash_map::Entry::Occupied(task) = self.tasks.entry(id.0) {
Expand Down
18 changes: 0 additions & 18 deletions core/src/connection/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,24 +130,6 @@ where
},
}
}

/// Create a task for an existing node we are already connected to.
pub fn established(
id: TaskId,
events: mpsc::Sender<Event<H, E>>,
commands: mpsc::Receiver<Command<THandlerInEvent<H>>>,
connection: Connection<M, H::Handler>,
) -> Self {
Task {
id,
events,
commands: commands.fuse(),
state: State::Established {
connection,
event: None,
},
}
}
}

/// The state associated with the `Task` of a connection.
Expand Down
36 changes: 2 additions & 34 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@

use crate::{
connection::{
self,
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
manager::{self, Manager, ManagerConfig},
Connected, Connection, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit,
IncomingInfo, IntoConnectionHandler, OutgoingInfo, PendingConnectionError, Substream,
Connected, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit, IncomingInfo,
IntoConnectionHandler, OutgoingInfo, PendingConnectionError, Substream,
},
muxing::StreamMuxer,
ConnectedPoint, PeerId,
Expand Down Expand Up @@ -313,37 +312,6 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {
id
}

/// Adds an existing established connection to the pool.
///
/// Returns the assigned connection ID on success. An error is returned
/// if the configured maximum number of established connections for the
/// connected peer has been reached.
pub fn add<TMuxer>(
&mut self,
c: Connection<TMuxer, THandler::Handler>,
i: Connected,
) -> Result<ConnectionId, ConnectionLimit>
where
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler:
ConnectionHandler<Substream = connection::Substream<TMuxer>> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TTransErr: error::Error + Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
{
self.counters.check_max_established(&i.endpoint)?;
self.counters
.check_max_established_per_peer(self.num_peer_established(&i.peer_id))?;
let id = self.manager.add(c, i.clone());
self.counters.inc_established(&i.endpoint);
self.established
.entry(i.peer_id)
.or_default()
.insert(id, i.endpoint);
Ok(id)
}

/// Gets an entry representing a connection in the pool.
///
/// Returns `None` if the pool has no connection with the given ID.
Expand Down
44 changes: 3 additions & 41 deletions core/src/network/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
use super::{DialError, DialingOpts, Network};
use crate::{
connection::{
handler::THandlerInEvent, pool::Pool, Connected, ConnectedPoint, Connection,
ConnectionHandler, ConnectionId, ConnectionLimit, EstablishedConnection,
EstablishedConnectionIter, IntoConnectionHandler, PendingConnection, Substream,
handler::THandlerInEvent, pool::Pool, ConnectedPoint, ConnectionHandler, ConnectionId,
ConnectionLimit, EstablishedConnection, EstablishedConnectionIter, IntoConnectionHandler,
PendingConnection, Substream,
},
Multiaddr, PeerId, StreamMuxer, Transport,
};
Expand Down Expand Up @@ -472,44 +472,6 @@ where
pub fn into_peer(self) -> Peer<'a, TTrans, THandler> {
Peer::Disconnected(self)
}

/// Moves the peer into a connected state by supplying an existing
/// established connection.
///
/// No event is generated for this action.
///
/// # Panics
///
/// Panics if `connected.peer_id` does not identify the current peer.
pub fn set_connected<TMuxer>(
self,
connected: Connected,
connection: Connection<TMuxer, THandler::Handler>,
) -> Result<ConnectedPeer<'a, TTrans, THandler>, ConnectionLimit>
where
THandler: Send + 'static,
TTrans::Error: Send + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>> + Send,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
{
if connected.peer_id != self.peer_id {
panic!(
"Invalid peer ID given: {:?}. Expected: {:?}",
connected.peer_id, self.peer_id
)
}

self.network
.pool
.add(connection, connected)
.map(move |_id| ConnectedPeer {
network: self.network,
peer_id: self.peer_id,
})
}
}

/// The (internal) state of a `DialingAttempt`, tracking the
Expand Down

0 comments on commit 1e9fcf9

Please sign in to comment.