Skip to content

Commit

Permalink
refactor: use async fns
Browse files Browse the repository at this point in the history
  • Loading branch information
kurnevsky committed Sep 12, 2020
1 parent 37f6cfe commit 4a1ecea
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 41 deletions.
20 changes: 5 additions & 15 deletions tox_core/src/net_crypto/mod.rs
Expand Up @@ -27,7 +27,6 @@ use std::u16;
use failure::Fail;
use futures::{TryFutureExt, StreamExt, SinkExt};
use futures::future;
use futures::future::Either;
use futures::channel::mpsc;
use tokio::sync::RwLock;

Expand Down Expand Up @@ -938,7 +937,7 @@ impl NetCrypto {
// TODO: can backpressure be used instead of congestion control? It
// seems it's possible to implement wrapper for bounded sender with
// priority queue and just send packets there
let udp_future = if let Some(addr) = connection.get_udp_addr() {
if let Some(addr) = connection.get_udp_addr() {
if connection.is_udp_alive() {
return self.send_to_udp(addr, packet.into()).await
.map_err(|e| e.context(SendPacketErrorKind::Udp).into())
Expand All @@ -953,23 +952,14 @@ impl NetCrypto {

if udp_attempt_should_be_made {
connection.update_udp_send_attempt_time();
Either::Left(self.send_to_udp(addr, dht_packet))
} else {
Either::Right(future::ok(()))
self.send_to_udp(addr, dht_packet).await
.map_err(|e| e.context(SendPacketErrorKind::Udp))?;
}
} else {
Either::Right(future::ok(()))
};

let udp_future = udp_future
.map_err(|e| e.context(SendPacketErrorKind::Udp).into());

let tcp_tx = self.tcp_tx.read().await.clone();
let tcp_future = maybe_send_bounded(tcp_tx, (packet.into(), connection.peer_dht_pk))
.map_err(|e| e.context(SendPacketErrorKind::Tcp).into());

future::try_join(udp_future, tcp_future)
.map_ok(drop).await
maybe_send_bounded(tcp_tx, (packet.into(), connection.peer_dht_pk)).await
.map_err(|e| e.context(SendPacketErrorKind::Tcp).into())
}

/// Send `CookieRequest` or `CryptoHandshake` packet if needed depending on
Expand Down
42 changes: 16 additions & 26 deletions tox_core/src/relay/client/client.rs
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use std::time::Instant;

use failure::Fail;
use futures::{future, Future, FutureExt, TryFutureExt, StreamExt, SinkExt};
use futures::{FutureExt, TryFutureExt, StreamExt, SinkExt};
use futures::channel::mpsc;
use tokio_util::codec::Framed;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -131,10 +131,8 @@ impl Client {
}
}

fn handle_route_request(&self, _packet: &RouteRequest) -> impl Future<Output = Result<(), HandlePacketError>> + Send {
future::err(
HandlePacketErrorKind::MustNotSend.into()
)
async fn handle_route_request(&self, _packet: &RouteRequest) -> Result<(), HandlePacketError> {
Err(HandlePacketErrorKind::MustNotSend.into())
}

async fn handle_route_response(&self, packet: &RouteResponse) -> Result<(), HandlePacketError> {
Expand Down Expand Up @@ -192,28 +190,24 @@ impl Client {
)).await.map_err(|e| e.context(HandlePacketErrorKind::SendTo).into())
}

fn handle_pong_response(&self, _packet: &PongResponse) -> impl Future<Output = Result<(), HandlePacketError>> + Send {
async fn handle_pong_response(&self, _packet: &PongResponse) -> Result<(), HandlePacketError> {
// TODO check ping_id
future::ok(())
Ok(())
}

fn handle_oob_send(&self, _packet: &OobSend) -> impl Future<Output = Result<(), HandlePacketError>> + Send {
future::err(
HandlePacketErrorKind::MustNotSend.into()
)
async fn handle_oob_send(&self, _packet: &OobSend) -> Result<(), HandlePacketError> {
Err(HandlePacketErrorKind::MustNotSend.into())
}

fn handle_oob_receive(&self, packet: OobReceive) -> impl Future<Output = Result<(), HandlePacketError>> + Send {
async fn handle_oob_receive(&self, packet: OobReceive) -> Result<(), HandlePacketError> {
let mut tx = self.incoming_tx.clone();
let msg = (
self.pk,
IncomingPacket::Oob(packet.sender_pk, packet.data)
);

async move {
tx.send(msg).await
.map_err(|e| e.context(HandlePacketErrorKind::SendTo).into())
}
tx.send(msg).await
.map_err(|e| e.context(HandlePacketErrorKind::SendTo).into())
}

async fn handle_data(&self, packet: Data) -> Result<(), HandlePacketError> {
Expand All @@ -238,23 +232,19 @@ impl Client {
}
}

fn handle_onion_request(&self, _packet: &OnionRequest) -> impl Future<Output = Result<(), HandlePacketError>> + Send {
future::err(
HandlePacketErrorKind::MustNotSend.into()
)
async fn handle_onion_request(&self, _packet: &OnionRequest) -> Result<(), HandlePacketError> {
Err(HandlePacketErrorKind::MustNotSend.into())
}

fn handle_onion_response(&self, packet: OnionResponse) -> impl Future<Output = Result<(), HandlePacketError>> + Send {
async fn handle_onion_response(&self, packet: OnionResponse) -> Result<(), HandlePacketError> {
let mut tx = self.incoming_tx.clone();
let msg = (
self.pk,
IncomingPacket::Onion(packet.payload)
);

async move {
tx.send(msg).await
.map_err(|e| e.context(HandlePacketErrorKind::SendTo).into())
}
tx.send(msg).await
.map_err(|e| e.context(HandlePacketErrorKind::SendTo).into())
}

/// Spawn a connection to this TCP relay if it is not connected already. The
Expand Down Expand Up @@ -1262,7 +1252,7 @@ pub mod tests {
let client_future = client.run(client_sk_1, client_pk_1)
.map_err(Error::from);

let (server_res, client_res) = future::join(server_future.boxed(), client_future.boxed()).await;
let (server_res, client_res) = futures::join!(server_future, client_future);
assert!(server_res.is_err()); // fail to process handshake
assert!(client_res.is_err()); // fail to process handshake

Expand Down

0 comments on commit 4a1ecea

Please sign in to comment.