Skip to content

Commit

Permalink
refactor: more async fns
Browse files Browse the repository at this point in the history
  • Loading branch information
kurnevsky committed Sep 9, 2020
1 parent e18fd41 commit 98e4c5d
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 94 deletions.
2 changes: 1 addition & 1 deletion tox_core/src/dht/server_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::pin::Pin;

use futures::{Future, FutureExt, SinkExt, StreamExt};
use futures::channel::mpsc::Receiver;
use tokio::net::{UdpSocket};
use tokio::net::UdpSocket;
use failure::Fail;

use crate::dht::codec::*;
Expand Down
126 changes: 53 additions & 73 deletions tox_core/src/friend_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,21 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use failure::Fail;
use futures::{Future, FutureExt, TryFutureExt, StreamExt, TryStreamExt, SinkExt, future};
use futures::future::Either;
use futures::{FutureExt, StreamExt, SinkExt, future};
use futures::channel::mpsc;
use parking_lot::RwLock;

use tox_binary_io::*;
use tox_crypto::*;
use crate::dht::dht_node::BAD_NODE_TIMEOUT;
use tox_packet::dht::packed_node::PackedNode;
use crate::dht::server::{Server as DhtServer};
use crate::dht::server::Server as DhtServer;
use crate::friend_connection::errors::*;
use tox_packet::friend_connection::*;
use crate::net_crypto::NetCrypto;
use crate::net_crypto::errors::KillConnectionErrorKind;
use crate::onion::client::OnionClient;
use crate::relay::client::{Connections as TcpConnections};
use crate::relay::client::Connections as TcpConnections;
use crate::time::*;

/// Shorthand for the transmit half of the message channel for sending a
Expand Down Expand Up @@ -167,26 +166,17 @@ impl FriendConnections {
}

