Skip to content

Commit

Permalink
Enable keepalive and the ability to set X10_SOCKET_TIMEOUT in native …
Browse files Browse the repository at this point in the history
…sockets

This turns on keepalive on the native X10 place->place socket communication links,
and if the user sets the environment variable "X10_SOCKET_TIMEOUT" to a value in
milliseconds, this will override the default socket timeout, which may help detect
a dead place under resilient X10 more quickly on some systems.
  • Loading branch information
bherta committed Nov 6, 2015
1 parent 9d21e7a commit 9ff1a05
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 23 deletions.
2 changes: 2 additions & 0 deletions x10.runtime/x10rt/sockets/Launcher.h
Expand Up @@ -52,6 +52,8 @@
#define X10_NOWRITEBUFFER "X10_NOWRITEBUFFER" // turns off non-blocking sockets
#define X10_LIBRARY_MODE "X10_LIBRARY_MODE" // Don't use our own launcher, but instead rely on some external system.
#define X10_LAUNCHER_TTY "X10_LAUNCHER_TTY" // set to false to disable Pseudo-TTY over SSH, which is used by default
#define X10_SOCKET_TIMEOUT "X10_SOCKET_TIMEOUT" // milliseconds, how long to wait before marking a place as dead

// don't miss X10_DEBUGGER_ID and X10_DEBUGGER_NAME over in DebugHelper.h

// how many seconds to wait after the first runtime exits, before we force any remaining runtimes to die
Expand Down
80 changes: 57 additions & 23 deletions x10.runtime/x10rt/sockets/x10rt_sockets.cc
Expand Up @@ -89,6 +89,7 @@ struct x10SocketState
// special case for index=myPlaceId on the above three. The socket link is the local listen socket,
// the read lock is used for listen socket handling and write lock for launcher communication
bool useNonblockingLinks; // flag to enable/disable buffered writes. True by default
int sotimeout;
struct x10SocketDataToWrite* pendingWrites;
pthread_mutex_t pendingWriteLock;
STATUS state;
Expand Down Expand Up @@ -181,6 +182,51 @@ int getPortEnv(unsigned int whichPlace)
return lp;
}

// set place-place socket link options.
int setSocketOptions(int socketFD) {
int returnval = 0;
// set SO_LINGER
struct linger linger;
linger.l_onoff = 1;
linger.l_linger = 1;
if (setsockopt(socketFD, SOL_SOCKET, SO_LINGER, &linger, sizeof(linger)) < 0) {
context.errorCode = X10RT_ERR_OTHER;
fatal_error("Error setting SO_LINGER on incoming socket");
returnval = -1;
}

if (context.useNonblockingLinks)
{
int flags = fcntl(socketFD, F_GETFL, 0);
if (fcntl(socketFD, F_SETFL, flags | O_NONBLOCK) == -1)
returnval = -1;
}

// enable keepalive
int keepalive = 1;
if(setsockopt(socketFD, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive)) < 0) {
fprintf(stderr, "Socket keepalive option set failed");
returnval = -1;
}

// adjust timeout for detecting a dead place
if (context.sotimeout > -1) {
struct timeval timeout;
// context.sotimeout is in milliseconds. convert to seconds + microseconds
timeout.tv_sec = context.sotimeout/1000;
timeout.tv_usec = (context.sotimeout - (timeout.tv_sec*1000)) * 1000;
if (setsockopt(socketFD, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) < 0) {
fprintf(stderr, "Socket SO_RCVTIMEO option set failed");
returnval = -1;
}
if (setsockopt(socketFD, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)) < 0) {
fprintf(stderr, "Socket SO_SNDTIMEO option set failed");
returnval = -1;
}
}
return returnval;
}

/*
* returns true if there is data remaining to flush
*/
Expand Down Expand Up @@ -413,20 +459,9 @@ int handleConnectionRequest()
pthread_mutex_init(&context.writeLocks[from], NULL);
context.socketLinks[from].fd = newFD;
context.socketLinks[from].events = POLLIN | POLLPRI;
// set SO_LINGER
struct linger linger;
linger.l_onoff = 1;
linger.l_linger = 1;
if (setsockopt(newFD, SOL_SOCKET, SO_LINGER, &linger, sizeof(linger)) < 0) {
context.errorCode = X10RT_ERR_OTHER;
fatal_error("Error setting SO_LINGER on incoming socket");
return -1;
}
if (context.useNonblockingLinks)
{
int flags = fcntl(newFD, F_GETFL, 0);
fcntl(newFD, F_SETFL, flags | O_NONBLOCK);
}

setSocketOptions(newFD);

return 0;
}
}
Expand Down Expand Up @@ -564,15 +599,8 @@ int initLink(uint32_t remotePlace, char* connectionInfo)
context.socketLinks[remotePlace].fd = newFD;
context.socketLinks[remotePlace].events = POLLIN | POLLPRI;

// set SO_LINGER
struct linger linger;
linger.l_onoff = 1;
linger.l_linger = 1;
if (setsockopt(newFD, SOL_SOCKET, SO_LINGER, &linger, sizeof(linger)) < 0) {
context.errorCode = X10RT_ERR_OTHER;
fatal_error("Error setting SO_LINGER on outgoing socket");
return -1;
}
setSocketOptions(newFD);

if (context.useNonblockingLinks)
{
int flags = fcntl(newFD, F_GETFL, 0);
Expand Down Expand Up @@ -650,6 +678,12 @@ x10rt_error x10rt_net_init (int * argc, char ***argv, x10rt_msg_type *counter)
if (context.useNonblockingLinks)
pthread_mutex_init(&context.pendingWriteLock, NULL);

char* socketTimeout = getenv(X10_SOCKET_TIMEOUT);
if (socketTimeout != NULL)
context.sotimeout = atoi(socketTimeout);
else
context.sotimeout = -1;

if (context.state == PREINITIALIZED) {
// phase 2 of the library mode. Basically, initialize everything other than what was done above. The arguments
// list is expected to contain the connection information needed to link up to the other runtimes, as well as the
Expand Down

0 comments on commit 9ff1a05

Please sign in to comment.