Skip to content

Commit

Permalink
[pulsar-broker] support configurable zk-cache expiry time (apache#6668)
Browse files Browse the repository at this point in the history
### Motivation
Right now zk-cache expiry is hardcoded and it needs to be configurable to refresh value based on various requirement: eg: refresh quickly in case of zk-watch miss, avoid frequent cache refresh to avoid zk-read or avoid issue due to zk read timeout, etc..

### Modification 
User can configure zk-cache expiry using `zooKeeperCacheExpirySeconds` configuration.
  • Loading branch information
rdhabalia authored and Addison Higham committed May 5, 2020
1 parent ad78009 commit ef34330
Show file tree
Hide file tree
Showing 18 changed files with 71 additions and 11 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ zooKeeperSessionTimeoutMillis=30000
# ZooKeeper operation timeout in seconds
zooKeeperOperationTimeoutSeconds=30

# ZooKeeper cache expiry time in seconds
zooKeeperCacheExpirySeconds=300

# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
brokerShutdownTimeoutMs=60000

Expand Down
3 changes: 3 additions & 0 deletions conf/discovery.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ configurationStoreServers=
# ZooKeeper session timeout
zookeeperSessionTimeoutMs=30000

# ZooKeeper cache expiry time in seconds
zooKeeperCacheExpirySeconds=300

# Port to use to server binary-proto request
servicePort=6650

Expand Down
3 changes: 3 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ functionWorkerWebServiceURLTLS=
# ZooKeeper session timeout (in milliseconds)
zookeeperSessionTimeoutMs=30000

# ZooKeeper cache expiry time in seconds
zooKeeperCacheExpirySeconds=300

### --- Server --- ###

# The port to use for server binary Protobuf requests
Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ zooKeeperSessionTimeoutMillis=30000
# ZooKeeper operation timeout in seconds
zooKeeperOperationTimeoutSeconds=30

# ZooKeeper cache expiry time in seconds
zooKeeperCacheExpirySeconds=300

# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
brokerShutdownTimeoutMs=60000

Expand Down
2 changes: 2 additions & 0 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ configurationStoreServers=

# Zookeeper session timeout in milliseconds
zooKeeperSessionTimeoutMillis=30000
# ZooKeeper cache expiry time in seconds
zooKeeperCacheExpirySeconds=300

# Pulsar cluster url to connect to broker (optional if configurationStoreServers present)
serviceUrl=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "ZooKeeper operation timeout in seconds"
)
private int zooKeeperOperationTimeoutSeconds = 30;
@FieldContext(
category = CATEGORY_SERVER,
doc = "ZooKeeper cache expiry time in seconds"
)
private int zooKeeperCacheExpirySeconds = 300;
@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ protected void startZkCacheService() throws PulsarServerException {
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(),
config.getZooKeeperOperationTimeoutSeconds(), config.getConfigurationStoreServers(),
getOrderedExecutor(), this.cacheExecutor);
getOrderedExecutor(), this.cacheExecutor, config.getZooKeeperCacheExpirySeconds());
try {
this.globalZkCache.start();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public BrokerDiscoveryProvider(ServiceConfig config, ZooKeeperClientFactory zkCl
config.getZookeeperSessionTimeoutMs());
globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(),
(int) TimeUnit.MILLISECONDS.toSeconds(config.getZookeeperSessionTimeoutMs()),
config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler);
config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler,
config.getZooKeeperCacheExpirySeconds());
globalZkCache.start();
} catch (Exception e) {
LOG.error("Failed to start Zookkeeper {}", e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public class ServiceConfig implements PulsarConfiguration {
// ZooKeeper session timeout
private int zookeeperSessionTimeoutMs = 30_000;

// ZooKeeper cache expiry time in seconds
private int zooKeeperCacheExpirySeconds=300;

// Port to use to server binary-proto request
private Optional<Integer> servicePort = Optional.ofNullable(5000);
// Port to use to server binary-proto-tls request
Expand Down Expand Up @@ -134,6 +137,14 @@ public void setZookeeperSessionTimeoutMs(int zookeeperSessionTimeoutMs) {
this.zookeeperSessionTimeoutMs = zookeeperSessionTimeoutMs;
}

public int getZooKeeperCacheExpirySeconds() {
return zooKeeperCacheExpirySeconds;
}

public void setZooKeeperCacheExpirySeconds(int zooKeeperCacheExpirySeconds) {
this.zooKeeperCacheExpirySeconds = zooKeeperCacheExpirySeconds;
}

public Optional<Integer> getServicePort() {
return servicePort;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "ZooKeeper operation timeout in seconds"
)
private int zooKeeperOperationTimeoutSeconds = 30;
@FieldContext(
category = CATEGORY_WORKER,
doc = "ZooKeeper cache expiry time in seconds"
)
private int zooKeeperCacheExpirySeconds = 300;
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "The path to the location to locate builtin connectors"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ private AuthorizationService getAuthorizationService() throws PulsarServerExcept
(int) workerConfig.getZooKeeperSessionTimeoutMillis(),
workerConfig.getZooKeeperOperationTimeoutSeconds(),
workerConfig.getConfigurationStoreServers(),
orderedExecutor, cacheExecutor);
orderedExecutor, cacheExecutor,
workerConfig.getZooKeeperOperationTimeoutSeconds());
try {
this.globalZkCache.start();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public BrokerDiscoveryProvider(ProxyConfiguration config, ZooKeeperClientFactory
config.getZookeeperSessionTimeoutMs());
globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(),
(int) TimeUnit.MILLISECONDS.toSeconds(config.getZookeeperSessionTimeoutMs()),
config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler);
config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler,
config.getZookeeperSessionTimeoutMs());
globalZkCache.start();
} catch (Exception e) {
LOG.error("Failed to start Zookkeeper {}", e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public class ProxyConfiguration implements PulsarConfiguration {
doc = "ZooKeeper session timeout (in milliseconds)"
)
private int zookeeperSessionTimeoutMs = 30_000;
@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
doc = "ZooKeeper cache expiry time in seconds"
)
private int zooKeeperCacheExpirySeconds = 300;

@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public void start() throws PulsarServerException, PulsarClientException, Malform
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(),
(int) TimeUnit.MILLISECONDS.toSeconds(config.getZooKeeperSessionTimeoutMillis()),
config.getConfigurationStoreServers(), this.orderedExecutor, this.executor);
config.getConfigurationStoreServers(), this.orderedExecutor, this.executor,
config.getZooKeeperCacheExpirySeconds());
try {
this.globalZkCache.start();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
private String configurationStoreServers;
// Zookeeper session timeout in milliseconds
private long zooKeeperSessionTimeoutMillis = 30000;
// ZooKeeper cache expiry time in seconds
private int zooKeeperCacheExpirySeconds = 300;

// Port to use to server HTTP request
private Optional<Integer> webServicePort = Optional.of(8080);
Expand Down Expand Up @@ -198,6 +200,14 @@ public void setZooKeeperSessionTimeoutMillis(long zooKeeperSessionTimeoutMillis)
this.zooKeeperSessionTimeoutMillis = zooKeeperSessionTimeoutMillis;
}

