diff --git a/examples/dht_node_new.rs b/examples/dht_node_new.rs new file mode 100644 index 000000000..85a6e0de0 --- /dev/null +++ b/examples/dht_node_new.rs @@ -0,0 +1,114 @@ +/* + Copyright (C) 2013 Tox project All Rights Reserved. + Copyright © 2017 Zetok Zalbavar + Copyright © 2018 Namsoo CHO + + This file is part of Tox. + + Tox is libre software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + Tox is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with Tox. If not, see . +*/ + +// an example of DHT node with current code +// +extern crate tox; +extern crate futures; +extern crate tokio_core; +extern crate tokio_io; +extern crate tokio_timer; + +#[macro_use] +extern crate log; +extern crate env_logger; + +use futures::*; +use futures::sync::mpsc; +use tokio_core::reactor::Core; +use tokio_core::net::UdpSocket; +//use tokio_timer; + +use std::cell::RefCell; +use std::net::SocketAddr; +use std::rc::Rc; +use std::io::{ErrorKind, Error}; +use std::time; + +use tox::toxcore::dht_new::codec::*; +use tox::toxcore::dht_new::server::*; +use tox::toxcore::crypto_core::*; + +fn main() { + env_logger::init(); + + if !crypto_init() { + error!("Crypto initialization failed."); + return; + } + + let (pk, sk) = gen_keypair(); + + let local: SocketAddr = "127.0.0.1:12345".parse().unwrap(); + + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + // Bind a UDP listener to the socket address. + let listener = UdpSocket::bind(&local, &handle).unwrap(); + + // Create a channel for this socket + let (tx, rx) = mpsc::unbounded::(); + let server = Rc::new(RefCell::new(Server::new(tx, pk, sk))); + + let (sink, stream) = listener.framed(DhtCodec).split(); + // The server task asynchronously iterates over and processes each + // incoming packet. + let handler = stream.for_each(move |(addr, packet)| { + let _ = server.borrow_mut().handle_packet((addr, packet.unwrap())); + Ok(()) + }) + .map_err(|err| { + // All tasks must have an `Error` type of `()`. This forces error + // handling and helps avoid silencing failures. + // + error!("packet receive error = {:?}", err); + Error::new(ErrorKind::Other, "udp receive error") + }); + + let writer_timer = tokio_timer::wheel() + .tick_duration(time::Duration::from_secs(1)) + .build() + ; + + let writer = rx + .map_err(|_| Error::new(ErrorKind::Other, "rx error")) + .fold(sink, move |sink, packet| { + debug!("Send {:?} => {:?}", packet.0, packet.1); + let sending_future = sink.send(packet); + let duration = time::Duration::from_secs(30); + let timeout = writer_timer.timeout(sending_future, duration); + timeout + }) + // drop sink when rx stream is exhausted + .map(|_sink| ()) + ; + + let server = writer.select(handler) + .map(|_| ()) + .map_err(move |(err, _select_next)| { + error!("Processing ended with error: {:?}", err); + err + }); + + info!("server running on localhost:12345"); + core.run(server).unwrap(); +} diff --git a/src/toxcore/dht_new/client.rs b/src/toxcore/dht_new/client.rs new file mode 100644 index 000000000..4dae59384 --- /dev/null +++ b/src/toxcore/dht_new/client.rs @@ -0,0 +1,257 @@ +/* + Copyright © 2017 Zetok Zalbavar + Copyright © 2018 Namsoo CHO + + This file is part of Tox. + + Tox is libre software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + Tox is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with Tox. If not, see . +*/ + + +/*! +The object of this struct is an object per a peer. +*/ + +use futures::{Sink, Future}; +use futures::sync::mpsc; +use tokio_io::IoFuture; + +//use std::collections::VecDeque; +use std::io::{ErrorKind, Error}; +use std::net::SocketAddr; + +use toxcore::crypto_core::*; +use toxcore::dht_new::packet::*; +use toxcore::dht_new::codec::*; + +/// Shorthand for the transmit half of the message channel. +type Tx = mpsc::UnboundedSender; + +/// peer info. +#[derive(Clone, Debug)] +pub struct Client { + /// Public key of peer + pub pk: PublicKey, + /// socket address of peer + pub addr: SocketAddr, + /// last sent ping_id to check PingResponse is correct + pub ping_id: u64, + /// precomputed key for this peer + pub precomputed_key: PrecomputedKey, + /// shaed mpsc tx part + pub tx: Tx, +} + +impl Client { + /// create Client object + pub fn new(precomputed_key: PrecomputedKey, pk: PublicKey, addr: SocketAddr, tx: Tx) -> Client { + Client { + pk: pk, + addr: addr, + precomputed_key: precomputed_key, + ping_id: 0, + tx: tx, + } + } + /// actual send method + pub fn send_to(&self, addr: SocketAddr, packet: DhtPacket) -> IoFuture<()> { + Box::new(self.tx.clone() // clone tx sender for 1 send only + .send((addr, packet)) + .map(|_tx| ()) // ignore tx because it was cloned + .map_err(|e| { + // This may only happen if rx is gone + // So cast SendError to a corresponding std::io::Error + error!("send to peer error {:?}", e); + Error::from(ErrorKind::UnexpectedEof) + }) + ) + } + fn send(&self, packet: DhtPacket) -> IoFuture<()> { + self.send_to(self.addr.clone(), packet) + } + /// respond with PingResponse to peer + pub fn send_ping_response(&self, resp_payload: PingResponsePayload) -> IoFuture<()> { + let ping_resp = DhtPacket::PingResponse(PingResponse::new(&self.precomputed_key.clone(), &self.pk, resp_payload)); + debug!("PingResp made {:?}", ping_resp); + self.send(ping_resp) + } + /// respond with NodesResponse to peer + pub fn send_nodes_response(&self, resp_payload: NodesResponsePayload) -> IoFuture<()> { + let nodes_resp = DhtPacket::NodesResponse(NodesResponse::new(&self.precomputed_key.clone(), &self.pk, resp_payload)); + debug!("NodesResp made {:?}", nodes_resp); + self.send(nodes_resp) + } + /// respond with NatPingResponse to peer + pub fn send_nat_ping_response(&self, rpk: &PublicKey, resp_payload: NatPingResponse) -> IoFuture<()> { + let payload = DhtRequestPayload::NatPingResponse(resp_payload); + let nat_ping_resp = DhtPacket::DhtRequest(DhtRequest::new(&self.precomputed_key.clone(), rpk, &self.pk, payload)); + self.send(nat_ping_resp) + } + /// send NatPingRequest/NatPingResponse to target peer + pub fn send_nat_ping_packet(&self, addr: &SocketAddr, request: DhtRequest) -> IoFuture<()> { + let packet = DhtPacket::DhtRequest(request); + self.send_to(*addr, packet) + } + /// create and send PingRequest packet + pub fn send_ping_request(&mut self) -> IoFuture<()> { + let payload = PingRequestPayload{ id: random_u64(), }; + self.ping_id = payload.id; + let ping_req = DhtPacket::PingRequest(PingRequest::new(&self.precomputed_key.clone(), &self.pk, payload)); + self.send(ping_req) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::*; + + use std::net::SocketAddr; + use toxcore::dht_new::packed_node::*; + + fn create_client() -> (Client, SecretKey, mpsc::UnboundedReceiver) { + let addr: SocketAddr = "127.0.0.1:12345".parse().unwrap(); + let (tx, rx) = mpsc::unbounded(); + let (alice_pk, alice_sk) = gen_keypair(); + let (bob_pk, bob_sk) = gen_keypair(); + let precomp = precompute(&alice_pk, &bob_sk); + let client = Client::new(precomp, bob_pk, addr, tx); + (client, alice_sk, rx) + } + // send() + #[test] + fn client_send_test() { + let (client, _sk, rx) = create_client(); + let payload = PingRequestPayload { id: random_u64() }; + let packet = DhtPacket::PingRequest(PingRequest::new(&client.precomputed_key.clone(), &client.pk, payload)); + client.send(packet.clone()).wait().unwrap(); + let rx = + if let (Some((_addr, received_packet)), rx1) = rx.into_future().wait().unwrap() { + assert_eq!(packet, received_packet); + rx1 + } else { + unreachable!("can not occur"); + }; + drop(rx); + assert!(!client.send(packet).wait().is_ok()); + } + // send_to() + #[test] + fn client_send_to_test() { + let (client, _sk, rx) = create_client(); + let payload = PingRequestPayload { id: random_u64() }; + let packet = DhtPacket::PingRequest(PingRequest::new(&client.precomputed_key.clone(), &client.pk, payload)); + client.send_to(client.addr, packet.clone()).wait().unwrap(); + if let (Some((_addr, received_packet)), _rx) = rx.into_future().wait().unwrap() { + assert_eq!(packet, received_packet); + } else { + unreachable!("can not occur"); + } + } + // send_ping_response() + #[test] + fn client_send_ping_response_test() { + let (client, sk, rx) = create_client(); + let payload = PingResponsePayload { id: random_u64() }; + client.send_ping_response(payload).wait().unwrap(); + if let (Some((_addr, received_packet)), _rx) = rx.into_future().wait().unwrap() { + if let DhtPacket::PingResponse(packet) = received_packet { + let ping_resp_payload = packet.get_payload(&sk).unwrap(); + assert_eq!(ping_resp_payload.id, payload.id); + } else { + unreachable!("can not occur"); + } + } else { + unreachable!("can not occur"); + } + } + // send_nodes_response() + #[test] + fn client_send_nodes_response_test() { + let (client, sk, rx) = create_client(); + let payload = NodesResponsePayload { nodes: vec![ + PackedNode::new(false, SocketAddr::V4("127.0.0.1:12345".parse().unwrap()), &gen_keypair().0) + ], id: 38 }; + client.send_nodes_response(payload.clone()).wait().unwrap(); + if let (Some((_addr, received_packet)), _rx) = rx.into_future().wait().unwrap() { + if let DhtPacket::NodesResponse(packet) = received_packet { + let nodes_resp_payload = packet.get_payload(&sk).unwrap(); + assert_eq!(nodes_resp_payload.id, payload.id); + assert_eq!(nodes_resp_payload.nodes, payload.nodes); + } else { + unreachable!("can not occur"); + } + } else { + unreachable!("can not occur"); + } + } + // send_nat_ping_response() + #[test] + fn client_send_nat_ping_response_test() { + let (client, sk, rx) = create_client(); + let payload = NatPingResponse { id: random_u64() }; + client.send_nat_ping_response(&client.pk, payload).wait().unwrap(); + if let (Some((_addr, received_packet)), _rx) = rx.into_future().wait().unwrap() { + if let DhtPacket::DhtRequest(packet) = received_packet { + if let DhtRequestPayload::NatPingResponse(nat_ping_resp_payload) = packet.get_payload(&sk).unwrap() { + assert_eq!(nat_ping_resp_payload.id, payload.id); + } else { + unreachable!("can not occur"); + } + } else { + unreachable!("can not occur"); + } + } else { + unreachable!("can not occur"); + } + } + // send_nat_ping_packet() + #[test] + fn client_send_nat_ping_packet_test() { + let (client, sk, rx) = create_client(); + let nat_res = NatPingResponse { id: random_u64() }; + let nat_payload = DhtRequestPayload::NatPingResponse(nat_res); + let dht_req = DhtRequest::new(&client.precomputed_key, &client.pk, &client.pk, nat_payload.clone()); + client.send_nat_ping_packet(&client.addr, dht_req).wait().unwrap(); + if let (Some((_addr, received_packet)), _rx) = rx.into_future().wait().unwrap() { + if let DhtPacket::DhtRequest(packet) = received_packet { + if let DhtRequestPayload::NatPingResponse(nat_ping_resp_payload) = packet.get_payload(&sk).unwrap() { + assert_eq!(nat_ping_resp_payload.id, nat_res.id); + } else { + unreachable!("can not occur"); + } + } else { + unreachable!("can not occur"); + } + } else { + unreachable!("can not occur"); + } + } + // send_ping_request() + #[test] + fn client_send_ping_request_test() { + let (mut client, sk, rx) = create_client(); + client.send_ping_request().wait().unwrap(); + if let (Some((_addr, received_packet)), _rx) = rx.into_future().wait().unwrap() { + if let DhtPacket::PingRequest(packet) = received_packet { + let ping_req_payload = packet.get_payload(&sk).unwrap(); + assert_eq!(ping_req_payload.id, client.ping_id); + } else { + unreachable!("can not occur"); + } + } else { + unreachable!("can not occur"); + } + } +} diff --git a/src/toxcore/dht_new/codec.rs b/src/toxcore/dht_new/codec.rs index 8fd06e374..2fbf28522 100644 --- a/src/toxcore/dht_new/codec.rs +++ b/src/toxcore/dht_new/codec.rs @@ -54,6 +54,7 @@ Because size of SendNodes is largest in DHT related packets pub const MAX_DHT_PACKET_SIZE: usize = 512; /// Struct to use for {de-,}serializing DHT UDP packets. +#[derive(Clone, Debug, Eq, PartialEq)] pub struct DhtCodec; impl UdpCodec for DhtCodec { diff --git a/src/toxcore/dht_new/kbucket.rs b/src/toxcore/dht_new/kbucket.rs index 59d638287..290615ca6 100644 --- a/src/toxcore/dht_new/kbucket.rs +++ b/src/toxcore/dht_new/kbucket.rs @@ -42,6 +42,7 @@ PK; and additionally used to store nodes closest to friends. use toxcore::crypto_core::*; use toxcore::dht_new::packed_node::PackedNode; use std::cmp::{Ord, Ordering}; +use std::net::SocketAddr; /** Calculate the [`k-bucket`](./struct.Kbucket.html) index of a PK compared to "own" PK. @@ -108,9 +109,9 @@ PK; and additionally used to store nodes closest to friends. #[derive(Clone, Debug, Eq, PartialEq)] pub struct Bucket { /// Amount of nodes it can hold. - capacity: u8, + pub capacity: u8, /// Nodes that bucket has, sorted by distance to PK. - nodes: Vec + pub nodes: Vec } /// Default number of nodes that bucket can hold. @@ -145,7 +146,6 @@ impl Bucket { } } - #[cfg(test)] fn find(&self, pk: &PublicKey) -> Option { for (n, node) in self.nodes.iter().enumerate() { if &node.pk == pk { @@ -301,7 +301,7 @@ pub struct Kbucket { pk: PublicKey, /// List of [`Bucket`](./struct.Bucket.html)s. - buckets: Vec>, + pub buckets: Vec>, } /** Maximum number of [`Bucket`](./struct.Bucket.html)s that [`Kbucket`] @@ -335,6 +335,7 @@ impl Kbucket { self.buckets.len() as u8 } + /// find peer which has pk #[cfg(test)] fn find(&self, pk: &PublicKey) -> Option<(usize, usize)> { for (bucket_index, bucket) in self.buckets.iter().enumerate() { @@ -346,6 +347,17 @@ impl Kbucket { None } + /// find peer which has pk + pub fn get_node(&self, pk: &PublicKey) -> Option { + for (_, bucket) in self.buckets.iter().enumerate() { + match bucket.find(pk) { + None => {}, + Some(node_index) => return Some((*bucket).nodes[node_index].saddr.clone()) + } + } + None + } + /** Return the possible internal index of [`Bucket`](./struct.Bucket.html) where the key could be inserted/removed. diff --git a/src/toxcore/dht_new/mod.rs b/src/toxcore/dht_new/mod.rs index 6b557e058..9e3eab27d 100644 --- a/src/toxcore/dht_new/mod.rs +++ b/src/toxcore/dht_new/mod.rs @@ -27,3 +27,5 @@ pub mod packet; pub mod kbucket; pub mod packed_node; pub mod codec; +pub mod server; +pub mod client; diff --git a/src/toxcore/dht_new/packed_node.rs b/src/toxcore/dht_new/packed_node.rs index 268e91d53..2728cfb45 100644 --- a/src/toxcore/dht_new/packed_node.rs +++ b/src/toxcore/dht_new/packed_node.rs @@ -63,7 +63,7 @@ solely on the UDP. #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub struct PackedNode { /// Socket addr of node. - saddr: SocketAddr, + pub saddr: SocketAddr, /// Public Key of the node. pub pk: PublicKey, } @@ -154,7 +154,7 @@ mod tests { use super::*; use std::net::{SocketAddrV4, SocketAddrV6}; - use quickcheck::{Arbitrary, Gen}; + use quickcheck::{Arbitrary, Gen, quickcheck}; // PackedNode:: /// Valid, random `PackedNode`. @@ -180,4 +180,51 @@ mod tests { } } } + + #[test] + fn packed_node_new_test() { + fn with_params(saddr: SocketAddr) { + let (pk, _sk) = gen_keypair(); + + let a = PackedNode::new(true, saddr.clone(), &pk.clone()); + let b = PackedNode { + saddr: saddr, + pk: pk, + }; + assert_eq!(a, b); + } + quickcheck(with_params as fn(SocketAddr)); + } + #[test] + fn packed_node_ip_type_test() { + fn with_packed_node(pnode: PackedNode) { + let a = pnode.ip_type(); + let b = + if pnode.saddr.is_ipv4() { + 2 + } else { + 10 + }; + assert_eq!(a, b); + } + quickcheck(with_packed_node as fn(PackedNode)); + } + #[test] + fn packed_node_ip_test() { + fn with_packed_node(pnode: PackedNode) { + let a = pnode.ip(); + let b = pnode.saddr.ip(); + assert_eq!(a, b); + } + quickcheck(with_packed_node as fn(PackedNode)); + } + #[test] + fn packed_node_socket_addr_test() { + fn with_packed_node(pnode: PackedNode) { + let a = pnode.socket_addr(); + let b = pnode.saddr; + assert_eq!(a, b); + } + quickcheck(with_packed_node as fn(PackedNode)); + } } diff --git a/src/toxcore/dht_new/packet.rs b/src/toxcore/dht_new/packet.rs index 513ebce9f..7312fdd0d 100644 --- a/src/toxcore/dht_new/packet.rs +++ b/src/toxcore/dht_new/packet.rs @@ -33,6 +33,7 @@ use toxcore::binary_io::*; use toxcore::crypto_core::*; use toxcore::dht_new::packed_node::PackedNode; use toxcore::onion::packet::*; +use toxcore::dht_new::codec::*; /// Length in bytes of [`PingRequest`](./struct.PingRequest.html) and /// [`PingResponse`](./struct.PingResponse.html) when serialized into bytes. @@ -172,6 +173,19 @@ impl FromBytes for PingRequest { } impl PingRequest { + /// create new PingRequest object + pub fn new(shared_secret: &PrecomputedKey, pk: &PublicKey, payload: PingRequestPayload) -> PingRequest { + let nonce = &gen_nonce(); + let mut buf = [0; MAX_DHT_PACKET_SIZE]; + let (_, size) = payload.to_bytes((&mut buf, 0)).unwrap(); + let payload = seal_precomputed(&buf[..size] , nonce, shared_secret); + + PingRequest { + pk: *pk, + nonce: *nonce, + payload: payload, + } + } /** Decrypt payload and try to parse it as `PingRequestPayload`. Returns `Error` in case of failure: @@ -291,6 +305,19 @@ impl FromBytes for PingResponse { } impl PingResponse { + /// create new PingResponse object + pub fn new(shared_secret: &PrecomputedKey, pk: &PublicKey, payload: PingResponsePayload) -> PingResponse { + let nonce = &gen_nonce(); + let mut buf = [0; MAX_DHT_PACKET_SIZE]; + let (_, size) = payload.to_bytes((&mut buf, 0)).unwrap(); + let payload = seal_precomputed(&buf[..size] , nonce, shared_secret); + + PingResponse { + pk: *pk, + nonce: *nonce, + payload: payload, + } + } /** Decrypt payload and try to parse it as `PingResponsePayload`. Returns `Error` in case of failure: @@ -410,6 +437,19 @@ impl FromBytes for NodesRequest { } impl NodesRequest { + /// create new NodesRequest object + pub fn new(shared_secret: &PrecomputedKey, pk: &PublicKey, payload: NodesRequestPayload) -> NodesRequest { + let nonce = &gen_nonce(); + let mut buf = [0; MAX_DHT_PACKET_SIZE]; + let (_, size) = payload.to_bytes((&mut buf, 0)).unwrap(); + let payload = seal_precomputed(&buf[..size] , nonce, shared_secret); + + NodesRequest { + pk: *pk, + nonce: *nonce, + payload: payload, + } + } /** Decrypt payload and try to parse it as `NodesRequestPayload`. Returns `Error` in case of failure: @@ -521,6 +561,19 @@ impl FromBytes for NodesResponse { } impl NodesResponse { + /// create new NodesResponse object + pub fn new(shared_secret: &PrecomputedKey, pk: &PublicKey, payload: NodesResponsePayload) -> NodesResponse { + let nonce = &gen_nonce(); + let mut buf = [0; MAX_DHT_PACKET_SIZE]; + let (_, size) = payload.to_bytes((&mut buf, 0)).unwrap(); + let payload = seal_precomputed(&buf[..size] , nonce, shared_secret); + + NodesResponse { + pk: *pk, + nonce: *nonce, + payload: payload, + } + } /** Decrypt payload and try to parse it as `NodesResponsePayload`. Returns `Error` in case of failure: @@ -791,6 +844,21 @@ impl FromBytes for DhtRequest { } impl DhtRequest { + /// create new DhtRequest object + pub fn new(shared_secret: &PrecomputedKey, rpk: &PublicKey, spk: &PublicKey, dp: DhtRequestPayload) -> DhtRequest { + let nonce = &gen_nonce(); + + let mut buf = [0; MAX_DHT_PACKET_SIZE]; + let (_, size) = dp.to_bytes((&mut buf, 0)).unwrap(); + let payload = seal_precomputed(&buf[..size], nonce, shared_secret); + + DhtRequest { + rpk: *rpk, + spk: *spk, + nonce: *nonce, + payload: payload, + } + } /** Decrypt payload and try to parse it as packet type. @@ -859,6 +927,12 @@ impl FromBytes for DhtRequestPayload { } /** NatPing request of DHT Request packet. + +size | description +1 | nat ping req (0xfe) +1 | Request or Response flag +8 | Request Id (Ping Id) + */ #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub struct NatPingRequest { @@ -1007,140 +1081,48 @@ impl FromBytes for BootstrapInfo { } -#[cfg(test)] -mod tests { - use super::*; - use std::net::SocketAddr; - use toxcore::dht_new::codec::*; - - const ONION_RETURN_1_PAYLOAD_SIZE: usize = ONION_RETURN_1_SIZE - NONCEBYTES; - const ONION_RETURN_2_PAYLOAD_SIZE: usize = ONION_RETURN_2_SIZE - NONCEBYTES; - const ONION_RETURN_3_PAYLOAD_SIZE: usize = ONION_RETURN_3_SIZE - NONCEBYTES; - - use quickcheck::{Arbitrary, Gen, quickcheck}; - - impl PingRequest { - pub fn new(shared_secret: &PrecomputedKey, pk: &PublicKey, payload: PingRequestPayload) -> PingRequest { - let nonce = &gen_nonce(); - let mut buf = [0; MAX_DHT_PACKET_SIZE]; - let (_, size) = payload.to_bytes((&mut buf, 0)).unwrap(); - let payload = seal_precomputed(&buf[..size] , nonce, shared_secret); - - PingRequest { - pk: *pk, - nonce: *nonce, - payload: payload, - } - } - } - - impl PingResponse { - pub fn new(shared_secret: &PrecomputedKey, pk: &PublicKey, payload: PingResponsePayload) -> PingResponse { - let nonce = &gen_nonce(); - let mut buf = [0; MAX_DHT_PACKET_SIZE]; - let (_, size) = payload.to_bytes((&mut buf, 0)).unwrap(); - let payload = seal_precomputed(&buf[..size] , nonce, shared_secret); - - PingResponse { - pk: *pk, - nonce: *nonce, - payload: payload, - } - } - } - - impl NodesRequest { - pub fn new(shared_secret: &PrecomputedKey, pk: &PublicKey, payload: NodesRequestPayload) -> NodesRequest { - let nonce = &gen_nonce(); - let mut buf = [0; MAX_DHT_PACKET_SIZE]; - let (_, size) = payload.to_bytes((&mut buf, 0)).unwrap(); - let payload = seal_precomputed(&buf[..size] , nonce, shared_secret); - - NodesRequest { - pk: *pk, - nonce: *nonce, - payload: payload, - } - } - } - - impl NodesResponse { - pub fn new(shared_secret: &PrecomputedKey, pk: &PublicKey, payload: NodesResponsePayload) -> NodesResponse { - let nonce = &gen_nonce(); - let mut buf = [0; MAX_DHT_PACKET_SIZE]; - let (_, size) = payload.to_bytes((&mut buf, 0)).unwrap(); - let payload = seal_precomputed(&buf[..size] , nonce, shared_secret); - - NodesResponse { - pk: *pk, - nonce: *nonce, - payload: payload, - } - } +impl PingRequestPayload { + /// Create new ping request with a randomly generated `request id`. + pub fn new() -> Self { + trace!("Creating new Ping."); + PingRequestPayload { id: random_u64() } } - impl DhtRequest { - /// create new DhtRequest object - pub fn new(shared_secret: &PrecomputedKey, rpk: &PublicKey, spk: &PublicKey, dp: DhtRequestPayload) -> DhtRequest { - let nonce = &gen_nonce(); - - let mut buf = [0; MAX_DHT_PACKET_SIZE]; - let (_, size) = dp.to_bytes((&mut buf, 0)).unwrap(); - let payload = seal_precomputed(&buf[..size], nonce, shared_secret); - - DhtRequest { - rpk: *rpk, - spk: *spk, - nonce: *nonce, - payload: payload, - } - } + /// An ID of the request / response. + pub fn id(&self) -> u64 { + self.id } +} - impl PingRequestPayload { - /// Create new ping request with a randomly generated `request id`. - pub fn new() -> Self { - trace!("Creating new Ping."); - PingRequestPayload { id: random_u64() } - } - - /// An ID of the request / response. - pub fn id(&self) -> u64 { - self.id - } +impl PingResponsePayload { + /// An ID of the request / response. + pub fn id(&self) -> u64 { + self.id } +} - impl PingResponsePayload { - /// An ID of the request / response. - pub fn id(&self) -> u64 { - self.id - } +impl NatPingRequest { + /// Create new ping request with a randomly generated `request id`. + pub fn new() -> Self { + trace!("Creating new Ping."); + NatPingRequest { id: random_u64() } } - impl NatPingRequest { - /// Create new ping request with a randomly generated `request id`. - pub fn new() -> Self { - trace!("Creating new Ping."); - NatPingRequest { id: random_u64() } - } - - /// An ID of the request / response. - pub fn id(&self) -> u64 { - self.id - } + /// An ID of the request / response. + pub fn id(&self) -> u64 { + self.id } +} - impl From for PingResponsePayload { - fn from(p: PingRequestPayload) -> Self { - PingResponsePayload { id: p.id } - } - } +#[cfg(test)] +mod tests { + use super::*; + use std::net::SocketAddr; + use quickcheck::{Arbitrary, Gen, quickcheck}; - impl From for NatPingResponse { - fn from(p: NatPingRequest) -> Self { - NatPingResponse { id: p.id } - } - } + const ONION_RETURN_1_PAYLOAD_SIZE: usize = ONION_RETURN_1_SIZE - NONCEBYTES; + const ONION_RETURN_2_PAYLOAD_SIZE: usize = ONION_RETURN_2_SIZE - NONCEBYTES; + const ONION_RETURN_3_PAYLOAD_SIZE: usize = ONION_RETURN_3_SIZE - NONCEBYTES; impl Arbitrary for DhtPacket { fn arbitrary(g: &mut G) -> Self { diff --git a/src/toxcore/dht_new/server.rs b/src/toxcore/dht_new/server.rs new file mode 100644 index 000000000..745e844ea --- /dev/null +++ b/src/toxcore/dht_new/server.rs @@ -0,0 +1,688 @@ +/* + Copyright © 2017 Zetok Zalbavar + Copyright © 2018 Namsoo CHO + + This file is part of Tox. + + Tox is libre software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + Tox is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with Tox. If not, see . +*/ + + +/*! +Functionality needed to work as a DHT node. +This module works as a coordinator of other modules. +*/ + +use futures::sync::mpsc; +use futures::future; +use tokio_io::IoFuture; + +use std::io::{ErrorKind, Error}; +use std::net::SocketAddr; +use std::collections::HashMap; + +use toxcore::crypto_core::*; +use toxcore::dht_new::packet::*; +use toxcore::dht_new::packed_node::*; +use toxcore::dht_new::kbucket::*; +use toxcore::dht_new::client::*; +use toxcore::dht_new::codec::*; + +/// Type representing Dht UDP packets. +//pub type DhtUdpPacket = (SocketAddr, DhtPacket); +/// Shorthand for the transmit half of the message channel. +type Tx = mpsc::UnboundedSender; + +/** +Own DHT node data. + +Contains: + +- DHT public key +- DHT secret key +- Close List ([`Kbucket`] with nodes close to own DHT public key) +- ping timeout lists ([`TimeoutQueue`]) + +# Adding node to Close List + +Before a [`PackedNode`] is added to the Close List, it needs to be +checked whether: + +- it can be added to [`Kbucket`] \(using [`Kbucket::can_add()`]) +- [`PackedNode`] is actually online + +Once the first check passes node is added to the temporary list, and +a [`NodesRequest`] request is sent to it in order to check whether it's +online. If the node responds correctly within [`PING_TIMEOUT`], it's +removed from temporary list and added to the Close List. + +[`NodesRequest`]: ../dht/struct.NodesRequest.html +[`Kbucket`]: ../dht/struct.Kbucket.html +[`Kbucket::can_add()`]: ../dht/struct.Kbucket.html#method.can_add +[`PackedNode`]: ../dht/struct.PackedNode.html +*/ +#[derive(Clone)] +pub struct Server { + /// secret key + pub sk: SecretKey, + /// public key + pub pk: PublicKey, + /// Close List (contains nodes close to own DHT PK) + pub kbucket: Kbucket, + /// tx split of channel to send packet to this peer via udp socket + pub tx: Tx, + /// store client object which has send request to peer + pub peers_cache: HashMap, +} + +impl Server { + /** + Create new `Server` instance. + + Note: a new instance generates new DHT public and secret keys. + + DHT `PublicKey` and `SecretKey` are supposed to be ephemeral. + */ + pub fn new(tx: Tx, pk: PublicKey, sk: SecretKey) -> Server { + let kbucket = Kbucket::new(KBUCKET_BUCKETS, &pk); + + debug!("Created new Server instance"); + Server { + sk: sk, + pk: pk, + kbucket: kbucket, + tx: tx, + peers_cache: HashMap::new(), + } + } + + /// create new client + pub fn create_client(&mut self, addr: &SocketAddr, pk: PublicKey) -> Client { + let precomputed_key = encrypt_precompute(&pk, &self.sk); + Client::new(precomputed_key, pk, addr.clone(), self.tx.clone()) + } + /// get client from cache + pub fn get_client(&self, pk: &PublicKey) -> Option { + // Client entry is inserted before sending *Request. + if let Some(client) = self.peers_cache.get(pk) { + Some(client.clone()) + } + else { + None + } + } + /** + Function to handle incoming packets. If there is a response packet, + send back it to the peer. + */ + pub fn handle_packet(&mut self, (addr, packet): DhtUdpPacket) -> IoFuture<()> + { + match packet { + DhtPacket::PingRequest(packet) => { + debug!("Received ping request"); + let client = self.create_client(&addr, packet.pk); + self.handle_ping_req(client, packet) + }, + DhtPacket::PingResponse(packet) => { + debug!("Received ping response"); + let client = self.create_client(&addr, packet.pk); + self.handle_ping_resp(client, packet) + }, + DhtPacket::NodesRequest(packet) => { + debug!("Received NodesRequest"); + let client = self.create_client(&addr, packet.pk); + self.handle_nodes_req(client, packet) + }, + DhtPacket::NodesResponse(packet) => { + debug!("Received NodesResponse"); + let client = self.create_client(&addr, packet.pk); + self.handle_nodes_resp(client, packet) + }, + DhtPacket::DhtRequest(dr) => { + // The packet kind of DhtRequest is in encrypted payload, + // so decrypting is needed first. + let payload = dr.get_payload(&self.sk) + .map(|p| p) + .map_err(|e| { + // error!("deserialize DhtRequest payload fail {:?}", e); + e + }); + match payload { + Ok(DhtRequestPayload::NatPingRequest(pl)) => { + debug!("Received nat ping request"); + let client = self.create_client(&addr, dr.spk); + self.handle_nat_ping_req(client, dr, pl) + }, + Ok(DhtRequestPayload::NatPingResponse(pl)) => { + debug!("Received nat ping response"); + let client = self.create_client(&addr, dr.spk); + self.handle_nat_ping_resp(client, dr, pl) + }, + _p => { + // error!("received packet are not handled {:?}", p); + Box::new( future::err( + Error::new(ErrorKind::Other, + "received packet are not handled" + ))) + }, + } + }, + ref p => { + error!("received packet are not handled {:?}", p); + Box::new( future::err( + Error::new(ErrorKind::Other, + "received packet are not handled" + ))) + } + } + } + + /** + handle received PingRequest packet, then create PingResponse packet + and send back it to the peer. + */ + fn handle_ping_req(&mut self, client: Client, request: PingRequest) -> IoFuture<()> + { + if let Ok(payload) = request.get_payload(&self.sk) { + let resp_payload = PingResponsePayload { + id: payload.id, + }; + client.send_ping_response(resp_payload) + } + else { + error!("get_payload() fail upon PingRequest"); + Box::new( future::err( + Error::new(ErrorKind::Other, + "get_payload() fail upon PingRequest" + ))) + } + } + /** + handle received PingResponse packet. If ping_id is correct, try_add peer to kbucket. + */ + fn handle_ping_resp(&mut self, client: Client, request: PingResponse) -> IoFuture<()> + { + if let Ok(payload) = request.get_payload(&self.sk) { + if payload.id == 0 { + return Box::new( future::err( + Error::new(ErrorKind::Other, + "PingResponse.ping_id == 0" + ))) + } + if client.ping_id == payload.id { + let packed_node = PackedNode { + saddr: client.addr.clone(), + pk: request.pk.clone(), + }; + self.kbucket.try_add(&packed_node); + Box::new( future::ok(()) ) + } + else { + Box::new( future::err( + Error::new(ErrorKind::Other, "PingResponse.ping_id does not match") + )) + } + } + else { + error!("get_payload() fail upon PingResponse"); + Box::new( future::err( + Error::new(ErrorKind::Other, + "get_payload() fail upon PingResponse" + ))) + } + } + /** + handle received NodesRequest packet, responds with NodesResponse + */ + fn handle_nodes_req(&mut self, client: Client, request: NodesRequest) -> IoFuture<()> { + if let Ok(payload) = request.get_payload(&self.sk) { + let close_nodes = self.kbucket.get_closest(&self.pk); + if !close_nodes.is_empty() { + let resp_payload = NodesResponsePayload { + nodes: close_nodes, + id: payload.id, + }; + client.send_nodes_response(resp_payload) + } else { + error!("get_closest() return nothing"); + Box::new( future::err( + Error::new(ErrorKind::Other, + "get_closest() return nothing" + ))) + } + } + else { + error!("get_payload() fail upon NodesRequest"); + Box::new( future::err( + Error::new(ErrorKind::Other, + "get_payload() fail upon NodesRequest" + ))) + } + } + /** + handle received NodesResponse from peer. + */ + fn handle_nodes_resp(&mut self, client: Client, request: NodesResponse) -> IoFuture<()> { + if let Ok(payload) = request.get_payload(&self.sk) { + if payload.id == 0 { + return Box::new( future::err( + Error::new(ErrorKind::Other, + "NodesResponse.ping_id == 0" + ))) + } + if client.ping_id == payload.id { + for node in &payload.nodes { + self.kbucket.try_add(node); + } + Box::new( future::ok(()) ) + } + else { + Box::new( future::err( + Error::new(ErrorKind::Other, "NodesResponse.ping_id does not match") + )) + } + } + else { + error!("get_payload() fail upon NodesResponse"); + Box::new( future::err( + Error::new(ErrorKind::Other, + "get_payload() fail upon NodesResponse" + ))) + } + } + + /** + handle received NatPingRequest packet, respond with NatPingResponse + */ + pub fn handle_nat_ping_req(&mut self, client: Client, request: DhtRequest, payload: NatPingRequest) -> IoFuture<()> { + if request.rpk == self.pk { // the target peer is me + let resp_payload = NatPingResponse { + id: payload.id, + }; + client.send_nat_ping_response(&request.spk, resp_payload) + } else { // search kbucket to find target peer + if let Some(addr) = self.kbucket.get_node(&request.rpk) { + client.send_nat_ping_packet(&addr, request.clone()) + } + else { // do nothing + Box::new( future::ok(()) ) + } + } + } + + /** + handle received NatPingResponse packet, start hole-punching + */ + pub fn handle_nat_ping_resp(&mut self, client: Client, request: DhtRequest, payload: NatPingResponse) -> IoFuture<()> { + if request.rpk == self.pk { // the target peer is me + if payload.id == 0 { + return Box::new( future::err( + Error::new(ErrorKind::Other, + "NodesResponse.ping_id == 0" + ))) + } + if client.ping_id == payload.id { + // TODO: start hole-punching + Box::new( future::ok(()) ) + } + else { + Box::new( future::err( + Error::new(ErrorKind::Other, "NatPingResponse.ping_id does not match") + )) + } + } else { // search kbucket to find target peer + if let Some(addr) = self.kbucket.get_node(&request.rpk) { + client.send_nat_ping_packet(&addr, request.clone()) + } + else { // do nothing + Box::new( future::ok(()) ) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::*; + + use quickcheck::TestResult; + use std::net::SocketAddr; + + fn create_node() -> (Server, PrecomputedKey, PublicKey, + mpsc::UnboundedReceiver, SocketAddr) { + if !crypto_init() { + error!("Crypto initialization failed."); + assert!(false); + } + + let (pk, sk) = gen_keypair(); + let (tx, rx) = mpsc::unbounded::(); + let alice = Server::new(tx, pk, sk); + let (bob_pk, bob_sk) = gen_keypair(); + let precomp = precompute(&alice.pk, &bob_sk); + + let addr: SocketAddr = "127.0.0.1:12346".parse().unwrap(); + (alice, precomp, bob_pk, rx, addr) + } + // new() + #[test] + fn server_new_test() { + if !crypto_init() { + error!("Crypto initialization failed."); + assert!(false); + } + + let (pk, sk) = gen_keypair(); + let tx: Tx = mpsc::unbounded().0; + let _ = Server::new(tx, pk, sk); + } + // create_client() + quickcheck! { + fn server_create_client_test(packet: PingRequest) -> TestResult { + if !crypto_init() { + error!("Crypto initialization failed."); + assert!(false); + } + + let (pk, sk) = gen_keypair(); + let(tx, _) = mpsc::unbounded(); + let mut alice = Server::new(tx, pk, sk); + let addr1: SocketAddr = "127.0.0.1:12345".parse().unwrap(); + let client1 = alice.create_client(&addr1.clone(), packet.pk.clone()); + // try one more time + let client2 = alice.create_client(&addr1, packet.pk.clone()); + assert_eq!(client1.pk, client2.pk); + assert_eq!(client1.precomputed_key, client2.precomputed_key); + let addr2: SocketAddr = "127.0.0.2:54321".parse().unwrap(); + let client3 = alice.create_client(&addr2, packet.pk); + assert_eq!(client1.precomputed_key, client3.precomputed_key); + assert_ne!(client1.addr, client3.addr); + TestResult::passed() + } + } + // get_client() + #[test] + fn server_get_client_test() { + let (mut alice, _precomp, bob_pk, _rx, addr) = create_node(); + let client = alice.create_client(&addr, bob_pk); + alice.peers_cache.insert(bob_pk, client.clone()); + assert_eq!(client.pk, alice.get_client(&bob_pk).unwrap().pk); + } + // handle_packet() + quickcheck! { + fn server_handle_packet_test(prq: PingRequestPayload) -> TestResult + // prs: PingResponsePayload, + // nrq: NodesRequestPayload, + // nrs: NodesResponsePayload, + // nat_req: NatPingRequest, + // nat_res: NatPingResponse) -> TestResult + { + let (mut alice, precomp, bob_pk, rx, addr) = create_node(); + // handle ping request, request from bob peer + let ping_req = DhtPacket::PingRequest(PingRequest::new(&precomp, &bob_pk, prq)); + alice.handle_packet((addr, ping_req)).wait().unwrap(); + if let (Some((_addr, packet)), _rx) = rx.into_future().wait().unwrap() { + debug!("received packet {:?}", packet); + if let DhtPacket::PingResponse(packet) = packet { + let ping_resp_payload = packet.get_payload(&alice.sk).unwrap(); + assert_eq!(ping_resp_payload.id, prq.id); + } else { + unreachable!("can not occur"); + } + } else { + unreachable!("can not occur"); + } + + // handle ping response + // let ping_res = DhtPacket::PingResponse(PingResponse::new(&precomp, &bob_pk, prs)); + // alice.kbucket = Kbucket::new(KBUCKET_BUCKETS, &alice.pk); + // let mut client = alice.create_client(&addr, bob_pk); + // client.ping_id = prs.id; + // alice.peers_cache.insert(bob_pk.clone(), client); + // alice.handle_packet((addr, ping_res)).wait().unwrap(); + // assert!(alice.kbucket.contains(&bob_pk)); + + // handle nodes request from bob + // let (tx, rx) = mpsc::unbounded::(); + // let node_pk = gen_keypair().0; + // let packed_node = PackedNode::new(false, SocketAddr::V4("127.0.0.1:12345".parse().unwrap()), &node_pk); + // alice.kbucket.try_add(&packed_node); + // let nodes_req = DhtPacket::NodesRequest(NodesRequest::new(&precomp, &bob_pk, nrq)); + // alice.handle_packet((addr, nodes_req)).wait().unwrap(); + // let rx = + // if let (Some((_addr, packet)), rx1) = rx.into_future().wait().unwrap() { + // debug!("received packet {:?}", packet); + // if let DhtPacket::NodesResponse(packet) = packet { + // let nodes_resp_payload = packet.get_payload(&alice.sk).unwrap(); + // assert_eq!(nodes_resp_payload.id, nrq.id); + // rx1 + // } else { + // unreachable!("can not occur"); + // } + // } else { + // unreachable!("can not occur"); + // }; + + // handle nodes response + // let nodes_res = DhtPacket::NodesResponse(NodesResponse::new(&precomp, &bob_pk, nrs.clone())); + // let mut client = alice.create_client(&addr, bob_pk); + // client.ping_id = nrs.id; + // alice.peers_cache.insert(bob_pk.clone(), client); + // alice.kbucket = Kbucket::new(KBUCKET_BUCKETS, &alice.pk); + // let mut kbuc = Kbucket::new(KBUCKET_BUCKETS, &alice.pk); + // for pn in &nrs.nodes { + // kbuc.try_add(pn); + // } + // alice.handle_packet((addr, nodes_res)).wait().unwrap(); + // assert_eq!(alice.kbucket, kbuc); + + // handle nat ping request + // let nat_payload = DhtRequestPayload::NatPingRequest(nat_req); + // let nat_ping_req = DhtPacket::DhtRequest(DhtRequest::new(&precomp, &alice.pk, &bob_pk, nat_payload.clone())); + // alice.handle_packet((addr, nat_ping_req)).wait().unwrap(); + // if let (Some((_addr, packet)), _rx1) = rx.into_future().wait().unwrap() { + // debug!("received packet {:?}", packet); + // if let DhtPacket::DhtRequest(packet) = packet { + // if let DhtRequestPayload::NatPingResponse(nat_ping_resp_payload) = packet.get_payload(&alice.sk).unwrap() { + // assert_eq!(nat_ping_resp_payload.id, nat_req.id); + // } else { + // unreachable!("can not occur"); + // } + // } else { + // unreachable!("can not occur"); + // } + // } else { + // unreachable!("can not occur"); + // } + + // let nat_ping_req = DhtPacket::DhtRequest(DhtRequest::new(&precomp, &alice.pk, &alice.pk, nat_payload)); + // assert!(!alice.handle_packet((addr, nat_ping_req)).wait().is_ok()); + + // handle nat ping response + // let nat_payload = DhtRequestPayload::NatPingResponse(nat_res); + // let nat_ping_res = DhtPacket::DhtRequest(DhtRequest::new(&precomp, &alice.pk, &bob_pk, nat_payload.clone())); + // let mut client = alice.create_client(&addr, bob_pk); + // client.ping_id = nat_res.id; + // alice.peers_cache.insert(bob_pk.clone(), client); + // assert!(alice.handle_packet((addr, nat_ping_res)).wait().is_ok()); + + // let nat_ping_res = DhtPacket::DhtRequest(DhtRequest::new(&precomp, &alice.pk, &alice.pk, nat_payload)); + // assert!(!alice.handle_packet((addr, nat_ping_res)).wait().is_ok()); + + TestResult::passed() + } + } + + // handle_ping_req() + #[test] + fn server_handle_ping_req_test() { + let (mut alice, precomp, bob_pk, rx, addr) = create_node(); + // handle ping request, request from bob peer + let prq = PingRequestPayload { id: random_u64() }; + let ping_req = PingRequest::new(&precomp, &bob_pk, prq); + let client = alice.create_client(&addr, bob_pk); + alice.handle_ping_req(client, ping_req).wait().unwrap(); + if let (Some((_addr, packet)), _rx1) = rx.into_future().wait().unwrap() { + debug!("received packet {:?}", packet); + if let DhtPacket::PingResponse(packet) = packet { + let ping_resp_payload = packet.get_payload(&alice.sk).unwrap(); + assert_eq!(ping_resp_payload.id, prq.id); + } else { + unreachable!("can not occur"); + } + } else { + unreachable!("can not occur"); + } + let prq = PingRequestPayload { id: random_u64() }; + let ping_req = PingRequest::new(&precomp, &alice.pk, prq); + let pk = alice.pk.clone(); + let client = alice.create_client(&addr, pk); + assert!(!alice.handle_ping_req(client, ping_req).wait().is_ok()); + } + + // handle_ping_resp() + #[test] + fn server_handle_ping_resp_test() { + let (mut alice, precomp, bob_pk, _rx, addr) = create_node(); + // handle ping response, request from bob peer + let prs = PingResponsePayload { id: random_u64() }; + let ping_resp = PingResponse::new(&precomp, &bob_pk, prs); + let mut client = alice.create_client(&addr, bob_pk); + client.ping_id = prs.id; + assert!(alice.handle_ping_resp(client, ping_resp).wait().is_ok()); + + let prs = PingResponsePayload { id: random_u64() }; + let ping_resp = PingResponse::new(&precomp, &alice.pk, prs); + let mut client = alice.create_client(&addr, bob_pk); + client.ping_id = prs.id; + assert!(!alice.handle_ping_resp(client, ping_resp).wait().is_ok()); + + let prs = PingResponsePayload { id: random_u64() }; + let ping_resp = PingResponse::new(&precomp, &bob_pk, prs); + let mut client = alice.create_client(&addr, bob_pk); + client.ping_id = 0; + assert!(!alice.handle_ping_resp(client.clone(), ping_resp.clone()).wait().is_ok()); + client.ping_id = prs.id + 1; + assert!(!alice.handle_ping_resp(client, ping_resp).wait().is_ok()); + } + + // handle_nodes_req() + #[test] + fn server_handle_nodes_req_test() { + let (mut alice, precomp, bob_pk, rx, addr) = create_node(); + // handle nodes request, request from bob peer + let node_pk = gen_keypair().0; + let packed_node = PackedNode::new(false, SocketAddr::V4("127.0.0.1:12345".parse().unwrap()), &node_pk); + alice.kbucket.try_add(&packed_node); + let nrq = NodesRequestPayload { pk: node_pk, id: random_u64() }; + let nodes_req = NodesRequest::new(&precomp, &bob_pk, nrq.clone()); + let client = alice.create_client(&addr, bob_pk); + alice.handle_nodes_req(client, nodes_req).wait().unwrap(); + if let (Some((_addr, packet)), _rx1) = rx.into_future().wait().unwrap() { + debug!("received packet {:?}", packet); + if let DhtPacket::NodesResponse(packet) = packet { + let nodes_resp_payload = packet.get_payload(&alice.sk).unwrap(); + assert_eq!(nodes_resp_payload.id, nrq.id); + } else { + unreachable!("can not occur") + } + } else { + unreachable!("can not occur"); + } + let nodes_req = NodesRequest::new(&precomp, &alice.pk, nrq); + let pk = alice.pk.clone(); + let client = alice.create_client(&addr, pk); + assert!(!alice.handle_nodes_req(client, nodes_req).wait().is_ok()); + } + + // handle_nodes_resp() + #[test] + fn server_handle_nodes_resp_test() { + let (mut alice, precomp, bob_pk, _rx, addr) = create_node(); + // handle nodes response, request from bob peer + let nrs = NodesResponsePayload { nodes: vec![ + PackedNode::new(false, SocketAddr::V4("127.0.0.1:12345".parse().unwrap()), &gen_keypair().0) + ], id: 38 }; + + let nodes_resp = NodesResponse::new(&precomp, &bob_pk, nrs.clone()); + let mut client = alice.create_client(&addr, bob_pk); + client.ping_id = 38; + alice.handle_nodes_resp(client, nodes_resp).wait().unwrap(); + let mut kbuc = Kbucket::new(KBUCKET_BUCKETS, &alice.pk); + for pn in &nrs.nodes { + kbuc.try_add(pn); + } + assert_eq!(alice.kbucket, kbuc); + + let nodes_resp = NodesResponse::new(&precomp, &alice.pk, nrs.clone()); + let pk = alice.pk.clone(); + let mut client = alice.create_client(&addr, pk); + client.ping_id = 38; + assert!(!alice.handle_nodes_resp(client, nodes_resp).wait().is_ok()); + + let nodes_resp = NodesResponse::new(&precomp, &bob_pk, nrs.clone()); + let mut client = alice.create_client(&addr, bob_pk); + client.ping_id = 0; + assert!(!alice.handle_nodes_resp(client.clone(), nodes_resp.clone()).wait().is_ok()); + client.ping_id = 38 + 1; + assert!(!alice.handle_nodes_resp(client, nodes_resp).wait().is_ok()); + } + + // handle nat ping request + #[test] + fn server_handle_nat_ping_req_test() { + let (mut alice, precomp, bob_pk, rx, addr) = create_node(); + let nat_req = NatPingRequest { id: random_u64() }; + let nat_payload = DhtRequestPayload::NatPingRequest(nat_req); + let dht_req = DhtRequest::new(&precomp, &alice.pk, &bob_pk, nat_payload.clone()); + let client = alice.create_client(&addr.clone(), bob_pk); + alice.handle_nat_ping_req(client, dht_req, nat_req).wait().unwrap(); + if let (Some((_addr, packet)), _rx1) = rx.into_future().wait().unwrap() { + debug!("received packet {:?}", packet); + if let DhtPacket::DhtRequest(packet) = packet { + if let DhtRequestPayload::NatPingResponse(nat_ping_resp_payload) = packet.get_payload(&alice.sk).unwrap() { + assert_eq!(nat_ping_resp_payload.id, nat_req.id); + } else { + unreachable!("can not occur"); + } + } else { + unreachable!("can not occur"); + } + } else { + unreachable!("can not occur"); + } + + let dht_req = DhtRequest::new(&precomp, &alice.pk, &alice.pk, nat_payload.clone()); + let pk = alice.pk.clone(); + let client = alice.create_client(&addr, pk); + assert!(!alice.handle_nat_ping_req(client, dht_req, nat_req).wait().is_ok()); + } + + // handle nat ping response + #[test] + fn server_handle_nat_ping_resp_test() { + let (mut alice, precomp, bob_pk, _rx, addr) = create_node(); + let nat_res = NatPingResponse { id: random_u64() }; + let nat_payload = DhtRequestPayload::NatPingResponse(nat_res); + let dht_req = DhtRequest::new(&precomp, &alice.pk, &bob_pk, nat_payload.clone()); + let mut client = alice.create_client(&addr.clone(), bob_pk); + client.ping_id = nat_res.id; + assert!(alice.handle_nat_ping_resp(client.clone(), dht_req.clone(), nat_res.clone()).wait().is_ok()); + client.ping_id = nat_res.id + 1; + assert!(!alice.handle_nat_ping_resp(client.clone(), dht_req.clone(), nat_res.clone()).wait().is_ok()); + client.ping_id = 0; + assert!(!alice.handle_nat_ping_resp(client, dht_req, nat_res).wait().is_ok()); + } +}