Permalink
Browse files

Hop new ports, but keep the old [for a minute, and up to 10 at a time]

(One is silver and the other gold...)
  • Loading branch information...
1 parent d17fb78 commit c0092a6e7ee481fc897661ff107e3fa940547574 @keithw keithw committed Oct 5, 2012
View
@@ -74,18 +74,22 @@ int main( int argc, char *argv[] )
if ( server ) {
Select &sel = Select::get_instance();
- sel.add_fd( n->fd() );
uint64_t last_num = n->get_remote_state_num();
while ( true ) {
try {
+ sel.clear_fds();
+ std::vector< int > fd_list( n->fds() );
+ assert( fd_list.size() == 1 ); /* servers don't hop */
+ int network_fd = fd_list.back();
+ sel.add_fd( network_fd );
if ( sel.select( n->wait_time() ) < 0 ) {
perror( "select" );
exit( 1 );
}
n->tick();
- if ( sel.read( n->fd() ) ) {
+ if ( sel.read( network_fd ) ) {
n->recv();
if ( n->get_remote_state_num() != last_num ) {
@@ -116,10 +120,18 @@ int main( int argc, char *argv[] )
}
Select &sel = Select::get_instance();
- sel.add_fd( STDIN_FILENO );
- sel.add_fd( n->fd() );
while( true ) {
+ sel.clear_fds();
+ sel.add_fd( STDIN_FILENO );
+
+ std::vector< int > fd_list( n->fds() );
+ for ( std::vector< int >::const_iterator it = fd_list.begin();
+ it != fd_list.end();
+ it++ ) {
+ sel.add_fd( *it );
+ }
+
try {
if ( sel.select( n->wait_time() ) < 0 ) {
perror( "select" );
@@ -133,7 +145,22 @@ int main( int argc, char *argv[] )
n->get_current_state().push_back( Parser::UserByte( x ) );
}
- if ( sel.read( n->fd() ) ) {
+ bool network_ready_to_read = false;
+ for ( std::vector< int >::const_iterator it = fd_list.begin();
+ it != fd_list.end();
+ it++ ) {
+ if ( sel.read( *it ) ) {
+ /* packet received from the network */
+ /* we only read one socket each run */
+ network_ready_to_read = true;
+ }
+
+ if ( sel.error( *it ) ) {
+ break;
+ }
+ }
+
+ if ( network_ready_to_read ) {
n->recv();
}
} catch ( NetworkException e ) {
@@ -534,7 +534,10 @@ void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &network
/* poll for events */
sel.clear_fds();
- sel.add_fd( network.fd() );
+ std::vector< int > fd_list( network.fds() );
+ assert( fd_list.size() == 1 ); /* servers don't hop */
+ int network_fd = fd_list.back();
+ sel.add_fd( network_fd );
sel.add_fd( host_fd );
int active_fds = sel.select( timeout );
@@ -546,7 +549,7 @@ void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &network
now = Network::timestamp();
uint64_t time_since_remote_state = now - network.get_latest_remote_state().timestamp;
- if ( sel.read( network.fd() ) ) {
+ if ( sel.read( network_fd ) ) {
/* packet received from the network */
network.recv();
@@ -652,7 +655,7 @@ void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &network
}
}
- if ( sel.error( network.fd() ) ) {
+ if ( sel.error( network_fd ) ) {
/* network problem */
break;
}
View
@@ -324,7 +324,12 @@ void STMClient::main( void )
/* poll for events */
/* network->fd() can in theory change over time */
sel.clear_fds();
- sel.add_fd( network->fd() );
+ std::vector< int > fd_list( network->fds() );
+ for ( std::vector< int >::const_iterator it = fd_list.begin();
+ it != fd_list.end();
+ it++ ) {
+ sel.add_fd( *it );
+ }
sel.add_fd( STDIN_FILENO );
int active_fds = sel.select( wait_time );
@@ -333,8 +338,24 @@ void STMClient::main( void )
break;
}
- if ( sel.read( network->fd() ) ) {
- /* packet received from the network */
+ bool network_ready_to_read = false;
+
+ for ( std::vector< int >::const_iterator it = fd_list.begin();
+ it != fd_list.end();
+ it++ ) {
+ if ( sel.read( *it ) ) {
+ /* packet received from the network */
+ /* we only read one socket each run */
+ network_ready_to_read = true;
+ }
+
+ if ( sel.error( *it ) ) {
+ /* network problem */
+ break;
+ }
+ }
+
+ if ( network_ready_to_read ) {
if ( !process_network_input() ) { return; }
}
@@ -370,11 +391,6 @@ void STMClient::main( void )
}
}
- if ( sel.error( network->fd() ) ) {
- /* network problem */
- break;
- }
-
if ( sel.error( STDIN_FILENO ) ) {
/* user problem */
if ( !network->has_remote_addr() ) {
View
@@ -111,50 +111,90 @@ void Connection::hop_port( void )
{
assert( !server );
- if ( close( sock ) < 0 ) {
- throw NetworkException( "close", errno );
+ setup();
+
+ prune_sockets();
+}
+
+void Connection::prune_sockets( void )
+{
+ /* don't keep old sockets if the new socket has been working for long enough */
+ if ( socks.size() > 1 ) {
+ if ( timestamp() - last_port_choice > MAX_OLD_SOCKET_AGE ) {
+ int num_to_kill = socks.size() - 1;
+ for ( int i = 0; i < num_to_kill; i++ ) {
+ socks.pop_front();
+ }
+ }
+ } else {
+ return;
}
- setup();
+ /* make sure we don't have too many receive sockets open */
+ if ( socks.size() > MAX_PORTS_OPEN ) {
+ int num_to_kill = socks.size() - MAX_PORTS_OPEN;
+ for ( int i = 0; i < num_to_kill; i++ ) {
+ socks.pop_front();
+ }
+ }
}
-void Connection::setup( void )
+Connection::Socket::Socket()
+ : _fd( socket( AF_INET, SOCK_DGRAM, 0 ) ),
+ _moved( false )
{
- /* create socket */
- sock = socket( AF_INET, SOCK_DGRAM, 0 );
- if ( sock < 0 ) {
+ if ( _fd < 0 ) {
throw NetworkException( "socket", errno );
}
- last_port_choice = timestamp();
-
/* Disable path MTU discovery */
#ifdef HAVE_IP_MTU_DISCOVER
char flag = IP_PMTUDISC_DONT;
socklen_t optlen = sizeof( flag );
- if ( setsockopt( sock, IPPROTO_IP, IP_MTU_DISCOVER, &flag, optlen ) < 0 ) {
+ if ( setsockopt( _fd, IPPROTO_IP, IP_MTU_DISCOVER, &flag, optlen ) < 0 ) {
throw NetworkException( "setsockopt", errno );
}
#endif
/* set diffserv values to AF42 + ECT */
uint8_t dscp = 0x92;
- if ( setsockopt( sock, IPPROTO_IP, IP_TOS, &dscp, 1) < 0 ) {
+ if ( setsockopt( _fd, IPPROTO_IP, IP_TOS, &dscp, 1) < 0 ) {
// perror( "setsockopt( IP_TOS )" );
}
/* request explicit congestion notification on received datagrams */
#ifdef HAVE_IP_RECVTOS
char tosflag = true;
socklen_t tosoptlen = sizeof( tosflag );
- if ( setsockopt( sock, IPPROTO_IP, IP_RECVTOS, &tosflag, tosoptlen ) < 0 ) {
+ if ( setsockopt( _fd, IPPROTO_IP, IP_RECVTOS, &tosflag, tosoptlen ) < 0 ) {
perror( "setsockopt( IP_RECVTOS )" );
}
#endif
}
+void Connection::setup( void )
+{
+ /* create socket */
+ socks.push_back( Socket() );
+
+ last_port_choice = timestamp();
+}
+
+const std::vector< int > Connection::fds( void ) const
+{
+ std::vector< int > ret;
+
+ for ( std::deque< Socket >::const_iterator it = socks.begin();
+ it != socks.end();
+ it++ ) {
+ ret.push_back( it->fd() );
+ }
+
+ return ret;
+}
+
Connection::Connection( const char *desired_ip, const char *desired_port ) /* server */
- : sock( -1 ),
+ : socks(),
has_remote_addr( false ),
remote_addr(),
server( true ),
@@ -213,7 +253,7 @@ Connection::Connection( const char *desired_ip, const char *desired_port ) /* se
/* try to bind to desired IP first */
if ( desired_ip_addr != INADDR_ANY ) {
try {
- if ( try_bind( sock, desired_ip_addr, desired_port_no ) ) { return; }
+ if ( try_bind( sock(), desired_ip_addr, desired_port_no ) ) { return; }
} catch ( const NetworkException& e ) {
struct in_addr sin_addr;
sin_addr.s_addr = desired_ip_addr;
@@ -225,7 +265,7 @@ Connection::Connection( const char *desired_ip, const char *desired_port ) /* se
/* now try any local interface */
try {
- if ( try_bind( sock, INADDR_ANY, desired_port_no ) ) { return; }
+ if ( try_bind( sock(), INADDR_ANY, desired_port_no ) ) { return; }
} catch ( const NetworkException& e ) {
fprintf( stderr, "Error binding to any interface: %s: %s\n",
e.function.c_str(), strerror( e.the_errno ) );
@@ -266,7 +306,7 @@ bool Connection::try_bind( int socket, uint32_t addr, int port )
}
Connection::Connection( const char *key_str, const char *ip, int port ) /* client */
- : sock( -1 ),
+ : socks(),
has_remote_addr( false ),
remote_addr(),
server( false ),
@@ -312,7 +352,7 @@ void Connection::send( string s )
string p = px.tostring( &session );
- ssize_t bytes_sent = sendto( sock, p.data(), p.size(), 0,
+ ssize_t bytes_sent = sendto( sock(), p.data(), p.size(), 0,
(sockaddr *)&remote_addr, sizeof( remote_addr ) );
if ( bytes_sent == static_cast<ssize_t>( p.size() ) ) {
@@ -340,6 +380,34 @@ void Connection::send( string s )
}
string Connection::recv( void )
+{
+ assert( !socks.empty() );
+ for ( std::deque< Socket >::const_iterator it = socks.begin();
+ it != socks.end();
+ it++ ) {
+ bool islast = (it + 1) == socks.end();
+ string payload;
+ try {
+ payload = recv_one( it->fd(), !islast );
+ } catch ( NetworkException & e ) {
+ if ( (e.the_errno == EAGAIN)
+ || (e.the_errno == EWOULDBLOCK) ) {
+ assert( !islast );
+ continue;
+ } else {
+ throw e;
+ }
+ }
+
+ /* succeeded */
+ prune_sockets();
+ return payload;
+ }
+ assert( false );
+ return "";
+}
+
+string Connection::recv_one( int sock_to_recv, bool nonblocking )
{
/* receive source address, ECN, and payload in msghdr structure */
struct sockaddr_in packet_remote_addr;
@@ -366,10 +434,10 @@ string Connection::recv( void )
/* receive flags */
header.msg_flags = 0;
- ssize_t received_len = recvmsg( sock, &header, 0 );
+ ssize_t received_len = recvmsg( sock_to_recv, &header, nonblocking ? MSG_DONTWAIT : 0 );
if ( received_len < 0 ) {
- throw NetworkException( "recvfrom", errno );
+ throw NetworkException( "recvmsg", errno );
}
if ( header.msg_flags & MSG_TRUNC ) {
@@ -456,7 +524,7 @@ int Connection::port( void ) const
struct sockaddr_in local_addr;
socklen_t addrlen = sizeof( local_addr );
- if ( getsockname( sock, (sockaddr *)&local_addr, &addrlen ) < 0 ) {
+ if ( getsockname( sock(), (sockaddr *)&local_addr, &addrlen ) < 0 ) {
throw NetworkException( "getsockname", errno );
}
@@ -501,9 +569,27 @@ uint64_t Connection::timeout( void ) const
return RTO;
}
-Connection::~Connection()
+Connection::Socket::~Socket()
{
- if ( close( sock ) < 0 ) {
- throw NetworkException( "close", errno );
+ if ( !_moved ) {
+ if ( close( _fd ) < 0 ) {
+ throw NetworkException( "close", errno );
+ }
}
}
+
+Connection::Socket::Socket( const Socket & other )
+ : _fd( other._fd ),
+ _moved( false )
+{
+ other.move();
+}
+
+const Connection::Socket & Connection::Socket::operator=( const Socket & other )
+{
+ _fd = other._fd;
+
+ other.move();
+
+ return *this;
+}
Oops, something went wrong.

0 comments on commit c0092a6

Please sign in to comment.