-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New and improved qp2p API #206
Conversation
src/endpoint.rs
Outdated
/// Sends a message to a peer. This will attempt to re-use any existing connections | ||
/// with the said peer. If a connection doesn't exist already, a new connection will be created. | ||
pub async fn send_message(&self, msg: Bytes, dest: &SocketAddr) -> Result<()> { | ||
self.connect_to(dest).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not return the connection directly from connect_to
? Then we wouldn't have to look it up again using get_connection
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The connection object is not a part of the public API. It is held internally and handled within this crate.
src/endpoint.rs
Outdated
/// Returns a message received from another peer along with the source address | ||
pub async fn next_incoming_message(&mut self) -> Option<(SocketAddr, Bytes)> { | ||
self.message_queue.lock().await.pop_front() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, you don't need &mut self
because the message queue is behind a Mutex
. But more serious issue is that this seems like it forces us to busy-loop for incoming messages, wasting CPU cycles. Wonder why do we need the message queue at all. Why not just recv
on the message channel receiver instead? This same point applies to next_incoming_connection
and next_disconnected_peer
too.
src/connections.rs
Outdated
}; | ||
if let Some(message) = message { | ||
message_tx | ||
.send((src, message.get_message_data())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why even create the Message
enum when only the data is used here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I'm going to remove the Message enum completely.
Cargo.toml
Outdated
@@ -21,6 +21,7 @@ serde_json = "1.0.59" | |||
structopt = "~0.3.15" | |||
thiserror = "1.0.23" | |||
webpki = "~0.21.3" | |||
flexi_logger = "~0.16.1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason of moving this from dev-dependencies to here?
src/endpoint.rs
Outdated
pub fn disconnect_from(&mut self, peer_addr: &SocketAddr) -> Result<()> { | ||
if let Some((conn, remover)) = self.connection_pool.remove(peer_addr) { | ||
conn.close(0u8.into(), b"User Disconnected"); | ||
remover.remove(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You already removed the connection from the pool by calling ConnectionPool::remove
above, so there is no need to remove it via the remover too.
src/connection_pool.rs
Outdated
.is_some() | ||
} | ||
|
||
pub fn remove(&mut self, addr: &SocketAddr) -> Option<(quinn::Connection, ConnectionRemover)> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no point in returning the Remover
here. The remover is a handle to remove the connection from the pool. But you are doing that manually in this function, so the remover has no purpose (calling remove
on it would have no effect).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this removes only one connection with that address, but in some cases (*) there can be more than one. Wonder if it wouldn't make more sense to remove all of them instead?
*) if two peers open a connection to each other around the same time, they can end up having two connections to the same peer in the pool (one incoming and one outgoing).
src/endpoint.rs
Outdated
} else { | ||
Err(Error::NoEchoServiceResponse) | ||
log::info!("We are not connected to this peer. Doing nothing"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps the message should be more informative about what it was attempting to do and that it's not doing it since there is not connection, otherwise when reading the logs you cannot understand what is it that "it's not doing".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course :) I was saving log clean-up for the end 😄
973c609
to
330ddd2
Compare
Do not bind ever to 0.0.0.0
this can be done by saving the result of the echo service after calling it the first time BREAKING CHANGE
remove chat functionality
to qp2p this allows us to provide a simple API that can be used to send/receive messages
- split out the recievers out of the endpoint object - fix all tests and doc tests to work with the new API
this example is no longer relevant since the echo service is automatically handled within the qp2p crate
and port is provided manually
is created - this simplifies the Endpoint::socket_addr() API removing the need of a mutable borrow
successful bootstrap
8d2b17a
to
479bb18
Compare
src/endpoint.rs
Outdated
} | ||
|
||
/// Sends a message to a peer. This will attempt to re-use any existing connections | ||
/// with the said peer. If a connection doesn't exist already, a new connection will be created. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a connection doesn't exist already, a new connection will be created
This doesn't seem to match the implementation.
event via the streams - this also prevents the creation of a new connection when send_message is called
479bb18
to
860ea5b
Compare
… used across threads. * modifies ConnectionDeduplicator to use Arc<Mutex<_>> instead of Mutex<_> * makes ConnectionDeduplicator cloneable
This PR introduces a new and improved simple qp2p API. This also implements connection and message handling within the qp2p crate itself so that the user of qp2p does not need to manage them.