Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fixes for connection leak and ZenStoreClient config

- Applied fix for socketChannel leak in ClientRequestExecutorFactory.create()
- Added comments to document other code paths at risk of leaking socketDestinations
- changed ClientConfig default from ZenStoreClient to DefaultStoreClient
- updated release notes
  • Loading branch information...
commit 6246d4e894855ff853e33020ed29ddbe810058c5 1 parent c227e34
Jay Wylie jayjwylie authored
2  META-INF/MANIFEST.MF
View
@@ -2,6 +2,6 @@ Manifest-Version: 1.0
Ant-Version: Apache Ant 1.7.1
Created-By: 20.2-b06 (Sun Microsystems Inc.)
Implementation-Title: Voldemort
-Implementation-Version: 1.0.0
+Implementation-Version: 1.1.0
Implementation-Vendor: LinkedIn
11 release_notes.txt
View
@@ -1,6 +1,13 @@
-Release 1.1.0.on 10/19/2012
+Release 1.1.1 on 10/30/2012
-Changes made since release 1.1.0
+Changed made since release 1.1.0
+* Fixed connection leak in ClientRequestExecutorFactory
+* Changed client to default to DefaultStoreClient
+
+
+Release 1.1.0 on 10/19/2012
+
+Changes made since release 1.0.0
IMPORTANT NOTE : This release has significant changes to the BDB storage layer.
Users are required to read the bin/PREUPGRADE_FOR_1_1_X_README file
6 src/java/voldemort/client/ClientConfig.java
View
@@ -68,8 +68,10 @@
private volatile boolean enablePipelineRoutedStore = true;
private volatile int clientZoneId = Zone.DEFAULT_ZONE_ID;
- // Flag to control which store client to use. Default = Enhanced
- private volatile boolean useDefaultClient = false;
+ // Flag to control which store client to use:
+ // true = DefaultStoreClient
+ // false = ZenStoreClient
+ private volatile boolean useDefaultClient = true;
private volatile String failureDetectorImplementation = FailureDetectorConfig.DEFAULT_IMPLEMENTATION_CLASS_NAME;
private volatile long failureDetectorBannagePeriod = FailureDetectorConfig.DEFAULT_BANNAGE_PERIOD;
180 src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorFactory.java
View
@@ -120,104 +120,106 @@ public ClientRequestExecutor create(SocketDestination dest) throws Exception {
+ dest.getPort() + " using protocol "
+ dest.getRequestFormatType().getCode());
- SocketChannel socketChannel = SocketChannel.open();
- socketChannel.socket().setReceiveBufferSize(this.socketBufferSize);
- socketChannel.socket().setSendBufferSize(this.socketBufferSize);
- socketChannel.socket().setTcpNoDelay(true);
- socketChannel.socket().setSoTimeout(soTimeoutMs);
- socketChannel.socket().setKeepAlive(this.socketKeepAlive);
- socketChannel.configureBlocking(false);
- socketChannel.connect(new InetSocketAddress(dest.getHost(), dest.getPort()));
-
- long startTime = System.currentTimeMillis();
- long duration = 0;
- long currWaitTime = 1;
- long prevWaitTime = 1;
-
- // Since we're non-blocking and it takes a non-zero amount of time
- // to connect, invoke finishConnect and loop.
- while(!socketChannel.finishConnect()) {
- duration = System.currentTimeMillis() - startTime;
- long remaining = this.connectTimeoutMs - duration;
-
- if(remaining < 0) {
- // Don't forget to close the socket before we throw our
- // exception or they'll leak :(
+ SocketChannel socketChannel = null;
+ ClientRequestExecutor clientRequestExecutor = null;
+
+ try {
+ socketChannel = SocketChannel.open();
+ socketChannel.socket().setReceiveBufferSize(this.socketBufferSize);
+ socketChannel.socket().setSendBufferSize(this.socketBufferSize);
+ socketChannel.socket().setTcpNoDelay(true);
+ socketChannel.socket().setSoTimeout(soTimeoutMs);
+ socketChannel.socket().setKeepAlive(this.socketKeepAlive);
+ socketChannel.configureBlocking(false);
+ socketChannel.connect(new InetSocketAddress(dest.getHost(), dest.getPort()));
+
+ long startTime = System.currentTimeMillis();
+ long duration = 0;
+ long currWaitTime = 1;
+ long prevWaitTime = 1;
+
+ // Since we're non-blocking and it takes a non-zero amount of time
+ // to connect, invoke finishConnect and loop.
+ while(!socketChannel.finishConnect()) {
+ duration = System.currentTimeMillis() - startTime;
+ long remaining = this.connectTimeoutMs - duration;
+
+ if(remaining < 0) {
+ throw new ConnectException("Cannot connect socket " + numCreated + " for "
+ + dest.getHost() + ":" + dest.getPort() + " after "
+ + duration + " ms");
+ }
+
+ if(logger.isTraceEnabled())
+ logger.trace("Still creating socket " + numCreated + " for " + dest.getHost()
+ + ":" + dest.getPort() + ", " + remaining
+ + " ms. remaining to connect");
+
try {
- socketChannel.close();
- } catch(Exception e) {
+ // Break up the connection timeout into smaller units,
+ // employing a Fibonacci-style back-off (1, 2, 3, 5, 8, ...)
+ Thread.sleep(Math.min(remaining, currWaitTime));
+ currWaitTime = Math.min(currWaitTime + prevWaitTime, 50);
+ prevWaitTime = currWaitTime - prevWaitTime;
+ } catch(InterruptedException e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e, e);
}
-
- throw new ConnectException("Cannot connect socket " + numCreated + " for "
- + dest.getHost() + ":" + dest.getPort() + " after "
- + duration + " ms");
}
- if(logger.isTraceEnabled())
- logger.trace("Still creating socket " + numCreated + " for " + dest.getHost() + ":"
- + dest.getPort() + ", " + remaining + " ms. remaining to connect");
-
- try {
- // Break up the connection timeout into smaller units,
- // employing a Fibonacci-style back-off (1, 2, 3, 5, 8, ...)
- Thread.sleep(Math.min(remaining, currWaitTime));
- currWaitTime = Math.min(currWaitTime + prevWaitTime, 50);
- prevWaitTime = currWaitTime - prevWaitTime;
- } catch(InterruptedException e) {
- if(logger.isEnabledFor(Level.WARN))
- logger.warn(e, e);
- }
- }
-
- if(logger.isDebugEnabled())
- logger.debug("Created socket " + numCreated + " for " + dest.getHost() + ":"
- + dest.getPort() + " using protocol "
- + dest.getRequestFormatType().getCode() + " after " + duration + " ms.");
-
- // check buffer sizes--you often don't get out what you put in!
- if(socketChannel.socket().getReceiveBufferSize() != this.socketBufferSize)
- logger.debug("Requested socket receive buffer size was " + this.socketBufferSize
- + " bytes but actual size is "
- + socketChannel.socket().getReceiveBufferSize() + " bytes.");
-
- if(socketChannel.socket().getSendBufferSize() != this.socketBufferSize)
- logger.debug("Requested socket send buffer size was " + this.socketBufferSize
- + " bytes but actual size is "
- + socketChannel.socket().getSendBufferSize() + " bytes.");
-
- ClientRequestSelectorManager selectorManager = selectorManagers[counter.getAndIncrement()
- % selectorManagers.length];
-
- Selector selector = selectorManager.getSelector();
- ClientRequestExecutor clientRequestExecutor = new ClientRequestExecutor(selector,
- socketChannel,
- socketBufferSize);
- BlockingClientRequest<String> clientRequest = new BlockingClientRequest<String>(new ProtocolNegotiatorClientRequest(dest.getRequestFormatType()),
- this.getTimeout());
- clientRequestExecutor.addClientRequest(clientRequest);
-
- selectorManager.registrationQueue.add(clientRequestExecutor);
- selector.wakeup();
-
- // Block while we wait for the protocol negotiation to complete.
- clientRequest.await();
-
- try {
- // This will throw an error if the result of the protocol
- // negotiation failed, otherwise it returns an uninteresting token
- // we can safely ignore.
+ if(logger.isDebugEnabled())
+ logger.debug("Created socket " + numCreated + " for " + dest.getHost() + ":"
+ + dest.getPort() + " using protocol "
+ + dest.getRequestFormatType().getCode() + " after " + duration
+ + " ms.");
+
+ // check buffer sizes--you often don't get out what you put in!
+ if(socketChannel.socket().getReceiveBufferSize() != this.socketBufferSize)
+ logger.debug("Requested socket receive buffer size was " + this.socketBufferSize
+ + " bytes but actual size is "
+ + socketChannel.socket().getReceiveBufferSize() + " bytes.");
+
+ if(socketChannel.socket().getSendBufferSize() != this.socketBufferSize)
+ logger.debug("Requested socket send buffer size was " + this.socketBufferSize
+ + " bytes but actual size is "
+ + socketChannel.socket().getSendBufferSize() + " bytes.");
+
+ ClientRequestSelectorManager selectorManager = selectorManagers[counter.getAndIncrement()
+ % selectorManagers.length];
+
+ Selector selector = selectorManager.getSelector();
+ clientRequestExecutor = new ClientRequestExecutor(selector,
+ socketChannel,
+ socketBufferSize);
+ BlockingClientRequest<String> clientRequest = new BlockingClientRequest<String>(new ProtocolNegotiatorClientRequest(dest.getRequestFormatType()),
+ this.getTimeout());
+ clientRequestExecutor.addClientRequest(clientRequest);
+
+ selectorManager.registrationQueue.add(clientRequestExecutor);
+ selector.wakeup();
+
+ // Block while we wait for protocol negotiation to complete. May
+ // throw interrupted exception
+ clientRequest.await();
+
+ // Either returns uninteresting token, or throws exception if
+ // protocol negotiation failed.
clientRequest.getResult();
} catch(Exception e) {
- // Don't forget to close the socket before we throw our exception or
- // they'll leak :(
- try {
- socketChannel.close();
- } catch(Exception ex) {
- if(logger.isEnabledFor(Level.WARN))
- logger.warn(ex, ex);
+ // Make sure not to leak socketChannels
+ if(socketChannel != null) {
+ try {
+ socketChannel.close();
+ } catch(Exception ex) {
+ if(logger.isEnabledFor(Level.WARN))
+ logger.warn(ex, ex);
+ }
}
+ // If clientRequestExector is not null, some additional clean up may
+ // be warranted. However, clientRequestExecutor.close(), the
+ // "obvious" clean up, is not safe to call here. This is because
+ // .close() checks in a resource to the KeyedResourcePool that was
+ // never checked out.
throw e;
}
7 src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorPool.java
View
@@ -162,6 +162,13 @@ public ClientRequestExecutor checkout(SocketDestination destination) {
try {
clientRequestExecutor = queuedPool.checkout(destination);
} catch(Exception e) {
+ // If this exception caught here is from the nonBlockingPut call
+ // within KeyedResourcePool.attemptGrow(), then there is the chance
+ // a ClientRequestExector resource will be leaked. Cannot safely
+ // deal with this here since clientRequestExecutor is not assigned
+ // in this catch. Even if it was, clientRequestExecutore.close()
+ // checks in the SocketDestination resource and so is not safe to
+ // call.
throw new UnreachableStoreException("Failure while checking out socket for "
+ destination + ": ", e);
} finally {
2  src/java/voldemort/utils/pool/KeyedResourcePool.java
View
@@ -446,6 +446,8 @@ public Pool(ResourcePoolConfig resourcePoolConfig) {
}
}
} catch(Exception e) {
+ // If nonBlockingPut throws an exception, then we could leak
+ // the resource created by objectFactory.create().
this.size.decrementAndGet();
throw e;
}
17 test/unit/voldemort/client/ClientRegistryTest.java
View
@@ -25,8 +25,6 @@
import junit.framework.TestCase;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import voldemort.ServerTestUtils;
@@ -73,7 +71,7 @@
.getValueSerializer());
private long startTime;
- @Before
+ @Override
public void setUp() throws Exception {
if(null == servers) {
@@ -98,7 +96,7 @@ public void setUp() throws Exception {
startTime = System.currentTimeMillis();
}
- @After
+ @Override
public void tearDown() throws Exception {
this.clearRegistryContent();
}
@@ -116,6 +114,7 @@ public void testHappyPath() {
.setBootstrapUrls(SERVER_LOCAL_URL
+ serverPorts[0])
.setClientContextName(CLIENT_CONTEXT_NAME)
+ .enableDefaultClient(false)
.setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL)
.setEnableLazy(false);
SocketStoreClientFactory socketFactory = new SocketStoreClientFactory(clientConfig);
@@ -182,6 +181,7 @@ public void testTwoClients() {
.setBootstrapUrls(SERVER_LOCAL_URL
+ serverPorts[0])
.setClientContextName(CLIENT_CONTEXT_NAME)
+ .enableDefaultClient(false)
.setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL)
.setEnableLazy(false);
SocketStoreClientFactory socketFactory = new SocketStoreClientFactory(clientConfig);
@@ -270,6 +270,7 @@ public void testTwoStores() {
.setBootstrapUrls(SERVER_LOCAL_URL
+ serverPorts[0])
.setClientContextName(CLIENT_CONTEXT_NAME)
+ .enableDefaultClient(false)
.setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL)
.setEnableLazy(false);
SocketStoreClientFactory socketFactory = new SocketStoreClientFactory(clientConfig);
@@ -375,6 +376,7 @@ public void testTwoFactories() {
.setBootstrapUrls(SERVER_LOCAL_URL
+ serverPorts[0])
.setClientContextName(CLIENT_CONTEXT_NAME)
+ .enableDefaultClient(false)
.setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL)
.setEnableLazy(false);
SocketStoreClientFactory socketFactory1 = new SocketStoreClientFactory(clientConfig);
@@ -385,6 +387,7 @@ public void testTwoFactories() {
.setBootstrapUrls(SERVER_LOCAL_URL
+ serverPorts[0])
.setClientContextName(CLIENT_CONTEXT_NAME2)
+ .enableDefaultClient(false)
.setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL)
.setEnableLazy(false);
SocketStoreClientFactory socketFactory2 = new SocketStoreClientFactory(clientConfig2);
@@ -514,6 +517,7 @@ public void testOneServerFailure() {
.setBootstrapUrls(SERVER_LOCAL_URL
+ serverPorts[1])
.setClientContextName(CLIENT_CONTEXT_NAME)
+ .enableDefaultClient(false)
.setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL)
.setEnableLazy(false);
SocketStoreClientFactory socketFactory1 = new SocketStoreClientFactory(clientConfig);
@@ -524,6 +528,7 @@ public void testOneServerFailure() {
.setBootstrapUrls(SERVER_LOCAL_URL
+ serverPorts[1])
.setClientContextName(CLIENT_CONTEXT_NAME2)
+ .enableDefaultClient(false)
.setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL)
.setEnableLazy(false);
SocketStoreClientFactory socketFactory2 = new SocketStoreClientFactory(clientConfig2);
@@ -609,6 +614,7 @@ public void testRepeatRegistrationSameFactory() {
.setBootstrapUrls(SERVER_LOCAL_URL
+ serverPorts[1])
.setClientContextName(CLIENT_CONTEXT_NAME)
+ .enableDefaultClient(false)
.setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL)
.setEnableLazy(false);
SocketStoreClientFactory socketFactory1 = new SocketStoreClientFactory(clientConfig);
@@ -619,6 +625,7 @@ public void testRepeatRegistrationSameFactory() {
.setBootstrapUrls(SERVER_LOCAL_URL
+ serverPorts[1])
.setClientContextName(CLIENT_CONTEXT_NAME2)
+ .enableDefaultClient(false)
.setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL)
.setEnableLazy(false);
SocketStoreClientFactory socketFactory2 = new SocketStoreClientFactory(clientConfig2);
@@ -662,6 +669,7 @@ public void testRepeatRegistrationDifferentFactories() {
.setBootstrapUrls(SERVER_LOCAL_URL
+ serverPorts[1])
.setClientContextName(CLIENT_CONTEXT_NAME)
+ .enableDefaultClient(false)
.setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL)
.setEnableLazy(false);
SocketStoreClientFactory socketFactory1 = new SocketStoreClientFactory(clientConfig);
@@ -672,6 +680,7 @@ public void testRepeatRegistrationDifferentFactories() {
.setBootstrapUrls(SERVER_LOCAL_URL
+ serverPorts[1])
.setClientContextName(CLIENT_CONTEXT_NAME2)
+ .enableDefaultClient(false)
.setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL)
.setEnableLazy(false);
SocketStoreClientFactory socketFactory2 = new SocketStoreClientFactory(clientConfig2);
8 test/unit/voldemort/client/SocketStoreClientFactoryMbeanTest.java
View
@@ -106,7 +106,7 @@ public void testMultipleDistinctClientsOnSingleFactory() {
bootStrap(clients, 10);
checkMbeanIdCount(CLIENT_DOMAIN, "ClientThreadPool*", 1, true);
- checkMbeanIdCount(CLIENT_DOMAIN, "ZenStoreClient*", 2, true);
+ checkMbeanIdCount(CLIENT_DOMAIN, "*StoreClient*", 2, true);
checkMbeanIdCount(CLUSTER_FAILUREDETECTOR_DOMAIN, "ThresholdFailureDetector*", 1, true);
checkMbeanIdCount(PIPELINE_ROUTED_STATS_DOMAIN, "*", 2, true);
checkMbeanIdCount(CLIENT_REQUEST_DOMAIN, "aggregated*", 1, true);
@@ -135,7 +135,7 @@ public void testMultipleIndistinctClientsOnSingleFactory() {
bootStrap(clients, 10);
checkMbeanIdCount(CLIENT_DOMAIN, "ClientThreadPool*", 1, true);
- checkMbeanIdCount(CLIENT_DOMAIN, "ZenStoreClient*", 2, true);
+ checkMbeanIdCount(CLIENT_DOMAIN, "*StoreClient*", 2, true);
checkMbeanIdCount(CLUSTER_FAILUREDETECTOR_DOMAIN, "ThresholdFailureDetector*", 1, true);
checkMbeanIdCount(PIPELINE_ROUTED_STATS_DOMAIN, "*", 2, true);
checkMbeanIdCount(CLIENT_REQUEST_DOMAIN, "aggregated*", 1, true);
@@ -163,7 +163,7 @@ public void testMultipleDistinctClientsOnMultipleFactories() {
bootStrap(clients, 10);
checkMbeanIdCount(CLIENT_DOMAIN, "ClientThreadPool*", 2, true);
- checkMbeanIdCount(CLIENT_DOMAIN, "ZenStoreClient*", 2, true);
+ checkMbeanIdCount(CLIENT_DOMAIN, "*StoreClient*", 2, true);
checkMbeanIdCount(CLUSTER_FAILUREDETECTOR_DOMAIN, "ThresholdFailureDetector*", 2, true);
checkMbeanIdCount(PIPELINE_ROUTED_STATS_DOMAIN, "*", 2, true);
checkMbeanIdCount(CLIENT_REQUEST_DOMAIN, "aggregated*", 2, true);
@@ -198,7 +198,7 @@ public void testMultipleInDistinctClientsOnMultipleFactories() {
bootStrap(clients, 10);
checkMbeanIdCount(CLIENT_DOMAIN, "ClientThreadPool*", 2, true);
- checkMbeanIdCount(CLIENT_DOMAIN, "ZenStoreClient*", 2, true);
+ checkMbeanIdCount(CLIENT_DOMAIN, "*StoreClient*", 2, true);
checkMbeanIdCount(CLUSTER_FAILUREDETECTOR_DOMAIN, "ThresholdFailureDetector*", 2, true);
checkMbeanIdCount(PIPELINE_ROUTED_STATS_DOMAIN, "*", 4, true);
checkMbeanIdCount(CLIENT_REQUEST_DOMAIN, "aggregated*", 2, true);
Please sign in to comment.
Something went wrong with that request. Please try again.