Skip to content

Commit

Permalink
fix(onion_client): use async mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
kurnevsky committed Sep 12, 2020
1 parent 8cbc87f commit 94cb300
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 138 deletions.
4 changes: 2 additions & 2 deletions examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async fn main() -> Result<(), Error> {
let (lossy_tx, mut lossy_rx) = mpsc::unbounded();

let (friend_request_tx, mut friend_request_sink_rx) = mpsc::unbounded();
onion_client.set_friend_request_sink(friend_request_tx);
onion_client.set_friend_request_sink(friend_request_tx).await;

let net_crypto = NetCrypto::new(NetCryptoNewArgs {
udp_tx: tx,
Expand Down Expand Up @@ -118,7 +118,7 @@ async fn main() -> Result<(), Error> {
let node = PackedNode::new(saddr.parse().unwrap(), &bootstrap_pk);

dht_server.add_initial_bootstrap(node);
onion_client.add_path_node(node);
onion_client.add_path_node(node).await;
}

let tcp_connections_c = tcp_connections.clone();
Expand Down
89 changes: 38 additions & 51 deletions examples/onion_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,41 @@ fn load_keypair() -> (PublicKey, SecretKey) {
(real_pk, real_sk)
}

async fn main_task(
local_addr: SocketAddr,
onion_client: OnionClient,
mut net_rx: mpsc::Receiver<(tox_packet::dht::Packet, SocketAddr)>,
dht_pk_rx: mpsc::UnboundedReceiver<(PublicKey, tox_crypto::curve25519xsalsa20poly1305::PublicKey)>,
) {
#[tokio::main]
async fn main() -> Result<(), Error> {
env_logger::init();

let (dht_pk, dht_sk) = gen_keypair();
let (real_pk, real_sk) = load_keypair();

// Create a channel for server to communicate with network
let (tx, mut rx) = mpsc::channel(32);

let local_addr: SocketAddr = "0.0.0.0:33445".parse().unwrap(); // 0.0.0.0 for IPv4
// let local_addr: SocketAddr = "[::]:33445".parse().unwrap(); // [::] for IPv6

let is_ipv4 = local_addr.is_ipv4();

let (dht_pk_tx, dht_pk_rx) = mpsc::unbounded();
let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();

let dht_server = Server::new(tx, dht_pk, dht_sk.clone());
let tcp_connections = Connections::new(dht_pk, dht_sk, tcp_incoming_tx);
let onion_client = OnionClient::new(dht_server, tcp_connections, real_sk, real_pk);

onion_client.set_dht_pk_sink(dht_pk_tx).await;

for &(pk, saddr) in &common::BOOTSTRAP_NODES {
let node = as_packed_node(pk, saddr);

onion_client.add_path_node(node).await;
}

let friend_pk_bytes: [u8; 32] = hex::FromHex::from_hex(FRIEND_PK).unwrap();
let friend_pk = PublicKey::from_slice(&friend_pk_bytes).unwrap();

onion_client.add_friend(friend_pk).await;

let socket = common::bind_socket(local_addr).await;
let stats = Stats::new();
let codec = DhtCodec::new(stats);
Expand Down Expand Up @@ -95,7 +123,7 @@ async fn main_task(
};

let network_writer = async move {
while let Some((packet, mut addr)) = net_rx.next().await {
while let Some((packet, mut addr)) = rx.next().await {
if is_ipv4 && addr.is_ipv6() { continue }

if !is_ipv4 {
Expand All @@ -122,54 +150,13 @@ async fn main_task(
futures::future::ready(())
});

let res = futures::select! {
info!("Running onion client on {}", local_addr);

futures::select! {
res = network_reader.fuse() => res,
res = network_writer.fuse() => res,
res = onion_client.run().fuse() =>
res.map_err(Error::from),
res = dht_pk_future.fuse() => Ok(res),
};

if let Err(ref e) = res {
error!("Processing ended with error: {:?}", e);
}
}

fn main() {
env_logger::init();

let (dht_pk, dht_sk) = gen_keypair();
let (real_pk, real_sk) = load_keypair();

// Create a channel for server to communicate with network
let (tx, rx) = mpsc::channel(32);

let local_addr: SocketAddr = "0.0.0.0:33445".parse().unwrap(); // 0.0.0.0 for IPv4
// let local_addr: SocketAddr = "[::]:33445".parse().unwrap(); // [::] for IPv6

let (dht_pk_tx, dht_pk_rx) = mpsc::unbounded();
let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();

let dht_server = Server::new(tx, dht_pk, dht_sk.clone());
let tcp_connections = Connections::new(dht_pk, dht_sk, tcp_incoming_tx);
let onion_client = OnionClient::new(dht_server, tcp_connections, real_sk, real_pk);

onion_client.set_dht_pk_sink(dht_pk_tx);

for &(pk, saddr) in &common::BOOTSTRAP_NODES {
let node = as_packed_node(pk, saddr);

onion_client.add_path_node(node);
}

let friend_pk_bytes: [u8; 32] = hex::FromHex::from_hex(FRIEND_PK).unwrap();
let friend_pk = PublicKey::from_slice(&friend_pk_bytes).unwrap();

onion_client.add_friend(friend_pk);

info!("Running onion client on {}", local_addr);
let main_fut = main_task(local_addr, onion_client, rx, dht_pk_rx);

let mut runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(main_fut);
}
28 changes: 14 additions & 14 deletions tox_core/src/friend_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl FriendConnections {
let mut friends = self.friends.write().await;
if let Entry::Vacant(entry) = friends.entry(friend_pk) {
entry.insert(Friend::new(friend_pk));
self.onion_client.add_friend(friend_pk);
self.onion_client.add_friend(friend_pk).await;
self.net_crypto.add_friend(friend_pk).await;
}
}
Expand All @@ -151,7 +151,7 @@ impl FriendConnections {
self.tcp_connections.remove_connection(dht_pk).await.ok();
};
self.net_crypto.remove_friend(friend_pk).await;
self.onion_client.remove_friend(friend_pk);
self.onion_client.remove_friend(friend_pk).await;
self.net_crypto.kill_connection(friend_pk)
.then(|res| future::ready(match res {
Err(ref e)
Expand Down Expand Up @@ -221,7 +221,7 @@ impl FriendConnections {

dht.add_friend(dht_pk).await;
net_crypto.add_connection(real_pk, dht_pk).await;
onion_client.set_friend_dht_pk(real_pk, dht_pk);
onion_client.set_friend_dht_pk(real_pk, dht_pk).await;
}
}
}
Expand Down Expand Up @@ -276,7 +276,7 @@ impl FriendConnections {

if status != friend.connected {
friend.connected = status;
onion_client.set_friend_connected(real_pk, status);
onion_client.set_friend_connected(real_pk, status).await;
if let Some(mut connection_status_tx) = connection_status_tx.read().await.clone() {
connection_status_tx.send((real_pk, status)).await
.map_err(|e| RunError::from(e.context(RunErrorKind::SendToConnectionStatus)))?;
Expand Down Expand Up @@ -389,7 +389,7 @@ impl FriendConnections {
/// modules.
pub async fn run(&self) -> Result<(), RunError> {
let (dht_pk_tx, dht_pk_rx) = mpsc::unbounded();
self.onion_client.set_dht_pk_sink(dht_pk_tx.clone());
self.onion_client.set_dht_pk_sink(dht_pk_tx.clone()).await;
self.net_crypto.set_dht_pk_sink(dht_pk_tx).await;

let (friend_saddr_tx, friend_saddr_rx) = mpsc::unbounded();
Expand Down Expand Up @@ -483,7 +483,7 @@ mod tests {
// add friend to all modules to check later that it will be deleted
// from everywhere
friend_connections.dht.add_friend(friend_dht_pk).await;
friend_connections.onion_client.add_friend(friend_pk);
friend_connections.onion_client.add_friend(friend_pk).await;
friend_connections.net_crypto.add_friend(friend_pk).await;
let (_relay_incoming_rx, _relay_outgoing_rx, relay_pk) = friend_connections.tcp_connections.add_client();
// ignore result future since it spawns the connection which should be
Expand All @@ -504,7 +504,7 @@ mod tests {
friend_connections.remove_friend(friend_pk).await.unwrap();

assert!(!friend_connections.dht.has_friend(&friend_dht_pk).await);
assert!(!friend_connections.onion_client.has_friend(&friend_pk));
assert!(!friend_connections.onion_client.has_friend(&friend_pk).await);
assert!(!friend_connections.net_crypto.has_friend(&friend_pk).await);
assert!(!friend_connections.tcp_connections.has_connection(&friend_dht_pk));

Expand Down Expand Up @@ -536,7 +536,7 @@ mod tests {
friend_connections.friends.write().await.insert(friend_pk, friend);
friend_connections.dht.add_friend(friend_dht_pk).await;
friend_connections.net_crypto.add_friend(friend_pk).await;
friend_connections.onion_client.add_friend(friend_pk);
friend_connections.onion_client.add_friend(friend_pk).await;
let (_relay_incoming_rx, _relay_outgoing_rx, relay_pk) = friend_connections.tcp_connections.add_client();
// ignore result future since it spawns the connection which should be
// executed inside tokio context
Expand Down Expand Up @@ -566,7 +566,7 @@ mod tests {
assert!(!friend_connections.dht.has_friend(&friend_dht_pk).await);
assert!(friend_connections.dht.has_friend(&new_friend_dht_pk).await);
assert_eq!(friend_connections.net_crypto.connection_dht_pk(&friend_pk).await, Some(new_friend_dht_pk));
assert_eq!(friend_connections.onion_client.friend_dht_pk(&friend_pk), Some(new_friend_dht_pk));
assert_eq!(friend_connections.onion_client.friend_dht_pk(&friend_pk).await, Some(new_friend_dht_pk));
assert!(!friend_connections.tcp_connections.has_connection(&friend_dht_pk));

let friend = &friend_connections.friends.read().await[&friend_pk];
Expand Down Expand Up @@ -629,7 +629,7 @@ mod tests {
friend.ping_sent_time = Some(now);
friend.share_relays_time = Some(now);
friend_connections.friends.write().await.insert(friend_pk, friend);
friend_connections.onion_client.add_friend(friend_pk);
friend_connections.onion_client.add_friend(friend_pk).await;

let (mut connnection_status_tx, connnection_status_rx) = mpsc::unbounded();
connnection_status_tx.send((friend_pk, true)).await.unwrap();
Expand All @@ -643,7 +643,7 @@ mod tests {
assert!(friend.ping_sent_time.is_none());
assert!(friend.share_relays_time.is_none());

assert!(friend_connections.onion_client.is_friend_connected(&friend_pk));
assert!(friend_connections.onion_client.is_friend_connected(&friend_pk).await);
}

#[tokio::test]
Expand All @@ -659,8 +659,8 @@ mod tests {
friend.dht_pk_time = Some(now);
friend.connected = true;
friend_connections.friends.write().await.insert(friend_pk, friend);
friend_connections.onion_client.add_friend(friend_pk);
friend_connections.onion_client.set_friend_connected(friend_pk, true);
friend_connections.onion_client.add_friend(friend_pk).await;
friend_connections.onion_client.set_friend_connected(friend_pk, true).await;

let (mut connnection_status_tx, connnection_status_rx) = mpsc::unbounded();
connnection_status_tx.send((friend_pk, false)).await.unwrap();
Expand All @@ -672,7 +672,7 @@ mod tests {
assert!(!friend.connected);
assert_eq!(friend.dht_pk_time, Some(now));

assert!(!friend_connections.onion_client.is_friend_connected(&friend_pk));
assert!(!friend_connections.onion_client.is_friend_connected(&friend_pk).await);
}

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion tox_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Repo: https://github.com/tox-rs/tox
*/

#![type_length_limit="1231994"]
#![type_length_limit="2097152"]
#![forbid(unsafe_code)]
#![doc(html_logo_url = "https://raw.githubusercontent.com/tox-rs/logo/master/logo.png")]
// Remove it when it will be fixed in nom parser
Expand Down
Loading

0 comments on commit 94cb300

Please sign in to comment.