public int getZooKeeperCacheExpirySeconds() {
return zooKeeperCacheExpirySeconds;
}

public void setZooKeeperCacheExpirySeconds(int zooKeeperCacheExpirySeconds) {
this.zooKeeperCacheExpirySeconds = zooKeeperCacheExpirySeconds;
}

public Optional<Integer> getWebServicePort() {
return webServicePort;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public class GlobalZooKeeperCache extends ZooKeeperCache implements Closeable {

public GlobalZooKeeperCache(ZooKeeperClientFactory zkClientFactory, int zkSessionTimeoutMillis,
int zkOperationTimeoutSeconds, String globalZkConnect, OrderedExecutor orderedExecutor,
ScheduledExecutorService scheduledExecutor) {
super("global-zk", null, zkOperationTimeoutSeconds, orderedExecutor);
ScheduledExecutorService scheduledExecutor, int cacheExpirySeconds) {
super("global-zk", null, zkOperationTimeoutSeconds, orderedExecutor, cacheExpirySeconds);
this.zlClientFactory = zkClientFactory;
this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
this.globalZkConnect = globalZkConnect;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,16 @@ public static interface CacheUpdater<T> {
private final OrderedExecutor backgroundExecutor = OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build();
private boolean shouldShutdownExecutor;
private final int zkOperationTimeoutSeconds;
private static final int CACHE_EXPIRY_SECONDS = 300; //5 minutes

protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<ZooKeeper>(null);

public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds, OrderedExecutor executor) {
this(cacheName, zkSession, zkOperationTimeoutSeconds, executor, CACHE_EXPIRY_SECONDS);
}

public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds,
OrderedExecutor executor, int cacheExpirySeconds) {
checkNotNull(executor);
this.zkOperationTimeoutSeconds = zkOperationTimeoutSeconds;
this.executor = executor;
Expand All @@ -100,17 +106,17 @@ public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTime

this.dataCache = Caffeine.newBuilder()
.recordStats()
.expireAfterWrite(5, TimeUnit.MINUTES)
.expireAfterWrite(zkOperationTimeoutSeconds, TimeUnit.SECONDS)
.buildAsync((key, executor1) -> null);

this.childrenCache = Caffeine.newBuilder()
.recordStats()
.expireAfterWrite(5, TimeUnit.MINUTES)
.expireAfterWrite(zkOperationTimeoutSeconds, TimeUnit.SECONDS)
.buildAsync((key, executor1) -> null);

this.existsCache = Caffeine.newBuilder()
.recordStats()
.expireAfterWrite(5, TimeUnit.MINUTES)
.expireAfterWrite(zkOperationTimeoutSeconds, TimeUnit.SECONDS)
.buildAsync((key, executor1) -> null);

CacheMetricsCollector.CAFFEINE.addCache(cacheName + "-data", dataCache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessio
};

GlobalZooKeeperCache zkCacheService = new GlobalZooKeeperCache(zkClientfactory, -1, 30, "", executor,
scheduledExecutor);
scheduledExecutor, 300);
zkCacheService.start();
zkClient = (MockZooKeeper) zkCacheService.getZooKeeper();
ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
Expand Down

0 comments on commit ef34330

Please sign in to comment.