Skip to content

Commit

Permalink
refactor: remove unnecessary clones and async move blocks
Browse files Browse the repository at this point in the history
It became possible after migration to async/await.
  • Loading branch information
kurnevsky committed Sep 19, 2020
1 parent 91eb563 commit e9b62ff
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 35 deletions.
2 changes: 1 addition & 1 deletion examples/dht_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn main() -> Result<(), Error> {

let stats = Stats::new();

let lan_discovery_sender =
let mut lan_discovery_sender =
LanDiscoverySender::new(tx.clone(), server_pk, local_addr.is_ipv6());

let mut server = Server::new(tx, server_pk, server_sk);
Expand Down
7 changes: 3 additions & 4 deletions examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async fn main() -> Result<(), Error> {
let socket = common::bind_socket(local_addr).await;
let stats = Stats::new();

let lan_discovery_sender = LanDiscoverySender::new(tx.clone(), dht_pk, local_addr.is_ipv6());
let mut lan_discovery_sender = LanDiscoverySender::new(tx.clone(), dht_pk, local_addr.is_ipv6());

let (tcp_incoming_tx, mut tcp_incoming_rx) = mpsc::unbounded();

Expand Down Expand Up @@ -122,10 +122,9 @@ async fn main() -> Result<(), Error> {
onion_client.add_path_node(node).await;
}

let tcp_connections_c = tcp_connections.clone();
let net_crypto_tcp_future = async move {
let net_crypto_tcp_future = async {
while let Some((packet, pk)) = net_crypto_tcp_rx.next().await {
tcp_connections_c.send_data(pk, packet).await?;
tcp_connections.send_data(pk, packet).await?;
}
Result::<(), Error>::Ok(())
};
Expand Down
13 changes: 6 additions & 7 deletions examples/onion_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ extern crate log;
use std::net::{SocketAddr, IpAddr};

use failure::Error;
use futures::future::{FutureExt};
use futures::stream::{StreamExt};
use futures::future::FutureExt;
use futures::stream::StreamExt;
use futures::sink::SinkExt;
use futures::channel::mpsc;
use tokio_util::udp::UdpFramed;
Expand Down Expand Up @@ -84,8 +84,7 @@ async fn main() -> Result<(), Error> {

let (mut sink, mut stream) = UdpFramed::new(socket, codec).split();

let client = onion_client.clone();
let network_reader = async move {
let network_reader = async {
while let Some(event) = stream.next().await {
let (packet, addr) = match event {
Ok(ev) => ev,
Expand All @@ -103,11 +102,11 @@ async fn main() -> Result<(), Error> {
Packet::OnionAnnounceResponse(packet) => {
let is_global = IsGlobal::is_global(&addr.ip());

client.handle_announce_response(&packet, is_global).await
onion_client.handle_announce_response(&packet, is_global).await
.map_err(Error::from)
},
Packet::OnionDataResponse(packet) =>
client.handle_data_response(&packet).await
onion_client.handle_data_response(&packet).await
.map_err(Error::from),
_ => Ok(()),
};
Expand All @@ -122,7 +121,7 @@ async fn main() -> Result<(), Error> {
Ok(())
};

let network_writer = async move {
let network_writer = async {
while let Some((packet, mut addr)) = rx.next().await {
if is_ipv4 && addr.is_ipv6() { continue }

Expand Down
2 changes: 1 addition & 1 deletion examples/tcp_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn main() {
let server = Server::new();

let stats = Stats::new();
let future = async move {
let future = async {
let listener = TcpListener::bind(&addr).await.unwrap();
drop(tcp_run(&server, listener, server_sk, stats, TCP_CONNECTIONS_LIMIT).await);
};
Expand Down
2 changes: 1 addition & 1 deletion tox_core/src/dht/lan_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl LanDiscoverySender {

/// Run LAN discovery periodically. Result future will never be completed
/// successfully.
pub async fn run(mut self) -> Result<(), LanDiscoveryError> {
pub async fn run(&mut self) -> Result<(), LanDiscoveryError> {
let interval = LAN_DISCOVERY_INTERVAL;
let mut wakeups = tokio::time::interval(interval);

Expand Down
38 changes: 19 additions & 19 deletions tox_core/src/friend_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl FriendConnections {
}

/// Handle the stream of found DHT `PublicKey`s.
async fn handle_dht_pk(&self, mut dht_pk_rx: mpsc::UnboundedReceiver<(PublicKey, PublicKey)>) -> Result<(), RunError> {
async fn handle_dht_pk(&self, dht_pk_rx: &mut mpsc::UnboundedReceiver<(PublicKey, PublicKey)>) -> Result<(), RunError> {
let dht = self.dht.clone();
let net_crypto = self.net_crypto.clone();
let onion_client = self.onion_client.clone();
Expand Down Expand Up @@ -230,7 +230,7 @@ impl FriendConnections {
}

/// Handle the stream of found IP addresses.
async fn handle_friend_saddr(&self, mut friend_saddr_rx: mpsc::UnboundedReceiver<PackedNode>) -> Result<(), RunError> {
async fn handle_friend_saddr(&self, friend_saddr_rx: &mut mpsc::UnboundedReceiver<PackedNode>) -> Result<(), RunError> {
let net_crypto = self.net_crypto.clone();
let friends = self.friends.clone();
while let Some(node) = friend_saddr_rx.next().await {
Expand All @@ -255,7 +255,7 @@ impl FriendConnections {
}

/// Handle the stream of connection statuses.
async fn handle_connection_status(&self, mut connnection_status_rx: mpsc::UnboundedReceiver<(PublicKey, bool)>) -> Result<(), RunError> {
async fn handle_connection_status(&self, connnection_status_rx: &mut mpsc::UnboundedReceiver<(PublicKey, bool)>) -> Result<(), RunError> {
let onion_client = self.onion_client.clone();
let friends = self.friends.clone();
let connection_status_tx = self.connection_status_tx.clone();
Expand Down Expand Up @@ -388,19 +388,19 @@ impl FriendConnections {
/// `PublicKey`, IP address and connection status updates to appropriate
/// modules.
pub async fn run(&self) -> Result<(), RunError> {
let (dht_pk_tx, dht_pk_rx) = mpsc::unbounded();
let (dht_pk_tx, mut dht_pk_rx) = mpsc::unbounded();
self.onion_client.set_dht_pk_sink(dht_pk_tx.clone()).await;
self.net_crypto.set_dht_pk_sink(dht_pk_tx).await;

let (friend_saddr_tx, friend_saddr_rx) = mpsc::unbounded();
let (friend_saddr_tx, mut friend_saddr_rx) = mpsc::unbounded();
self.dht.set_friend_saddr_sink(friend_saddr_tx).await;

let (connection_status_tx, connection_status_rx) = mpsc::unbounded();
let (connection_status_tx, mut connection_status_rx) = mpsc::unbounded();
self.net_crypto.set_connection_status_sink(connection_status_tx).await;

let dht_pk_future = self.handle_dht_pk(dht_pk_rx);
let friend_saddr_future = self.handle_friend_saddr(friend_saddr_rx);
let connection_status_future = self.handle_connection_status(connection_status_rx);
let dht_pk_future = self.handle_dht_pk(&mut dht_pk_rx);
let friend_saddr_future = self.handle_friend_saddr(&mut friend_saddr_rx);
let connection_status_future = self.handle_connection_status(&mut connection_status_rx);
let main_loop_future = self.run_main_loop();

futures::select! {
Expand Down Expand Up @@ -554,14 +554,14 @@ mod tests {
friend_connections.net_crypto.set_friend_udp_addr(friend_pk, saddr).await;

let (new_friend_dht_pk, _new_friend_dht_sk) = gen_keypair();
let (mut dht_pk_tx, dht_pk_rx) = mpsc::unbounded();
let (mut dht_pk_tx, mut dht_pk_rx) = mpsc::unbounded();
dht_pk_tx.send((friend_pk, new_friend_dht_pk)).await.unwrap();
drop(dht_pk_tx);

let delay = Duration::from_secs(1);
tokio::time::advance(delay).await;

friend_connections.handle_dht_pk(dht_pk_rx).await.unwrap();
friend_connections.handle_dht_pk(&mut dht_pk_rx).await.unwrap();

assert!(!friend_connections.dht.has_friend(&friend_dht_pk).await);
assert!(friend_connections.dht.has_friend(&new_friend_dht_pk).await);
Expand Down Expand Up @@ -598,14 +598,14 @@ mod tests {
friend_connections.friends.write().await.insert(friend_pk, friend);

let saddr = "127.0.0.1:12345".parse().unwrap();
let (mut friend_saddr_tx, friend_saddr_rx) = mpsc::unbounded();
let (mut friend_saddr_tx, mut friend_saddr_rx) = mpsc::unbounded();
friend_saddr_tx.send(PackedNode::new(saddr, &friend_dht_pk)).await.unwrap();
drop(friend_saddr_tx);

let delay = Duration::from_secs(1);
tokio::time::advance(delay).await;

friend_connections.handle_friend_saddr(friend_saddr_rx).await.unwrap();
friend_connections.handle_friend_saddr(&mut friend_saddr_rx).await.unwrap();
assert_eq!(friend_connections.net_crypto.connection_saddr(&friend_pk).await, Some(saddr));

let friend = &friend_connections.friends.read().await[&friend_pk];
Expand All @@ -631,11 +631,11 @@ mod tests {
friend_connections.friends.write().await.insert(friend_pk, friend);
friend_connections.onion_client.add_friend(friend_pk).await;

let (mut connnection_status_tx, connnection_status_rx) = mpsc::unbounded();
let (mut connnection_status_tx, mut connnection_status_rx) = mpsc::unbounded();
connnection_status_tx.send((friend_pk, true)).await.unwrap();
drop(connnection_status_tx);

friend_connections.handle_connection_status(connnection_status_rx).await.unwrap();
friend_connections.handle_connection_status(&mut connnection_status_rx).await.unwrap();

let friend = &friend_connections.friends.read().await[&friend_pk];
assert!(friend.connected);
Expand All @@ -662,11 +662,11 @@ mod tests {
friend_connections.onion_client.add_friend(friend_pk).await;
friend_connections.onion_client.set_friend_connected(friend_pk, true).await;

let (mut connnection_status_tx, connnection_status_rx) = mpsc::unbounded();
let (mut connnection_status_tx, mut connnection_status_rx) = mpsc::unbounded();
connnection_status_tx.send((friend_pk, false)).await.unwrap();
drop(connnection_status_tx);

friend_connections.handle_connection_status(connnection_status_rx).await.unwrap();
friend_connections.handle_connection_status(&mut connnection_status_rx).await.unwrap();

let friend = &friend_connections.friends.read().await[&friend_pk];
assert!(!friend.connected);
Expand Down Expand Up @@ -963,7 +963,7 @@ mod tests {

let net_crypto = friend_connections.net_crypto.clone();
let dht = friend_connections.dht.clone();
let packets_future = async move {
let packets_future = async {
dht.handle_packet(DhtPacket::CryptoHandshake(handshake), friend_saddr).await.unwrap();

let session_pk = net_crypto.get_session_pk(&friend_pk).await.unwrap();
Expand All @@ -984,7 +984,7 @@ mod tests {
assert_eq!(received_data, vec![PACKET_ID_ALIVE]);
};

let connection_status_future = async move {
let connection_status_future = async {
let packet = connection_status_rx.next().await;
let (pk, status) = packet.unwrap();
assert_eq!(pk, friend_pk);
Expand Down
2 changes: 1 addition & 1 deletion tox_core/src/onion/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ impl OnionClient {
}

/// Run periodical announcements and friends searching.
pub async fn run(self) -> Result<(), RunError> {
pub async fn run(&self) -> Result<(), RunError> {
let interval = Duration::from_secs(1);
let mut wakeups = tokio::time::interval(interval);

Expand Down
2 changes: 1 addition & 1 deletion tox_core/src/relay/client/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ impl Connections {

/// Run TCP periodical tasks. Result future will never be completed
/// successfully.
pub async fn run(self) -> Result<(), ConnectionError> {
pub async fn run(&self) -> Result<(), ConnectionError> {
let mut wakeups = tokio::time::interval(CONNECTIONS_INTERVAL);

while wakeups.next().await.is_some() {
Expand Down

0 comments on commit e9b62ff

Please sign in to comment.