diff --git a/core/src/main/java/me/prettyprint/cassandra/connection/HThriftClient.java b/core/src/main/java/me/prettyprint/cassandra/connection/HThriftClient.java index 492519089..290ee6cca 100644 --- a/core/src/main/java/me/prettyprint/cassandra/connection/HThriftClient.java +++ b/core/src/main/java/me/prettyprint/cassandra/connection/HThriftClient.java @@ -1,5 +1,7 @@ package me.prettyprint.cassandra.connection; +import java.net.Socket; +import java.net.SocketException; import java.util.concurrent.atomic.AtomicLong; import me.prettyprint.cassandra.service.CassandraHost; @@ -103,11 +105,20 @@ HThriftClient open() { log.debug("Creating a new thrift connection to {}", cassandraHost); } + TSocket socket = new TSocket(cassandraHost.getHost(), cassandraHost.getPort(), timeout); + if ( cassandraHost.getUseSocketKeepalive() ) { + try { + socket.getSocket().setKeepAlive(true); + } catch (SocketException se) { + throw new HectorTransportException("Could not set SO_KEEPALIVE on socket: ", se); + } + } if (cassandraHost.getUseThriftFramedTransport()) { - transport = new TFramedTransport(new TSocket(cassandraHost.getHost(), cassandraHost.getPort(), timeout)); + transport = new TFramedTransport(socket); } else { - transport = new TSocket(cassandraHost.getHost(), cassandraHost.getPort(), timeout); + transport = socket; } + try { transport.open(); } catch (TTransportException e) { diff --git a/core/src/main/java/me/prettyprint/cassandra/service/CassandraHost.java b/core/src/main/java/me/prettyprint/cassandra/service/CassandraHost.java index 93bd0c22d..e03e7144f 100644 --- a/core/src/main/java/me/prettyprint/cassandra/service/CassandraHost.java +++ b/core/src/main/java/me/prettyprint/cassandra/service/CassandraHost.java @@ -59,6 +59,7 @@ public final class CassandraHost { private int cassandraThriftSocketTimeout; private ExhaustedPolicy exhaustedPolicy = ExhaustedPolicy.WHEN_EXHAUSTED_BLOCK; private boolean useThriftFramedTransport = DEFAULT_USE_FRAMED_THRIFT_TRANSPORT; + private boolean useSocketKeepalive; //TODO(ran): private FailoverPolicy failoverPolicy = DEFAULT_FAILOVER_POLICY; public CassandraHost(String url) { @@ -225,4 +226,13 @@ public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis; } + public boolean getUseSocketKeepalive() { + return useSocketKeepalive; + } + + public void setUseSocketKeepalive(boolean useSocketKeepalive) { + this.useSocketKeepalive = useSocketKeepalive; + } + + } diff --git a/core/src/main/java/me/prettyprint/cassandra/service/CassandraHostConfigurator.java b/core/src/main/java/me/prettyprint/cassandra/service/CassandraHostConfigurator.java index 0156c9dfd..d4627e8d0 100644 --- a/core/src/main/java/me/prettyprint/cassandra/service/CassandraHostConfigurator.java +++ b/core/src/main/java/me/prettyprint/cassandra/service/CassandraHostConfigurator.java @@ -40,6 +40,7 @@ public final class CassandraHostConfigurator implements Serializable { private int hostTimeoutUnsuspendCheckDelay = HostTimeoutTracker.DEF_NODE_UNSUSPEND_CHECK_DELAY_IN_SECONDS; private boolean useHostTimeoutTracker = false; private boolean runAutoDiscoveryAtStartup = false; + private boolean useSocketKeepalive = false; public CassandraHostConfigurator() { this.hosts = null; @@ -72,6 +73,7 @@ public void applyConfig(CassandraHost cassandraHost) { cassandraHost.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); cassandraHost.setMaxWaitTimeWhenExhausted(maxWaitTimeWhenExhausted); cassandraHost.setUseThriftFramedTransport(useThriftFramedTransport); + cassandraHost.setUseSocketKeepalive(useSocketKeepalive); // this is special as it can be passed in as a system property if (cassandraThriftSocketTimeout > 0) { @@ -289,6 +291,18 @@ public void setRunAutoDiscoveryAtStartup(boolean runAutoDiscoveryAtStartup) { this.runAutoDiscoveryAtStartup = runAutoDiscoveryAtStartup; } + public boolean getUseSocketKeepalive() { + return useSocketKeepalive; + } + + /** + * Enable SO_KEEPALIVE on the underlying socket. OFF by default (per java.net.Socket) + * + */ + public void setUseSocketKeepalive(boolean useSocketKeepalive) { + this.useSocketKeepalive = useSocketKeepalive; + } + }