Skip to content

Commit

Permalink
[#4852] Change Zookeeper Client management to use Curator
Browse files Browse the repository at this point in the history
  • Loading branch information
koo-taejin committed Nov 7, 2018
1 parent 34c0e52 commit 34285e1
Show file tree
Hide file tree
Showing 28 changed files with 1,312 additions and 1,168 deletions.
Expand Up @@ -15,27 +15,23 @@
*/
package com.navercorp.pinpoint.collector.cluster.flink;

import com.navercorp.pinpoint.collector.cluster.zookeeper.DefaultZookeeperClient;
import com.navercorp.pinpoint.collector.cluster.zookeeper.ZookeeperClient;
import com.navercorp.pinpoint.collector.cluster.zookeeper.ZookeeperClusterManager;
import com.navercorp.pinpoint.collector.cluster.zookeeper.ZookeeperUtils;
import com.navercorp.pinpoint.collector.config.CollectorConfiguration;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.CuratorZookeeperClient;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperClient;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperEventWatcher;
import com.navercorp.pinpoint.common.server.util.concurrent.CommonState;
import com.navercorp.pinpoint.common.server.util.concurrent.CommonStateContext;
import com.navercorp.pinpoint.common.util.Assert;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.proto.WatcherEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* @author minwoo.jung
Expand Down Expand Up @@ -71,7 +67,7 @@ public void setUp() throws KeeperException, IOException, InterruptedException {
logger.info("{} initialization started.", this.getClass().getSimpleName());

ClusterManagerWatcher watcher = new ClusterManagerWatcher();
this.client = new DefaultZookeeperClient(config.getFlinkClusterZookeeperAddress(), config.getFlinkClusterSessionTimeout(), watcher);
this.client = new CuratorZookeeperClient(config.getFlinkClusterZookeeperAddress(), config.getFlinkClusterSessionTimeout(), watcher);
this.client.connect();

this.zookeeperClusterManager = new ZookeeperClusterManager(client, PINPOINT_FLINK_CLUSTER_PATH, clusterConnectionManager);
Expand All @@ -81,9 +77,7 @@ public void setUp() throws KeeperException, IOException, InterruptedException {
logger.info("{} initialization completed.", this.getClass().getSimpleName());

if (client.isConnected()) {
WatcherEvent watcherEvent = new WatcherEvent(EventType.None.getIntValue(), KeeperState.SyncConnected.getIntValue(), "");
WatchedEvent event = new WatchedEvent(watcherEvent);
watcher.process(event);
watcher.handleConnected();
}
}
break;
Expand Down Expand Up @@ -133,54 +127,41 @@ public ZookeeperClusterManager getZookeeperClusterManager() {

class ClusterManagerWatcher implements ZookeeperEventWatcher {

private final AtomicBoolean connected = new AtomicBoolean(false);

@Override
public void process(WatchedEvent event) {
logger.debug("Process Zookeeper Event({})", event);

KeeperState state = event.getState();
EventType eventType = event.getType();

// ephemeral node is removed on disconnect event (leave node management exclusively to zookeeper)
if (ZookeeperUtils.isDisconnectedEvent(state, eventType)) {
connected.compareAndSet(true, false);
if (state == KeeperState.Expired) {
if (client != null) {
client.reconnectWhenSessionExpired();
}
}
return;
}

// on connect/reconnect event
if (ZookeeperUtils.isConnectedEvent(state, eventType)) {
// could already be connected (failure to compareAndSet doesn't really matter)
boolean changed = connected.compareAndSet(false, true);
}

if (serviceState.isStarted() && connected.get()) {
if (serviceState.isStarted() && client.isConnected()) {
// duplicate event possible - but the logic does not change
if (ZookeeperUtils.isConnectedEvent(state, eventType)) {
// profilerClusterManager.initZookeeperClusterData();
zookeeperClusterManager.handleAndRegisterWatcher(PINPOINT_FLINK_CLUSTER_PATH);
} else if (eventType == EventType.NodeChildrenChanged) {
String path = event.getPath();

if (PINPOINT_FLINK_CLUSTER_PATH.equals(path)) {
zookeeperClusterManager.handleAndRegisterWatcher(path);
if (eventType == EventType.NodeChildrenChanged) {
String eventPath = event.getPath();

if (PINPOINT_FLINK_CLUSTER_PATH.equals(eventPath)) {
zookeeperClusterManager.handleAndRegisterWatcher(eventPath);
} else {
logger.warn("Unknown Path ChildrenChanged {}.", path);
logger.warn("Unknown Path ChildrenChanged {}.", eventPath);
}

}
}
}

@Override
public boolean isConnected() {
return connected.get();
public boolean handleConnected() {
if (serviceState.isStarted()) {
zookeeperClusterManager.handleAndRegisterWatcher(PINPOINT_FLINK_CLUSTER_PATH);
return true;
} else {
return false;
}
}

@Override
public boolean handleDisconnected() {
return true;
}

}

}

0 comments on commit 34285e1

Please sign in to comment.