diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF index ff3f61348a..d4a9f249b9 100644 --- a/META-INF/MANIFEST.MF +++ b/META-INF/MANIFEST.MF @@ -2,6 +2,6 @@ Manifest-Version: 1.0 Ant-Version: Apache Ant 1.7.1 Created-By: 20.2-b06 (Sun Microsystems Inc.) Implementation-Title: Voldemort -Implementation-Version: 1.0.0 +Implementation-Version: 1.1.0 Implementation-Vendor: LinkedIn diff --git a/release_notes.txt b/release_notes.txt index 8aa089d733..eccc11451d 100644 --- a/release_notes.txt +++ b/release_notes.txt @@ -1,6 +1,13 @@ -Release 1.1.0.on 10/19/2012 +Release 1.1.1 on 10/30/2012 -Changes made since release 1.1.0 +Changed made since release 1.1.0 +* Fixed connection leak in ClientRequestExecutorFactory +* Changed client to default to DefaultStoreClient + + +Release 1.1.0 on 10/19/2012 + +Changes made since release 1.0.0 IMPORTANT NOTE : This release has significant changes to the BDB storage layer. Users are required to read the bin/PREUPGRADE_FOR_1_1_X_README file diff --git a/src/java/voldemort/client/ClientConfig.java b/src/java/voldemort/client/ClientConfig.java index 66a3ec6d4d..408fceb8c7 100644 --- a/src/java/voldemort/client/ClientConfig.java +++ b/src/java/voldemort/client/ClientConfig.java @@ -68,8 +68,10 @@ public class ClientConfig { private volatile boolean enablePipelineRoutedStore = true; private volatile int clientZoneId = Zone.DEFAULT_ZONE_ID; - // Flag to control which store client to use. Default = Enhanced - private volatile boolean useDefaultClient = false; + // Flag to control which store client to use: + // true = DefaultStoreClient + // false = ZenStoreClient + private volatile boolean useDefaultClient = true; private volatile String failureDetectorImplementation = FailureDetectorConfig.DEFAULT_IMPLEMENTATION_CLASS_NAME; private volatile long failureDetectorBannagePeriod = FailureDetectorConfig.DEFAULT_BANNAGE_PERIOD; diff --git a/src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorFactory.java b/src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorFactory.java index ce95df84c2..e3ada1adc9 100644 --- a/src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorFactory.java +++ b/src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorFactory.java @@ -120,104 +120,106 @@ public ClientRequestExecutor create(SocketDestination dest) throws Exception { + dest.getPort() + " using protocol " + dest.getRequestFormatType().getCode()); - SocketChannel socketChannel = SocketChannel.open(); - socketChannel.socket().setReceiveBufferSize(this.socketBufferSize); - socketChannel.socket().setSendBufferSize(this.socketBufferSize); - socketChannel.socket().setTcpNoDelay(true); - socketChannel.socket().setSoTimeout(soTimeoutMs); - socketChannel.socket().setKeepAlive(this.socketKeepAlive); - socketChannel.configureBlocking(false); - socketChannel.connect(new InetSocketAddress(dest.getHost(), dest.getPort())); - - long startTime = System.currentTimeMillis(); - long duration = 0; - long currWaitTime = 1; - long prevWaitTime = 1; - - // Since we're non-blocking and it takes a non-zero amount of time - // to connect, invoke finishConnect and loop. - while(!socketChannel.finishConnect()) { - duration = System.currentTimeMillis() - startTime; - long remaining = this.connectTimeoutMs - duration; - - if(remaining < 0) { - // Don't forget to close the socket before we throw our - // exception or they'll leak :( + SocketChannel socketChannel = null; + ClientRequestExecutor clientRequestExecutor = null; + + try { + socketChannel = SocketChannel.open(); + socketChannel.socket().setReceiveBufferSize(this.socketBufferSize); + socketChannel.socket().setSendBufferSize(this.socketBufferSize); + socketChannel.socket().setTcpNoDelay(true); + socketChannel.socket().setSoTimeout(soTimeoutMs); + socketChannel.socket().setKeepAlive(this.socketKeepAlive); + socketChannel.configureBlocking(false); + socketChannel.connect(new InetSocketAddress(dest.getHost(), dest.getPort())); + + long startTime = System.currentTimeMillis(); + long duration = 0; + long currWaitTime = 1; + long prevWaitTime = 1; + + // Since we're non-blocking and it takes a non-zero amount of time + // to connect, invoke finishConnect and loop. + while(!socketChannel.finishConnect()) { + duration = System.currentTimeMillis() - startTime; + long remaining = this.connectTimeoutMs - duration; + + if(remaining < 0) { + throw new ConnectException("Cannot connect socket " + numCreated + " for " + + dest.getHost() + ":" + dest.getPort() + " after " + + duration + " ms"); + } + + if(logger.isTraceEnabled()) + logger.trace("Still creating socket " + numCreated + " for " + dest.getHost() + + ":" + dest.getPort() + ", " + remaining + + " ms. remaining to connect"); + try { - socketChannel.close(); - } catch(Exception e) { + // Break up the connection timeout into smaller units, + // employing a Fibonacci-style back-off (1, 2, 3, 5, 8, ...) + Thread.sleep(Math.min(remaining, currWaitTime)); + currWaitTime = Math.min(currWaitTime + prevWaitTime, 50); + prevWaitTime = currWaitTime - prevWaitTime; + } catch(InterruptedException e) { if(logger.isEnabledFor(Level.WARN)) logger.warn(e, e); } - - throw new ConnectException("Cannot connect socket " + numCreated + " for " - + dest.getHost() + ":" + dest.getPort() + " after " - + duration + " ms"); } - if(logger.isTraceEnabled()) - logger.trace("Still creating socket " + numCreated + " for " + dest.getHost() + ":" - + dest.getPort() + ", " + remaining + " ms. remaining to connect"); - - try { - // Break up the connection timeout into smaller units, - // employing a Fibonacci-style back-off (1, 2, 3, 5, 8, ...) - Thread.sleep(Math.min(remaining, currWaitTime)); - currWaitTime = Math.min(currWaitTime + prevWaitTime, 50); - prevWaitTime = currWaitTime - prevWaitTime; - } catch(InterruptedException e) { - if(logger.isEnabledFor(Level.WARN)) - logger.warn(e, e); - } - } - - if(logger.isDebugEnabled()) - logger.debug("Created socket " + numCreated + " for " + dest.getHost() + ":" - + dest.getPort() + " using protocol " - + dest.getRequestFormatType().getCode() + " after " + duration + " ms."); - - // check buffer sizes--you often don't get out what you put in! - if(socketChannel.socket().getReceiveBufferSize() != this.socketBufferSize) - logger.debug("Requested socket receive buffer size was " + this.socketBufferSize - + " bytes but actual size is " - + socketChannel.socket().getReceiveBufferSize() + " bytes."); - - if(socketChannel.socket().getSendBufferSize() != this.socketBufferSize) - logger.debug("Requested socket send buffer size was " + this.socketBufferSize - + " bytes but actual size is " - + socketChannel.socket().getSendBufferSize() + " bytes."); - - ClientRequestSelectorManager selectorManager = selectorManagers[counter.getAndIncrement() - % selectorManagers.length]; - - Selector selector = selectorManager.getSelector(); - ClientRequestExecutor clientRequestExecutor = new ClientRequestExecutor(selector, - socketChannel, - socketBufferSize); - BlockingClientRequest clientRequest = new BlockingClientRequest(new ProtocolNegotiatorClientRequest(dest.getRequestFormatType()), - this.getTimeout()); - clientRequestExecutor.addClientRequest(clientRequest); - - selectorManager.registrationQueue.add(clientRequestExecutor); - selector.wakeup(); - - // Block while we wait for the protocol negotiation to complete. - clientRequest.await(); - - try { - // This will throw an error if the result of the protocol - // negotiation failed, otherwise it returns an uninteresting token - // we can safely ignore. + if(logger.isDebugEnabled()) + logger.debug("Created socket " + numCreated + " for " + dest.getHost() + ":" + + dest.getPort() + " using protocol " + + dest.getRequestFormatType().getCode() + " after " + duration + + " ms."); + + // check buffer sizes--you often don't get out what you put in! + if(socketChannel.socket().getReceiveBufferSize() != this.socketBufferSize) + logger.debug("Requested socket receive buffer size was " + this.socketBufferSize + + " bytes but actual size is " + + socketChannel.socket().getReceiveBufferSize() + " bytes."); + + if(socketChannel.socket().getSendBufferSize() != this.socketBufferSize) + logger.debug("Requested socket send buffer size was " + this.socketBufferSize + + " bytes but actual size is " + + socketChannel.socket().getSendBufferSize() + " bytes."); + + ClientRequestSelectorManager selectorManager = selectorManagers[counter.getAndIncrement() + % selectorManagers.length]; + + Selector selector = selectorManager.getSelector(); + clientRequestExecutor = new ClientRequestExecutor(selector, + socketChannel, + socketBufferSize); + BlockingClientRequest clientRequest = new BlockingClientRequest(new ProtocolNegotiatorClientRequest(dest.getRequestFormatType()), + this.getTimeout()); + clientRequestExecutor.addClientRequest(clientRequest); + + selectorManager.registrationQueue.add(clientRequestExecutor); + selector.wakeup(); + + // Block while we wait for protocol negotiation to complete. May + // throw interrupted exception + clientRequest.await(); + + // Either returns uninteresting token, or throws exception if + // protocol negotiation failed. clientRequest.getResult(); } catch(Exception e) { - // Don't forget to close the socket before we throw our exception or - // they'll leak :( - try { - socketChannel.close(); - } catch(Exception ex) { - if(logger.isEnabledFor(Level.WARN)) - logger.warn(ex, ex); + // Make sure not to leak socketChannels + if(socketChannel != null) { + try { + socketChannel.close(); + } catch(Exception ex) { + if(logger.isEnabledFor(Level.WARN)) + logger.warn(ex, ex); + } } + // If clientRequestExector is not null, some additional clean up may + // be warranted. However, clientRequestExecutor.close(), the + // "obvious" clean up, is not safe to call here. This is because + // .close() checks in a resource to the KeyedResourcePool that was + // never checked out. throw e; } diff --git a/src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorPool.java b/src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorPool.java index dd78813346..85a996e595 100644 --- a/src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorPool.java +++ b/src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorPool.java @@ -162,6 +162,13 @@ public ClientRequestExecutor checkout(SocketDestination destination) { try { clientRequestExecutor = queuedPool.checkout(destination); } catch(Exception e) { + // If this exception caught here is from the nonBlockingPut call + // within KeyedResourcePool.attemptGrow(), then there is the chance + // a ClientRequestExector resource will be leaked. Cannot safely + // deal with this here since clientRequestExecutor is not assigned + // in this catch. Even if it was, clientRequestExecutore.close() + // checks in the SocketDestination resource and so is not safe to + // call. throw new UnreachableStoreException("Failure while checking out socket for " + destination + ": ", e); } finally { diff --git a/src/java/voldemort/utils/pool/KeyedResourcePool.java b/src/java/voldemort/utils/pool/KeyedResourcePool.java index e71c582b74..6a68dca3ed 100644 --- a/src/java/voldemort/utils/pool/KeyedResourcePool.java +++ b/src/java/voldemort/utils/pool/KeyedResourcePool.java @@ -446,6 +446,8 @@ public boolean attemptGrow(K key, ResourceFactory objectFactory) throw } } } catch(Exception e) { + // If nonBlockingPut throws an exception, then we could leak + // the resource created by objectFactory.create(). this.size.decrementAndGet(); throw e; } diff --git a/test/unit/voldemort/client/ClientRegistryTest.java b/test/unit/voldemort/client/ClientRegistryTest.java index 87cb8b8512..24ecca1051 100644 --- a/test/unit/voldemort/client/ClientRegistryTest.java +++ b/test/unit/voldemort/client/ClientRegistryTest.java @@ -25,8 +25,6 @@ import junit.framework.TestCase; -import org.junit.After; -import org.junit.Before; import org.junit.Test; import voldemort.ServerTestUtils; @@ -73,7 +71,7 @@ public class ClientRegistryTest extends TestCase { .getValueSerializer()); private long startTime; - @Before + @Override public void setUp() throws Exception { if(null == servers) { @@ -98,7 +96,7 @@ public void setUp() throws Exception { startTime = System.currentTimeMillis(); } - @After + @Override public void tearDown() throws Exception { this.clearRegistryContent(); } @@ -116,6 +114,7 @@ public void testHappyPath() { .setBootstrapUrls(SERVER_LOCAL_URL + serverPorts[0]) .setClientContextName(CLIENT_CONTEXT_NAME) + .enableDefaultClient(false) .setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL) .setEnableLazy(false); SocketStoreClientFactory socketFactory = new SocketStoreClientFactory(clientConfig); @@ -182,6 +181,7 @@ public void testTwoClients() { .setBootstrapUrls(SERVER_LOCAL_URL + serverPorts[0]) .setClientContextName(CLIENT_CONTEXT_NAME) + .enableDefaultClient(false) .setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL) .setEnableLazy(false); SocketStoreClientFactory socketFactory = new SocketStoreClientFactory(clientConfig); @@ -270,6 +270,7 @@ public void testTwoStores() { .setBootstrapUrls(SERVER_LOCAL_URL + serverPorts[0]) .setClientContextName(CLIENT_CONTEXT_NAME) + .enableDefaultClient(false) .setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL) .setEnableLazy(false); SocketStoreClientFactory socketFactory = new SocketStoreClientFactory(clientConfig); @@ -375,6 +376,7 @@ public void testTwoFactories() { .setBootstrapUrls(SERVER_LOCAL_URL + serverPorts[0]) .setClientContextName(CLIENT_CONTEXT_NAME) + .enableDefaultClient(false) .setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL) .setEnableLazy(false); SocketStoreClientFactory socketFactory1 = new SocketStoreClientFactory(clientConfig); @@ -385,6 +387,7 @@ public void testTwoFactories() { .setBootstrapUrls(SERVER_LOCAL_URL + serverPorts[0]) .setClientContextName(CLIENT_CONTEXT_NAME2) + .enableDefaultClient(false) .setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL) .setEnableLazy(false); SocketStoreClientFactory socketFactory2 = new SocketStoreClientFactory(clientConfig2); @@ -514,6 +517,7 @@ public void testOneServerFailure() { .setBootstrapUrls(SERVER_LOCAL_URL + serverPorts[1]) .setClientContextName(CLIENT_CONTEXT_NAME) + .enableDefaultClient(false) .setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL) .setEnableLazy(false); SocketStoreClientFactory socketFactory1 = new SocketStoreClientFactory(clientConfig); @@ -524,6 +528,7 @@ public void testOneServerFailure() { .setBootstrapUrls(SERVER_LOCAL_URL + serverPorts[1]) .setClientContextName(CLIENT_CONTEXT_NAME2) + .enableDefaultClient(false) .setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL) .setEnableLazy(false); SocketStoreClientFactory socketFactory2 = new SocketStoreClientFactory(clientConfig2); @@ -609,6 +614,7 @@ public void testRepeatRegistrationSameFactory() { .setBootstrapUrls(SERVER_LOCAL_URL + serverPorts[1]) .setClientContextName(CLIENT_CONTEXT_NAME) + .enableDefaultClient(false) .setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL) .setEnableLazy(false); SocketStoreClientFactory socketFactory1 = new SocketStoreClientFactory(clientConfig); @@ -619,6 +625,7 @@ public void testRepeatRegistrationSameFactory() { .setBootstrapUrls(SERVER_LOCAL_URL + serverPorts[1]) .setClientContextName(CLIENT_CONTEXT_NAME2) + .enableDefaultClient(false) .setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL) .setEnableLazy(false); SocketStoreClientFactory socketFactory2 = new SocketStoreClientFactory(clientConfig2); @@ -662,6 +669,7 @@ public void testRepeatRegistrationDifferentFactories() { .setBootstrapUrls(SERVER_LOCAL_URL + serverPorts[1]) .setClientContextName(CLIENT_CONTEXT_NAME) + .enableDefaultClient(false) .setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL) .setEnableLazy(false); SocketStoreClientFactory socketFactory1 = new SocketStoreClientFactory(clientConfig); @@ -672,6 +680,7 @@ public void testRepeatRegistrationDifferentFactories() { .setBootstrapUrls(SERVER_LOCAL_URL + serverPorts[1]) .setClientContextName(CLIENT_CONTEXT_NAME2) + .enableDefaultClient(false) .setClientRegistryUpdateIntervalInSecs(CLIENT_REGISTRY_REFRESH_INTERVAL) .setEnableLazy(false); SocketStoreClientFactory socketFactory2 = new SocketStoreClientFactory(clientConfig2); diff --git a/test/unit/voldemort/client/SocketStoreClientFactoryMbeanTest.java b/test/unit/voldemort/client/SocketStoreClientFactoryMbeanTest.java index e785ee5641..7888aeed54 100644 --- a/test/unit/voldemort/client/SocketStoreClientFactoryMbeanTest.java +++ b/test/unit/voldemort/client/SocketStoreClientFactoryMbeanTest.java @@ -106,7 +106,7 @@ public void testMultipleDistinctClientsOnSingleFactory() { bootStrap(clients, 10); checkMbeanIdCount(CLIENT_DOMAIN, "ClientThreadPool*", 1, true); - checkMbeanIdCount(CLIENT_DOMAIN, "ZenStoreClient*", 2, true); + checkMbeanIdCount(CLIENT_DOMAIN, "*StoreClient*", 2, true); checkMbeanIdCount(CLUSTER_FAILUREDETECTOR_DOMAIN, "ThresholdFailureDetector*", 1, true); checkMbeanIdCount(PIPELINE_ROUTED_STATS_DOMAIN, "*", 2, true); checkMbeanIdCount(CLIENT_REQUEST_DOMAIN, "aggregated*", 1, true); @@ -135,7 +135,7 @@ public void testMultipleIndistinctClientsOnSingleFactory() { bootStrap(clients, 10); checkMbeanIdCount(CLIENT_DOMAIN, "ClientThreadPool*", 1, true); - checkMbeanIdCount(CLIENT_DOMAIN, "ZenStoreClient*", 2, true); + checkMbeanIdCount(CLIENT_DOMAIN, "*StoreClient*", 2, true); checkMbeanIdCount(CLUSTER_FAILUREDETECTOR_DOMAIN, "ThresholdFailureDetector*", 1, true); checkMbeanIdCount(PIPELINE_ROUTED_STATS_DOMAIN, "*", 2, true); checkMbeanIdCount(CLIENT_REQUEST_DOMAIN, "aggregated*", 1, true); @@ -163,7 +163,7 @@ public void testMultipleDistinctClientsOnMultipleFactories() { bootStrap(clients, 10); checkMbeanIdCount(CLIENT_DOMAIN, "ClientThreadPool*", 2, true); - checkMbeanIdCount(CLIENT_DOMAIN, "ZenStoreClient*", 2, true); + checkMbeanIdCount(CLIENT_DOMAIN, "*StoreClient*", 2, true); checkMbeanIdCount(CLUSTER_FAILUREDETECTOR_DOMAIN, "ThresholdFailureDetector*", 2, true); checkMbeanIdCount(PIPELINE_ROUTED_STATS_DOMAIN, "*", 2, true); checkMbeanIdCount(CLIENT_REQUEST_DOMAIN, "aggregated*", 2, true); @@ -198,7 +198,7 @@ public void testMultipleInDistinctClientsOnMultipleFactories() { bootStrap(clients, 10); checkMbeanIdCount(CLIENT_DOMAIN, "ClientThreadPool*", 2, true); - checkMbeanIdCount(CLIENT_DOMAIN, "ZenStoreClient*", 2, true); + checkMbeanIdCount(CLIENT_DOMAIN, "*StoreClient*", 2, true); checkMbeanIdCount(CLUSTER_FAILUREDETECTOR_DOMAIN, "ThresholdFailureDetector*", 2, true); checkMbeanIdCount(PIPELINE_ROUTED_STATS_DOMAIN, "*", 4, true); checkMbeanIdCount(CLIENT_REQUEST_DOMAIN, "aggregated*", 2, true);