Skip to content

Commit

Permalink
fix(net_crypto): use tokio rw lock
Browse files Browse the repository at this point in the history
  • Loading branch information
kurnevsky committed Sep 12, 2020
1 parent cd3a100 commit 8cbc87f
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 194 deletions.
2 changes: 1 addition & 1 deletion examples/echo.rs
Expand Up @@ -94,7 +94,7 @@ async fn main() -> Result<(), Error> {
});

let (net_crypto_tcp_tx, mut net_crypto_tcp_rx) = mpsc::channel(32);
net_crypto.set_tcp_sink(net_crypto_tcp_tx);
net_crypto.set_tcp_sink(net_crypto_tcp_tx).await;

dht_server.set_net_crypto(net_crypto.clone());
dht_server.set_onion_client(onion_client.clone());
Expand Down
50 changes: 25 additions & 25 deletions tox_core/src/friend_connection/mod.rs
Expand Up @@ -138,7 +138,7 @@ impl FriendConnections {
if let Entry::Vacant(entry) = friends.entry(friend_pk) {
entry.insert(Friend::new(friend_pk));
self.onion_client.add_friend(friend_pk);
self.net_crypto.add_friend(friend_pk);
self.net_crypto.add_friend(friend_pk).await;
}
}

Expand All @@ -150,7 +150,7 @@ impl FriendConnections {
// TODO: handle error properly after migrating the TCP client to failure
self.tcp_connections.remove_connection(dht_pk).await.ok();
};
self.net_crypto.remove_friend(friend_pk);
self.net_crypto.remove_friend(friend_pk).await;
self.onion_client.remove_friend(friend_pk);
self.net_crypto.kill_connection(friend_pk)
.then(|res| future::ready(match res {
Expand Down Expand Up @@ -220,7 +220,7 @@ impl FriendConnections {
friend.dht_pk = Some(dht_pk);

dht.add_friend(dht_pk).await;
net_crypto.add_connection(real_pk, dht_pk);
net_crypto.add_connection(real_pk, dht_pk).await;
onion_client.set_friend_dht_pk(real_pk, dht_pk);
}
}
Expand All @@ -245,8 +245,8 @@ impl FriendConnections {

friend.saddr = Some(node.saddr);

net_crypto.add_connection(friend.real_pk, node.pk);
net_crypto.set_friend_udp_addr(friend.real_pk, node.saddr);
net_crypto.add_connection(friend.real_pk, node.pk).await;
net_crypto.set_friend_udp_addr(friend.real_pk, node.saddr).await;
}
}
}
Expand Down Expand Up @@ -352,9 +352,9 @@ impl FriendConnections {
}

