diff --git a/src/lib.rs b/src/lib.rs index b41239ccd..fbfe9213f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -125,6 +125,9 @@ fn main() { // Too many false positives in tests #![cfg_attr(feature = "clippy", allow(needless_pass_by_value))] +// FIXME update to nom 4 and remove this rule +#![allow(unused_parens)] + extern crate bytes; extern crate byteorder; extern crate futures; diff --git a/src/toxcore/dht_new/codec.rs b/src/toxcore/dht_new/codec.rs new file mode 100644 index 000000000..1c3278ad8 --- /dev/null +++ b/src/toxcore/dht_new/codec.rs @@ -0,0 +1,152 @@ +/* + Copyright (C) 2013 Tox project All Rights Reserved. + Copyright © 2016-2017 Zetok Zalbavar + Copyright © 2018 Namsoo CHO + + This file is part of Tox. + + Tox is libre software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + Tox is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with Tox. If not, see . +*/ + +/*! Codec for encoding/decoding DHT Packets & DHT Request packets using tokio-io +*/ + +use toxcore::dht_new::packet::*; +use toxcore::dht_new::binary_io::*; + +use std::io; +use std::io::{Error, ErrorKind}; +use tokio_core::net::UdpCodec; +use std::net::SocketAddr; + +/// Type representing Dht UDP packets. +pub type DhtUdpPacket = (SocketAddr, DhtBase); + +/// Type representing received Dht UDP packets. +pub type DhtRecvUdpPacket = (SocketAddr, Option); + +/** +SendNodes +size | description +1 | packet type +32 | public key +24 | nonce +1 | number of response nodes +[39,204]| packed nodes +8 | Request Id (Ping Id) +--------------------------------- +270 bytes maximun. +Because size of SendNodes is largest in DHT related packets +512 is enough for DhtPacket +*/ +pub const MAX_DHT_PACKET_SIZE: usize = 512; + +/// Struct to use for {de-,}serializing DHT UDP packets. +pub struct DhtCodec; + +impl UdpCodec for DhtCodec { + type In = DhtRecvUdpPacket; + type Out = DhtUdpPacket; + + fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> io::Result + { + match DhtBase::from_bytes(buf) { + IResult::Incomplete(_) => { + Err(Error::new(ErrorKind::Other, + "DhtBase packet should not be incomplete")) + }, + IResult::Error(e) => { + Err(Error::new(ErrorKind::Other, + format!("deserialize DhtBase packet error: {:?}", e))) + }, + IResult::Done(_, encrypted_packet) => { + Ok((*src, Some(encrypted_packet))) + } + } + } + + fn encode(&mut self, (addr, dp): Self::Out, into: &mut Vec) -> SocketAddr { + let mut buf = [0; MAX_DHT_PACKET_SIZE]; + if let Ok((_, size)) = dp.to_bytes((&mut buf, 0)) { + into.extend(&buf[..size]); + } else { + // TODO: move from tokio-core to tokio and return error instead of panic + panic!("DhtBase to_bytes error {:?}", dp); + } + addr + } +} + +#[cfg(test)] +mod test { + use tokio_core::net::UdpCodec; + use std::net::SocketAddr; + + use super::*; + use toxcore::dht_new::packet_kind::*; + + use quickcheck::{quickcheck, TestResult}; + + #[test] + fn dht_codec_decode_test() { + fn with_dp(dp: DhtPacket) -> TestResult { + // need an invalid PacketKind for DhtPacket + if dp.packet_kind as u8 <= PacketKind::SendNodes as u8 { + return TestResult::discard() + } + + let kind = dp.packet_kind.clone() as u8; + // TODO: random SocketAddr + let addr = SocketAddr::V4("0.1.2.3:4".parse().unwrap()); + let mut tc = DhtCodec; + + let mut buf = [0; 1024]; + let bytes = dp.to_bytes((&mut buf, 0)).unwrap().0; + + let (decoded_a, decoded_dp) = tc.decode(&addr, &bytes) + .unwrap(); + // it did have correct packet + let decoded_dp = decoded_dp.unwrap(); + + assert_eq!(addr, decoded_a); + assert_eq!(DhtBase::DhtPacket(dp), decoded_dp); + + // make it error + bytes[0] = kind; + let (r_addr, none) = tc.decode(&addr, &bytes).unwrap(); + assert_eq!(addr, r_addr); + assert!(none.is_none()); + + TestResult::passed() + } + quickcheck(with_dp as fn(DhtPacket) -> TestResult); + } + + #[test] + fn dht_codec_encode_test() { + fn with_dp(dp: DhtPacket) { + // TODO: random SocketAddr + let addr = SocketAddr::V4("5.6.7.8:9".parse().unwrap()); + let mut buf = Vec::new(); + let mut tc = DhtCodec; + + let socket = tc.encode((addr, DhtBase::DhtPacket(dp.clone())), &mut buf); + assert_eq!(addr, socket); + let mut enc_buf = [0; MAX_DHT_PACKET_SIZE]; + let (_, size) = dp.to_bytes((&mut enc_buf, 0)).unwrap(); + assert_eq!(buf, enc_buf[..size].to_vec()); + } + quickcheck(with_dp as fn(DhtPacket)); + } +} diff --git a/src/toxcore/dht_new/dht_impl.rs b/src/toxcore/dht_new/dht_impl.rs new file mode 100644 index 000000000..e3eebc7c5 --- /dev/null +++ b/src/toxcore/dht_new/dht_impl.rs @@ -0,0 +1,298 @@ +/* + Copyright © 2017 Zetok Zalbavar + Copyright © 2018 Namsoo CHO + + This file is part of Tox. + + Tox is libre software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + Tox is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with Tox. If not, see . +*/ + + +/*! +Implements Dht related structures. +*/ + +use std::collections::HashMap; +use toxcore::dht_new::packet::*; +use toxcore::dht_new::packed_node::*; +use toxcore::dht_new::kbucket::*; +use toxcore::dht_new::binary_io::*; +use toxcore::dht_new::codec::*; +use toxcore::crypto_core::*; +use toxcore::dht_new::packet_kind::*; +use std::io::{Error, ErrorKind}; +use nom::IResult; +use std::hash::{Hash, Hasher}; + + +#[derive(Clone, Eq, Debug, PartialEq)] +struct HashKeys(SecretKey, PublicKey); + +impl Hash for HashKeys { + fn hash(&self, state: &mut H) where H: Hasher { + let SecretKey(sk) = self.0; + let PublicKey(pk) = self.1; + for byte in sk.iter() { + state.write_u8(*byte); + } + for byte in pk.iter() { + state.write_u8(*byte); + } + state.finish(); + } +} + +/// Manage hash table for precomputed keys. +#[derive(Clone, Eq, Debug, PartialEq)] +pub struct PrecomputedKeys { + cache: HashMap, +} + +impl PrecomputedKeys { + /// manage hash table for precomputed keys + pub fn new () -> PrecomputedKeys { + PrecomputedKeys { + cache: HashMap::new(), + } + } + + /// Get precomputed keys + /// If the Key is not found in cache, create symmetric key and insert it into cache for later use. + pub fn get_symmetric_key (&mut self, key_pair: (&SecretKey, &PublicKey)) -> Result { + let key = HashKeys(key_pair.0.clone(), key_pair.1.clone()); + match self.cache.get(&key) { // if symmetric key exists in cache, returns with the value + Some(k) => { + return Ok(k.clone()); + }, + None => {}, + }; + + // Key don't exist in cache, So create one + // must separate logic into two blocks because self.cache is barrowed mutably + let shared_secret = encrypt_precompute(key_pair.1, key_pair.0); + self.cache.insert (key, shared_secret.clone()); + Ok(shared_secret) + } +} + +impl DhtPacket { + /// create new DhtPacket object + pub fn new(shared_secret: &PrecomputedKey, pk: &PublicKey, dp: DhtPacketPayload) -> DhtPacket { + let nonce = &gen_nonce(); + let mut buf = [0; MAX_DHT_PACKET_SIZE]; + let (_, size) = dp.to_bytes((&mut buf, 0)).unwrap(); + let payload = seal_precomputed(&buf[..size] , nonce, shared_secret); + + DhtPacket { + packet_kind: dp.kind(), + pk: *pk, + nonce: *nonce, + payload: payload, + } + + } + + /** + Decrypt payload and try to parse it as packet type. + + To get info about it's packet type use + [`.kind()`](./struct.DhtPacket.html#method.kind) method. + + Returns `None` in case of faliure: + + - fails to decrypt + - fails to parse as given packet type + */ + /* TODO: perhaps switch to using precomputed symmetric key? + - given that computing shared key is apparently the most + costly operation when it comes to crypto, using precomputed + key might (would significantly?) lower resource usage + + Alternatively, another method `get_payloadnm()` which would use + symmetric key. + */ + pub fn get_payload(&self, own_secret_key: &SecretKey) -> Result, Error> + { + debug!(target: "DhtPacket", "Getting packet data from DhtPacket."); + trace!(target: "DhtPacket", "With DhtPacket: {:?}", self); + let decrypted = open(&self.payload, &self.nonce, &self.pk, + own_secret_key) + .and_then(|d| Ok(d)) + .map_err(|e| { + debug!("Decrypting DhtPacket failed!"); + Error::new(ErrorKind::Other, + format!("DhtPacket decrypt error: {:?}", e)) + }); + + match self.packet_kind { + PacketKind::PingRequest => { + match PingRequest::from_bytes(&decrypted.unwrap_or(vec![0])) { + IResult::Incomplete(e) => { + error!(target: "DhtPacket", "PingRequest deserialize error: {:?}", e); + Err(Error::new(ErrorKind::Other, + format!("PingRequest deserialize error: {:?}", e))) + }, + IResult::Error(e) => { + error!(target: "DhtPacket", "PingRequest deserialize error: {:?}", e); + Err(Error::new(ErrorKind::Other, + format!("PingRequest deserialize error: {:?}", e))) + }, + IResult::Done(_, packet) => { + Ok(Some(DhtPacketPayload::PingRequest(packet))) + } + } + }, + PacketKind::PingResponse => { + match PingResponse::from_bytes(&decrypted.unwrap_or(vec![0])) { + IResult::Incomplete(e) => { + error!(target: "DhtPacket", "PingResponse deserialize error: {:?}", e); + Err(Error::new(ErrorKind::Other, + format!("PingResponse deserialize error: {:?}", e))) + }, + IResult::Error(e) => { + error!(target: "DhtPacket", "PingResponse deserialize error: {:?}", e); + Err(Error::new(ErrorKind::Other, + format!("PingResponse deserialize error: {:?}", e))) + }, + IResult::Done(_, packet) => { + Ok(Some(DhtPacketPayload::PingResponse(packet))) + } + } + }, + PacketKind::GetNodes => { + match GetNodes::from_bytes(&decrypted.unwrap_or(vec![0])) { + IResult::Incomplete(e) => { + error!(target: "DhtPacket", "GetNodes deserialize error: {:?}", e); + Err(Error::new(ErrorKind::Other, + format!("GetNodes deserialize error: {:?}", e))) + }, + IResult::Error(e) => { + error!(target: "DhtPacket", "GetNodes deserialize error: {:?}", e); + Err(Error::new(ErrorKind::Other, + format!("GetNodes deserialize error: {:?}", e))) + }, + IResult::Done(_, packet) => { + Ok(Some(DhtPacketPayload::GetNodes(packet))) + } + } + }, + PacketKind::SendNodes => { + match SendNodes::from_bytes(&decrypted.unwrap_or(vec![0])) { + IResult::Incomplete(e) => { + error!(target: "DhtPacket", "SendNodes deserialize error: {:?}", e); + Err(Error::new(ErrorKind::Other, + format!("SendNodes deserialize error: {:?}", e))) + }, + IResult::Error(e) => { + error!(target: "DhtPacket", "SendNodes deserialize error: {:?}", e); + Err(Error::new(ErrorKind::Other, + format!("SendNodes deserialize error: {:?}", e))) + }, + IResult::Done(_, packet) => { + Ok(Some(DhtPacketPayload::SendNodes(packet))) + } + } + }, + e => { + error!("Invalid PacketKind for DhtPacketPayload {:?}", e); + Err(Error::new(ErrorKind::Other, + format!("Invalid PacketKind for DhtPacketPayload {:?}", e))) + } + } + } +} + +impl DhtRequest { + /// create new DhtRequest object + pub fn new(shared_secret: &PrecomputedKey, rpk: &PublicKey, spk: &PublicKey, dp: DhtRequestPayload) -> DhtRequest { + let nonce = &gen_nonce(); + + let mut buf = [0; MAX_DHT_PACKET_SIZE]; + let (_, size) = dp.to_bytes((&mut buf, 0)).unwrap(); + let payload = seal_precomputed(&buf[..size], nonce, shared_secret); + + DhtRequest { + rpk: *rpk, + spk: *spk, + nonce: *nonce, + payload: payload, + } + } +} + +impl GetNodes { + /** + Create response to `self` request with nodes provided from the `Kbucket`. + + Fails (returns `None`) if `Kbucket` is empty. + */ + pub fn response(&self, kbucket: &Kbucket) -> Option { + let nodes = kbucket.get_closest(&self.pk); + if !nodes.is_empty() { + Some(DhtPacketPayload::SendNodes(SendNodes::with_nodes(self, nodes).unwrap())) + } + else { + None + } + } +} + +impl SendNodes { + /** + Create new `SendNodes`. Returns `None` if 0 or more than 4 nodes are + supplied. + + Created as a response to `GetNodes` request. + */ + pub fn with_nodes(request: &GetNodes, nodes: Vec) -> Option { + debug!(target: "SendNodes", "Creating SendNodes from GetNodes."); + trace!(target: "SendNodes", "With GetNodes: {:?}", request); + trace!("With nodes: {:?}", &nodes); + + if nodes.is_empty() || nodes.len() > 4 { + warn!(target: "SendNodes", "Wrong number of nodes supplied!"); + return None + } + + Some(SendNodes { nodes: nodes, id: request.id }) + } +} + +impl From for PingResponse { + fn from(p: PingRequest) -> Self { + PingResponse { id: p.id } + } +} + +impl DhtPacketPayload { + /// Packet kind for enum DhtPacketPayload + pub fn kind(&self) -> PacketKind { + match *self { + DhtPacketPayload::PingRequest(_) => PacketKind::PingRequest, + DhtPacketPayload::PingResponse(_) => PacketKind::PingResponse, + DhtPacketPayload::GetNodes(_) => PacketKind::GetNodes, + DhtPacketPayload::SendNodes(_) => PacketKind::SendNodes, + } + } +} + +impl DhtBase { + /// Packet kind for enum DhtPacketPayload + pub fn kind(&self) -> PacketKind { + match *self { + DhtBase::DhtPacket(ref p) => p.packet_kind, + DhtBase::DhtRequest(ref p) => PacketKind::from_bytes(&[p.payload[1]]).unwrap().1, + } + } +} diff --git a/src/toxcore/dht_new/dht_node.rs b/src/toxcore/dht_new/dht_node.rs new file mode 100644 index 000000000..2179c168a --- /dev/null +++ b/src/toxcore/dht_new/dht_node.rs @@ -0,0 +1,1231 @@ +/* + Copyright © 2017 Zetok Zalbavar + Copyright © 2018 Namsoo CHO + + This file is part of Tox. + + Tox is libre software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + Tox is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with Tox. If not, see . +*/ + + +/*! +Functionality needed to work as a DHT node. +This module works as a coordinator of other modules. +*/ + +use futures::*; +use futures::sink; +use futures::stream::*; +use futures::sync::mpsc; +use tokio_core::reactor::Core; +use tokio_proto::multiplex::RequestId; +use tokio_core::net::UdpFramed; + +use std::collections::VecDeque; +use std::io::{self, ErrorKind, Error}; +use std::net::SocketAddr; +use std::thread; + +use toxcore::crypto_core::*; +use toxcore::dht_new::packet::*; +use toxcore::dht_new::codec::*; +use toxcore::dht_new::packed_node::*; +use toxcore::dht_new::kbucket::*; +use toxcore::dht_new::dht_impl::*; +use toxcore::timeout::*; +use toxcore::dht_new::packet_kind::*; + +/// Type for sending `SplitSink` with `DhtCodec`. +// FIXME: docs +// TODO: rename +pub type DhtSplitSink = SplitSink>; + +/// Type for receiving `SplitStream` with `DhtCodec`. +// FIXME: docs +// TODO: rename +pub type DhtSplitStream = SplitStream>; + +/// Type representing future `Send` via `SplitSink`. +// FIXME: docs +// TODO: rename +pub type SendSink = sink::Send>>; + +// /// Type representing Tox UDP packets. +// TODO: change DhtPacket to and enum with all possible packets +// pub type DhtUdpPacket = (SocketAddr, DhtPacketBase); + +// /// Type representing received Tox UDP packets. +// TODO: change DhtPacket to and enum with all possible packets +//pub type ToxRecvUdpPacket = (SocketAddr, Option); + +/** +Spawn a thread that will start receiving packets from [`DhtSplitStream`]. + +[`DhtSplitStream`]: ./type.DhtSplitStream.html +*/ +// TODO: move to network.rs ? +pub fn receive_packets(stream: DhtSplitStream) + -> mpsc::Receiver +{ + let (tx, rx) = mpsc::channel(2048); + thread::spawn(move || { + // can this fail to unwrap? + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + let f = stream.for_each(|(src, p)| { + if let Some(packet) = p { + let tx = tx.clone(); + let send_one = tx.send((src, packet)).then(|_| Ok(())); + handle.spawn(send_one); + } + Ok(()) + }); + + core.run(f).unwrap(); + }); + + rx +} + +/** +Spawn a thread that will start sending packets via [`DhtSplitSink`]. + +Send all packets that need to be sent via returned `Sender`. + +[`DhtSplitSink`]: ./type.DhtSplitSink.html +*/ +// TODO: move to network.rs ? +pub fn send_packets(sink: DhtSplitSink) + -> mpsc::Sender +{ + let (tx, rx) = mpsc::channel(2048); + thread::spawn(move || { + // can this fail to unwrap? + let mut core = Core::new().unwrap(); + + let f = sink.send_all(rx.map_err(|_| { + // needed only to satisfy Sink::send_all() error constraints + io::Error::new(ErrorKind::Other, "") + })); + drop(core.run(f)); + }); + + tx +} + +/** +Own DHT node data. + +Contains: + +- DHT public key +- DHT secret key +- Close List ([`Kbucket`] with nodes close to own DHT public key) +- ping timeout lists ([`TimeoutQueue`]) + +# Adding node to Close List + +Before a [`PackedNode`] is added to the Close List, it needs to be +checked whether: + +- it can be added to [`Kbucket`] \(using [`Kbucket::can_add()`]) +- [`PackedNode`] is actually online + +Once the first check passes node is added to the temporary list, and +a [`GetNodes`] request is sent to it in order to check whether it's +online. If the node responds correctly within [`PING_TIMEOUT`], it's +removed from temporary list and added to the Close List. + +[`GetNodes`]: ../dht/struct.GetNodes.html +[`Kbucket`]: ../dht/struct.Kbucket.html +[`Kbucket::can_add()`]: ../dht/struct.Kbucket.html#method.can_add +[`PackedNode`]: ../dht/struct.PackedNode.html +[`PING_TIMEOUT`]: ../timeout/constant.PING_TIMEOUT.html +[`TimeoutQueue`]: ../timeout/struct.TimeoutQueue.html +*/ +#[derive(Clone, Eq, Debug, PartialEq)] +pub struct DhtNode { + /// secret key + pub sk: Box, + /// public key + pub pk: Box, + /// Close List (contains nodes close to own DHT PK) + pub kbucket: Box, + getn_timeout: TimeoutQueue, + /// timeouts for requests that check whether a node is online before + /// adding it to the Close List + // TODO: rename + to_close_tout: TimeoutQueue, + /// list of nodes that are checked for being online before adding + /// to the Close List + // TODO: rename + to_close_nodes: VecDeque, + // TODO: add a "verify" TimeoutQueue to check if nodes are online + // before adding them to the kbucket + + // TODO: track sent ping request IDs + // TODO: have a table with precomputed keys for all known nodes? + // (use lru-cache for storing last used 1024?) + /// symmetric keys cache + pub precomputed_cache: PrecomputedKeys, +} + + +impl DhtNode { + /** + Create new `DhtNode` instance. + + Note: a new instance generates new DHT public and secret keys. + + DHT `PublicKey` and `SecretKey` are supposed to be ephemeral. + */ + pub fn new() -> io::Result { + if !crypto_init() { + return Err(io::Error::new(ErrorKind::Other, + "Crypto initialization failed.")); + } + + let (pk, sk) = gen_keypair(); + let kbucket = Kbucket::new(KBUCKET_BUCKETS, &pk); + + debug!("Created new DhtNode instance"); + Ok(DhtNode { + sk: Box::new(sk), + pk: Box::new(pk), + kbucket: Box::new(kbucket), + getn_timeout: Default::default(), + to_close_tout: Default::default(), + to_close_nodes: Default::default(), + // Should it Boxed? + precomputed_cache: PrecomputedKeys::new(), + }) + } + + /** + Function to handle incoming packets. If there is a response packet, + `Some(DhtBase)` is returned. + */ + pub fn handle_packet(&mut self, packet: &DhtBase) + -> Option + { + match packet { + &DhtBase::DhtPacket(ref dp) => { + match dp.packet_kind { + PacketKind::PingRequest => { + debug!("Received ping request"); + self.create_ping_resp(packet) + }, + PacketKind::GetNodes => { + debug!("Received GetN request"); + self.create_sendn(packet) + }, + PacketKind::SendNodes => { + debug!("Received SendN packet"); + self.handle_packet_sendn(packet); + None + }, + // TODO: handle other kinds of packets + p => { + debug!("Received unhandled packet kind: {:?}", p); + None + }, + } + }, + // DhtRequest is not yet. + &DhtBase::DhtRequest(_) => { + None + }, + } + } + + /** + Handle [`DhtBase`] that claims to contain [`SendNodes`] packet. + + Packet is dropped if: + + - it doesn't contain [`SendNodes`] + - it's not a response to a [`GetNodes`] request (invalid ID) + + [`DhtBase`]: ../dht/enum.DhtBase.html + [`DhtPacket`]: ../dht/struct.DhtPacket.html + [`GetNodes`]: ../dht/struct.GetNodes.html + [`SendNodes`]: ../dht/struct.SendNodes.html + */ + fn handle_packet_sendn(&mut self, packet: &DhtBase) { + if let &DhtBase::DhtPacket(ref dp) = packet { + let rlt = dp.get_payload(&self.sk) + .and_then(|psn| { + if let Some(DhtPacketPayload::SendNodes(sn)) = psn { + if self.getn_timeout.remove(sn.id) { + debug!("Received SendN is a valid response"); + // received SendNodes packet is a response to our request + trace!("Adding nodes from SendNodes to DhtNode's Kbucket"); + for node in &sn.nodes { + self.kbucket.try_add(node); + } + } + Ok(()) + } else { Ok(())} + }) + .map_err(|_| { + error!("Wrong DhtPacket; should have contained SendNodes"); + }); + rlt.unwrap_or(()) + } + else { + ; + } + } + + /** + Remove nodes that have crossed `secs` timeout threshold. + */ + // TODO: test + // TODO: add fn for ping/getn req timeouts with hardcoded consts? + pub fn remove_timed_out(&mut self, secs: u64) { + for pk in self.getn_timeout.get_timed_out(secs) { + debug!("Removing timed out node"); + self.kbucket.remove(&pk); + } + } + + /** + Create a [`DhtPacket`] to peer with `peer_pk` `PublicKey` containing + a [`PingReq`] request. + + [`DhtPacket`]: ../dht/struct.DhtPacket.html + [`PingReq`]: ../dht/struct.PingReq.html + */ + pub fn create_ping_req(&mut self, peer_pk: &PublicKey) -> DhtPacket { + let ping = DhtPacketPayload::PingRequest(PingRequest { id: random_u64() }); + let shared_key = self.precomputed_cache.get_symmetric_key((&self.sk, peer_pk)).expect("symmetric key gens fail"); + DhtPacket::new(&shared_key, &self.pk, ping) + } + + /** + Create a [`DhtUdpPacket`] with request for ping response from a peer. + + [`DhtUdpPacket`] is to be passed to `Sender` created by + [`send_packets()`]. + + [`send_packets()`]: ./fn.send_packets.html + [`DhtUdpPacket`]: ./type.DhtUdpPacket.html + */ + // TODO: track requests + pub fn request_ping(&mut self, peer: &PackedNode) -> DhtUdpPacket { + let request = self.create_ping_req(&peer.pk); + (peer.socket_addr(), DhtBase::DhtPacket(request)) + } + + /** + Create DHT Packet with [`Ping`](./struct.Ping.html) response to `Ping` + request that packet contained. + + Nonce for the response is automatically generated. + */ + // Because UDP codec and tokio use DhtBase for send/receive packet, + // this function returns DhtBase type object + pub fn ping_response(&self, dp: &DhtPacket, + secret_key: &SecretKey, + symmetric_key: &PrecomputedKey, + own_public_key: &PublicKey) -> Option { + + debug!(target: "DhtPacket", "Creating Ping response from Ping request that DHT packet contained."); + trace!(target: "DhtPacket", "With args: DhtPacket: {:?}, own_pk: {:?}", dp, own_public_key); + + if dp.packet_kind != PacketKind::PingRequest { + return None + } + + if let Ok(Some(DhtPacketPayload::PingRequest(packet))) = dp.get_payload(secret_key) { + let resp = DhtPacketPayload::PingResponse(PingResponse::from(packet)); + Some(DhtBase::DhtPacket(DhtPacket::new(symmetric_key, own_public_key, resp))) + } + else { + None + } + } + + /** + Create a [`DhtPacket`] in response to [`DhtPacket`] containing + [`PingReq`] packet. + + Returns `None` if [`DhtPacket`] is not a [`PingReq`]. + + [`DhtPacket`]: ../dht/struct.DhtPacket.html + [`PingReq`]: ../dht/struct.PingReq.html + */ + pub fn create_ping_resp(&mut self, request: &DhtBase) + -> Option + { + if let &DhtBase::DhtPacket(ref dp) = request { + // TODO: precompute shared key to calculate it 1 time + let shared_key = self.precomputed_cache.get_symmetric_key((&self.sk, &dp.pk)).expect("Key HashMap error"); + self.ping_response(dp, &self.sk, &shared_key, &self.pk) + } + else { + None + } + } + + /** + Create a future sending [`DhtPacket`] that encapsulates + [ping response] to supplied ping request. + + [`DhtPacket`]: ../dht/struct.DhtPacket.html + [ping response]: ../dht/struct.PingResp.html + */ + // TODO: change to return Option + pub fn respond_ping(&mut self, + sink: DhtSplitSink, + peer_addr: SocketAddr, + request: &DhtBase) + -> Option + { + self.create_ping_resp(request) + .map(|p| sink.send((peer_addr, p))) + } + + /** + Create a [`DhtPacket`] to peer's `PublicKey` containing + a [`GetNodes`] request for nodes close to own DHT `PublicKey`. + + `RequestId` is to be used for tracking node timeouts. + + [`DhtPacket`]: ../dht/struct.DhtPacket.html + [`GetNodes`]: ../dht/struct.GetNodes.html + */ + pub fn create_getn(&mut self, peer_pk: &PublicKey) + -> (RequestId, DhtBase) { + // request for nodes that are close to our own DHT PK + let req = GetNodes{ pk: *&*self.pk, id: random_u64() }; + let shared_key = self.precomputed_cache.get_symmetric_key((&self.sk, peer_pk)).unwrap(); + (req.id, DhtBase::DhtPacket(DhtPacket::new(&shared_key, &self.pk, DhtPacketPayload::GetNodes(req)))) + } + + /** + Create a [`DhtUdpPacket`] with request for nodes from a peer. + + [`DhtUdpPacket`] is to be passed to `Sender` created by + [`send_packets()`]. + + `RequestId` is to be used for tracking node timeouts. + + [`send_packets()`]: ./fn.send_packets.html + [`DhtUdpPacket`]: ./type.DhtUdpPacket.html + */ + pub fn request_nodes(&mut self, peer: &PackedNode) + -> (RequestId, DhtUdpPacket) + { + let (id, request) = self.create_getn(&peer.pk); + (id, (peer.socket_addr(), request)) + } + + /** + Create [`DhtUdpPacket`]s with request for nodes from every peer in + the Close List. + + [`DhtUdpPacket`]s are to be passed to `Sender` created by + [`send_packets()`]. + + **Adds request to response timeout queue.** + + **Note**: returned `Vec` can be empty if there are no known nodes. + + [`send_packets()`]: ./fn.send_packets.html + [`DhtUdpPacket`]: ./type.DhtUdpPacket.html + */ + pub fn request_nodes_close(&mut self) -> Vec { + self.kbucket.iter() + // copy, collect & iter again to work around borrow checker + .cloned() + .collect::>() + .iter() + .map(|pn| { + let (id, packet) = self.request_nodes(pn); + // add to timeout queue + self.getn_timeout.add(&pn.pk, id); + packet + }) + .collect() + } + + + /** + Create a [`DhtPacket`] to peer with `peer_pk` `PublicKey` + containing [`SendNodes`] response. + + Returns `None` if own `Kbucket` is empty or supplied `DhtPacket` + doesn't contain [`GetNodes`] request. + + [`DhtPacket`]: ../dht/struct.DhtPacket.html + [`GetNodes`]: ../dht/struct.GetNodes.html + [`SendNodes`]: ../dht/struct.SendNodes.html + */ + pub fn create_sendn(&mut self, request: &DhtBase) + -> Option + { + if let &DhtBase::DhtPacket(ref dp) = request { + let rlt = dp.get_payload(&self.sk) + .and_then(|psn| { + if let Some(DhtPacketPayload::GetNodes(ref getn)) = psn { + if let Some(sendn) = getn.response(&*self.kbucket) { + let shared_key = self.precomputed_cache.get_symmetric_key((&self.sk, &dp.pk)).expect("Keys HashMap error"); + Ok(Some(DhtBase::DhtPacket(DhtPacket::new(&shared_key, &self.pk, sendn)))) + } else { Err(Error::new(ErrorKind::Other, "SendNodes creation error")) } + } else { Err(Error::new(ErrorKind::Other, "get_payload call error")) } + }) + .map_err(|_| ()); + rlt.unwrap_or(None) + } else { + None + } + } + + /** + Send nodes in response to [`GetNodes`] request contained in + [`DhtPacket`]. + + Can fail (return `None`) if Kbucket is empty or `DhtPacket` doesn't + contain `GetNodes` request. + + [`DhtPacket`]: ../dht/struct.DhtPacket.html + [`GetNodes`]: ../dht/struct.GetNodes.html + */ + pub fn send_nodes(&mut self, + sink: DhtSplitSink, + peer_addr: SocketAddr, + request: &DhtBase) + -> Option + { + self.create_sendn(request) + .map(|sn| sink.send((peer_addr, sn))) + } +} + +#[cfg(test)] +mod test { + use futures::*; + use futures::future::*; + use tokio_core::reactor::{Core, Timeout}; + use tokio_core::net::UdpCodec; + + use std::net::SocketAddr; + use std::time::Duration; + + use toxcore::dht_new::binary_io::*; + use toxcore::crypto_core::*; + use toxcore::network::*; + use toxcore::dht_new::dht_node::*; + use toxcore::dht_new::packet_kind::PacketKind; + + use quickcheck::{quickcheck, TestResult}; + + /// Bind to this IpAddr. + // NOTE: apparently using `0.0.0.0`/`::` is not allowed on CIs like + // appveyor / travis + const SOCKET_ADDR: &str = "127.0.0.1"; + + /// Provide: + /// - mut core ($c) + /// - handle ($h) + macro_rules! create_core { + ($c:ident, $h:ident) => ( + let $c = Core::new().unwrap(); + let $h = $c.handle(); + ); + + (mut $c:ident, $h:ident) => ( + let mut $c = Core::new().unwrap(); + let $h = $c.handle(); + ); + } + + /// Accept: + /// - handle ($h) + /// Provide: + /// - [mut] DhtNode $name + /// - socket $name_socket + macro_rules! node_socket { + ($h:ident, mut $name:ident, $name_socket:ident) => ( + let mut $name = DhtNode::new().unwrap(); + let $name_socket = bind_udp(SOCKET_ADDR.parse().unwrap(), + // make port range sufficiently big + 2048..65_000, + &$h) + .expect("failed to bind to socket"); + ); + ($($h:ident, $name:ident, $name_socket:ident),+) => ($( + let $name = DhtNode::new().unwrap(); + let $name_socket = bind_udp(SOCKET_ADDR.parse().unwrap(), + // make port range sufficiently big + 2048..65_000, + &$h) + .expect("failed to bind to socket"); + )+); + } + + /// Add timeout to the future, and panic upon timing out. + /// + /// If not specified, default timeout = 5s. + macro_rules! add_timeout { + ($f:expr, $handle:expr) => ( + add_timeout!($f, $handle, 5) + ); + + ($f:expr, $handle:expr, $seconds:expr) => ( + $f.map(Ok) + .select( + Timeout::new(Duration::from_secs($seconds), $handle) + .unwrap() + .map(Err)) + .then(|res| { + match res { + Ok((Err(()), _received)) => + panic!("timed out"), + Err((e, _other)) => panic!("{}", e), + Ok((f, _timeout)) => f, + } + }) + ); + } + + // DhtNode:: + + // DhtNode::new() + + #[test] + fn dht_node_new() { + let _ = DhtNode::new().unwrap(); + } + + // DhtNode::try_add() + + #[test] + fn dht_node_try_add_to_empty() { + fn with_nodes(pns: Vec) { + let mut dhtn = DhtNode::new().unwrap(); + let mut kbuc = Kbucket::new(KBUCKET_BUCKETS, &dhtn.pk); + + for pn in &pns { + assert_eq!(dhtn.kbucket.try_add(pn), kbuc.try_add(pn)); + assert_eq!(kbuc, *dhtn.kbucket); + } + } + quickcheck(with_nodes as fn(Vec)); + } + + // DhtNode::create_ping_req() + + #[test] + fn dht_node_create_ping_req_test() { + let mut alice = DhtNode::new().unwrap(); + let (bob_pk, bob_sk) = gen_keypair(); + let (_, eve_sk) = gen_keypair(); + let packet1 = alice.create_ping_req(&bob_pk); + assert_eq!(&*alice.pk, &packet1.pk); + assert_eq!(PacketKind::PingRequest, packet1.packet_kind); + + let packet2 = alice.create_ping_req(&bob_pk); + assert_ne!(packet1, packet2); + + // eve can't decrypt it + assert_eq!(None, packet1.get_payload(&eve_sk).unwrap_or(None)); + + if let DhtPacketPayload::PingRequest(payload1) = packet1.get_payload(&bob_sk) + .expect("failed to get payload1").unwrap() { + if let DhtPacketPayload::PingRequest(payload2) = packet2.get_payload(&bob_sk) + .expect("failed to get payload2").unwrap() { + assert_ne!(payload1.id, payload2.id); + } else { panic!("Can not occur"); } + } else { panic!("Can not occur"); } + } + + // DhtNode::request_ping() + + #[test] + fn dht_node_request_ping_test() { + // bob creates & sends PingReq to alice + // received PingReq has to be succesfully decrypted + create_core!(core, handle); + node_socket!(handle, alice, alice_socket); + let mut bob = DhtNode::new().unwrap(); + let alice_addr = alice_socket.local_addr().unwrap(); + let alice_pn = PackedNode::new(true, alice_addr, &alice.pk); + + if let (dest_addr, DhtBase::DhtPacket(bob_request)) = bob.request_ping(&alice_pn) { + assert_eq!(alice_addr, dest_addr); + + let payload = bob_request + .get_payload(&alice.sk) + .expect("Failed to decrypt payload").unwrap(); + + assert_eq!(PacketKind::PingRequest, payload.kind()); + } else { panic!("Can not occur"); } + + } + + // DhtNode::create_ping_resp() + + quickcheck! { + fn dht_node_create_ping_resp_test(req: PingRequest) -> () { + // alice creates DhtPacket containing PingReq request + // bob has to respond to it with PingResp + // alice has to be able to decrypt response + // eve can't decrypt response + + let mut alice = DhtNode::new().unwrap(); + let mut bob = DhtNode::new().unwrap(); + let (_, eve_sk) = gen_keypair(); + let precomp = encrypt_precompute(&bob.pk, &alice.sk); + let dreq = DhtPacketPayload::PingRequest(req); + let a_ping = DhtBase::DhtPacket(DhtPacket::new(&precomp, &alice.pk, dreq)); + + if let DhtBase::DhtPacket(resp1) = bob.create_ping_resp(&a_ping) + .expect("failed to create ping resp1") { + if let DhtBase::DhtPacket(resp2) = bob.create_ping_resp(&a_ping) + .expect("failed to create ping resp2") { + + assert_eq!(&resp1.pk, &*bob.pk); + assert_eq!(PacketKind::PingResponse, resp1.packet_kind); + // encrypted payload differs due to different nonce + assert_ne!(resp1, resp2); + + // eve can't decrypt + assert_eq!(None, resp1.get_payload(&eve_sk).ok().unwrap_or(None)); + + let resp1_payload = resp1 + .get_payload(&alice.sk).unwrap().unwrap(); + let resp2_payload = resp2 + .get_payload(&alice.sk).unwrap().unwrap(); + assert_eq!(resp1_payload, resp2_payload); + if let DhtPacketPayload::PingResponse(target_resp) = resp1_payload { + assert_eq!(req.id, target_resp.id); + assert_eq!(PacketKind::PingResponse, resp1_payload.kind()); + + // can't create response from DhtPacket containing PingResp + assert!(alice.create_ping_resp(&DhtBase::DhtPacket(resp1)).is_none()); + } else { panic!("can not occur")} + } else { panic!("can not occur")} + } else { panic!("can not occur")} + } + } + + // DhtNode::respond_ping() + + quickcheck! { + fn dht_node_respond_ping_test(req: PingRequest) -> () { + // bob creates a DhtPacket with PingReq, and alice + // sends a response to it + // response has to be successfully decrypted by alice + // response can't be decrypted by eve + create_core!(mut core, handle); + node_socket!(handle, mut alice, alice_socket); + node_socket!(handle, mut bob, bob_socket); + let (_, eve_sk) = gen_keypair(); + + let precomp = encrypt_precompute(&alice.pk, &bob.sk); + let dreq = DhtPacketPayload::PingRequest(req); + let bob_ping = DhtPacket::new(&precomp, &bob.pk, dreq); + + let (alice_sink, _) = alice_socket.framed(DhtCodec).split(); + let alice_send = alice.respond_ping( + alice_sink, + bob_socket.local_addr().unwrap(), + &DhtBase::DhtPacket(bob_ping.clone())); + + let mut recv_buf = [0; MAX_UDP_PACKET_SIZE]; + let future_recv = bob_socket.recv_dgram(&mut recv_buf[..]); + let future_recv = add_timeout!(future_recv, &handle); + handle.spawn(alice_send.then(|_| ok(()))); + + let received = core.run(future_recv).unwrap(); + let (_bob_socket, recv_buf, size, _saddr) = received; + assert!(size != 0); + let mut buf = [0; 512]; + assert_eq!(size, bob_ping.to_bytes((&mut buf, 0)).unwrap().1); + + let recv_packet = DhtPacket::from_bytes(&recv_buf[..size]).unwrap().1; + assert_eq!(PacketKind::PingResponse, recv_packet.packet_kind); + + // eve can't decrypt it + assert_eq!(None, recv_packet.get_payload(&eve_sk).unwrap_or(None)); + + if let DhtPacketPayload::PingResponse(_payload) = recv_packet + .get_payload(&bob.sk).unwrap().unwrap() { + ; + } else { panic!("can not occur")} + bob.pk = alice.pk; // to remove compile time warning + } + } + + // DhtNode::create_getn() + + #[test] + fn dht_node_create_getn_test() { + // alice sends GetNodes request to bob + // bob has to successfully decrypt the request + // eve can't decrypt the request + let mut alice = DhtNode::new().unwrap(); + let (bob_pk, bob_sk) = gen_keypair(); + let (_, eve_sk) = gen_keypair(); + if let (req_id1, DhtBase::DhtPacket(packet1)) = alice.create_getn(&bob_pk) { + assert_eq!(&*alice.pk, &packet1.pk); + assert_eq!(PacketKind::GetNodes, packet1.packet_kind); + + // eve can't decrypt + assert_eq!(None, packet1.get_payload(&eve_sk).unwrap_or(None)); + + if let DhtPacketPayload::GetNodes(payload1) = packet1.get_payload(&bob_sk) + .expect("failed to get payload1").unwrap() { + assert_eq!(&*alice.pk, &payload1.pk); + assert_eq!(req_id1, payload1.id); + + if let (_req_id2, DhtBase::DhtPacket(packet2)) = alice.create_getn(&bob_pk) { + assert_ne!(&packet1, &packet2); + + if let DhtPacketPayload::GetNodes(payload2) = packet2.get_payload(&bob_sk) + .expect("failed to get payload2").unwrap() { + assert_ne!(payload1.id, payload2.id); + } else { panic!("can not occur")} + } else { panic!("can not occur")} + } else { panic!("can not occur")} + } else { panic!("can not occur")} + } + + // DhtNode::request_nodes() + + #[test] + fn dht_node_request_nodes_test() { + // bob creates a ToxUdpPacket with GetNodes request to alice + // alice has to successfully decrypt & parse it + create_core!(core, handle); + node_socket!(handle, alice, alice_socket); + let mut bob = DhtNode::new().unwrap(); + let alice_addr = alice_socket.local_addr().unwrap(); + let alice_pn = PackedNode::new(true, alice_addr, &alice.pk); + + if let (id, (dest_addr, DhtBase::DhtPacket(bob_request))) = bob.request_nodes(&alice_pn) { + assert_eq!(alice_addr, dest_addr); + + if let DhtPacketPayload::GetNodes(payload) = bob_request + .get_payload(&alice.sk) + .expect("Failed to decrypt payload") + .unwrap() { + + assert_eq!(&payload.pk, &*bob.pk); + assert_eq!(payload.id, id); + } else { panic!("can not occur")} + } else { panic!("can not occur")} + } + + // DhtNode::request_nodes_close() + + quickcheck! { + fn dht_node_request_nodes_close_test(pns: Vec) + -> TestResult + { + if pns.is_empty() { return TestResult::discard() } + + let mut dnode = DhtNode::new().unwrap(); + for pn in &pns { + dnode.kbucket.try_add(pn); + } + + let requests = dnode.request_nodes_close(); + + for (n, node) in dnode.kbucket.iter().enumerate() { + // each request creates a response timeout + assert_eq!(dnode.getn_timeout.get(n).unwrap().pk(), + &node.pk); + let (req_addr, ref _req_packet) = requests[n]; + assert_eq!(node.socket_addr(), req_addr); + } + + TestResult::passed() + } + } + + + + // DhtNode::create_sendn() + + quickcheck! { + fn dht_node_create_sendn_test(pns: Vec) -> TestResult { + if pns.is_empty() { return TestResult::discard() } + + // alice creates DhtPacket containing GetNodes request + // bob has to respond to it with SendNodes + // alice has to be able to decrypt response + // alice has to be able to successfully add received nodes + // eve can't decrypt response + + let mut alice = DhtNode::new().unwrap(); + let mut bob = DhtNode::new().unwrap(); + let (_, eve_sk) = gen_keypair(); + let (_id, req) = alice.create_getn(&bob.pk); + + // errors with an empty kbucket + let error = bob.create_sendn(&req); + assert_eq!(None, error); + + for pn in &pns { + bob.kbucket.try_add(pn); + } + + let pk = bob.pk.clone(); + let nonce = gen_nonce(); + + if let DhtBase::DhtPacket(resp1) = bob.create_sendn(&req) + .unwrap_or(DhtBase::DhtPacket(DhtPacket{packet_kind: PacketKind::PingRequest, pk: *pk, nonce: nonce, payload: vec![0x00]})) { + if let DhtBase::DhtPacket(resp2) = bob.create_sendn(&req) + .unwrap_or(DhtBase::DhtPacket(DhtPacket{packet_kind: PacketKind::PingRequest, pk: *pk, nonce: nonce, payload: vec![0x00]})) { + + assert_eq!(&resp1.pk, &*bob.pk); + assert_eq!(PacketKind::SendNodes, resp1.packet_kind); + // encrypted payload differs due to different nonce + assert_ne!(resp1, resp2); + + // eve can't decrypt + assert_eq!(None, resp1.get_payload(&eve_sk).unwrap_or(None)); + + if let DhtPacketPayload::SendNodes(resp1_payload) = resp1 + .get_payload(&alice.sk) + .unwrap_or(Some(DhtPacketPayload::PingRequest(PingRequest{id: 0x00}))).unwrap_or(DhtPacketPayload::PingRequest(PingRequest{id: 0x00})) { + if let DhtPacketPayload::SendNodes(resp2_payload) = resp2 + .get_payload(&alice.sk) + .unwrap_or(Some(DhtPacketPayload::PingRequest(PingRequest{id: 0x00}))).unwrap_or(DhtPacketPayload::PingRequest(PingRequest{id: 0x00})) { + assert_eq!(resp1_payload, resp2_payload); + assert!(!resp1_payload.nodes.is_empty()); + + for node in &resp1_payload.nodes { + // has to succeed, since nodes in response have to differ + assert!(alice.kbucket.try_add(node)); + } + } else { panic!("can not occur")} + } else { panic!("can not occur")} + } else { panic!("can not occur")} + } else { panic!("can not occur")} + + TestResult::passed() + } + } + + // DhtNode::send_nodes() + + #[test] + quickcheck! { + fn dht_node_send_nodes(pns: Vec) -> TestResult { + if pns.is_empty() { return TestResult::discard() } + + // alice sends SendNodes response to random GetNodes request + // to bob + + create_core!(mut core, handle); + node_socket!(handle, mut alice, alice_socket); + node_socket!(handle, mut bob, bob_socket); + + for pn in &pns { + alice.kbucket.try_add(pn); + } + + let (_id, getn) = bob.create_getn(&alice.pk); + + let (alice_sink, _) = alice_socket.framed(DhtCodec).split(); + let alice_response = alice.send_nodes( + alice_sink, + bob_socket.local_addr().unwrap(), + &getn); + + let mut recv_buf = [0; MAX_UDP_PACKET_SIZE]; + let future_recv = bob_socket.recv_dgram(&mut recv_buf[..]); + let future_recv = add_timeout!(future_recv, &handle); + handle.spawn(alice_response.then(|_| ok(()))); + + let received = core.run(future_recv).unwrap(); + let (_bob_socket, recv_buf, size, _saddr) = received; + assert!(size != 0); + + let _recv_packet = DhtPacket::from_bytes(&recv_buf[..size]); + + TestResult::passed() + } + } + + // DhtNode::handle_packet_sendn() + + quickcheck! { + fn dht_node_handle_packet_sendn_test(sn: SendNodes, + gn: GetNodes, + pq: PingRequest, + pr: PingResponse) + -> () + { + // bob creates a DhtPacket to alice that contains SendNodes + // alice adds the nodes + + let dpq = DhtPacketPayload::PingRequest(pq); + let dpr = DhtPacketPayload::PingResponse(pr); + let dgn = DhtPacketPayload::GetNodes(gn); + let dsn = DhtPacketPayload::SendNodes(sn.clone()); + + let mut alice = DhtNode::new().unwrap(); + let (bob_pk, bob_sk) = gen_keypair(); + let precomp = precompute(&alice.pk, &bob_sk); + macro_rules! try_add_with { + ($($kind:expr)+) => ($( + alice.handle_packet_sendn(&DhtBase::DhtPacket(DhtPacket::new(&precomp, + &bob_pk, + $kind))); + )+) + } + // also try to add nodes from a DhtPacket that don't contain + // SendNodes + try_add_with!(dsn.clone() /* and invalid ones */ dgn dpq dpr); + + // since alice doesn't have stored ID for SendNodes response, + // packet is supposed to be ignored + assert!(alice.kbucket.is_empty()); + + // add needed packet ID to alice's timeout table + alice.getn_timeout.add(&bob_pk, sn.id); + // now nodes from SendNodes can be processed + try_add_with!(dsn); + + // verify that alice's kbucket's contents are the same as + // stand-alone kbucket + let mut kbuc = Kbucket::new(KBUCKET_BUCKETS, &alice.pk); + for pn in &sn.nodes { + kbuc.try_add(pn); + } + assert_eq!(kbuc, *alice.kbucket); + } + } + + // DhtNode::handle_packet() + + quickcheck! { + fn dht_node_handle_packet(pq: PingRequest, + pr: PingResponse, + gn: GetNodes, + sn: SendNodes) + -> () + { + let alice = DhtNode::new().unwrap(); + let mut bob = DhtNode::new().unwrap(); + let precom = precompute(&bob.pk, &alice.sk); + + let dpq = DhtPacketPayload::PingRequest(pq); + let dpr = DhtPacketPayload::PingResponse(pr); + let dgn = DhtPacketPayload::GetNodes(gn); + let dsn = DhtPacketPayload::SendNodes(sn.clone()); + + // test with + + { + // PingReq + let dp = DhtBase::DhtPacket(DhtPacket::new(&precom, &alice.pk, dpq)); + assert_eq!(bob.create_ping_resp(&dp).unwrap().kind(), + bob.handle_packet(&dp).unwrap().kind()); + } + + { + // PingResp + let dp = DhtBase::DhtPacket(DhtPacket::new(&precom, &alice.pk, dpr)); + assert_eq!(None, bob.handle_packet(&dp)); + } + + { + // GetNodes with an empty kbucket + let dp = DhtBase::DhtPacket(DhtPacket::new(&precom, &alice.pk, dgn.clone())); + assert_eq!(None, bob.handle_packet(&dp)); + } + + { + // SendNodes + let dp = DhtBase::DhtPacket(DhtPacket::new(&precom, &alice.pk, dsn)); + assert_eq!(None, bob.handle_packet(&dp)); + // bob doesn't have request ID, thus packet is dropped + assert!(bob.kbucket.is_empty()); + // add request ID, so that nods could be processed + bob.getn_timeout.add(&alice.pk, sn.id); + assert_eq!(None, bob.handle_packet(&dp)); + assert!(!bob.kbucket.is_empty()); + } + + { + // GetNodes with something in kbucket + let dp = DhtBase::DhtPacket(DhtPacket::new(&precom, &alice.pk, dgn)); + assert_eq!(bob.create_sendn(&dp).unwrap().kind(), + bob.handle_packet(&dp).unwrap().kind()); + } + } + } + + + // DhtCodec:: + + // DhtCodec::decode() + + #[test] + fn tox_codec_decode_test() { + fn with_dp(dp: DhtBase) -> TestResult { + // TODO: random SocketAddr + let addr = SocketAddr::V4("0.1.2.3:4".parse().unwrap()); + let mut tc = DhtCodec; + + let mut buf = [0; 512]; + let (_, size) = dp.to_bytes((&mut buf, 0)).unwrap(); + let bytes = &buf[..size]; + + let (decoded_a, decoded_dp) = tc.decode(&addr, &bytes) + .unwrap(); + // it did have correct packet + let decoded_dp = decoded_dp.unwrap(); + + assert_eq!(addr, decoded_a); + assert_eq!(dp, decoded_dp); + + // make it error + let mut buf_err = buf.clone(); + buf_err[0] = 0x40; + let bytes_err = &mut buf_err[..size]; + let (r_addr, none) = tc.decode(&addr, &bytes_err).unwrap_or((addr, None)); + assert_eq!(addr, r_addr); + assert!(none.is_none()); + + TestResult::passed() + } + quickcheck(with_dp as fn(DhtBase) -> TestResult); + } + + // DhtCodec::encode() + + #[test] + fn dht_codec_encode_test() { + fn with_dp(dp: DhtBase) { + // TODO: random SocketAddr + let addr = SocketAddr::V4("5.6.7.8:9".parse().unwrap()); + let mut buf = Vec::new(); + let mut tc = DhtCodec; + + let socket = tc.encode((addr, dp.clone()), &mut buf); + assert_eq!(addr, socket); + let mut to_buf = [0; 512]; + let (_, size) = dp.to_bytes((&mut to_buf, 0)).unwrap(); + assert_eq!(buf, to_buf[..size].to_vec()); + } + quickcheck(with_dp as fn(DhtBase)); + } + + + // receive_packets() + + quickcheck! { + fn dht_receive_packets_test(dps: Vec) -> TestResult { + if dps.is_empty() { return TestResult::discard() } + // Send & receive packet create threads. + // And processing each packet in Vec also create thread. + // Aribtrary Generator make much test data, so it result in shortage of OS resources. + // To prevent this problem, limited number of test data are used. + static mut COUNT: u16 = 0; + unsafe { + COUNT += 1; + if COUNT > 100 || dps.len() > 20 { + return TestResult::discard() + } + } + // alice sends packets to bob + create_core!(mut core, handle); + node_socket!(handle, _alice, a_socket); + node_socket!(handle, _bob, b_socket); + + let a_addr = a_socket.local_addr().expect("local sender socket create error"); + let b_addr = b_socket.local_addr().expect("local receiver socket create error"); + let (_sink, stream) = b_socket.framed(DhtCodec).split(); + + // start receiving packets + let to_receive = receive_packets(stream); + + let mut a_socket = a_socket; + for dp in &dps { + let mut buf = [0; MAX_DHT_PACKET_SIZE]; + let (_, size) = dp.to_bytes((&mut buf, 0)).expect("to_bytes fail on DhtBase {:?}, dp"); + let send = a_socket.send_dgram(&buf[..size], b_addr); + let (s, _) = core.run(send).expect("send error"); + a_socket = s; + } + + let f_recv = to_receive.take(dps.len() as u64).collect(); + let received = core.run(f_recv).expect("receive error"); + + for (n, &(ref addr, ref packet)) in received.iter().enumerate() { + assert_eq!(a_addr, *addr); + assert_eq!(dps[n], *packet); + } + + TestResult::passed() + } + } + + // send_packets() + + quickcheck! { + fn dht_send_packets_test(dps: Vec) -> TestResult { + if dps.is_empty() { return TestResult::discard() } + // Send & receive packet create threads. + // And processing each packet in Vec also create thread. + // Aribtrary Generator make much test data, so it result in shortage of OS resources. + // To prevent this problem, limited number of test data are used. + static mut COUNT: u16 = 0; + unsafe { + COUNT += 1; + if COUNT > 100 || dps.len() > 20 { + return TestResult::discard() + } + } + // alice sends packets to bob + create_core!(mut core, handle); + node_socket!(handle, _alice, a_socket); + node_socket!(handle, _bob, b_socket); + + let a_addr = a_socket.local_addr().expect("sender socket create error"); + let b_addr = b_socket.local_addr().expect("receiver socket create error"); + let (sink, _stream) = a_socket.framed(DhtCodec).split(); + let (_sink, stream) = b_socket.framed(DhtCodec).split(); + + // start receiving/sending packets + let receiver = receive_packets(stream); + let sender = send_packets(sink); + + let dps_send = dps.clone(); + for dp in dps_send { + let tx = sender.clone(); + let send = tx.send((b_addr, dp)).then(|_| Ok(())); + handle.spawn(send); + } + + let f_recv = receiver.take(dps.len() as u64).collect(); + let received = core.run(f_recv).expect("receive error"); + + for (n, &(ref addr, ref packet)) in received.iter().enumerate() { + assert_eq!(a_addr, *addr); + assert_eq!(dps[n], *packet); + } + + TestResult::passed() + } + } +} diff --git a/src/toxcore/dht_new/kbucket.rs b/src/toxcore/dht_new/kbucket.rs index bab4c8053..55c49e135 100644 --- a/src/toxcore/dht_new/kbucket.rs +++ b/src/toxcore/dht_new/kbucket.rs @@ -148,7 +148,7 @@ impl Bucket { #[cfg(test)] fn find(&self, pk: &PublicKey) -> Option { for (n, node) in self.nodes.iter().enumerate() { - if node.pk() == pk { + if &node.pk == pk { return Some(n) } } @@ -181,7 +181,7 @@ impl Bucket { trace!(target: "Bucket", "With bucket: {:?}; PK: {:?} and new node: {:?}", self, base_pk, new_node); - match self.nodes.binary_search_by(|n| base_pk.distance(n.pk(), new_node.pk())) { + match self.nodes.binary_search_by(|n| base_pk.distance(&n.pk, &new_node.pk)) { Ok(index) => { debug!(target: "Bucket", "Updated: the node was already in the bucket."); @@ -229,7 +229,7 @@ impl Bucket { */ pub fn remove(&mut self, base_pk: &PublicKey, node_pk: &PublicKey) { trace!(target: "Bucket", "Removing PackedNode with PK: {:?}", node_pk); - match self.nodes.binary_search_by(|n| base_pk.distance(n.pk(), node_pk) ) { + match self.nodes.binary_search_by(|n| base_pk.distance(&n.pk, node_pk) ) { Ok(index) => { self.nodes.remove(index); }, @@ -241,7 +241,7 @@ impl Bucket { /// Check if node with given PK is in the `Bucket`. pub fn contains(&self, pk: &PublicKey) -> bool { - self.nodes.iter().any(|n| n.pk() == pk) + self.nodes.iter().any(|n| &n.pk == pk) } /// Get the capacity of the Bucket. @@ -335,13 +335,7 @@ impl Kbucket { self.buckets.len() as u8 } - /// Get the PK of the Kbucket. Used in tests only #[cfg(test)] - pub fn pk(&self) -> PublicKey { - self.pk - } - - #[cfg(test)] fn find(&self, pk: &PublicKey) -> Option<(usize, usize)> { for (bucket_index, bucket) in self.buckets.iter().enumerate() { match bucket.find(pk) { @@ -382,7 +376,7 @@ impl Kbucket { debug!(target: "Kbucket", "Trying to add PackedNode."); trace!(target: "Kbucket", "With PN: {:?}; and self: {:?}", node, self); - match self.bucket_index(node.pk()) { + match self.bucket_index(&node.pk) { Some(index) => self.buckets[index].try_add(&self.pk, node), None => { trace!("Failed to add node: {:?}", node); @@ -623,7 +617,7 @@ mod test { fn dht_bucket_1_capacity_try_add_test() { fn with_nodes(n1: PackedNode, n2: PackedNode) -> TestResult { let pk = PublicKey([0; PUBLICKEYBYTES]); - if pk.distance(n2.pk(), n1.pk()) != Ordering::Greater { + if pk.distance(&n2.pk, &n1.pk) != Ordering::Greater { // n2 should be greater to check we can't add it return TestResult::discard() } @@ -651,7 +645,7 @@ mod test { let mut bucket = Bucket::new(Some(bucket_size)); let non_existent_node: PackedNode = Arbitrary::arbitrary(&mut rng); - bucket.remove(&base_pk, non_existent_node.pk()); // "removing" non-existent node + bucket.remove(&base_pk, &non_existent_node.pk); // "removing" non-existent node assert_eq!(true, bucket.is_empty()); let nodes = vec![Arbitrary::arbitrary(&mut rng); num as usize]; @@ -667,7 +661,7 @@ mod test { } for node in &nodes { - bucket.remove(&base_pk, node.pk()); + bucket.remove(&base_pk, &node.pk); } assert_eq!(true, bucket.is_empty()); } @@ -729,7 +723,7 @@ mod test { let pk = nums_to_pk(a, b, c, d); let kbucket = Kbucket::new(buckets, &pk); assert_eq!(buckets, kbucket.size()); - assert_eq!(pk, kbucket.pk()); + assert_eq!(pk, kbucket.pk); } quickcheck(with_pk as fn(u64, u64, u64, u64, u8)); } @@ -792,7 +786,7 @@ mod test { // Check for actual removing for node in &nodes { - kbucket.remove(node.pk()); + kbucket.remove(&node.pk); } assert!(kbucket.is_empty()); TestResult::passed() @@ -820,7 +814,7 @@ mod test { // check whether number of correct nodes that are returned is right let correctness = |should, kbc: &Kbucket| { - assert_eq!(kbc.get_closest(&pk), kbc.get_closest(&kbc.pk())); + assert_eq!(kbc.get_closest(&pk), kbc.get_closest(&kbc.pk)); let got_nodes = kbc.get_closest(&pk); let mut got_correct = 0; @@ -901,9 +895,9 @@ mod test { kbucket.try_add(n1); kbucket.try_add(n2); kbucket.try_add(n3); - assert_eq!(Some((46, 0)), kbucket.find(n1.pk())); - assert_eq!(Some((46, 1)), kbucket.find(n2.pk())); - assert_eq!(Some((46, 2)), kbucket.find(n3.pk())); + assert_eq!(Some((46, 0)), kbucket.find(&n1.pk)); + assert_eq!(Some((46, 1)), kbucket.find(&n2.pk)); + assert_eq!(Some((46, 2)), kbucket.find(&n3.pk)); }); with_data(|kbucket, n1, n2, n3| { // insert order: n3 n2 n1 maps to position @@ -911,9 +905,9 @@ mod test { kbucket.try_add(n3); kbucket.try_add(n2); kbucket.try_add(n1); - assert_eq!(Some((46, 0)), kbucket.find(n1.pk())); - assert_eq!(Some((46, 1)), kbucket.find(n2.pk())); - assert_eq!(Some((46, 2)), kbucket.find(n3.pk())); + assert_eq!(Some((46, 0)), kbucket.find(&n1.pk)); + assert_eq!(Some((46, 1)), kbucket.find(&n2.pk)); + assert_eq!(Some((46, 2)), kbucket.find(&n3.pk)); }); // Check that removing order does not affect // the order of nodes inside @@ -923,10 +917,10 @@ mod test { kbucket.try_add(n2); // => 1 kbucket.try_add(n3); // => 2 // test removing from the beginning (n1 => 0) - kbucket.remove(n1.pk()); - assert_eq!(None, kbucket.find(n1.pk())); - assert_eq!(Some((46, 0)), kbucket.find(n2.pk())); - assert_eq!(Some((46, 1)), kbucket.find(n3.pk())); + kbucket.remove(&n1.pk); + assert_eq!(None, kbucket.find(&n1.pk)); + assert_eq!(Some((46, 0)), kbucket.find(&n2.pk)); + assert_eq!(Some((46, 1)), kbucket.find(&n3.pk)); }); with_data(|kbucket, n1, n2, n3| { // prepare kbucket @@ -934,10 +928,10 @@ mod test { kbucket.try_add(n2); // => 1 kbucket.try_add(n3); // => 2 // test removing from the middle (n2 => 1) - kbucket.remove(n2.pk()); - assert_eq!(Some((46, 0)), kbucket.find(n1.pk())); - assert_eq!(None, kbucket.find(n2.pk())); - assert_eq!(Some((46, 1)), kbucket.find(n3.pk())); + kbucket.remove(&n2.pk); + assert_eq!(Some((46, 0)), kbucket.find(&n1.pk)); + assert_eq!(None, kbucket.find(&n2.pk)); + assert_eq!(Some((46, 1)), kbucket.find(&n3.pk)); }); with_data(|kbucket, n1, n2, n3| { // prepare kbucket @@ -945,10 +939,10 @@ mod test { kbucket.try_add(n2); // => 1 kbucket.try_add(n3); // => 2 // test removing from the end (n3 => 2) - kbucket.remove(n3.pk()); - assert_eq!(Some((46, 0)), kbucket.find(n1.pk())); - assert_eq!(Some((46, 1)), kbucket.find(n2.pk())); - assert_eq!(None, kbucket.find(n3.pk())); + kbucket.remove(&n3.pk); + assert_eq!(Some((46, 0)), kbucket.find(&n1.pk)); + assert_eq!(Some((46, 1)), kbucket.find(&n2.pk)); + assert_eq!(None, kbucket.find(&n3.pk)); }); } @@ -961,13 +955,13 @@ mod test { let (pk, _) = gen_keypair(); let mut kbucket = Kbucket::new(n, &pk); assert!(!kbucket.contains(&pk)); - assert!(pns.iter().all(|pn| !kbucket.contains(pn.pk()))); + assert!(pns.iter().all(|pn| !kbucket.contains(&pn.pk))); for pn in &pns { kbucket.try_add(pn); } - assert!(kbucket.iter().all(|pn| kbucket.contains(pn.pk()))); + assert!(kbucket.iter().all(|pn| kbucket.contains(&pn.pk))); TestResult::passed() } @@ -984,7 +978,7 @@ mod test { { let fitting_nodes = pns.iter().any(|p1| pns.iter() .filter(|p2| p1 != *p2) - .any(|p2| kbucket_index(&pk, p1.pk()) == kbucket_index(&pk, p2.pk()))); + .any(|p2| kbucket_index(&pk, &p1.pk) == kbucket_index(&pk, &p2.pk))); if !fitting_nodes { return TestResult::discard() } @@ -997,12 +991,12 @@ mod test { for node in &pns { if kbucket.try_add(node) { - let index = kbucket_index(&pk, node.pk()); + let index = kbucket_index(&pk, &node.pk); // none of nodes with the same index can be added // to the kbucket assert!(pns.iter() - .filter(|pn| kbucket_index(&pk, pn.pk()) == index) - .all(|pn| !kbucket.can_add(pn.pk()))); + .filter(|pn| kbucket_index(&pk, &pn.pk) == index) + .all(|pn| !kbucket.can_add(&pn.pk))); } } diff --git a/src/toxcore/dht_new/mod.rs b/src/toxcore/dht_new/mod.rs index d61203df7..5cb87a7f4 100644 --- a/src/toxcore/dht_new/mod.rs +++ b/src/toxcore/dht_new/mod.rs @@ -27,3 +27,7 @@ pub mod packet; pub mod binary_io; pub mod kbucket; pub mod packed_node; +pub mod codec; +pub mod dht_node; +pub mod dht_impl; +pub mod packet_kind; diff --git a/src/toxcore/dht_new/packed_node.rs b/src/toxcore/dht_new/packed_node.rs index e29a639d3..83d35e6f9 100644 --- a/src/toxcore/dht_new/packed_node.rs +++ b/src/toxcore/dht_new/packed_node.rs @@ -149,13 +149,6 @@ impl PackedNode { trace!("With address: {:?}", self); self.saddr } - - /// Get an PK from the `PackedNode`. - pub fn pk(&self) -> &PublicKey { - trace!(target: "PackedNode", "Getting PK from PackedNode."); - trace!("With PK: {:?}", self); - &self.pk - } } diff --git a/src/toxcore/dht_new/packet.rs b/src/toxcore/dht_new/packet.rs index cf9aa4f4d..02daaa27a 100644 --- a/src/toxcore/dht_new/packet.rs +++ b/src/toxcore/dht_new/packet.rs @@ -37,7 +37,7 @@ * takes care of the serializing and de-serializing DHT packets */ -use nom::{le_u8, le_u16, be_u64}; +use nom::{le_u8, le_u16, be_u64, rest}; use std::net::{ IpAddr, @@ -47,19 +47,67 @@ use std::net::{ use toxcore::dht_new::binary_io::*; use toxcore::crypto_core::*; +use toxcore::dht_new::packet_kind::*; use toxcore::dht_new::packed_node::PackedNode; /// Length in bytes of [`PingRequest`](./struct.PingRequest.html) and /// [`PingResponse`](./struct.PingResponse.html) when serialized into bytes. pub const PING_SIZE: usize = 9; -/** Standard DHT packet that encapsulates in the payload -[`DhtPacketT`](./trait.DhtPacketT.html). +/** DHT packet base enum that encapsulates +[`DhtPacket`](./struct.DhtPacket.html) or [`DhtRequest`](./struct.DhtRequest.html). + +https://zetok.github.io/tox-spec/#dht-packet +https://zetok.github.io/tox-spec/#dht-request-packets +*/ +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum DhtBase { + /// DhtBase are wrapper for DhtPacket and DhtRequest + DhtPacket(DhtPacket), + /// DhtBase are wrapper for DhtPacket and DhtRequest + DhtRequest(DhtRequest), +} + +/** DHT packet struct that encapsulates in the payload +[`DhtPacketPayload`](./enum.DhtPacketPayload.html). + +https://zetok.github.io/tox-spec/#dht-packet +*/ +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct DhtPacket { + /// first class packet kind + pub packet_kind: PacketKind, + /// Public Key of Request Packet + pub pk: PublicKey, + /// one time serial number + pub nonce : Nonce, + /// payload of DhtPacket + pub payload: Vec, +} + +/** DHT Request packet struct. + +https://zetok.github.io/tox-spec/#dht-request-packets +*/ +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct DhtRequest { + /// receiver publik key + pub rpk: PublicKey, + /// sender publick key + pub spk: PublicKey, + /// one time serial number + pub nonce: Nonce, + /// payload of DhtRequest packet + pub payload: Vec, +} + +/** Standard DHT packet that embedded in the payload of +[`DhtPacket`](./struct.DhtPacket.html). https://zetok.github.io/tox-spec/#dht-packet */ #[derive(Clone, Debug, Eq, PartialEq)] -pub enum DhtPacket { +pub enum DhtPacketPayload { /// [`PingRequest`](./struct.PingRequest.html) structure. PingRequest(PingRequest), /// [`PingResponse`](./struct.PingResponse.html) structure. @@ -70,23 +118,123 @@ pub enum DhtPacket { SendNodes(SendNodes), } -impl ToBytes for DhtPacket { +/** Standart DHT Request packet that embedded in the payload of +[`DhtRequest`](./struct.DhtRequest.html).. + +https://zetok.github.io/tox-spec/#dht-request-packets +*/ +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum DhtRequestPayload { + /// [`NatPingRequest`](./struct.NatPingRequest.html) structure. + NatPingRequest(NatPingRequest), + /// [`NatPingResponse`](./struct.NatPingResponse.html) structure. + NatPingResponse(NatPingResponse), +} + + +impl ToBytes for DhtBase { fn to_bytes<'a>(&self, buf: (&'a mut [u8], usize)) -> Result<(&'a mut [u8], usize), GenError> { match *self { - DhtPacket::PingRequest(ref p) => p.to_bytes(buf), - DhtPacket::PingResponse(ref p) => p.to_bytes(buf), - DhtPacket::GetNodes(ref p) => p.to_bytes(buf), - DhtPacket::SendNodes(ref p) => p.to_bytes(buf), + DhtBase::DhtPacket(ref p) => p.to_bytes(buf), + DhtBase::DhtRequest(ref p) => p.to_bytes(buf), } } } +impl FromBytes for DhtBase { + named!(from_bytes, alt!( + map!(DhtPacket::from_bytes, DhtBase::DhtPacket) | + map!(DhtRequest::from_bytes, DhtBase::DhtRequest) + )); +} + +impl ToBytes for DhtPacket { + fn to_bytes<'a>(&self, buf: (&'a mut [u8], usize)) -> Result<(&'a mut [u8], usize), GenError> { + do_gen!(buf, + gen_be_u8!(self.packet_kind as u8) >> + gen_slice!(self.pk.as_ref()) >> + gen_slice!(self.nonce.as_ref()) >> + gen_slice!(self.payload.as_slice()) + ) + } +} + impl FromBytes for DhtPacket { - named!(from_bytes, alt!( - map!(PingRequest::from_bytes, DhtPacket::PingRequest) | - map!(PingResponse::from_bytes, DhtPacket::PingResponse) | - map!(GetNodes::from_bytes, DhtPacket::GetNodes) | - map!(SendNodes::from_bytes, DhtPacket::SendNodes) + named!(from_bytes, do_parse!( + packet_kind: verify!(call!(PacketKind::from_bytes), |packet_type| match packet_type { + PacketKind::PingRequest | PacketKind::PingResponse | + PacketKind::GetNodes | PacketKind::SendNodes => true, + _ => false + }) >> + pk: call!(PublicKey::from_bytes) >> + nonce: call!(Nonce::from_bytes) >> + payload: map!(rest, |bytes| bytes.to_vec() ) >> + (DhtPacket { + packet_kind: packet_kind, + pk: pk, + nonce: nonce, + payload: payload + }) + )); +} + +impl ToBytes for DhtPacketPayload { + fn to_bytes<'a>(&self, buf: (&'a mut [u8], usize)) -> Result<(&'a mut [u8], usize), GenError> { + match *self { + DhtPacketPayload::PingRequest(ref p) => p.to_bytes(buf), + DhtPacketPayload::PingResponse(ref p) => p.to_bytes(buf), + DhtPacketPayload::GetNodes(ref p) => p.to_bytes(buf), + DhtPacketPayload::SendNodes(ref p) => p.to_bytes(buf), + } + } +} + +impl ToBytes for DhtRequestPayload { + fn to_bytes<'a>(&self, buf: (&'a mut [u8], usize)) -> Result<(&'a mut [u8], usize), GenError> { + match *self { + DhtRequestPayload::NatPingRequest(ref p) => p.to_bytes(buf), + DhtRequestPayload::NatPingResponse(ref p) => p.to_bytes(buf), + } + } +} + +impl FromBytes for DhtPacketPayload { + named!(from_bytes, alt!( + map!(PingRequest::from_bytes, DhtPacketPayload::PingRequest) | + map!(PingResponse::from_bytes, DhtPacketPayload::PingResponse) | + map!(GetNodes::from_bytes, DhtPacketPayload::GetNodes) | + map!(SendNodes::from_bytes, DhtPacketPayload::SendNodes) + )); +} + +impl ToBytes for DhtRequest { + fn to_bytes<'a>(&self, buf: (&'a mut [u8], usize)) -> Result<(&'a mut [u8], usize), GenError> { + do_gen!(buf, + gen_be_u8!(0x20) >> + gen_slice!(self.rpk.as_ref()) >> + gen_slice!(self.spk.as_ref()) >> + gen_slice!(self.nonce.as_ref()) >> + gen_slice!(self.payload.as_slice()) + ) + } +} + +impl FromBytes for DhtRequest { + named!(from_bytes, do_parse!( + packet_type: verify!(call!(PacketKind::from_bytes), |packet_type| match packet_type { + PacketKind::DhtRequest => true, + _ => false + }) >> + rpk: call!(PublicKey::from_bytes) >> + spk: call!(PublicKey::from_bytes) >> + nonce: call!(Nonce::from_bytes) >> + payload: map!(rest, |bytes| bytes.to_vec() ) >> + (DhtRequest { + rpk: rpk, + spk: spk, + nonce: nonce, + payload: payload + }) )); } @@ -340,18 +488,6 @@ impl FromBytes for SendNodes { )); } -/** DHT Request packet. - -https://zetok.github.io/tox-spec/#dht-request-packets -*/ -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum DhtRequest { - /// [`NatPingRequest`](./struct.NatPingRequest.html) structure. - NatPingRequest(NatPingRequest), - /// [`NatPingResponse`](./struct.NatPingResponse.html) structure. - NatPingResponse(NatPingResponse), -} - /** NatPing request of DHT Request packet. */ #[derive(Copy, Clone, Debug, Eq, PartialEq)] @@ -368,22 +504,6 @@ pub struct NatPingResponse { pub id: u64, } -impl ToBytes for DhtRequest { - fn to_bytes<'a>(&self, buf: (&'a mut [u8], usize)) -> Result<(&'a mut [u8], usize), GenError> { - match *self { - DhtRequest::NatPingRequest(ref p) => p.to_bytes(buf), - DhtRequest::NatPingResponse(ref p) => p.to_bytes(buf), - } - } -} - -impl FromBytes for DhtRequest { - named!(from_bytes, alt!( - map!(NatPingRequest::from_bytes, DhtRequest::NatPingRequest) | - map!(NatPingResponse::from_bytes, DhtRequest::NatPingResponse) - )); -} - impl FromBytes for NatPingRequest { named!(from_bytes, do_parse!( tag!(&[0xfe][..]) >> @@ -425,18 +545,59 @@ impl ToBytes for NatPingResponse { #[cfg(test)] mod test { use super::*; - use std::fmt::Debug; use byteorder::{ByteOrder, BigEndian, WriteBytesExt}; use quickcheck::{Arbitrary, Gen, quickcheck}; - #[derive(Clone, Copy, Debug, Eq, PartialEq)] - pub enum PacketKind { - PingRequest = 0, - PingResponse = 1, + const NAT_PING_REQUEST: PacketKind = PacketKind::PingRequest; + const NAT_PING_RESPONSE: PacketKind = PacketKind::PingResponse; + + impl Arbitrary for DhtBase { + fn arbitrary(g: &mut G) -> Self { + let choice = g.gen_range(0, 2); + if choice == 0 { + DhtBase::DhtPacket(DhtPacket::arbitrary(g)) + } else { + DhtBase::DhtRequest(DhtRequest::arbitrary(g)) + } + } + } + + impl Arbitrary for DhtPacket { + fn arbitrary(g: &mut G) -> Self { + let (pk, sk) = gen_keypair(); // "sender" keypair + let (r_pk, _) = gen_keypair(); // receiver PK + let precomputed = encrypt_precompute(&r_pk, &sk); + + let choice = g.gen_range(0, 4); + match choice { + 0 => + DhtPacket::new(&precomputed, &pk, DhtPacketPayload::PingRequest(PingRequest::arbitrary(g))), + 1 => + DhtPacket::new(&precomputed, &pk, DhtPacketPayload::PingResponse(PingResponse::arbitrary(g))), + 2 => + DhtPacket::new(&precomputed, &pk, DhtPacketPayload::GetNodes(GetNodes::arbitrary(g))), + 3 => + DhtPacket::new(&precomputed, &pk, DhtPacketPayload::SendNodes(SendNodes::arbitrary(g))), + _ => unreachable!("Arbitrary for DhtPacket - should not have happened!") + } + } + } + + impl Arbitrary for DhtRequest { + fn arbitrary(g: &mut G) -> Self { + let (pk, sk) = gen_keypair(); // "sender" keypair + let (r_pk, _) = gen_keypair(); // receiver PK + let precomputed = encrypt_precompute(&r_pk, &sk); + + let choice = g.gen_range(0, 2); + if choice == 0 { + DhtRequest::new(&precomputed, &r_pk, &pk,DhtRequestPayload::NatPingRequest(NatPingRequest::arbitrary(g))) + } else { + DhtRequest::new(&precomputed, &r_pk, &pk, DhtRequestPayload::NatPingResponse(NatPingResponse::arbitrary(g))) + } + } } - const NatPingRequest: PacketKind = PacketKind::PingRequest; - const NatPingResponse: PacketKind = PacketKind::PingResponse; // PingRequest:: impl Arbitrary for PingRequest { @@ -472,12 +633,6 @@ mod test { } } - impl From for PingResponse { - fn from(p: PingRequest) -> Self { - PingResponse { id: p.id } - } - } - // PingRequest:: impl Arbitrary for NatPingRequest { fn arbitrary(_g: &mut G) -> Self { @@ -518,16 +673,6 @@ mod test { } } - /// Trait for types of DHT packets that can be put in [`DhtPacket`] - /// (./struct.DhtPacket.html). - pub trait DhtPacketT: ToBytes + FromBytes + Eq + PartialEq + Debug { - /// Provide packet type number. - /// - /// To use for serialization: `.kind() as u8`. - fn kind(&self) -> PacketKind; - - } - macro_rules! tests_for_pings { ($($p:ident $b_t:ident $f_t:ident)+) => ($( @@ -582,11 +727,11 @@ mod test { } } - impl Arbitrary for DhtPacket { + impl Arbitrary for DhtPacketPayload { fn arbitrary(g: &mut G) -> Self { let mut a: [u8; PUBLICKEYBYTES] = [0; PUBLICKEYBYTES]; g.fill_bytes(&mut a); - DhtPacket::GetNodes(GetNodes { pk: PublicKey(a), id: g.gen() }) + DhtPacketPayload::GetNodes(GetNodes { pk: PublicKey(a), id: g.gen() }) } } @@ -624,30 +769,30 @@ mod test { quickcheck(with_bytes as fn(Vec)); } - // DhtPacket::GetNodes::to_bytes() + // DhtPacketPayload::GetNodes::to_bytes() #[test] fn dht_packet_get_nodes_to_bytes_test() { - fn with_gn(gn: DhtPacket) { + fn with_gn(gn: DhtPacketPayload) { let mut _buf = [0;1024]; let g_bytes = gn.to_bytes((&mut _buf, 0)).ok().unwrap().0; - if let DhtPacket::GetNodes(gp) = gn { + if let DhtPacketPayload::GetNodes(gp) = gn { let PublicKey(pk_bytes) = gp.pk; assert_eq!(&pk_bytes, &g_bytes[..PUBLICKEYBYTES]); assert_eq!(gp.id, BigEndian::read_u64(&g_bytes[PUBLICKEYBYTES..])); } } - quickcheck(with_gn as fn(DhtPacket)); + quickcheck(with_gn as fn(DhtPacketPayload)); } - // DhtPacket::GetNodes::from_bytes() + // DhtPacketPayload::GetNodes::from_bytes() #[test] fn dht_packet_get_nodes_from_bytes_test() { fn with_bytes(bytes: Vec) { if bytes.len() < GET_NODES_SIZE { assert!(!GetNodes::from_bytes(&bytes).is_done()); } else { - let gn = DhtPacket::from_bytes(&bytes).unwrap().1; - if let DhtPacket::GetNodes(gp) = gn { + let gn = DhtPacketPayload::from_bytes(&bytes).unwrap().1; + if let DhtPacketPayload::GetNodes(gp) = gn { // ping_id as bytes should match "original" bytes assert_eq!(BigEndian::read_u64(&bytes[PUBLICKEYBYTES..GET_NODES_SIZE]), gp.id); @@ -668,27 +813,6 @@ mod test { } } - impl SendNodes { - /** - Create new `SendNodes`. Returns `None` if 0 or more than 4 nodes are - supplied. - - Created as a response to `GetNodes` request. - */ - pub fn with_nodes(request: &GetNodes, nodes: Vec) -> Option { - debug!(target: "SendNodes", "Creating SendNodes from GetNodes."); - trace!(target: "SendNodes", "With GetNodes: {:?}", request); - trace!("With nodes: {:?}", &nodes); - - if nodes.is_empty() || nodes.len() > 4 { - warn!(target: "SendNodes", "Wrong number of nodes supplied!"); - return None - } - - Some(SendNodes { nodes: nodes, id: request.id }) - } - } - // SendNodes::to_bytes() #[test] fn packet_send_nodes_to_bytes_test() { @@ -773,7 +897,11 @@ mod test { let pb = p.to_bytes((&mut _buf, 0)).ok().unwrap(); assert_eq!(NAT_PING_SIZE, pb.1); assert_eq!(NAT_PING_TYPE as u8, pb.0[0]); - assert_eq!($np as u8, pb.0[1]); + if stringify!($np) == "NatPingRequest" { + assert_eq!(NAT_PING_REQUEST as u8, pb.0[1]); + } else { + assert_eq!(NAT_PING_RESPONSE as u8, pb.0[1]); + } } quickcheck(with_np as fn($np)); } @@ -794,7 +922,12 @@ mod test { quickcheck(with_bytes as fn(Vec)); // just in case - let mut ping = vec![NAT_PING_TYPE, $np as u8]; + let ping_kind = match stringify!($np) { + "NatPingRequest" => NAT_PING_REQUEST as u8, + "NatPingResponse" => NAT_PING_RESPONSE as u8, + e => panic!("can not occur {:?}", e) + }; + let mut ping = vec![NAT_PING_TYPE, ping_kind]; ping.write_u64::(random_u64()) .unwrap(); with_bytes(ping); diff --git a/src/toxcore/dht_new/packet_kind.rs b/src/toxcore/dht_new/packet_kind.rs new file mode 100644 index 000000000..979265780 --- /dev/null +++ b/src/toxcore/dht_new/packet_kind.rs @@ -0,0 +1,64 @@ +/* + Copyright © 2016 Zetok Zalbavar + Copyright © 2018 Namsoo CHO + + This file is part of Tox. + + Tox is libre software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + Tox is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with Tox. If not, see . +*/ + +/*! Data associated with the `PacketKind`. Used by most of other `dht` + modules. + + Used by: + + * [`dht`](../dht/index.html) +*/ + +use nom::le_u8; + +use toxcore::dht_new::binary_io::*; + + +/** Top-level packet kind names and their associated numbers. + + According to https://zetok.github.io/tox-spec.html#packet-kind. +*/ +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum PacketKind { + /// [`Ping`](./struct.Ping.html) request number. + PingRequest = 0, + /// [`Ping`](./struct.Ping.html) response number. + PingResponse = 1, + /// [`GetNodes`](./struct.GetNodes.html) packet number. + GetNodes = 2, + /// [`SendNodes`](./struct.SendNodes.html) packet number. + SendNodes = 4, + /// DHT Request. + DhtRequest = 32, +} + +/** Parse first byte from provided `bytes` as `PacketKind`. + + Returns `None` if no bytes provided, or first byte doesn't match. +*/ +impl FromBytes for PacketKind { + named!(from_bytes, switch!(le_u8, + 0 => value!(PacketKind::PingRequest) | + 1 => value!(PacketKind::PingResponse) | + 2 => value!(PacketKind::GetNodes) | + 4 => value!(PacketKind::SendNodes) | + 32 => value!(PacketKind::DhtRequest) + )); +}