Skip to content

Commit

Permalink
add zookeeper connection sharing feature
Browse files Browse the repository at this point in the history
RB=1260149
G=si-core-reviewers
R=ssheng,dhoa,cxu,fcapponi
A=fcapponi
  • Loading branch information
Alex Jing committed Apr 17, 2018
1 parent e8d3386 commit 1172064
Show file tree
Hide file tree
Showing 12 changed files with 1,117 additions and 53 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG
@@ -1,5 +1,7 @@
20.0.19 20.0.19
------- -------
(RB=1260149)
add zookeeper connection sharing feature


20.0.18 20.0.18
------- -------
Expand Down
11 changes: 10 additions & 1 deletion d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java
Expand Up @@ -35,6 +35,8 @@
import com.linkedin.d2.balancer.util.healthcheck.HealthCheckOperations; import com.linkedin.d2.balancer.util.healthcheck.HealthCheckOperations;
import com.linkedin.d2.balancer.util.partitions.PartitionAccessorRegistry; import com.linkedin.d2.balancer.util.partitions.PartitionAccessorRegistry;
import com.linkedin.d2.balancer.zkfs.ZKFSTogglingLoadBalancerFactoryImpl; import com.linkedin.d2.balancer.zkfs.ZKFSTogglingLoadBalancerFactoryImpl;
import com.linkedin.d2.discovery.stores.zk.ZKConnectionBuilder;
import com.linkedin.d2.discovery.stores.zk.ZkConnectionDealer;
import com.linkedin.d2.discovery.stores.zk.ZooKeeper; import com.linkedin.d2.discovery.stores.zk.ZooKeeper;
import com.linkedin.r2.transport.common.TransportClientFactory; import com.linkedin.r2.transport.common.TransportClientFactory;
import com.linkedin.r2.transport.http.client.HttpClientFactory; import com.linkedin.r2.transport.http.client.HttpClientFactory;
Expand Down Expand Up @@ -131,7 +133,8 @@ public D2Client build()
_config.zooKeeperDecorator, _config.zooKeeperDecorator,
_config.enableSaveUriDataOnDisk, _config.enableSaveUriDataOnDisk,
loadBalancerStrategyFactories, loadBalancerStrategyFactories,
_config.requestTimeoutHandlerEnabled); _config.requestTimeoutHandlerEnabled,
_config.connectionDealer);


final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ? final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ?
new ZKFSLoadBalancerWithFacilitiesFactory() : new ZKFSLoadBalancerWithFacilitiesFactory() :
Expand Down Expand Up @@ -415,6 +418,12 @@ public D2ClientBuilder setRequestTimeoutHandlerEnabled(boolean requestTimeoutHan
return this; return this;
} }


public D2ClientBuilder setZKConnectionDealer(ZkConnectionDealer connectionDealer)
{
_config.connectionDealer = connectionDealer;
return this;
}

