Skip to content
This repository has been archived by the owner on Jan 6, 2020. It is now read-only.

Commit

Permalink
Merge pull request #67 from canndrew/deadline
Browse files Browse the repository at this point in the history
Add deadlines to several methods
  • Loading branch information
vinipsmaker committed Apr 15, 2016
2 parents 47033af + 2b282a5 commit 49e6a63
Show file tree
Hide file tree
Showing 13 changed files with 116 additions and 151 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ rand = "~0.3.14"
rustc-serialize = "~0.3.18"
socket_addr = "~0.1.0"
sodiumoxide = "~0.0.9"
time = "~0.1.34"
void = "1.0.1"
w_result = "~0.1.1"
byteorder = "~0.5.0"
Expand Down
5 changes: 4 additions & 1 deletion examples/simple-tcp-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
extern crate nat_traversal;
extern crate w_result;

use std::time::{Instant, Duration};

use nat_traversal::{MappingContext, SimpleTcpHolePunchServer};
use w_result::{WOk, WErr};

Expand All @@ -60,7 +62,8 @@ fn main() {
};

// Now we create the server.
let simple_server = match SimpleTcpHolePunchServer::new(Box::new(mapping_context)) {
let deadline = Instant::now() + Duration::from_secs(3);
let simple_server = match SimpleTcpHolePunchServer::new(Box::new(mapping_context), deadline) {
WOk(simple_server, warnings) => {
for warning in warnings {
println!("Warning when creating simple server: {}", warning);
Expand Down
5 changes: 4 additions & 1 deletion examples/simple-udp-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
extern crate nat_traversal;
extern crate w_result;

use std::time::{Instant, Duration};

use nat_traversal::{MappingContext, SimpleUdpHolePunchServer};
use w_result::{WOk, WErr};

Expand All @@ -60,7 +62,8 @@ fn main() {
};

// Now we create the server.
let simple_server = match SimpleUdpHolePunchServer::new(Box::new(mapping_context)) {
let deadline = Instant::now() + Duration::from_secs(3);
let simple_server = match SimpleUdpHolePunchServer::new(Box::new(mapping_context), deadline) {
WOk(simple_server, warnings) => {
for warning in warnings {
println!("Warning when creating simple server: {}", warning);
Expand Down
9 changes: 7 additions & 2 deletions examples/tcp-hole-punch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ extern crate socket_addr;

use std::net::ToSocketAddrs;
use std::io::{Read, Write};
use std::time::{Instant, Duration};

use socket_addr::SocketAddr;
use nat_traversal::{MappingContext, gen_rendezvous_info, MappedTcpSocket, tcp_punch_hole};
Expand Down Expand Up @@ -109,7 +110,8 @@ fn main() {
}

// Now we use our context to create a mapped tcp socket.
let mapped_socket = match MappedTcpSocket::new(&mapping_context) {
let deadline = Instant::now() + Duration::from_secs(5);
let mapped_socket = match MappedTcpSocket::new(&mapping_context, deadline) {
WOk(mapped_socket, warnings) => {
for warning in warnings {
println!("Warning when mapping socket: {}", warning);
Expand Down Expand Up @@ -171,7 +173,10 @@ fn main() {

// Now we use the socket, our private rendezvous info and their public rendezvous info to
// complete the connection.
let mut stream = match tcp_punch_hole(socket, our_priv_info, their_pub_info) {
let mut stream = match tcp_punch_hole(
socket, our_priv_info, their_pub_info,
Instant::now() + Duration::from_secs(5)
) {
WOk(punched_socket, warnings) => {
for warning in warnings {
println!("Warning when punching hole: {}", warning);
Expand Down
11 changes: 9 additions & 2 deletions examples/udp-hole-punch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ extern crate rustc_serialize;
extern crate socket_addr;

use std::net::ToSocketAddrs;
use std::time::{Instant, Duration};

use socket_addr::SocketAddr;
use nat_traversal::{MappingContext, gen_rendezvous_info, MappedUdpSocket, PunchedUdpSocket};
Expand Down Expand Up @@ -108,7 +109,10 @@ fn main() {
}

// Now we use our context to create a mapped udp socket.
let mapped_socket = match MappedUdpSocket::new(&mapping_context) {
let mapped_socket = match MappedUdpSocket::new(
&mapping_context,
Instant::now() + Duration::from_secs(5)
) {
WOk(mapped_socket, warnings) => {
for warning in warnings {
println!("Warning when mapping socket: {}", warning);
Expand Down Expand Up @@ -170,7 +174,10 @@ fn main() {

// Now we use the socket, our private rendezvous info and their public rendezvous info to
// complete the connection.
let punched_socket = match PunchedUdpSocket::punch_hole(socket, our_priv_info, their_pub_info) {
let deadline = Instant::now() + Duration::from_secs(5);
let punched_socket = match PunchedUdpSocket::punch_hole(socket, our_priv_info,
their_pub_info, deadline)
{
WOk(punched_socket, warnings) => {
for warning in warnings {
println!("Warning when punching hole: {}", warning);
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ extern crate byteorder;
extern crate net2;
extern crate rand;
extern crate rustc_serialize;
extern crate time;
extern crate void;
#[macro_use]
extern crate maidsafe_utilities;
Expand Down
125 changes: 49 additions & 76 deletions src/mapped_tcp_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
//! NAT traversal utilities.

use std::net;
use std::net::{IpAddr, TcpStream};
use std::net::{IpAddr, Ipv4Addr, TcpStream};
use std::io;
use std::io::{Read, Write};
use std::time::Duration;
use std::time::{Instant, Duration};
use std::thread;
use std::str;
use std::sync::mpsc;
Expand All @@ -35,8 +35,6 @@ use net2;
use socket_addr::SocketAddr;
use w_result::{WResult, WErr, WOk};
use maidsafe_utilities::serialisation::{deserialise, SerialisationError};
use time::SteadyTime;
use time;
use rand::random;
use byteorder::{ReadBytesExt, WriteBytesExt, BigEndian};

Expand All @@ -47,7 +45,6 @@ use rendezvous_info;
use socket_utils;
use mapping_context;
use listener_message;
use utils;
use utils::DisplaySlice;

/// A tcp socket for which we know our external endpoints.
Expand Down Expand Up @@ -163,28 +160,10 @@ quick_error! {
/// Errors returned by MappedTcpSocket::new
#[derive(Debug)]
pub enum MappedTcpSocketNewError {
/// Error creating TCP socket.
CreateSocket { err: io::Error } {
description("Error creating TCP socket")
display("Error creating TCP socket: {}", err)
cause(err)
}
/// Error enabling SO_REUSEADDR on new socket.
EnableReuseAddr { err: io::Error } {
description("Error enabling SO_REUSEADDR on new socket")
display("Error enabling SO_REUSEADDR on new socket: {}", err)
cause(err)
}
/// Error enabling SO_REUSEPORT (or equivalent) on new socket.
EnableReusePort { err: io::Error } {
description("Error enabling SO_REUSEPORT (or equivalent) on new socket")
display("Error enabling SO_REUSEPORT (or equivalent) on new socket: {}", err)
cause(err)
}
/// Error binding new socket.
Bind { err: io::Error } {
description("Error binding new socket")
display("Error binding new socket: {}", err)
/// Error creating a reusably bound tcp socket.
NewReusablyBoundTcpSocket { err: NewReusablyBoundTcpSocketError } {
description("Error creating a reusably bound tcp socket.")
display("Error creating a reusably bound tcp socket: {}", err)
cause(err)
}
/// Error mapping new socket.
Expand All @@ -200,10 +179,10 @@ impl From<MappedTcpSocketNewError> for io::Error {
fn from(e: MappedTcpSocketNewError) -> io::Error {
let err_str = format!("{}", e);
let kind = match e {
MappedTcpSocketNewError::CreateSocket { err } => err.kind(),
MappedTcpSocketNewError::EnableReuseAddr { err } => err.kind(),
MappedTcpSocketNewError::EnableReusePort { err } => err.kind(),
MappedTcpSocketNewError::Bind { err } => err.kind(),
MappedTcpSocketNewError::NewReusablyBoundTcpSocket { err } => {
let err: io::Error = From::from(err);
err.kind()
},
MappedTcpSocketNewError::Map { err } => {
let err: io::Error = From::from(err);
err.kind()
Expand Down Expand Up @@ -291,7 +270,7 @@ pub fn new_reusably_bound_tcp_socket(local_addr: &net::SocketAddr) -> Result<net
impl MappedTcpSocket {
/// Map an existing tcp socket. The socket must bound but not connected. It must have been
/// bound with SO_REUSEADDR and SO_REUSEPORT options (or equivalent) set.
pub fn map(socket: net2::TcpBuilder, mc: &MappingContext)
pub fn map(socket: net2::TcpBuilder, mc: &MappingContext, deadline: Instant)
-> WResult<MappedTcpSocket, MappedTcpSocketMapWarning, MappedTcpSocketMapError>
{
let mut endpoints = Vec::new();
Expand Down Expand Up @@ -465,58 +444,53 @@ impl MappedTcpSocket {
};
Ok(external_addr)
};
let _ = results_tx.send(map());
let _ = results_tx.send(Some(map()));
}));
}
drop(results_tx);
let timeout_thread = thread!("MappedTcpSocket::map timeout", move || {
let now = Instant::now();
if deadline > now {
let timeout = deadline - now;
thread::park_timeout(timeout);
}
let _ = results_tx.send(None);
});

let mut num_results = 0;
for result in results_rx {
match result {
Ok(external_addr) => {
Some(Ok(external_addr)) => {
endpoints.push(MappedSocketAddr {
addr: external_addr,
nat_restricted: true,
});
num_results += 1;
if num_results >= 2 {
break;
}
},
Err(e) => {
Some(Err(e)) => {
warnings.push(e);
},
None => {
break;
},
}
}

timeout_thread.thread().unpark();
WOk(MappedTcpSocket {
socket: socket,
endpoints: endpoints,
}, warnings)
}

/// Create a new `MappedTcpSocket`
pub fn new(mc: &MappingContext) -> WResult<MappedTcpSocket, MappedTcpSocketMapWarning, MappedTcpSocketNewError> {
let socket = match net2::TcpBuilder::new_v4() {
pub fn new(mc: &MappingContext, deadline: Instant)
-> WResult<MappedTcpSocket, MappedTcpSocketMapWarning, MappedTcpSocketNewError>
{
let unspec_addr = net::SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let socket = match new_reusably_bound_tcp_socket(&unspec_addr) {
Ok(socket) => socket,
Err(e) => return WErr(MappedTcpSocketNewError::CreateSocket { err: e }),
};
match socket.reuse_address(true) {
Ok(_) => (),
Err(e) => return WErr(MappedTcpSocketNewError::EnableReuseAddr { err: e }),
};
match socket_utils::enable_so_reuseport(&socket) {
Ok(()) => (),
Err(e) => return WErr(MappedTcpSocketNewError::EnableReusePort { err: e }),
};
// need to connect to a bunch of guys in parallel and get our addresses.
// need a bunch of sockets that are bound to the same local port.
match socket.bind("0.0.0.0:0") {
Ok(_) => (),
Err(e) => return WErr(MappedTcpSocketNewError::Bind { err: e }),
Err(e) => return WErr(MappedTcpSocketNewError::NewReusablyBoundTcpSocket { err: e }),
};

MappedTcpSocket::map(socket, mc).map_err(|e| MappedTcpSocketNewError::Map { err: e })
MappedTcpSocket::map(socket, mc, deadline).map_err(|e| MappedTcpSocketNewError::Map { err: e })
}
}

Expand Down Expand Up @@ -624,7 +598,8 @@ impl From<TcpPunchHoleError> for io::Error {
/// `MappedTcpSocket`.
pub fn tcp_punch_hole(socket: net2::TcpBuilder,
our_priv_rendezvous_info: PrivRendezvousInfo,
their_pub_rendezvous_info: PubRendezvousInfo)
their_pub_rendezvous_info: PubRendezvousInfo,
deadline: Instant)
-> WResult<TcpStream, TcpPunchHoleWarning, TcpPunchHoleError> {
// In order to do tcp hole punching we connect to all of their endpoints in parallel while
// simultaneously listening. All the sockets we use must be bound to the same local address. As
Expand All @@ -636,10 +611,6 @@ pub fn tcp_punch_hole(socket: net2::TcpBuilder,
// we're stuck with spawning (and then detaching) loads of threads. Setting the read/write
// timeouts should prevent the detached threads from leaking indefinitely.

// The total timeout for the entire function.
let start_time = SteadyTime::now();
let deadline = start_time + time::Duration::seconds(10);

let mut warnings = Vec::new();

let shutdown = Arc::new(AtomicBool::new(false));
Expand Down Expand Up @@ -715,13 +686,12 @@ pub fn tcp_punch_hole(socket: net2::TcpBuilder,
Ok(stream)
};
loop {
let now = SteadyTime::now();
let now = Instant::now();
if now >= deadline || shutdown_clone.load(Ordering::SeqCst) {
break;
}
else {
let timeout = deadline - now;
let timeout = utils::time_duration_to_std_duration(timeout);
match f(timeout) {
Ok(stream) => {
let _ = results_tx_clone.send(Some(Ok((stream, addr))));
Expand Down Expand Up @@ -769,13 +739,12 @@ pub fn tcp_punch_hole(socket: net2::TcpBuilder,
// Spawn a new thread here to prevent someone from connecting then not sending any data
// and preventing us from accepting any more connections.
let results_tx_clone = results_tx_clone.clone();
let now = SteadyTime::now();
let now = Instant::now();
if now >= deadline {
break;
};
let _ = thread!("tcp_punch_hole listen handshake", move || {
let timeout = deadline - now;
let timeout = utils::time_duration_to_std_duration(timeout);
match stream.set_write_timeout(Some(timeout)) {
Ok(()) => (),
Err(e) => {
Expand Down Expand Up @@ -841,10 +810,11 @@ pub fn tcp_punch_hole(socket: net2::TcpBuilder,
// TODO(canndrew): We won't need to do this one this is fixed: https://github.com/rust-lang/rfcs/issues/962
let results_tx_clone = results_tx.clone();
let timeout_thread = thread!("tcp_punch_hole timeout", move || {
let now = SteadyTime::now();
let timeout = deadline - now;
let timeout = utils::time_duration_to_std_duration(timeout);
thread::park_timeout(timeout);
let now = Instant::now();
if deadline > now {
let timeout = deadline - now;
thread::park_timeout(timeout);
}
let _ = results_tx_clone.send(None);
});
let timeout_thread_handle = timeout_thread.thread();
Expand Down Expand Up @@ -982,26 +952,29 @@ mod test {
use super::*;

use std::io::{Read, Write};
use std::time::{Instant, Duration};

use mapping_context::MappingContext;
use rendezvous_info::gen_rendezvous_info;

#[test]
fn two_peers_tcp_hole_punch_over_loopback() {
let deadline = Instant::now() + Duration::from_secs(5);
let mapping_context = unwrap_result!(MappingContext::new().result_log());

let mapped_socket_0 = unwrap_result!(MappedTcpSocket::new(&mapping_context).result_log());
let mapped_socket_0 = unwrap_result!(MappedTcpSocket::new(&mapping_context, deadline).result_log());
let socket_0 = mapped_socket_0.socket;
let endpoints_0 = mapped_socket_0.endpoints;
let (priv_info_0, pub_info_0) = gen_rendezvous_info(endpoints_0);

let mapped_socket_1 = unwrap_result!(MappedTcpSocket::new(&mapping_context).result_log());
let mapped_socket_1 = unwrap_result!(MappedTcpSocket::new(&mapping_context, deadline).result_log());
let socket_1 = mapped_socket_1.socket;
let endpoints_1 = mapped_socket_1.endpoints;
let (priv_info_1, pub_info_1) = gen_rendezvous_info(endpoints_1);

let deadline = Instant::now() + Duration::from_secs(5);
let thread_0 = thread!("two_peers_tcp_hole_punch_over_loopback_0", move || {
let mut stream = unwrap_result!(tcp_punch_hole(socket_0, priv_info_0, pub_info_1).result_log());
let mut stream = unwrap_result!(tcp_punch_hole(socket_0, priv_info_0, pub_info_1, deadline).result_log());
let mut data = [0u8; 4];
let n = unwrap_result!(stream.write(&data));
assert_eq!(n, 4);
Expand All @@ -1011,7 +984,7 @@ mod test {
});

let thread_1 = thread!("two_peers_tcp_hole_punch_over_loopback_1", move || {
let mut stream = unwrap_result!(tcp_punch_hole(socket_1, priv_info_1, pub_info_0).result_log());
let mut stream = unwrap_result!(tcp_punch_hole(socket_1, priv_info_1, pub_info_0, deadline).result_log());
let mut data = [1u8; 4];
let n = unwrap_result!(stream.write(&data));
assert_eq!(n, 4);
Expand Down

0 comments on commit 49e6a63

Please sign in to comment.