Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions examples/dht_node_new.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
Copyright (C) 2013 Tox project All Rights Reserved.
Copyright © 2017 Zetok Zalbavar <zexavexxe@gmail.com>
Copyright © 2018 Namsoo CHO <nscho66@gmail.com>

This file is part of Tox.

Tox is libre software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

Tox is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with Tox. If not, see <http://www.gnu.org/licenses/>.
*/

// an example of DHT node with current code
//
extern crate tox;
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_timer;

#[macro_use]
extern crate log;
extern crate env_logger;

use futures::*;
use futures::sync::mpsc;
use tokio_core::reactor::Core;
use tokio_core::net::UdpSocket;
//use tokio_timer;

use std::cell::RefCell;
use std::net::SocketAddr;
use std::rc::Rc;
use std::io::{ErrorKind, Error};
use std::time;

use tox::toxcore::dht_new::codec::*;
use tox::toxcore::dht_new::server::*;
use tox::toxcore::crypto_core::*;

fn main() {
env_logger::init();

if !crypto_init() {
error!("Crypto initialization failed.");
return;
}

let (pk, sk) = gen_keypair();

let local: SocketAddr = "127.0.0.1:12345".parse().unwrap();

let mut core = Core::new().unwrap();
let handle = core.handle();

// Bind a UDP listener to the socket address.
let listener = UdpSocket::bind(&local, &handle).unwrap();

// Create a channel for this socket
let (tx, rx) = mpsc::unbounded::<DhtUdpPacket>();
let server = Rc::new(RefCell::new(Server::new(tx, pk, sk)));

let (sink, stream) = listener.framed(DhtCodec).split();
// The server task asynchronously iterates over and processes each
// incoming packet.
let handler = stream.for_each(move |(addr, packet)| {
let _ = server.borrow_mut().handle_packet((addr, packet.unwrap()));
Ok(())
})
.map_err(|err| {
// All tasks must have an `Error` type of `()`. This forces error
// handling and helps avoid silencing failures.
//
error!("packet receive error = {:?}", err);
Error::new(ErrorKind::Other, "udp receive error")
});

let writer_timer = tokio_timer::wheel()
.tick_duration(time::Duration::from_secs(1))
.build()
;

let writer = rx
.map_err(|_| Error::new(ErrorKind::Other, "rx error"))
.fold(sink, move |sink, packet| {
debug!("Send {:?} => {:?}", packet.0, packet.1);
let sending_future = sink.send(packet);
let duration = time::Duration::from_secs(30);
let timeout = writer_timer.timeout(sending_future, duration);
timeout
})
// drop sink when rx stream is exhausted
.map(|_sink| ())
;

let server = writer.select(handler)
.map(|_| ())
.map_err(move |(err, _select_next)| {
error!("Processing ended with error: {:?}", err);
err
});

info!("server running on localhost:12345");
core.run(server).unwrap();
}
257 changes: 257 additions & 0 deletions src/toxcore/dht_new/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
/*
Copyright © 2017 Zetok Zalbavar <zexavexxe@gmail.com>
Copyright © 2018 Namsoo CHO <nscho66@gmail.com>

This file is part of Tox.

Tox is libre software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

Tox is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with Tox. If not, see <http://www.gnu.org/licenses/>.
*/


/*!
The object of this struct is an object per a peer.
*/

use futures::{Sink, Future};
use futures::sync::mpsc;
use tokio_io::IoFuture;

//use std::collections::VecDeque;
use std::io::{ErrorKind, Error};
use std::net::SocketAddr;

use toxcore::crypto_core::*;
use toxcore::dht_new::packet::*;
use toxcore::dht_new::codec::*;

/// Shorthand for the transmit half of the message channel.
type Tx = mpsc::UnboundedSender<DhtUdpPacket>;

/// peer info.
#[derive(Clone, Debug)]
pub struct Client {
/// Public key of peer
pub pk: PublicKey,
/// socket address of peer
pub addr: SocketAddr,
/// last sent ping_id to check PingResponse is correct
pub ping_id: u64,
/// precomputed key for this peer
pub precomputed_key: PrecomputedKey,
/// shaed mpsc tx part
pub tx: Tx,
}