private Map<String, TransportClientFactory> createDefaultTransportClientFactories() private Map<String, TransportClientFactory> createDefaultTransportClientFactories()
{ {
final Map<String, TransportClientFactory> clientFactories = new HashMap<String, TransportClientFactory>(); final Map<String, TransportClientFactory> clientFactories = new HashMap<String, TransportClientFactory>();
Expand Down
11 changes: 8 additions & 3 deletions d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java
Expand Up @@ -17,13 +17,15 @@


import com.linkedin.d2.backuprequests.BackupRequestsStrategyStatsConsumer; import com.linkedin.d2.backuprequests.BackupRequestsStrategyStatsConsumer;
import com.linkedin.d2.balancer.event.EventEmitter; import com.linkedin.d2.balancer.event.EventEmitter;
import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy;
import com.linkedin.d2.balancer.strategies.LoadBalancerStrategyFactory; import com.linkedin.d2.balancer.strategies.LoadBalancerStrategyFactory;
import com.linkedin.d2.balancer.util.WarmUpLoadBalancer; import com.linkedin.d2.balancer.util.WarmUpLoadBalancer;
import com.linkedin.d2.balancer.util.downstreams.DownstreamServicesFetcher; import com.linkedin.d2.balancer.util.downstreams.DownstreamServicesFetcher;
import com.linkedin.d2.balancer.util.healthcheck.HealthCheckOperations; import com.linkedin.d2.balancer.util.healthcheck.HealthCheckOperations;
import com.linkedin.d2.balancer.util.partitions.PartitionAccessorRegistry; import com.linkedin.d2.balancer.util.partitions.PartitionAccessorRegistry;
import com.linkedin.d2.balancer.zkfs.ZKFSTogglingLoadBalancerFactoryImpl; import com.linkedin.d2.balancer.zkfs.ZKFSTogglingLoadBalancerFactoryImpl;
import com.linkedin.d2.balancer.zkfs.ZKFSTogglingLoadBalancerFactoryImpl.ComponentFactory; import com.linkedin.d2.balancer.zkfs.ZKFSTogglingLoadBalancerFactoryImpl.ComponentFactory;
import com.linkedin.d2.discovery.stores.zk.ZkConnectionDealer;
import com.linkedin.d2.discovery.stores.zk.ZooKeeper; import com.linkedin.d2.discovery.stores.zk.ZooKeeper;
import com.linkedin.r2.transport.common.TransportClientFactory; import com.linkedin.r2.transport.common.TransportClientFactory;
import java.util.Collections; import java.util.Collections;
Expand Down Expand Up @@ -75,8 +77,9 @@ public class D2ClientConfig
EventEmitter eventEmitter = null; EventEmitter eventEmitter = null;
PartitionAccessorRegistry partitionAccessorRegistry = null; PartitionAccessorRegistry partitionAccessorRegistry = null;
Function<ZooKeeper, ZooKeeper> zooKeeperDecorator = null; Function<ZooKeeper, ZooKeeper> zooKeeperDecorator = null;
Map<String, LoadBalancerStrategyFactory<?>> loadBalancerStrategyFactories = Collections.emptyMap(); Map<String, LoadBalancerStrategyFactory<? extends LoadBalancerStrategy>> loadBalancerStrategyFactories = Collections.emptyMap();
boolean requestTimeoutHandlerEnabled = false; boolean requestTimeoutHandlerEnabled = false;
ZkConnectionDealer connectionDealer = null;


private static final int DEAULT_RETRY_LIMIT = 3; private static final int DEAULT_RETRY_LIMIT = 3;


Expand Down Expand Up @@ -120,8 +123,9 @@ public D2ClientConfig()
PartitionAccessorRegistry partitionAccessorRegistry, PartitionAccessorRegistry partitionAccessorRegistry,
Function<ZooKeeper, ZooKeeper> zooKeeperDecorator, Function<ZooKeeper, ZooKeeper> zooKeeperDecorator,
boolean enableSaveUriDataOnDisk, boolean enableSaveUriDataOnDisk,
Map<String, LoadBalancerStrategyFactory<?>> loadBalancerStrategyFactories, Map<String, LoadBalancerStrategyFactory<? extends LoadBalancerStrategy>> loadBalancerStrategyFactories,
boolean requestTimeoutHandlerEnabled) boolean requestTimeoutHandlerEnabled,
ZkConnectionDealer connectionDealer)
{ {
this.zkHosts = zkHosts; this.zkHosts = zkHosts;
this.zkSessionTimeoutInMs = zkSessionTimeoutInMs; this.zkSessionTimeoutInMs = zkSessionTimeoutInMs;
Expand Down Expand Up @@ -161,5 +165,6 @@ public D2ClientConfig()
this.enableSaveUriDataOnDisk = enableSaveUriDataOnDisk; this.enableSaveUriDataOnDisk = enableSaveUriDataOnDisk;
this.loadBalancerStrategyFactories = loadBalancerStrategyFactories; this.loadBalancerStrategyFactories = loadBalancerStrategyFactories;
this.requestTimeoutHandlerEnabled = requestTimeoutHandlerEnabled; this.requestTimeoutHandlerEnabled = requestTimeoutHandlerEnabled;
this.connectionDealer = connectionDealer;
} }
} }
Expand Up @@ -58,7 +58,13 @@ public LoadBalancerWithFacilities create(D2ClientConfig config)
zkConnectionBuilder.setShutdownAsynchronously(config.shutdownAsynchronously) zkConnectionBuilder.setShutdownAsynchronously(config.shutdownAsynchronously)
.setIsSymlinkAware(config.isSymlinkAware).setTimeout((int) config.zkSessionTimeoutInMs); .setIsSymlinkAware(config.isSymlinkAware).setTimeout((int) config.zkSessionTimeoutInMs);


