From 1172064599ac49b3b31bc15f46d946acd4f6fa48 Mon Sep 17 00:00:00 2001 From: Alex Jing Date: Tue, 17 Apr 2018 15:22:37 -0700 Subject: [PATCH] add zookeeper connection sharing feature RB=1260149 G=si-core-reviewers R=ssheng,dhoa,cxu,fcapponi A=fcapponi --- CHANGELOG | 2 + .../linkedin/d2/balancer/D2ClientBuilder.java | 11 +- .../linkedin/d2/balancer/D2ClientConfig.java | 11 +- ...LastSeenBalancerWithFacilitiesFactory.java | 8 +- .../servers/ZooKeeperConnectionManager.java | 134 ++-- .../d2/discovery/stores/zk/ZKConnection.java | 10 + .../stores/zk/ZKConnectionBuilder.java | 38 +- .../stores/zk/ZKPersistentConnection.java | 86 ++- .../stores/zk/ZkConnectionDealer.java | 92 +++ .../stores/zk/TestZKPersistentConnection.java | 93 +++ .../stores/zk/ZkConnectionBuilderTest.java | 68 ++ .../stores/zk/ZkConnectionDealerTest.java | 617 ++++++++++++++++++ 12 files changed, 1117 insertions(+), 53 deletions(-) create mode 100644 d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZkConnectionDealer.java create mode 100644 d2/src/test/java/com/linkedin/d2/discovery/stores/zk/ZkConnectionBuilderTest.java create mode 100644 d2/src/test/java/com/linkedin/d2/discovery/stores/zk/ZkConnectionDealerTest.java diff --git a/CHANGELOG b/CHANGELOG index 9f7f40b67e..dffe886f89 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,7 @@ 20.0.19 ------- +(RB=1260149) +add zookeeper connection sharing feature 20.0.18 ------- diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java index 2cfc0ec5b2..fa8df07b89 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java @@ -35,6 +35,8 @@ import com.linkedin.d2.balancer.util.healthcheck.HealthCheckOperations; import com.linkedin.d2.balancer.util.partitions.PartitionAccessorRegistry; import com.linkedin.d2.balancer.zkfs.ZKFSTogglingLoadBalancerFactoryImpl; +import com.linkedin.d2.discovery.stores.zk.ZKConnectionBuilder; +import com.linkedin.d2.discovery.stores.zk.ZkConnectionDealer; import com.linkedin.d2.discovery.stores.zk.ZooKeeper; import com.linkedin.r2.transport.common.TransportClientFactory; import com.linkedin.r2.transport.http.client.HttpClientFactory; @@ -131,7 +133,8 @@ public D2Client build() _config.zooKeeperDecorator, _config.enableSaveUriDataOnDisk, loadBalancerStrategyFactories, - _config.requestTimeoutHandlerEnabled); + _config.requestTimeoutHandlerEnabled, + _config.connectionDealer); final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ? new ZKFSLoadBalancerWithFacilitiesFactory() : @@ -415,6 +418,12 @@ public D2ClientBuilder setRequestTimeoutHandlerEnabled(boolean requestTimeoutHan return this; } + public D2ClientBuilder setZKConnectionDealer(ZkConnectionDealer connectionDealer) + { + _config.connectionDealer = connectionDealer; + return this; + } + private Map createDefaultTransportClientFactories() { final Map clientFactories = new HashMap(); diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java index cf9b0c366b..38f6d90449 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java @@ -17,6 +17,7 @@ import com.linkedin.d2.backuprequests.BackupRequestsStrategyStatsConsumer; import com.linkedin.d2.balancer.event.EventEmitter; +import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy; import com.linkedin.d2.balancer.strategies.LoadBalancerStrategyFactory; import com.linkedin.d2.balancer.util.WarmUpLoadBalancer; import com.linkedin.d2.balancer.util.downstreams.DownstreamServicesFetcher; @@ -24,6 +25,7 @@ import com.linkedin.d2.balancer.util.partitions.PartitionAccessorRegistry; import com.linkedin.d2.balancer.zkfs.ZKFSTogglingLoadBalancerFactoryImpl; import com.linkedin.d2.balancer.zkfs.ZKFSTogglingLoadBalancerFactoryImpl.ComponentFactory; +import com.linkedin.d2.discovery.stores.zk.ZkConnectionDealer; import com.linkedin.d2.discovery.stores.zk.ZooKeeper; import com.linkedin.r2.transport.common.TransportClientFactory; import java.util.Collections; @@ -75,8 +77,9 @@ public class D2ClientConfig EventEmitter eventEmitter = null; PartitionAccessorRegistry partitionAccessorRegistry = null; Function zooKeeperDecorator = null; - Map> loadBalancerStrategyFactories = Collections.emptyMap(); + Map> loadBalancerStrategyFactories = Collections.emptyMap(); boolean requestTimeoutHandlerEnabled = false; + ZkConnectionDealer connectionDealer = null; private static final int DEAULT_RETRY_LIMIT = 3; @@ -120,8 +123,9 @@ public D2ClientConfig() PartitionAccessorRegistry partitionAccessorRegistry, Function zooKeeperDecorator, boolean enableSaveUriDataOnDisk, - Map> loadBalancerStrategyFactories, - boolean requestTimeoutHandlerEnabled) + Map> loadBalancerStrategyFactories, + boolean requestTimeoutHandlerEnabled, + ZkConnectionDealer connectionDealer) { this.zkHosts = zkHosts; this.zkSessionTimeoutInMs = zkSessionTimeoutInMs; @@ -161,5 +165,6 @@ public D2ClientConfig() this.enableSaveUriDataOnDisk = enableSaveUriDataOnDisk; this.loadBalancerStrategyFactories = loadBalancerStrategyFactories; this.requestTimeoutHandlerEnabled = requestTimeoutHandlerEnabled; + this.connectionDealer = connectionDealer; } } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java index ac6315452c..38dca80c69 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java @@ -58,7 +58,13 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) zkConnectionBuilder.setShutdownAsynchronously(config.shutdownAsynchronously) .setIsSymlinkAware(config.isSymlinkAware).setTimeout((int) config.zkSessionTimeoutInMs); - ZKPersistentConnection zkPersistentConnection = new ZKPersistentConnection(zkConnectionBuilder); + ZKPersistentConnection zkPersistentConnection; + if (config.connectionDealer != null) + { + zkPersistentConnection = config.connectionDealer.getZKPersistentConnection(zkConnectionBuilder); + } else { + zkPersistentConnection = new ZKPersistentConnection(zkConnectionBuilder); + } // init all the stores LastSeenZKStore lsClusterStore = getClusterPropertiesLastSeenZKStore(config, zkPersistentConnection); diff --git a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java index d6bfb726f7..27b93d9545 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java @@ -32,7 +32,10 @@ import com.linkedin.d2.discovery.stores.zk.ZooKeeperEphemeralStore; import com.linkedin.d2.discovery.stores.zk.ZooKeeperStore; import java.util.Collections; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,10 +58,37 @@ public class ZooKeeperConnectionManager private final ZKStoreFactory> _factory; private final ZooKeeperAnnouncer[] _servers; private final AtomicReference> _startupCallback = new AtomicReference>(); + private final ZKPersistentConnection _zkConnection; + /** + * a boolean flag to indicate whether _store is successfully started or not + */ + private volatile boolean _storeStarted = false; + + /** + * Two preconditions have to be met before actual announcing + */ + private volatile boolean _managerStarted = false; + private volatile boolean _storeReady = false; + private volatile ZooKeeperEphemeralStore _store; + public ZooKeeperConnectionManager(ZKPersistentConnection zkConnection, + String zkBasePath, + ZKStoreFactory> factory, + ZooKeeperAnnouncer... servers) + { + _zkBasePath = zkBasePath; + _zkConnection = zkConnection; + _factory = factory; + _servers = servers; + _zkConnection.addListeners(Collections.singletonList(new Listener())); + + _zkConnectString = zkConnection.getZKConnection().getConnectString(); + _zkSessionTimeout = zkConnection.getZKConnection().getTimeout(); + } + public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout, String zkBasePath, ZKStoreFactory> factory, @@ -104,6 +134,7 @@ public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout, public void start(Callback callback) { + _managerStarted = true; if (!_startupCallback.compareAndSet(null, callback)) { throw new IllegalStateException("Already starting"); @@ -111,6 +142,9 @@ public void start(Callback callback) try { _zkConnection.start(); + //Trying to start store here. If the connection is not ready, will return immediately. + //The connection event will trigger the actual store startup + tryStartStore(); LOG.info("Started ZooKeeper connection to {}", _zkConnectString); } catch (Exception e) @@ -122,6 +156,7 @@ public void start(Callback callback) public void shutdown(final Callback callback) { + _managerStarted = false; Callback zkCloseCallback = new CallbackAdapter(callback) { @Override @@ -205,11 +240,8 @@ public void onSuccess(None result) private class Listener implements ZKPersistentConnection.EventListener { - /** - * a boolean flag to indicate whether _store is successfully started or not - */ - private volatile boolean _storeStarted = false; + private volatile boolean _sessionEstablished = false; @Override public void notifyEvent(ZKPersistentConnection.Event event) { @@ -219,7 +251,9 @@ public void notifyEvent(ZKPersistentConnection.Event event) case SESSION_ESTABLISHED: { _store = _factory.createStore(_zkConnection.getZKConnection(), ZKFSUtil.uriPath(_zkBasePath)); - startStore(); + _storeReady = true; + //Trying to start the store. If the manager itself is not started yet, the start will be deferred until start is called. + tryStartStore(); break; } case SESSION_EXPIRED: @@ -232,7 +266,7 @@ public void notifyEvent(ZKPersistentConnection.Event event) { if (!_storeStarted) { - startStore(); + tryStartStore(); } else { @@ -248,54 +282,66 @@ public void notifyEvent(ZKPersistentConnection.Event event) break; } } + } - private void startStore() + /** + * Store should only be started if two conditions are satisfied + * 1. store is ready. store is ready when connection is established + * 2. ZookeeperConnectionManager is started. + */ + private void tryStartStore() + { + if (_managerStarted && _storeReady) { + startStore(); + } + } + + private void startStore() + { + final Callback callback = _startupCallback.getAndSet(null); + final Callback multiCallback = callback != null ? + Callbacks.countDown(callback, _servers.length) : + Callbacks.empty(); + _store.start(new Callback() { - final Callback callback = _startupCallback.getAndSet(null); - final Callback multiCallback = callback != null ? - Callbacks.countDown(callback, _servers.length) : - Callbacks.empty(); - _store.start(new Callback() + @Override + public void onError(Throwable e) { - @Override - public void onError(Throwable e) + LOG.error("Failed to start ZooKeeperEphemeralStore", e); + if (callback != null) { - LOG.error("Failed to start ZooKeeperEphemeralStore", e); - if (callback != null) - { - callback.onError(e); - } + callback.onError(e); } + } - @Override - public void onSuccess(None result) + @Override + public void onSuccess(None result) + { + /* mark store as started */ + _storeStarted = true; + for (ZooKeeperAnnouncer server : _servers) { - /* mark store as started */ - _storeStarted = true; - for (ZooKeeperAnnouncer server : _servers) + server.setStore(_store); + server.start(new Callback() { - server.setStore(_store); - server.start(new Callback() + @Override + public void onError(Throwable e) { - @Override - public void onError(Throwable e) - { - LOG.error("Failed to start server", e); - multiCallback.onError(e); - } - - @Override - public void onSuccess(None result) - { - LOG.info("Started an announcer"); - multiCallback.onSuccess(result); - } - }); - } - LOG.info("Starting {} announcers", (_servers.length)); + LOG.error("Failed to start server", e); + multiCallback.onError(e); + } + + @Override + public void onSuccess(None result) + { + LOG.info("Started an announcer"); + multiCallback.onSuccess(result); + } + }); } - }); - } + LOG.info("Starting {} announcers", (_servers.length)); + } + }); } public interface ZKStoreFactory> diff --git a/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZKConnection.java b/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZKConnection.java index 137a2ed81d..e9c0126764 100644 --- a/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZKConnection.java +++ b/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZKConnection.java @@ -293,6 +293,16 @@ public ZooKeeper getZooKeeper() return zk(); } + public String getConnectString() + { + return _connectString; + } + + public int getTimeout() + { + return _timeout; + } + public void waitForState(KeeperState state, long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException { diff --git a/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZKConnectionBuilder.java b/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZKConnectionBuilder.java index 856f92a386..0460987486 100644 --- a/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZKConnectionBuilder.java +++ b/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZKConnectionBuilder.java @@ -17,6 +17,7 @@ package com.linkedin.d2.discovery.stores.zk; import com.linkedin.util.ArgumentUtil; +import java.util.Objects; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; @@ -54,6 +55,19 @@ public ZKConnectionBuilder(String connectString) _connectString = connectString; } + public ZKConnectionBuilder(ZKConnectionBuilder builder) + { + _connectString = builder._connectString; + _sessionTimeout = builder._sessionTimeout; + _shutdownAsynchronously = builder._shutdownAsynchronously; + _retryLimit = builder._retryLimit; + _isSymlinkAware = builder._isSymlinkAware; + _exponentialBackoff = builder._exponentialBackoff; + _retryScheduler = builder._retryScheduler; + _initInterval = builder._initInterval; + _zkDecorator = builder._zkDecorator; + } + /** * @param sessionTimeout session timeout in milliseconds */ @@ -80,7 +94,6 @@ public ZKConnectionBuilder setRetryLimit(int retryLimit) _retryLimit = retryLimit; return this; } - /** * @param isSymlinkAware Resolves znodes whose name is prefixed with a * dollar sign '$' (eg. /$symlink1, /foo/bar/$symlink2) @@ -133,4 +146,27 @@ public ZKConnection build() return new ZKConnection(_connectString, _sessionTimeout, _retryLimit, _exponentialBackoff, _retryScheduler, _initInterval, _shutdownAsynchronously, _isSymlinkAware, _zkDecorator); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ZKConnectionBuilder that = (ZKConnectionBuilder) o; + return _sessionTimeout == that._sessionTimeout && _shutdownAsynchronously == that._shutdownAsynchronously + && _retryLimit == that._retryLimit && _isSymlinkAware == that._isSymlinkAware + && _exponentialBackoff == that._exponentialBackoff && _initInterval == that._initInterval && Objects.equals( + _connectString, that._connectString) && Objects.equals(_retryScheduler, that._retryScheduler) && Objects.equals( + _zkDecorator, that._zkDecorator); + } + + @Override + public int hashCode() { + + return Objects.hash(_connectString, _sessionTimeout, _shutdownAsynchronously, _retryLimit, _isSymlinkAware, + _exponentialBackoff, _retryScheduler, _initInterval, _zkDecorator); + } } \ No newline at end of file diff --git a/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZKPersistentConnection.java b/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZKPersistentConnection.java index b101bee443..d9948a25da 100644 --- a/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZKPersistentConnection.java +++ b/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZKPersistentConnection.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +50,13 @@ public class ZKPersistentConnection private Set _listeners; private State _state = State.INIT; + //the number of users currently having the connection running + private AtomicInteger _activeUserCount; + //the number of users who obtained the connection from the ZKConnectionDealer during construction. + private AtomicInteger _registeredUserCount; + //the flag to indicate that the connection has been forcefully shutdown by framework + private volatile boolean _hasForcefullyShutdown; + private enum State { INIT, STARTED, @@ -163,8 +171,9 @@ public ZKPersistentConnection(ZKConnectionBuilder zkConnectionBuilder) _zkConnection = _zkConnectionBuilder.build(); _zkConnection.addStateListener(new Listener()); _listeners = new HashSet<>(); - - + _activeUserCount = new AtomicInteger(0); + _registeredUserCount = new AtomicInteger(0); + _hasForcefullyShutdown = false; } /** @@ -191,13 +200,23 @@ public void addListeners(Collection listeners) } } + /** + * Called when an additional user requested the connection + */ + public void incrementShareCount() + { + _registeredUserCount.incrementAndGet(); + } + public void start() throws IOException { synchronized (_mutex) { + _activeUserCount.getAndIncrement(); if (_state != State.INIT) { - throw new IllegalStateException("Can not start ZKConnection when " + _state); + // if it is not the first time we started it, we just increment the active user count and return + return; } _state = State.STARTED; _listeners = Collections.unmodifiableSet(_listeners); @@ -209,15 +228,60 @@ public void shutdown() throws InterruptedException { synchronized (_mutex) { + if (_hasForcefullyShutdown) + { + LOG.warn("The connection has already been forcefully shutdown"); + return; + } if (_state != State.STARTED) { throw new IllegalStateException("Can not shutdown ZKConnection when " + _state); } + int remainingActiveUserCount = _activeUserCount.decrementAndGet(); + int remainingRegisteredUserCount = _registeredUserCount.decrementAndGet(); + if (remainingActiveUserCount > 0 || remainingRegisteredUserCount > 0) + { + //connection can only be shut down if + // 1. no one is using it + // 2. everyone who has shared it has finished using it. + return; + } _state = State.STOPPED; _zkConnection.shutdown(); } } + /** + * This method is intended to be called at the end of framework lifecycle to ensure graceful shutdown, normal shutdown operation should + * be carried out with the method above. + */ + public void forceShutdown() throws InterruptedException + { + synchronized (_mutex) + { + if (_state != State.STARTED) + { + LOG.warn("Unnecessary to forcefully shutdown a zkPersistentConnection that is either not started or already stopped"); + return; + } + _hasForcefullyShutdown = true; + int remainingActiveUserCount = _activeUserCount.get(); + if (remainingActiveUserCount != 0) + { + LOG.warn("Forcefully shutting down ZkPersistentConnection when there still are" + remainingActiveUserCount + + " active users"); + } + _state = State.STOPPED; + try + { + _zkConnection.shutdown(); + } catch (IllegalStateException e) + { + LOG.warn("trying to forcefully shutdown zk connection but encountered:" + e.getMessage()); + } + } + } + public ZooKeeper getZooKeeper() { synchronized (_mutex) @@ -234,6 +298,22 @@ public ZKConnection getZKConnection() } } + public boolean isConnectionStarted() + { + synchronized (_mutex) + { + return _state == State.STARTED; + } + } + + public boolean isConnectionStopped() + { + synchronized (_mutex) + { + return _state == State.STOPPED; + } + } + private class Listener implements ZKConnection.StateListener { private long _sessionId; diff --git a/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZkConnectionDealer.java b/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZkConnectionDealer.java new file mode 100644 index 0000000000..c26f1e1932 --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZkConnectionDealer.java @@ -0,0 +1,92 @@ +/* + Copyright (c) 2018 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.d2.discovery.stores.zk; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Entity; + + +/** + * This class is used to dispatch ZkPersistentConnection based on the config provided in ZKConnectionBuilder. + * This allows us to have a centralized place to generate connections to Zookeeper as well as share this connection + * whenever possible. + * + * NOTE: this class is intended to be used during object initialization phase before starting/running the application. + * This is because connection event listeners can only be added before connection start. + */ +public class ZkConnectionDealer { + private static final Logger LOG = LoggerFactory.getLogger(ZkConnectionDealer.class); + + private Map _sharedConnections; + + + public ZkConnectionDealer() { + _sharedConnections = new HashMap<>(); + } + + /** + * Returns either a new connection to zookeeper if no connection is shareable or an old connection if the config is identical to one we had before + * @param zkConnectionBuilder ZKConnectionBuilder with desired Zookeeper config values. + * @return a ZKPersistentConnection + */ + public ZKPersistentConnection getZKPersistentConnection(ZKConnectionBuilder zkConnectionBuilder) { + final ZKConnectionBuilder builder= new ZKConnectionBuilder(zkConnectionBuilder); + ZKPersistentConnection connection; + + synchronized (_sharedConnections){ + if (_sharedConnections.containsKey(builder)) { + connection = _sharedConnections.get(builder); + if (connection.isConnectionStarted()) + { + LOG.warn("There is a connection with the same parameters that are already started. Opening a new connection now. Please consider constructing connections before startup."); + return new ZKPersistentConnection(builder); + } + } else { + connection = new ZKPersistentConnection(builder); + _sharedConnections.put(builder, connection); + } + } + connection.incrementShareCount(); + return connection; + } + + /** + * Since connections are shared, if registered users did not use the connection, the connection can't be closed unless we manually close it. + * @throws InterruptedException + */ + public void ensureConnectionClosed() throws InterruptedException + { + Collection connectionList = _sharedConnections.values(); + for (ZKPersistentConnection connection : connectionList) { + if (!connection.isConnectionStopped()) + { + connection.forceShutdown(); + } + } + } + + public int getZkConnectionCount() { + return _sharedConnections.size(); + } +} diff --git a/d2/src/test/java/com/linkedin/d2/discovery/stores/zk/TestZKPersistentConnection.java b/d2/src/test/java/com/linkedin/d2/discovery/stores/zk/TestZKPersistentConnection.java index eb687a2315..aa6d6d8a13 100644 --- a/d2/src/test/java/com/linkedin/d2/discovery/stores/zk/TestZKPersistentConnection.java +++ b/d2/src/test/java/com/linkedin/d2/discovery/stores/zk/TestZKPersistentConnection.java @@ -20,11 +20,23 @@ package com.linkedin.d2.discovery.stores.zk; +import com.linkedin.common.callback.Callback; +import com.linkedin.common.callback.Callbacks; +import com.linkedin.common.callback.FutureCallback; +import com.linkedin.common.util.None; import java.io.IOException; +import java.util.ArrayList; +import java.util.Calendar; import java.util.Collections; import java.util.Date; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -32,6 +44,7 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.testng.Assert; +import org.testng.annotations.Listeners; import org.testng.annotations.Test; import static org.testng.Assert.fail; @@ -192,6 +205,86 @@ public void testFailureAddListenersAfterStart() } } + @Test + public void testMultipleUsersOnSingleConnection() throws Exception { + int port = 2120; + int numUsers = 10; + Random random = new Random(); + ZKServer server = new ZKServer(port); + server.startup(); + ZKPersistentConnection c = + new ZKPersistentConnection(new ZKConnectionBuilder("localhost:" + port).setTimeout(15000)); + ExecutorService executor = Executors.newFixedThreadPool(numUsers); + AtomicInteger notificationCount = new AtomicInteger(0); + + for (int i = 0; i < numUsers; i++) { + ZKPersistentConnection.EventListener listener = new ZKPersistentConnection.EventListener() { + @Override + public void notifyEvent(ZKPersistentConnection.Event event) { + notificationCount.getAndIncrement(); + } + }; + c.addListeners(Collections.singletonList(listener)); + c.incrementShareCount(); + } + + FutureCallback callback = new FutureCallback(); + Callback multiCallback = Callbacks.countDown(callback, numUsers); + for (int i = 0; i < numUsers; i++) { + final int userIndex = i; + executor.submit(new Runnable() { + @Override + public void run() { + try { + // start after indeterminate delay to simulate interleaved startup and shutdown + Thread.sleep(Math.abs(random.nextInt()) % 100); + c.start(); + + //test live connection + c.getZooKeeper().exists("/test", false); + + c.shutdown(); + multiCallback.onSuccess(None.none()); + } catch (Exception e) { + multiCallback.onError(e); + } + } + }); + } + + callback.get(5000, TimeUnit.MILLISECONDS); + Assert.assertTrue(notificationCount.get() == 10); + Assert.assertTrue(c.isConnectionStopped()); + server.shutdown(); + executor.shutdown(); + } + + @Test + public void testNormalUsercaseWithoutSharing() throws IOException, InterruptedException, KeeperException + { + int port = 2120; + int numUsers = 10; + Random random = new Random(); + ZKServer server = new ZKServer(port); + server.startup(); + + ZKConnectionBuilder builder = new ZKConnectionBuilder("localhost:" + port); + builder.setTimeout(15000); + ZKPersistentConnection connection = new ZKPersistentConnection(builder); + + connection.start(); + Assert.assertTrue(connection.isConnectionStarted()); + + connection.getZooKeeper().exists("/test", false); + + connection.shutdown(); + Assert.assertTrue(connection.isConnectionStopped()); + server.shutdown(); + } + + + + private static class TestListener implements ZKPersistentConnection.EventListener { private final Lock _lock = new ReentrantLock(); diff --git a/d2/src/test/java/com/linkedin/d2/discovery/stores/zk/ZkConnectionBuilderTest.java b/d2/src/test/java/com/linkedin/d2/discovery/stores/zk/ZkConnectionBuilderTest.java new file mode 100644 index 0000000000..03c62395ea --- /dev/null +++ b/d2/src/test/java/com/linkedin/d2/discovery/stores/zk/ZkConnectionBuilderTest.java @@ -0,0 +1,68 @@ +/* + Copyright (c) 2018 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.d2.discovery.stores.zk; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class ZkConnectionBuilderTest { + @Test + public void testBuilderEquality() { + ZKConnectionBuilder builder1 = new ZKConnectionBuilder("localhost:2121"); + ZKConnectionBuilder builder2 = new ZKConnectionBuilder("localhost:2121"); + ZKConnectionBuilder builder3 = new ZKConnectionBuilder("localhost:2121"); + ZKConnectionBuilder builder4 = new ZKConnectionBuilder("localhost:2121"); + + builder1.setInitInterval(20); + builder1.setRetryLimit(10); + builder1.setTimeout(100); + builder1.setExponentialBackoff(true); + builder1.setIsSymlinkAware(true); + builder1.setShutdownAsynchronously(true); + + builder2.setInitInterval(20); + builder2.setRetryLimit(10); + builder2.setTimeout(100); + builder2.setExponentialBackoff(true); + builder2.setIsSymlinkAware(true); + builder2.setShutdownAsynchronously(true); + + builder3.setInitInterval(20); + builder3.setRetryLimit(10); + builder3.setTimeout(100); + builder3.setExponentialBackoff(true); + builder3.setIsSymlinkAware(false); + builder3.setShutdownAsynchronously(true); + + builder4.setInitInterval(20); + builder4.setRetryLimit(10); + builder4.setTimeout(100); + builder4.setExponentialBackoff(false); + builder4.setIsSymlinkAware(true); + builder4.setShutdownAsynchronously(true); + + Set set = new HashSet<>(); + set.add(builder1); + Assert.assertTrue(set.contains(builder2)); + Assert.assertTrue(!set.contains(builder3)); + Assert.assertTrue(!set.contains(builder4)); + } +} diff --git a/d2/src/test/java/com/linkedin/d2/discovery/stores/zk/ZkConnectionDealerTest.java b/d2/src/test/java/com/linkedin/d2/discovery/stores/zk/ZkConnectionDealerTest.java new file mode 100644 index 0000000000..5ef7a6f6d8 --- /dev/null +++ b/d2/src/test/java/com/linkedin/d2/discovery/stores/zk/ZkConnectionDealerTest.java @@ -0,0 +1,617 @@ +/* + Copyright (c) 2018 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.d2.discovery.stores.zk; + +import com.linkedin.common.callback.Callback; +import com.linkedin.common.callback.Callbacks; +import com.linkedin.common.callback.FutureCallback; +import com.linkedin.common.callback.MultiCallback; +import com.linkedin.common.util.None; +import com.linkedin.d2.balancer.D2Client; +import com.linkedin.d2.balancer.D2ClientBuilder; +import com.linkedin.d2.balancer.Directory; +import com.linkedin.d2.balancer.LastSeenBalancerWithFacilitiesFactory; +import com.linkedin.d2.balancer.properties.ClusterProperties; +import com.linkedin.d2.balancer.properties.ClusterPropertiesJsonSerializer; +import com.linkedin.d2.balancer.properties.PartitionData; +import com.linkedin.d2.balancer.properties.ServiceProperties; +import com.linkedin.d2.balancer.properties.ServicePropertiesJsonSerializer; +import com.linkedin.d2.balancer.properties.UriProperties; +import com.linkedin.d2.balancer.properties.UriPropertiesJsonSerializer; +import com.linkedin.d2.balancer.properties.UriPropertiesMerger; +import com.linkedin.d2.balancer.servers.ZKUriStoreFactory; +import com.linkedin.d2.balancer.servers.ZooKeeperAnnouncer; +import com.linkedin.d2.balancer.servers.ZooKeeperConnectionManager; +import com.linkedin.d2.balancer.servers.ZooKeeperServer; +import com.linkedin.d2.balancer.util.HostSet; +import com.linkedin.d2.balancer.util.partitions.DefaultPartitionAccessor; +import com.linkedin.d2.balancer.zkfs.ZKFSUtil; +import com.linkedin.r2.message.RequestContext; +import com.linkedin.r2.message.rest.RestRequest; +import com.linkedin.r2.message.rest.RestRequestBuilder; +import com.linkedin.r2.message.rest.RestResponse; +import com.linkedin.r2.transport.common.TransportClientFactory; +import com.linkedin.r2.transport.common.bridge.client.TransportClient; +import com.linkedin.r2.transport.common.bridge.common.TransportCallback; +import com.linkedin.r2.transport.common.bridge.common.TransportResponse; +import java.io.IOException; +import java.net.URI; +import java.sql.Blob; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.zookeeper.KeeperException; +import org.testng.Assert; +import org.testng.annotations.Test; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.AfterMethod; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.fail; + + + + +public class ZkConnectionDealerTest { + private ZkConnectionDealer _dealer; + private ZKServer _zkServer; + + private int NUM_DEFAULT_SHAREABLE_BUILDERS = 5; + + private static final String CLUSTER_NAME = "testCluster"; + private static final String SERVICE_NAME = "testService"; + private static final String ZKBASE_PATH = "/d2"; + private static final int ZK_PORT = 2120; + private static final int ZK_TIMEOUT = 5000; + private static final int BLOCKING_CALL_TIMEOUT = 5000; + + private ZooKeeperPermanentStore _serviceRegistry; + private ZooKeeperPermanentStore _clusterRegistry; + private ZooKeeperEphemeralStore _verificationStore; + private ExecutorService _threadPoolExecutor; + + @BeforeMethod + public void setUp() throws Exception { + _dealer = new ZkConnectionDealer(); + try { + _zkServer = new ZKServer(ZK_PORT); + _zkServer.startup(); + } catch (IOException e) { + fail("unable to instantiate real zk server on port " + ZK_PORT); + } + + ZKConnection serviceZkConn = new ZKConnectionBuilder("localhost:" + ZK_PORT).setTimeout(5000).build(); + ZKConnection clusterZkConn = new ZKConnectionBuilder("localhost:" + ZK_PORT).setTimeout(5000).build(); + + _serviceRegistry = + new ZooKeeperPermanentStore(serviceZkConn, new ServicePropertiesJsonSerializer(), + ZKFSUtil.servicePath(ZKBASE_PATH)); + _clusterRegistry = + new ZooKeeperPermanentStore(clusterZkConn, new ClusterPropertiesJsonSerializer(), + ZKFSUtil.clusterPath(ZKBASE_PATH)); + + FutureCallback storesStartupCallBack = new FutureCallback(); + Callback multiStartupCallback = Callbacks.countDown(storesStartupCallBack, 2); + serviceZkConn.start(); + clusterZkConn.start(); + _serviceRegistry.start(multiStartupCallback); + _clusterRegistry.start(multiStartupCallback); + storesStartupCallBack.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + + FutureCallback propertiesSetupCallback = new FutureCallback(); + Callback multiPropertiesCallback = Callbacks.countDown(propertiesSetupCallback, 2); + + ServiceProperties serviceProps = + new ServiceProperties(SERVICE_NAME, CLUSTER_NAME, "/testService", Arrays.asList("degrader"), + Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Arrays.asList("http"), + Collections.emptySet()); + _serviceRegistry.put(SERVICE_NAME, serviceProps, multiPropertiesCallback); + ClusterProperties clusterProps = new ClusterProperties(CLUSTER_NAME); + _clusterRegistry.put(CLUSTER_NAME, clusterProps, multiPropertiesCallback); + propertiesSetupCallback.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + + _verificationStore = createAndStartVerificationStore(); + _threadPoolExecutor = Executors.newFixedThreadPool(10); + } + + @AfterMethod + public void tearDown() throws IOException, InterruptedException, ExecutionException, TimeoutException{ + FutureCallback shutdownCallback = new FutureCallback<>(); + Callback multiCallback = Callbacks.countDown(shutdownCallback,3); + _serviceRegistry.shutdown(multiCallback); + _clusterRegistry.shutdown(multiCallback); + _verificationStore.shutdown(multiCallback); + _threadPoolExecutor.shutdown(); + shutdownCallback.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + _zkServer.shutdown(); + } + + /** + * create and start a uri store to verify announcement + */ + private ZooKeeperEphemeralStore createAndStartVerificationStore() + throws IOException, ExecutionException, InterruptedException, TimeoutException{ + ZKConnection zkClient = new ZKConnection("localhost:" + ZK_PORT, 5000); + zkClient.start(); + + ZooKeeperEphemeralStore store = + new ZooKeeperEphemeralStore(zkClient, new UriPropertiesJsonSerializer(), + new UriPropertiesMerger(), ZKFSUtil.uriPath(ZKBASE_PATH)); + FutureCallback callback = new FutureCallback(); + store.start(callback); + callback.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + return store; + } + + /** + * Generate some fake host names for testing. + */ + private List prepareHostNames(int count, String name) throws Exception { + List hostNames = new ArrayList<>(); + for (int i = 0; i < count; i++) { + hostNames.add(new URI("http://" + name + "_" + i + ".test.com")); + } + return hostNames; + } + + /** + * For each given uri, generate a zookeeperConnectionManager for announcement + */ + + private List prepareConnectionManagers(List hostNames) throws Exception { + List connectionManagers = new ArrayList<>(); + for (URI uri : hostNames) { + ZooKeeperServer server = new ZooKeeperServer(); + ZooKeeperAnnouncer announcer = new ZooKeeperAnnouncer(server, true); + announcer.setCluster(CLUSTER_NAME); + announcer.setUri(uri.toString()); + Map partitionWeight = new HashMap(); + partitionWeight.put(DefaultPartitionAccessor.DEFAULT_PARTITION_ID, new PartitionData(0.5d)); + announcer.setPartitionData(partitionWeight); + + ZooKeeperConnectionManager.ZKStoreFactory> factory = new ZKUriStoreFactory(); + ZKConnectionBuilder connectionBuilder = new ZKConnectionBuilder("localhost:" + ZK_PORT); + connectionBuilder.setTimeout(ZK_TIMEOUT); + ZKPersistentConnection connection = _dealer.getZKPersistentConnection(connectionBuilder); + ZooKeeperConnectionManager connectionManager = + new ZooKeeperConnectionManager(connection, ZKBASE_PATH, factory, announcer); + connectionManagers.add(connectionManager); + } + return connectionManagers; + } + + private void shutdownConnectionManagers(List managers) throws Exception { + FutureCallback shutdownCallback = new FutureCallback(); + Callback shutdownMulitCallback = Callbacks.countDown(shutdownCallback, managers.size()); + for (ZooKeeperConnectionManager manager : managers) { + _threadPoolExecutor.submit(() -> manager.shutdown(shutdownMulitCallback)); + } + shutdownCallback.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + } + + private void startConnectionManagers(List managers) throws Exception { + FutureCallback markupCallback = new FutureCallback<>(); + Callback markupMultiCallback = Callbacks.countDown(markupCallback, managers.size()); + for (ZooKeeperConnectionManager manager : managers) { + _threadPoolExecutor.submit(() -> { + try { + //Using random sleep to introduce delay to simulate uncertainty during real environment. + Thread.sleep(Math.abs(new Random().nextInt()) % 100); + manager.start(markupMultiCallback); + } catch (Exception e) { + markupMultiCallback.onError(new RuntimeException("Announcing failed for host: " + manager.getAnnouncers()[0].getUri() + " due to: " + e.getMessage(), e)); + } + }); + } + markupCallback.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + } + + private List identicalBuildersSetUp() { + List builders = new ArrayList<>(); + for (int i = 0; i < NUM_DEFAULT_SHAREABLE_BUILDERS; i++) { + ZKConnectionBuilder builder = new ZKConnectionBuilder("localhost:2120"); + builder.setInitInterval(20); + builder.setRetryLimit(10); + builder.setTimeout(100); + builder.setExponentialBackoff(true); + builder.setIsSymlinkAware(true); + builder.setShutdownAsynchronously(true); + builders.add(builder); + } + return builders; + } + + private Callback decorateNoneCallback(Callback callback) { + return Callbacks.handle(result -> callback.onSuccess(None.none()), callback); + } + + /** + * Obtain the d2client with the same setup. + */ + private D2Client getD2Client(Map transportClientFactoryMap) { + D2ClientBuilder d2ClientBuilder = new D2ClientBuilder(); + d2ClientBuilder.setZkHosts("localhost:" + ZK_PORT) + .setZkSessionTimeout(ZK_TIMEOUT, TimeUnit.MILLISECONDS) + .setZKConnectionDealer(_dealer) + .setLoadBalancerWithFacilitiesFactory(new LastSeenBalancerWithFacilitiesFactory()) + .setClientFactories(transportClientFactoryMap); + return d2ClientBuilder.build(); + } + + private void fireTestRequests(D2Client client, int numRequest, FutureCallback finishCallback) throws Exception { + Callback reqMultiCallback = Callbacks.countDown(finishCallback, numRequest); + for (int i = 0; i < numRequest; i++) { + RestRequestBuilder builder = new RestRequestBuilder(new URI("d2://testService")); + client.restRequest(builder.build(), decorateNoneCallback(reqMultiCallback)); + } + } + /** + * Tests begin + */ + + @Test + public void TestZkConnectionDealerBasic() { + List builders = identicalBuildersSetUp(); + List connections = new ArrayList<>(); + for (int i = 0; i < NUM_DEFAULT_SHAREABLE_BUILDERS; i++) { + connections.add(_dealer.getZKPersistentConnection(builders.get(i))); + } + ZKPersistentConnection firstConn = connections.get(0); + for (ZKPersistentConnection conn : connections) { + Assert.assertTrue(conn == firstConn); + } + + ZKConnectionBuilder differentBuilder = new ZKConnectionBuilder("localhost:2122"); + ZKPersistentConnection differentConnection = _dealer.getZKPersistentConnection(differentBuilder); + Assert.assertTrue(differentConnection != firstConn); + + Assert.assertTrue(_dealer.getZkConnectionCount() == 2); + } + + /** + * Test both markUp and markDown when using only one connection. + */ + @Test(groups = "needZk") + public void testMarkUpAndMarkDownSharingConnection() throws Exception { + List hostNames = prepareHostNames(5, "testMarkUpAndMarkDownSharingConnection"); + List connectionManagers = prepareConnectionManagers(hostNames); + + //announce all five hosts + startConnectionManagers(connectionManagers); + + UriProperties properties = _verificationStore.get(CLUSTER_NAME); + assertNotNull(properties); + assertEquals(properties.Uris().size(), 5); + + + FutureCallback markdownCallback = new FutureCallback<>(); + Callback markdownMultiCallback = Callbacks.countDown(markdownCallback, 2); + //markdown three hosts + for (ZooKeeperConnectionManager manager : connectionManagers.subList(0, 2)) { + _threadPoolExecutor.submit(() -> { + try { + //Using random sleep to introduce delay to simulate uncertainty during real environment. + Thread.sleep(Math.abs(new Random().nextInt()) % 100); + manager.getAnnouncers()[0].markDown(markdownMultiCallback); + } catch (Exception e) { + markdownMultiCallback.onError(new RuntimeException("MarkDown failed for host: " + manager.getAnnouncers()[0].getUri() + "due to: " + e.getMessage(), e)); + } + }); + } + + markdownCallback.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + + UriProperties newProperties = _verificationStore.get(CLUSTER_NAME); + assertNotNull(newProperties); + assertEquals(newProperties.Uris().size(), 3); + + shutdownConnectionManagers(connectionManagers); + } + + /** + * Test announcing many hosts using one connection concurrently + */ + @Test(groups = "needZk") + public void testManyHostsAnnouncementSharingConnections() throws Exception { + List hostNames = prepareHostNames(100, "testManyHostsAnnouncementSharingConnections"); + List connectionManagers = prepareConnectionManagers(hostNames); + + startConnectionManagers(connectionManagers); + + UriProperties newProperties = _verificationStore.get(CLUSTER_NAME); + assertNotNull(newProperties); + assertEquals(newProperties.Uris().size(), 100); + + shutdownConnectionManagers(connectionManagers); + } + + /** + * Testing sharing connections between announcers and d2client + * @throws Exception + */ + @Test(groups = "needZk") + public void testAnnouncerAndClientSharing() throws Exception { + //connection shared to announcers + List hostNames = prepareHostNames(20, "testAnnouncerAndClientSharing"); + List connectionManagers = prepareConnectionManagers(hostNames); + int l = 1; + //set up a mock transport client + Map transportClientMap = new HashMap<>(); + TestTransportClientFactory testClientFactory = new TestTransportClientFactory(); + transportClientMap.put("http", testClientFactory); + + //connection shared to d2client + D2Client client = getD2Client(transportClientMap); + + //there should only be one connection + Assert.assertTrue(_dealer.getZkConnectionCount() == 1); + + + //start both announcers and client + FutureCallback startUpCallback = new FutureCallback(); + Callback startUpMultiCallback = Callbacks.countDown(startUpCallback, connectionManagers.size() + 1); + + _threadPoolExecutor.submit(() -> client.start(startUpMultiCallback)); + for (ZooKeeperConnectionManager manager : connectionManagers) { + _threadPoolExecutor.submit(() -> manager.start(startUpMultiCallback)); + } + startUpCallback.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + + //verify zookeeper is updated + UriProperties properties = _verificationStore.get(CLUSTER_NAME); + assertNotNull(properties); + assertEquals(properties.Uris().size(), 20); + + //fire some requests to make sure announcement is successful and hosts properties can be retrieved successfully. + int requestRepeat = 1000; + FutureCallback reqCallback = new FutureCallback(); + fireTestRequests(client, requestRepeat, reqCallback); + reqCallback.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + + //verify d2client received the changes + HostSet hosts = client.getFacilities().getKeyMapper().getAllPartitionsMultipleHosts(new URI("d2://testService"), 20); + Assert.assertEquals(hosts.getAllHosts().size(), 20); + Assert.assertEquals(testClientFactory.requestCount.get(), 1000); + + + //Markdown half of the hosts and test the results + FutureCallback hostsMarkdownCallback = new FutureCallback(); + Callback hostsMarkdownMultiCallback = Callbacks.countDown(hostsMarkdownCallback,10); + for (ZooKeeperConnectionManager manager : connectionManagers.subList(0,10)) { + _threadPoolExecutor.submit(() -> manager.getAnnouncers()[0].markDown(hostsMarkdownMultiCallback)); + } + hostsMarkdownCallback.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + + //verify zookeeper is updated + properties = _verificationStore.get(CLUSTER_NAME); + assertNotNull(properties); + assertEquals(properties.Uris().size(), 10); + + //fire some requests to make sure announcement is successful and hosts properties can be retrieved successfully. + FutureCallback secondReqCallback = new FutureCallback(); + fireTestRequests(client, requestRepeat, secondReqCallback); + secondReqCallback.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + + //verify d2client can read the zookeeper updates. + hosts = client.getFacilities().getKeyMapper().getAllPartitionsMultipleHosts(new URI("d2://testService"), 20); + Assert.assertEquals(hosts.getAllHosts().size(), 10); + Assert.assertEquals(testClientFactory.requestCount.get(), 2000); + + //Mix announcements with request firing to test connection robustness. + FutureCallback thirdReqCallback = new FutureCallback(); + Callback thirdReqMultiCallback = Callbacks.countDown(thirdReqCallback, requestRepeat + 10); + for (int i = 0; i < requestRepeat; i++) { + _threadPoolExecutor.submit(() -> { + try{ + RestRequestBuilder builder = new RestRequestBuilder(new URI("d2://testService")); + client.restRequest(builder.build(), decorateNoneCallback(thirdReqMultiCallback)); + }catch (Exception e){ + throw new RuntimeException(e); + } + }); + if (i % 100 == 0) { + //markup one host every 100 requests + ZooKeeperConnectionManager manager = connectionManagers.get(i / 100); + _threadPoolExecutor.submit(() -> { + try{ + manager.getAnnouncers()[0].markUp(thirdReqMultiCallback); + }catch (Exception e){ + throw new RuntimeException(e); + } + }); + } + } + + thirdReqCallback.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + Assert.assertEquals(testClientFactory.requestCount.get(), 3000); + + + //announcers can be shutdown after announcing, without affecting client. This should not happen though. + FutureCallback announcerShutdownCallback = new FutureCallback(); + Callback announcersShutdownCallback = Callbacks.countDown(announcerShutdownCallback, connectionManagers.size()); + for (ZooKeeperConnectionManager manager : connectionManagers) { + manager.shutdown(announcersShutdownCallback); + } + announcerShutdownCallback.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + + + //fire some requests to make sure d2client is still usable. + FutureCallback fourthReqCallback = new FutureCallback(); + fireTestRequests(client, requestRepeat, fourthReqCallback); + thirdReqCallback.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + + hosts = client.getFacilities().getKeyMapper().getAllPartitionsMultipleHosts(new URI("d2://testService"), 20); + Assert.assertEquals(hosts.getAllHosts().size(), 20); + Assert.assertEquals(testClientFactory.requestCount.get(), 4000); + + + //test done! + FutureCallback clientShutdownCallback = new FutureCallback(); + client.shutdown(clientShutdownCallback); + clientShutdownCallback.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + + //make sure the connection is properly stopped. + ZKPersistentConnection connection = _dealer.getZKPersistentConnection(new ZKConnectionBuilder("localhost:" + ZK_PORT).setTimeout(ZK_TIMEOUT)); + Assert.assertNotNull(connection); + Assert.assertTrue(connection.isConnectionStopped()); + } + + /** + * Test that when there is an zookeeper property update, d2client can receive the update correctly + */ + @Test(groups = "needZK") + public void testZKPropertyUpdate() throws Exception { + List hosts = prepareHostNames(5, "testZKPropertyUpdate"); + List connectionManagers = prepareConnectionManagers(hosts); + + Map transportClientMap = new HashMap(); + transportClientMap.put("http", new TestTransportClientFactory()); + + // connection shared to d2client + D2Client client = getD2Client(transportClientMap); + + FutureCallback startupCallback = new FutureCallback(); + client.start(startupCallback); + startupCallback.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + + startConnectionManagers(connectionManagers); + + Directory d2Directory = client.getFacilities().getDirectory(); + + List serviceList = new ArrayList<>(); + ServiceProperties serviceProps = + new ServiceProperties("newTestService", CLUSTER_NAME, "/newTestService", Arrays.asList("degrader"), + Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Arrays.asList("http"), + Collections.emptySet()); + + FutureCallback propertyCallback = new FutureCallback(); + _serviceRegistry.put("newTestService", serviceProps, propertyCallback); + propertyCallback.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + + + FutureCallback finishCallback = new FutureCallback(); + d2Directory.getServiceNames(new Callback>() { + @Override + public void onError(Throwable e) { + finishCallback.onError(e); + } + + @Override + public void onSuccess(List result) { + serviceList.addAll(result); + finishCallback.onSuccess(None.none()); + } + }); + + finishCallback.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + Assert.assertEquals(serviceList.size(), 2); + Assert.assertTrue(serviceList.contains("newTestService")); + Assert.assertTrue(serviceList.contains("testService")); + + shutdownConnectionManagers(connectionManagers); + FutureCallback clientShutdownCallback = new FutureCallback(); + client.shutdown(clientShutdownCallback); + clientShutdownCallback.get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); + } + + /** + * Hosts should only be announced when ZookeeperConnectionManager is started. + */ + @Test(groups = "needZk") + public void testAnnouncerNoStartup() throws Exception { + List hosts = prepareHostNames(5, "testAnnouncerNoStartup"); + List connectionManagers = prepareConnectionManagers(hosts); + List managersToStart = connectionManagers.subList(0,3); + Assert.assertTrue(_dealer.getZkConnectionCount() == 1); + + startConnectionManagers(connectionManagers.subList(0,3)); + + //verify that only three managers are started. + UriProperties properties = _verificationStore.get(CLUSTER_NAME); + assertNotNull(properties); + assertEquals(properties.Uris().size(), 3); + + shutdownConnectionManagers(connectionManagers); + } + + private class TestTransportClientFactory implements TransportClientFactory { + + public Map _properties; + public int getClientCount; + public AtomicInteger requestCount = new AtomicInteger(0); + + @Override + public TransportClient getClient(Map properties) { + getClientCount++; + _properties = properties; + return new TransportClient() { + @Override + public void restRequest(RestRequest request, RequestContext requestContext, Map wireAttrs, + TransportCallback callback) { + requestCount.getAndIncrement(); + callback.onResponse(new TransportResponse() { + @Override + public RestResponse getResponse() { + return null; + } + + @Override + public boolean hasError() { + return false; + } + + @Override + public Throwable getError() { + return null; + } + + @Override + public Map getWireAttributes() { + return null; + } + }); + } + + @Override + public void shutdown(Callback callback) { + callback.onSuccess(None.none()); + } + }; + } + + @Override + public void shutdown(Callback callback) { + callback.onSuccess(None.none()); + } + } +} + +