impl Client {
/// create Client object
pub fn new(precomputed_key: PrecomputedKey, pk: PublicKey, addr: SocketAddr, tx: Tx) -> Client {
Client {
pk: pk,
addr: addr,
precomputed_key: precomputed_key,
ping_id: 0,
tx: tx,
}
}
/// actual send method
pub fn send_to(&self, addr: SocketAddr, packet: DhtPacket) -> IoFuture<()> {
Box::new(self.tx.clone() // clone tx sender for 1 send only
.send((addr, packet))
.map(|_tx| ()) // ignore tx because it was cloned
.map_err(|e| {
// This may only happen if rx is gone
// So cast SendError<T> to a corresponding std::io::Error
error!("send to peer error {:?}", e);
Error::from(ErrorKind::UnexpectedEof)
})
)
}
fn send(&self, packet: DhtPacket) -> IoFuture<()> {
self.send_to(self.addr.clone(), packet)
}
/// respond with PingResponse to peer
pub fn send_ping_response(&self, resp_payload: PingResponsePayload) -> IoFuture<()> {
let ping_resp = DhtPacket::PingResponse(PingResponse::new(&self.precomputed_key.clone(), &self.pk, resp_payload));
debug!("PingResp made {:?}", ping_resp);
self.send(ping_resp)
}
/// respond with NodesResponse to peer
pub fn send_nodes_response(&self, resp_payload: NodesResponsePayload) -> IoFuture<()> {
let nodes_resp = DhtPacket::NodesResponse(NodesResponse::new(&self.precomputed_key.clone(), &self.pk, resp_payload));
debug!("NodesResp made {:?}", nodes_resp);
self.send(nodes_resp)
}
/// respond with NatPingResponse to peer
pub fn send_nat_ping_response(&self, rpk: &PublicKey, resp_payload: NatPingResponse) -> IoFuture<()> {
let payload = DhtRequestPayload::NatPingResponse(resp_payload);
let nat_ping_resp = DhtPacket::DhtRequest(DhtRequest::new(&self.precomputed_key.clone(), rpk, &self.pk, payload));
self.send(nat_ping_resp)
}
/// send NatPingRequest/NatPingResponse to target peer
pub fn send_nat_ping_packet(&self, addr: &SocketAddr, request: DhtRequest) -> IoFuture<()> {
let packet = DhtPacket::DhtRequest(request);
self.send_to(*addr, packet)
}
/// create and send PingRequest packet
pub fn send_ping_request(&mut self) -> IoFuture<()> {
let payload = PingRequestPayload{ id: random_u64(), };
self.ping_id = payload.id;
let ping_req = DhtPacket::PingRequest(PingRequest::new(&self.precomputed_key.clone(), &self.pk, payload));
self.send(ping_req)
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures::*;

use std::net::SocketAddr;
use toxcore::dht_new::packed_node::*;

fn create_client() -> (Client, SecretKey, mpsc::UnboundedReceiver<DhtUdpPacket>) {
let addr: SocketAddr = "127.0.0.1:12345".parse().unwrap();
let (tx, rx) = mpsc::unbounded();
let (alice_pk, alice_sk) = gen_keypair();
let (bob_pk, bob_sk) = gen_keypair();
let precomp = precompute(&alice_pk, &bob_sk);
let client = Client::new(precomp, bob_pk, addr, tx);
(client, alice_sk, rx)
}
// send()
#[test]
fn client_send_test() {
let (client, _sk, rx) = create_client();
let payload = PingRequestPayload { id: random_u64() };
let packet = DhtPacket::PingRequest(PingRequest::new(&client.precomputed_key.clone(), &client.pk, payload));
client.send(packet.clone()).wait().unwrap();
let rx =
if let (Some((_addr, received_packet)), rx1) = rx.into_future().wait().unwrap() {
assert_eq!(packet, received_packet);
rx1
} else {
unreachable!("can not occur");
};
drop(rx);
assert!(!client.send(packet).wait().is_ok());
}
// send_to()
#[test]
fn client_send_to_test() {
let (client, _sk, rx) = create_client();
let payload = PingRequestPayload { id: random_u64() };
let packet = DhtPacket::PingRequest(PingRequest::new(&client.precomputed_key.clone(), &client.pk, payload));
client.send_to(client.addr, packet.clone()).wait().unwrap();
if let (Some((_addr, received_packet)), _rx) = rx.into_future().wait().unwrap() {
assert_eq!(packet, received_packet);
} else {
unreachable!("can not occur");
}
}
// send_ping_response()
#[test]
fn client_send_ping_response_test() {
let (client, sk, rx) = create_client();
let payload = PingResponsePayload { id: random_u64() };
client.send_ping_response(payload).wait().unwrap();
if let (Some((_addr, received_packet)), _rx) = rx.into_future().wait().unwrap() {
if let DhtPacket::PingResponse(packet) = received_packet {
let ping_resp_payload = packet.get_payload(&sk).unwrap();
assert_eq!(ping_resp_payload.id, payload.id);
} else {
unreachable!("can not occur");
}
} else {
unreachable!("can not occur");
}
}
// send_nodes_response()
#[test]
fn client_send_nodes_response_test() {
let (client, sk, rx) = create_client();
let payload = NodesResponsePayload { nodes: vec![
PackedNode::new(false, SocketAddr::V4("127.0.0.1:12345".parse().unwrap()), &gen_keypair().0)
], id: 38 };
client.send_nodes_response(payload.clone()).wait().unwrap();
if let (Some((_addr, received_packet)), _rx) = rx.into_future().wait().unwrap() {
if let DhtPacket::NodesResponse(packet) = received_packet {
let nodes_resp_payload = packet.get_payload(&sk).unwrap();
assert_eq!(nodes_resp_payload.id, payload.id);
assert_eq!(nodes_resp_payload.nodes, payload.nodes);
} else {
unreachable!("can not occur");
}
} else {
unreachable!("can not occur");
}
}
// send_nat_ping_response()
#[test]
fn client_send_nat_ping_response_test() {
let (client, sk, rx) = create_client();
let payload = NatPingResponse { id: random_u64() };
client.send_nat_ping_response(&client.pk, payload).wait().unwrap();
if let (Some((_addr, received_packet)), _rx) = rx.into_future().wait().unwrap() {
if let DhtPacket::DhtRequest(packet) = received_packet {
if let DhtRequestPayload::NatPingResponse(nat_ping_resp_payload) = packet.get_payload(&sk).unwrap() {
assert_eq!(nat_ping_resp_payload.id, payload.id);
} else {
unreachable!("can not occur");
}
} else {
unreachable!("can not occur");
}
} else {
unreachable!("can not occur");
}
}
// send_nat_ping_packet()
#[test]
fn client_send_nat_ping_packet_test() {
let (client, sk, rx) = create_client();
let nat_res = NatPingResponse { id: random_u64() };
let nat_payload = DhtRequestPayload::NatPingResponse(nat_res);
let dht_req = DhtRequest::new(&client.precomputed_key, &client.pk, &client.pk, nat_payload.clone());
client.send_nat_ping_packet(&client.addr, dht_req).wait().unwrap();
if let (Some((_addr, received_packet)), _rx) = rx.into_future().wait().unwrap() {
if let DhtPacket::DhtRequest(packet) = received_packet {
if let DhtRequestPayload::NatPingResponse(nat_ping_resp_payload) = packet.get_payload(&sk).unwrap() {
assert_eq!(nat_ping_resp_payload.id, nat_res.id);
} else {
unreachable!("can not occur");
}
} else {
unreachable!("can not occur");
}
} else {
unreachable!("can not occur");
}
}
// send_ping_request()
#[test]
fn client_send_ping_request_test() {
let (mut client, sk, rx) = create_client();
client.send_ping_request().wait().unwrap();
if let (Some((_addr, received_packet)), _rx) = rx.into_future().wait().unwrap() {
if let DhtPacket::PingRequest(packet) = received_packet {
let ping_req_payload = packet.get_payload(&sk).unwrap();
assert_eq!(ping_req_payload.id, client.ping_id);
} else {
unreachable!("can not occur");
}
} else {
unreachable!("can not occur");
}
}
}
1 change: 1 addition & 0 deletions src/toxcore/dht_new/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Because size of SendNodes is largest in DHT related packets
pub const MAX_DHT_PACKET_SIZE: usize = 512;

/// Struct to use for {de-,}serializing DHT UDP packets.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct DhtCodec;

impl UdpCodec for DhtCodec {
Expand Down
Loading