ZKPersistentConnection zkPersistentConnection = new ZKPersistentConnection(zkConnectionBuilder); ZKPersistentConnection zkPersistentConnection;
if (config.connectionDealer != null)
{
zkPersistentConnection = config.connectionDealer.getZKPersistentConnection(zkConnectionBuilder);
} else {
zkPersistentConnection = new ZKPersistentConnection(zkConnectionBuilder);
}


// init all the stores // init all the stores
LastSeenZKStore<ClusterProperties> lsClusterStore = getClusterPropertiesLastSeenZKStore(config, zkPersistentConnection); LastSeenZKStore<ClusterProperties> lsClusterStore = getClusterPropertiesLastSeenZKStore(config, zkPersistentConnection);
Expand Down
Expand Up @@ -32,7 +32,10 @@
import com.linkedin.d2.discovery.stores.zk.ZooKeeperEphemeralStore; import com.linkedin.d2.discovery.stores.zk.ZooKeeperEphemeralStore;
import com.linkedin.d2.discovery.stores.zk.ZooKeeperStore; import com.linkedin.d2.discovery.stores.zk.ZooKeeperStore;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand All @@ -55,10 +58,37 @@ public class ZooKeeperConnectionManager
private final ZKStoreFactory<UriProperties,ZooKeeperEphemeralStore<UriProperties>> _factory; private final ZKStoreFactory<UriProperties,ZooKeeperEphemeralStore<UriProperties>> _factory;
private final ZooKeeperAnnouncer[] _servers; private final ZooKeeperAnnouncer[] _servers;
private final AtomicReference<Callback<None>> _startupCallback = new AtomicReference<Callback<None>>(); private final AtomicReference<Callback<None>> _startupCallback = new AtomicReference<Callback<None>>();

private final ZKPersistentConnection _zkConnection; private final ZKPersistentConnection _zkConnection;


/**
* a boolean flag to indicate whether _store is successfully started or not
*/
private volatile boolean _storeStarted = false;

/**
* Two preconditions have to be met before actual announcing
*/
private volatile boolean _managerStarted = false;
private volatile boolean _storeReady = false;

private volatile ZooKeeperEphemeralStore<UriProperties> _store; private volatile ZooKeeperEphemeralStore<UriProperties> _store;


public ZooKeeperConnectionManager(ZKPersistentConnection zkConnection,
String zkBasePath,
ZKStoreFactory<UriProperties,ZooKeeperEphemeralStore<UriProperties>> factory,
ZooKeeperAnnouncer... servers)
{
_zkBasePath = zkBasePath;
_zkConnection = zkConnection;
_factory = factory;
_servers = servers;
_zkConnection.addListeners(Collections.singletonList(new Listener()));

_zkConnectString = zkConnection.getZKConnection().getConnectString();
_zkSessionTimeout = zkConnection.getZKConnection().getTimeout();
}

public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout, public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout,
String zkBasePath, String zkBasePath,
ZKStoreFactory<UriProperties,ZooKeeperEphemeralStore<UriProperties>> factory, ZKStoreFactory<UriProperties,ZooKeeperEphemeralStore<UriProperties>> factory,
Expand Down Expand Up @@ -104,13 +134,17 @@ public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout,