/// Handle received `ShareRelays` packet.
pub fn handle_share_relays(&self, friend_pk: PublicKey, share_relays: ShareRelays) -> impl Future<Output = Result<(), HandleShareRelaysError>> + Send {
pub async fn handle_share_relays(&self, friend_pk: PublicKey, share_relays: ShareRelays) -> Result<(), HandleShareRelaysError> {
if let Some(friend) = self.friends.read().get(&friend_pk) {
if let Some(dht_pk) = friend.dht_pk {
let futures = share_relays.relays
.iter()
.map(|node| self.tcp_connections.add_relay_connection(node.saddr, node.pk, dht_pk))
.collect::<Vec<_>>();
Either::Left(
future::try_join_all(futures)
.map_ok(drop)
.map_err(|e|
e.context(HandleShareRelaysErrorKind::AddTcpConnection).into()
)
)
} else {
Either::Right(future::ok(()))
for node in share_relays.relays {
self.tcp_connections.add_relay_connection(node.saddr, node.pk, dht_pk).await
.map_err(|e| HandleShareRelaysError::from(e.context(HandleShareRelaysErrorKind::AddTcpConnection)))?;
}
}
} else {
Either::Right(future::ok(()))
}

Ok(())
}

/// Handle received ping packet.
Expand Down Expand Up @@ -240,70 +230,62 @@ impl FriendConnections {
}

/// Handle the stream of found IP addresses.
fn handle_friend_saddr(&self, friend_saddr_rx: mpsc::UnboundedReceiver<PackedNode>) -> impl Future<Output = Result<(), RunError>> + Send {
async fn handle_friend_saddr(&self, mut friend_saddr_rx: mpsc::UnboundedReceiver<PackedNode>) -> Result<(), RunError> {
let net_crypto = self.net_crypto.clone();
let friends = self.friends.clone();
friend_saddr_rx
.map(Ok)
.try_for_each(move |node| {
let mut friends = friends.write();
let friend = friends.values_mut()
.find(|friend| friend.dht_pk == Some(node.pk));
if let Some(friend) = friend {
friend.saddr_time = Some(clock_now());

if friend.saddr != Some(node.saddr) {
info!("Found a friend's IP address");

friend.saddr = Some(node.saddr);

net_crypto.add_connection(friend.real_pk, node.pk);
net_crypto.set_friend_udp_addr(friend.real_pk, node.saddr);
}
while let Some(node) = friend_saddr_rx.next().await {
let mut friends = friends.write();
let friend = friends.values_mut()
.find(|friend| friend.dht_pk == Some(node.pk));
if let Some(friend) = friend {
friend.saddr_time = Some(clock_now());

if friend.saddr != Some(node.saddr) {
info!("Found a friend's IP address");

friend.saddr = Some(node.saddr);

net_crypto.add_connection(friend.real_pk, node.pk);
net_crypto.set_friend_udp_addr(friend.real_pk, node.saddr);
}
}
}

future::ok(())
})
Ok(())
}

/// Handle the stream of connection statuses.
fn handle_connection_status(&self, connnection_status_rx: mpsc::UnboundedReceiver<(PublicKey, bool)>) -> impl Future<Output = Result<(), RunError>> + Send {
async fn handle_connection_status(&self, mut connnection_status_rx: mpsc::UnboundedReceiver<(PublicKey, bool)>) -> Result<(), RunError> {
let onion_client = self.onion_client.clone();
let friends = self.friends.clone();
let connection_status_tx = self.connection_status_tx.clone();
connnection_status_rx
.map(Ok)
.try_for_each(move |(real_pk, status)| {
if let Some(friend) = friends.write().get_mut(&real_pk) {
if status && !friend.connected {
info!("Connection with a friend is established");

friend.ping_received_time = Some(clock_now());
friend.ping_sent_time = None;
friend.share_relays_time = None;
} else if !status && friend.connected {
info!("Connection with a friend is lost");

// update dht_pk_time right after it went offline to enforce attemts to reconnect
friend.dht_pk_time = Some(clock_now());
}
while let Some((real_pk, status)) = connnection_status_rx.next().await {
if let Some(friend) = friends.write().get_mut(&real_pk) {
if status && !friend.connected {
info!("Connection with a friend is established");

friend.ping_received_time = Some(clock_now());
friend.ping_sent_time = None;
friend.share_relays_time = None;
} else if !status && friend.connected {
info!("Connection with a friend is lost");

// update dht_pk_time right after it went offline to enforce attemts to reconnect
friend.dht_pk_time = Some(clock_now());
}

if status != friend.connected {
friend.connected = status;
onion_client.set_friend_connected(real_pk, status);
if let Some(mut connection_status_tx) = connection_status_tx.read().clone() {
let res = async move {
connection_status_tx.send((real_pk, status)).await
.map_err(|e| e.context(RunErrorKind::SendToConnectionStatus).into())
};

return Either::Left(res);
}
if status != friend.connected {
friend.connected = status;
onion_client.set_friend_connected(real_pk, status);
if let Some(mut connection_status_tx) = connection_status_tx.read().clone() {
connection_status_tx.send((real_pk, status)).await
.map_err(|e| RunError::from(e.context(RunErrorKind::SendToConnectionStatus)))?;
}
}
}
}

Either::Right(future::ok(()))
})
Ok(())
}

/// Send some of our relays to a friend and start using these relays to
Expand Down Expand Up @@ -917,9 +899,7 @@ mod tests {
relays: vec![PackedNode::new(relay_saddr, &relay_pk)],
};

// ignore result future since it spawns the connection which should be
// executed inside tokio context
let _ = friend_connections.handle_share_relays(friend_pk, share_relays);
friend_connections.handle_share_relays(friend_pk, share_relays).await.unwrap();

assert!(friend_connections.tcp_connections.has_relay(&relay_pk));
assert!(friend_connections.tcp_connections.has_connection(&friend_dht_pk));
Expand Down
18 changes: 8 additions & 10 deletions tox_core/src/net_crypto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::time::{Duration, Instant};
use std::u16;

use failure::Fail;
use futures::{Future, TryFutureExt, StreamExt, SinkExt};
use futures::{TryFutureExt, StreamExt, SinkExt};
use futures::future;
use futures::future::Either;
use futures::channel::mpsc;
Expand All @@ -36,7 +36,7 @@ use tox_crypto::*;
use tox_packet::dht::{Packet as DhtPacket, *};
use crate::dht::precomputed_cache::*;
use crate::io_tokio::*;
use tox_packet::relay::{DataPayload as TcpDataPayload};
use tox_packet::relay::DataPayload as TcpDataPayload;
use crate::time::*;

/// Maximum size of `Packet` when we try to send it to UDP address even if
Expand Down Expand Up @@ -740,18 +740,15 @@ impl NetCrypto {

/// Send received lossless packets from the beginning of the receiving
/// buffer to lossless sink and delete them
fn process_ready_lossless_packets(&self, recv_array: &mut PacketsArray<RecvPacket>, pk: PublicKey)
-> impl Future<Output = Result<(), mpsc::SendError>> + Send {
let tx = self.lossless_tx.clone();
async fn process_ready_lossless_packets(&self, recv_array: &mut PacketsArray<RecvPacket>, pk: PublicKey)
-> Result<(), mpsc::SendError> {
let mut tx = self.lossless_tx.clone();

while let Some(packet) = recv_array.pop_front() {
let res = tx.unbounded_send((pk, packet.data))
.map_err(|e| e.into_send_error());

if res.is_err() { return future::ready(res) }
tx.send((pk, packet.data)).await?;
}

future::ok(())
Ok(())
}

/// Find the time when the last acknowledged packet was sent. This time is
Expand Down Expand Up @@ -1136,6 +1133,7 @@ impl NetCrypto {
mod tests {
// https://github.com/rust-lang/rust/issues/61520
use super::{*, Packet};
use futures::Future;

impl NetCrypto {
pub fn has_friend(&self, pk: &PublicKey) -> bool {
Expand Down
4 changes: 2 additions & 2 deletions tox_core/src/onion/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::dht::ip_port::IsGlobal;
use tox_packet::dht::packed_node::PackedNode;
use tox_packet::dht::*;
use crate::dht::request_queue::RequestQueue;
use crate::dht::server::{Server as DhtServer};
use crate::dht::server::Server as DhtServer;
use crate::dht::kbucket::*;
use tox_packet::ip_port::*;
use crate::onion::client::errors::*;
Expand All @@ -29,7 +29,7 @@ use crate::onion::client::paths_pool::*;
use crate::onion::onion_announce::initial_ping_id;
use tox_packet::onion::*;
use tox_packet::packed_node::*;
use crate::relay::client::{Connections as TcpConnections};
use crate::relay::client::Connections as TcpConnections;
use crate::time::*;
use crate::io_tokio::*;

Expand Down
4 changes: 2 additions & 2 deletions tox_core/src/onion/client/paths_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use std::time::{Duration, Instant};

use tox_crypto::*;
use tox_packet::dht::packed_node::PackedNode;
use crate::dht::server::{Server as DhtServer};
use crate::dht::server::Server as DhtServer;
use crate::onion::client::nodes_pool::*;
use crate::onion::client::onion_path::*;
use crate::time::*;
use crate::onion::client::TIME_TO_STABLE;
use crate::relay::client::{Connections as TcpConnections};
use crate::relay::client::Connections as TcpConnections;

/// Onion path is considered invalid after this number of unsuccessful attempts
/// to use it.
Expand Down
2 changes: 0 additions & 2 deletions tox_core/src/relay/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ impl Server {
handshaked client.
*/
pub fn handle_packet(&self, pk: &PublicKey, packet: Packet) -> impl Future<Output = Result<(), Error>> + Send {
// TODO: use anonymous sum types when rust has them
// https://github.com/rust-lang/rfcs/issues/294
match packet {
Packet::RouteRequest(packet) => self.handle_route_request(pk, &packet).boxed(),
Packet::RouteResponse(packet) => self.handle_route_response(pk, &packet).boxed(),
Expand Down
7 changes: 3 additions & 4 deletions tox_core/src/relay/server/server_ext.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
/*! Extension trait for run TCP server on `TcpStream` and ping sender
*/

use std::io::{Error as IoError};
use std::io::Error as IoError;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration};
use std::time::Duration;
use std::pin::Pin;

use failure::Fail;
use futures::{future, Future, FutureExt, TryFutureExt, SinkExt, StreamExt, TryStreamExt};
use futures::channel::mpsc;
use tokio::net::{TcpStream, TcpListener};
use tokio_util::codec::Framed;
use tokio::time::{Error as TimerError};
// use tokio_timer::timeout::{Error as TimeoutError};
use tokio::time::Error as TimerError;

use tox_crypto::*;
use crate::relay::codec::{DecodeError, EncodeError, Codec};
Expand Down

0 comments on commit 98e4c5d

Please sign in to comment.