Skip to content

Commit

Permalink
GH-3435: Upgrade to curator-recipes 5.3.0
Browse files Browse the repository at this point in the history
Fixes #3435

* Migrate `ZookeeperMetadataStore` from deprecated `PathChildrenCache` to the `CuratorCache`
and respective `CuratorCacheListener`
  • Loading branch information
artembilan committed Sep 14, 2022
1 parent 0f7b3e6 commit abd94bd
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 92 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ ext {
commonsDbcp2Version = '2.9.0'
commonsIoVersion = '2.11.0'
commonsNetVersion = '3.8.0'
curatorVersion = '4.3.0'
curatorVersion = '5.3.0'
derbyVersion = '10.14.2.0'
findbugsVersion = '3.0.1'
ftpServerVersion = '1.2.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,9 +24,8 @@

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
Expand All @@ -51,8 +50,6 @@ public class ZookeeperMetadataStore implements ListenableMetadataStore, SmartLif

private static final String KEY_MUST_NOT_BE_NULL = "'key' must not be null.";

private final Object lifecycleMonitor = new Object();

private final CuratorFramework client;

private final List<MetadataStoreListener> listeners = new CopyOnWriteArrayList<>();
Expand All @@ -67,13 +64,13 @@ public class ZookeeperMetadataStore implements ListenableMetadataStore, SmartLif

private String encoding = StandardCharsets.UTF_8.name();

private PathChildrenCache cache;
private CuratorCache cache;

private boolean autoStartup = true;

private int phase = Integer.MAX_VALUE;

private volatile boolean running = false;
private boolean running;

public ZookeeperMetadataStore(CuratorFramework client) {
Assert.notNull(client, "Client cannot be null");
Expand Down Expand Up @@ -197,27 +194,27 @@ public String get(String key) {
Assert.notNull(key, KEY_MUST_NOT_BE_NULL);
Assert.state(isRunning(), "ZookeeperMetadataStore has to be started before using.");
synchronized (this.updateMap) {
ChildData currentData = this.cache.getCurrentData(getPath(key));
if (currentData == null) {
if (this.updateMap.containsKey(key)) {
// we have saved the value, but the cache hasn't updated yet
// if the value had changed via replication, we would have been notified by the listener
return this.updateMap.get(key).getValue();
}
else {
// the value just doesn't exist
return null;
}
}
else {
// our version is more recent than the cache
if (this.updateMap.containsKey(key) &&
this.updateMap.get(key).getVersion() >= currentData.getStat().getVersion()) {
return this.cache.get(getPath(key))
.map(currentData -> {
// our version is more recent than the cache
if (this.updateMap.containsKey(key) &&
this.updateMap.get(key).getVersion() >= currentData.getStat().getVersion()) {

return this.updateMap.get(key).getValue();
}
return IntegrationUtils.bytesToString(currentData.getData(), this.encoding);
}
return this.updateMap.get(key).getValue();
}
return IntegrationUtils.bytesToString(currentData.getData(), this.encoding);
})
.orElseGet(() -> {
if (this.updateMap.containsKey(key)) {
// we have saved the value, but the cache hasn't updated yet
// if the value had changed via replication, we would have been notified by the listener
return this.updateMap.get(key).getValue();
}
else {
// the value just doesn't exist
return null;
}
});
}
}

Expand Down Expand Up @@ -264,46 +261,38 @@ public boolean isAutoStartup() {
}

@Override
public void start() {
public synchronized void start() {
if (!this.running) {
synchronized (this.lifecycleMonitor) {
if (!this.running) {
try {
this.client.checkExists()
.creatingParentContainersIfNeeded()
.forPath(this.root);

this.cache = new PathChildrenCache(this.client, this.root, true);
this.cache.getListenable()
.addListener(new MetadataStoreListenerInvokingPathChildrenCacheListener());
this.cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
this.running = true;
}
catch (Exception e) {
throw new ZookeeperMetadataStoreException("Exception while starting bean", e);
}
}
try {
this.client.checkExists()
.creatingParentContainersIfNeeded()
.forPath(this.root);

this.cache = CuratorCache.builder(this.client, this.root).build();
this.cache.listenable().addListener(new MetadataStoreCacheListener());
this.client.createContainers(this.root);
this.cache.start();
this.running = true;
}
catch (Exception e) {
throw new ZookeeperMetadataStoreException("Exception while starting bean", e);
}
}
}

@Override
public void stop() {
public synchronized void stop() {
if (this.running) {
synchronized (this.lifecycleMonitor) {
if (this.running) {
if (this.cache != null) {
CloseableUtils.closeQuietly(this.cache);
}
this.cache = null;
this.running = false;
}
if (this.cache != null) {
CloseableUtils.closeQuietly(this.cache);
}
this.cache = null;
this.running = false;
}
}

@Override
public boolean isRunning() {
public synchronized boolean isRunning() {
return this.running;
}

Expand Down Expand Up @@ -338,45 +327,41 @@ private int getVersion() {

}

private class MetadataStoreListenerInvokingPathChildrenCacheListener implements PathChildrenCacheListener {
private class MetadataStoreCacheListener implements CuratorCacheListener {

MetadataStoreListenerInvokingPathChildrenCacheListener() {
MetadataStoreCacheListener() {
}

@Override
public void childEvent(CuratorFramework framework, PathChildrenCacheEvent event) {
synchronized (ZookeeperMetadataStore.this.updateMap) {
String eventPath = event.getData().getPath();
String eventKey = getKey(eventPath);
String value =
IntegrationUtils.bytesToString(event.getData().getData(), ZookeeperMetadataStore.this.encoding);
switch (event.getType()) {
case CHILD_ADDED:
if (ZookeeperMetadataStore.this.updateMap.containsKey(eventKey) &&
event.getData().getStat().getVersion() >=
ZookeeperMetadataStore.this.updateMap.get(eventKey).getVersion()) {

ZookeeperMetadataStore.this.updateMap.remove(eventPath);
}
ZookeeperMetadataStore.this.listeners.forEach((listener) -> listener.onAdd(eventKey, value));
break;
case CHILD_UPDATED:
if (ZookeeperMetadataStore.this.updateMap.containsKey(eventKey) &&
event.getData().getStat().getVersion() >=
ZookeeperMetadataStore.this.updateMap.get(eventKey).getVersion()) {

ZookeeperMetadataStore.this.updateMap.remove(eventPath);
}
ZookeeperMetadataStore.this.listeners.forEach((listener) -> listener.onUpdate(eventKey, value));
break;
case CHILD_REMOVED:
ZookeeperMetadataStore.this.updateMap.remove(eventKey);
ZookeeperMetadataStore.this.listeners.forEach((listener) -> listener.onRemove(eventKey, value));
break;
default:
// ignore all other events
break;
}
public void event(Type type, ChildData oldData, ChildData newData) {
ChildData data = Type.NODE_DELETED.equals(type) ? oldData : newData;
String eventPath = data.getPath();
String eventKey = getKey(eventPath);
String value = IntegrationUtils.bytesToString(data.getData(), ZookeeperMetadataStore.this.encoding);

switch (type) {
case NODE_CREATED:
if (ZookeeperMetadataStore.this.updateMap.containsKey(eventKey) &&
data.getStat().getVersion() >=
ZookeeperMetadataStore.this.updateMap.get(eventKey).getVersion()) {

ZookeeperMetadataStore.this.updateMap.remove(eventPath);
}
ZookeeperMetadataStore.this.listeners.forEach((listener) -> listener.onAdd(eventKey, value));
break;
case NODE_CHANGED:
if (ZookeeperMetadataStore.this.updateMap.containsKey(eventKey) &&
data.getStat().getVersion() >=
ZookeeperMetadataStore.this.updateMap.get(eventKey).getVersion()) {

ZookeeperMetadataStore.this.updateMap.remove(eventPath);
}
ZookeeperMetadataStore.this.listeners.forEach((listener) -> listener.onUpdate(eventKey, value));
break;
case NODE_DELETED:
ZookeeperMetadataStore.this.updateMap.remove(eventKey);
ZookeeperMetadataStore.this.listeners.forEach((listener) -> listener.onRemove(eventKey, value));
break;
}
}

Expand Down

0 comments on commit abd94bd

Please sign in to comment.