public void start(Callback<None> callback) public void start(Callback<None> callback)
{ {
_managerStarted = true;
if (!_startupCallback.compareAndSet(null, callback)) if (!_startupCallback.compareAndSet(null, callback))
{ {
throw new IllegalStateException("Already starting"); throw new IllegalStateException("Already starting");
} }
try try
{ {
_zkConnection.start(); _zkConnection.start();
//Trying to start store here. If the connection is not ready, will return immediately.
//The connection event will trigger the actual store startup
tryStartStore();
LOG.info("Started ZooKeeper connection to {}", _zkConnectString); LOG.info("Started ZooKeeper connection to {}", _zkConnectString);
} }
catch (Exception e) catch (Exception e)
Expand All @@ -122,6 +156,7 @@ public void start(Callback<None> callback)


public void shutdown(final Callback<None> callback) public void shutdown(final Callback<None> callback)
{ {
_managerStarted = false;
Callback<None> zkCloseCallback = new CallbackAdapter<None,None>(callback) Callback<None> zkCloseCallback = new CallbackAdapter<None,None>(callback)
{ {
@Override @Override
Expand Down Expand Up @@ -205,11 +240,8 @@ public void onSuccess(None result)


private class Listener implements ZKPersistentConnection.EventListener private class Listener implements ZKPersistentConnection.EventListener
{ {
/**
* a boolean flag to indicate whether _store is successfully started or not
*/
private volatile boolean _storeStarted = false;


private volatile boolean _sessionEstablished = false;
@Override @Override
public void notifyEvent(ZKPersistentConnection.Event event) public void notifyEvent(ZKPersistentConnection.Event event)
{ {
Expand All @@ -219,7 +251,9 @@ public void notifyEvent(ZKPersistentConnection.Event event)
case SESSION_ESTABLISHED: case SESSION_ESTABLISHED:
{ {
_store = _factory.createStore(_zkConnection.getZKConnection(), ZKFSUtil.uriPath(_zkBasePath)); _store = _factory.createStore(_zkConnection.getZKConnection(), ZKFSUtil.uriPath(_zkBasePath));
startStore(); _storeReady = true;
//Trying to start the store. If the manager itself is not started yet, the start will be deferred until start is called.
tryStartStore();
break; break;
} }
case SESSION_EXPIRED: case SESSION_EXPIRED:
Expand All @@ -232,7 +266,7 @@ public void notifyEvent(ZKPersistentConnection.Event event)
{ {
if (!_storeStarted) if (!_storeStarted)
{ {
startStore(); tryStartStore();
} }
else else
{ {
Expand All @@ -248,54 +282,66 @@ public void notifyEvent(ZKPersistentConnection.Event event)
break; break;
} }
} }
}


private void startStore() /**
* Store should only be started if two conditions are satisfied
* 1. store is ready. store is ready when connection is established
* 2. ZookeeperConnectionManager is started.
*/
private void tryStartStore()
{
if (_managerStarted && _storeReady) {
startStore();
}
}

private void startStore()
{
final Callback<None> callback = _startupCallback.getAndSet(null);
final Callback<None> multiCallback = callback != null ?
Callbacks.countDown(callback, _servers.length) :
Callbacks.<None>empty();
_store.start(new Callback<None>()
{ {
final Callback<None> callback = _startupCallback.getAndSet(null); @Override
final Callback<None> multiCallback = callback != null ? public void onError(Throwable e)
Callbacks.countDown(callback, _servers.length) :
Callbacks.<None>empty();
_store.start(new Callback<None>()
{ {
@Override LOG.error("Failed to start ZooKeeperEphemeralStore", e);
public void onError(Throwable e) if (callback != null)
{ {
LOG.error("Failed to start ZooKeeperEphemeralStore", e); callback.onError(e);
if (callback != null)
{
callback.onError(e);
}
} }
}


@Override @Override
public void onSuccess(None result) public void onSuccess(None result)
{
/* mark store as started */
_storeStarted = true;
for (ZooKeeperAnnouncer server : _servers)
{ {
/* mark store as started */ server.setStore(_store);
_storeStarted = true; server.start(new Callback<None>()
for (ZooKeeperAnnouncer server : _servers)
{ {
server.setStore(_store); @Override
server.start(new Callback<None>() public void onError(Throwable e)
{ {
@Override LOG.error("Failed to start server", e);
public void onError(Throwable e) multiCallback.onError(e);
{ }
LOG.error("Failed to start server", e);
multiCallback.onError(e); @Override
} public void onSuccess(None result)

{
@Override LOG.info("Started an announcer");
public void onSuccess(None result) multiCallback.onSuccess(result);
{ }
LOG.info("Started an announcer"); });
multiCallback.onSuccess(result);
}
});
}
LOG.info("Starting {} announcers", (_servers.length));
} }
}); LOG.info("Starting {} announcers", (_servers.length));
} }
});
} }


public interface ZKStoreFactory<P, Z extends ZooKeeperStore<P>> public interface ZKStoreFactory<P, Z extends ZooKeeperStore<P>>
Expand Down
Expand Up @@ -293,6 +293,16 @@ public ZooKeeper getZooKeeper()
return zk(); return zk();
} }


public String getConnectString()
{
return _connectString;
}

public int getTimeout()
{
return _timeout;
}

public void waitForState(KeeperState state, long timeout, TimeUnit timeUnit) public void waitForState(KeeperState state, long timeout, TimeUnit timeUnit)
throws InterruptedException, TimeoutException throws InterruptedException, TimeoutException
{ {
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.linkedin.d2.discovery.stores.zk; package com.linkedin.d2.discovery.stores.zk;


import com.linkedin.util.ArgumentUtil; import com.linkedin.util.ArgumentUtil;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function; import java.util.function.Function;


Expand Down Expand Up @@ -54,6 +55,19 @@ public ZKConnectionBuilder(String connectString)
_connectString = connectString; _connectString = connectString;
} }


public ZKConnectionBuilder(ZKConnectionBuilder builder)
{
_connectString = builder._connectString;
_sessionTimeout = builder._sessionTimeout;
_shutdownAsynchronously = builder._shutdownAsynchronously;
_retryLimit = builder._retryLimit;
_isSymlinkAware = builder._isSymlinkAware;
_exponentialBackoff = builder._exponentialBackoff;
_retryScheduler = builder._retryScheduler;
_initInterval = builder._initInterval;
_zkDecorator = builder._zkDecorator;
}

/** /**
* @param sessionTimeout session timeout in milliseconds * @param sessionTimeout session timeout in milliseconds
*/ */
Expand All @@ -80,7 +94,6 @@ public ZKConnectionBuilder setRetryLimit(int retryLimit)
_retryLimit = retryLimit; _retryLimit = retryLimit;
return this; return this;
} }

/** /**
* @param isSymlinkAware Resolves znodes whose name is prefixed with a * @param isSymlinkAware Resolves znodes whose name is prefixed with a
* dollar sign '$' (eg. /$symlink1, /foo/bar/$symlink2) * dollar sign '$' (eg. /$symlink1, /foo/bar/$symlink2)
Expand Down Expand Up @@ -133,4 +146,27 @@ public ZKConnection build()
return new ZKConnection(_connectString, _sessionTimeout, _retryLimit, _exponentialBackoff, return new ZKConnection(_connectString, _sessionTimeout, _retryLimit, _exponentialBackoff,
_retryScheduler, _initInterval, _shutdownAsynchronously, _isSymlinkAware, _zkDecorator); _retryScheduler, _initInterval, _shutdownAsynchronously, _isSymlinkAware, _zkDecorator);
} }

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ZKConnectionBuilder that = (ZKConnectionBuilder) o;
return _sessionTimeout == that._sessionTimeout && _shutdownAsynchronously == that._shutdownAsynchronously
&& _retryLimit == that._retryLimit && _isSymlinkAware == that._isSymlinkAware
&& _exponentialBackoff == that._exponentialBackoff && _initInterval == that._initInterval && Objects.equals(
_connectString, that._connectString) && Objects.equals(_retryScheduler, that._retryScheduler) && Objects.equals(
_zkDecorator, that._zkDecorator);
}

@Override
public int hashCode() {

return Objects.hash(_connectString, _sessionTimeout, _shutdownAsynchronously, _retryLimit, _isSymlinkAware,
_exponentialBackoff, _retryScheduler, _initInterval, _zkDecorator);
}
} }

0 comments on commit 1172064

Please sign in to comment.