if let Some(dht_pk) = friend.dht_pk {
self.net_crypto.add_connection(friend.real_pk, dht_pk);
self.net_crypto.add_connection(friend.real_pk, dht_pk).await;
if let Some(saddr) = friend.saddr {
self.net_crypto.set_friend_udp_addr(friend.real_pk, saddr);
self.net_crypto.set_friend_udp_addr(friend.real_pk, saddr).await;
}
}
}
Expand Down Expand Up @@ -390,13 +390,13 @@ impl FriendConnections {
pub async fn run(&self) -> Result<(), RunError> {
let (dht_pk_tx, dht_pk_rx) = mpsc::unbounded();
self.onion_client.set_dht_pk_sink(dht_pk_tx.clone());
self.net_crypto.set_dht_pk_sink(dht_pk_tx);
self.net_crypto.set_dht_pk_sink(dht_pk_tx).await;

let (friend_saddr_tx, friend_saddr_rx) = mpsc::unbounded();
self.dht.set_friend_saddr_sink(friend_saddr_tx).await;

let (connection_status_tx, connection_status_rx) = mpsc::unbounded();
self.net_crypto.set_connection_status_sink(connection_status_tx);
self.net_crypto.set_connection_status_sink(connection_status_tx).await;

let dht_pk_future = self.handle_dht_pk(dht_pk_rx);
let friend_saddr_future = self.handle_friend_saddr(friend_saddr_rx);
Expand Down Expand Up @@ -484,7 +484,7 @@ mod tests {
// from everywhere
friend_connections.dht.add_friend(friend_dht_pk).await;
friend_connections.onion_client.add_friend(friend_pk);
friend_connections.net_crypto.add_friend(friend_pk);
friend_connections.net_crypto.add_friend(friend_pk).await;
let (_relay_incoming_rx, _relay_outgoing_rx, relay_pk) = friend_connections.tcp_connections.add_client();
// ignore result future since it spawns the connection which should be
// executed inside tokio context
Expand All @@ -498,14 +498,14 @@ mod tests {
sent_nonce,
gen_nonce(),
session_precomputed_key.clone()
);
friend_connections.net_crypto.set_friend_udp_addr(friend_pk, saddr);
).await;
friend_connections.net_crypto.set_friend_udp_addr(friend_pk, saddr).await;

friend_connections.remove_friend(friend_pk).await.unwrap();

assert!(!friend_connections.dht.has_friend(&friend_dht_pk).await);
assert!(!friend_connections.onion_client.has_friend(&friend_pk));
assert!(!friend_connections.net_crypto.has_friend(&friend_pk));
assert!(!friend_connections.net_crypto.has_friend(&friend_pk).await);
assert!(!friend_connections.tcp_connections.has_connection(&friend_dht_pk));

let (received, _udp_rx) = udp_rx.into_future().await;
Expand Down Expand Up @@ -535,7 +535,7 @@ mod tests {
friend.saddr_time = Some(now);
friend_connections.friends.write().await.insert(friend_pk, friend);
friend_connections.dht.add_friend(friend_dht_pk).await;
friend_connections.net_crypto.add_friend(friend_pk);
friend_connections.net_crypto.add_friend(friend_pk).await;
friend_connections.onion_client.add_friend(friend_pk);
let (_relay_incoming_rx, _relay_outgoing_rx, relay_pk) = friend_connections.tcp_connections.add_client();
// ignore result future since it spawns the connection which should be
Expand All @@ -550,8 +550,8 @@ mod tests {
sent_nonce,
gen_nonce(),
session_precomputed_key.clone()
);
friend_connections.net_crypto.set_friend_udp_addr(friend_pk, saddr);
).await;
friend_connections.net_crypto.set_friend_udp_addr(friend_pk, saddr).await;

let (new_friend_dht_pk, _new_friend_dht_sk) = gen_keypair();
let (mut dht_pk_tx, dht_pk_rx) = mpsc::unbounded();
Expand All @@ -565,7 +565,7 @@ mod tests {

assert!(!friend_connections.dht.has_friend(&friend_dht_pk).await);
assert!(friend_connections.dht.has_friend(&new_friend_dht_pk).await);
assert_eq!(friend_connections.net_crypto.connection_dht_pk(&friend_pk), Some(new_friend_dht_pk));
assert_eq!(friend_connections.net_crypto.connection_dht_pk(&friend_pk).await, Some(new_friend_dht_pk));
assert_eq!(friend_connections.onion_client.friend_dht_pk(&friend_pk), Some(new_friend_dht_pk));
assert!(!friend_connections.tcp_connections.has_connection(&friend_dht_pk));

Expand Down Expand Up @@ -606,7 +606,7 @@ mod tests {
tokio::time::advance(delay).await;

friend_connections.handle_friend_saddr(friend_saddr_rx).await.unwrap();
assert_eq!(friend_connections.net_crypto.connection_saddr(&friend_pk), Some(saddr));
assert_eq!(friend_connections.net_crypto.connection_saddr(&friend_pk).await, Some(saddr));

let friend = &friend_connections.friends.read().await[&friend_pk];
assert_eq!(friend.dht_pk, Some(friend_dht_pk));
Expand Down Expand Up @@ -702,8 +702,8 @@ mod tests {
sent_nonce,
gen_nonce(),
session_precomputed_key.clone()
);
friend_connections.net_crypto.set_friend_udp_addr(friend_pk, saddr);
).await;
friend_connections.net_crypto.set_friend_udp_addr(friend_pk, saddr).await;

tokio::time::pause();
tokio::time::advance(FRIEND_CONNECTION_TIMEOUT + Duration::from_secs(1)).await;
Expand Down Expand Up @@ -748,8 +748,8 @@ mod tests {
sent_nonce,
gen_nonce(),
session_precomputed_key.clone()
);
friend_connections.net_crypto.set_friend_udp_addr(friend_pk, saddr);
).await;
friend_connections.net_crypto.set_friend_udp_addr(friend_pk, saddr).await;

let delay = Duration::from_secs(1);
tokio::time::advance(delay).await;
Expand Down Expand Up @@ -797,8 +797,8 @@ mod tests {
sent_nonce,
gen_nonce(),
session_precomputed_key.clone()
);
friend_connections.net_crypto.set_friend_udp_addr(friend_pk, saddr);
).await;
friend_connections.net_crypto.set_friend_udp_addr(friend_pk, saddr).await;

let (_relay_incoming_rx, _relay_outgoing_rx, relay_pk) = friend_connections.tcp_connections.add_client();

Expand Down Expand Up @@ -966,7 +966,7 @@ mod tests {
let packets_future = async move {
dht.handle_packet(DhtPacket::CryptoHandshake(handshake), friend_saddr).await.unwrap();

let session_pk = net_crypto.get_session_pk(&friend_pk).unwrap();
let session_pk = net_crypto.get_session_pk(&friend_pk).await.unwrap();
let session_precomputed_key = precompute(&session_pk, &friend_session_sk);

let crypto_data_payload = CryptoDataPayload {
Expand Down
1 change: 1 addition & 0 deletions tox_core/src/lib.rs
Expand Up @@ -5,6 +5,7 @@ Repo: https://github.com/tox-rs/tox
*/

#![type_length_limit="1231994"]
#![forbid(unsafe_code)]
#![doc(html_logo_url = "https://raw.githubusercontent.com/tox-rs/logo/master/logo.png")]
// Remove it when it will be fixed in nom parser
Expand Down

0 comments on commit 8cbc87f

Please sign in to comment.