diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/connection/ClusterConnectionManager.java b/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/connection/ClusterConnectionManager.java new file mode 100644 index 000000000000..feeaf1a436d7 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/connection/ClusterConnectionManager.java @@ -0,0 +1,35 @@ +/* + * Copyright 2017 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.cluster.connection; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; + +/** + * @author minwoo.jung + */ +public interface ClusterConnectionManager { + void start(); + + void stop(); + + void connectPointIfAbsent(InetSocketAddress address); + + public void disconnectPoint(SocketAddress address); + + public List getConnectedAddressList(); +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/connection/CollectorClusterConnectionManager.java b/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/connection/CollectorClusterConnectionManager.java index 4742b603a015..97efa3e6753d 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/connection/CollectorClusterConnectionManager.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/connection/CollectorClusterConnectionManager.java @@ -30,7 +30,7 @@ /** * @Author Taejin Koo */ -public class CollectorClusterConnectionManager { +public class CollectorClusterConnectionManager implements ClusterConnectionManager { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -53,6 +53,7 @@ public CollectorClusterConnectionManager(String clusterId, CollectorClusterConne this.clusterAcceptor = acceptor; } + @Override public void start() { logger.info("{} initialization started.", ClassUtils.simpleClassName(this)); @@ -67,6 +68,7 @@ public void start() { logger.info("{} initialization completed.", ClassUtils.simpleClassName(this)); } + @Override public void stop() { logger.info("{} destroying started.", ClassUtils.simpleClassName(this)); @@ -87,6 +89,7 @@ public void stop() { logger.info("{} destroying completed.", ClassUtils.simpleClassName(this)); } + @Override public void connectPointIfAbsent(InetSocketAddress address) { logger.info("localhost -> {} connect started.", address); @@ -100,6 +103,7 @@ public void connectPointIfAbsent(InetSocketAddress address) { logger.info("localhost -> {} connect completed.", address); } + @Override public void disconnectPoint(SocketAddress address) { logger.info("localhost -> {} disconnect started.", address); @@ -112,6 +116,7 @@ public void disconnectPoint(SocketAddress address) { } } + @Override public List getConnectedAddressList() { return socketRepository.getAddressList(); } diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/zookeeper/ZookeeperWebClusterManager.java b/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/zookeeper/ZookeeperClusterManager.java similarity index 94% rename from collector/src/main/java/com/navercorp/pinpoint/collector/cluster/zookeeper/ZookeeperWebClusterManager.java rename to collector/src/main/java/com/navercorp/pinpoint/collector/cluster/zookeeper/ZookeeperClusterManager.java index 6769c3a2a153..8fb2f2b3a4e9 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/zookeeper/ZookeeperWebClusterManager.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/zookeeper/ZookeeperClusterManager.java @@ -1,252 +1,254 @@ -/* - * Copyright 2014 NAVER Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.navercorp.pinpoint.collector.cluster.zookeeper; - -import com.navercorp.pinpoint.collector.cluster.connection.CollectorClusterConnectionManager; -import com.navercorp.pinpoint.collector.cluster.zookeeper.exception.ConnectionException; -import com.navercorp.pinpoint.common.util.NetUtils; -import com.navercorp.pinpoint.common.util.PinpointThreadFactory; -import com.navercorp.pinpoint.common.server.util.concurrent.CommonState; -import com.navercorp.pinpoint.common.server.util.concurrent.CommonStateContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * @author koo.taejin - */ -public class ZookeeperWebClusterManager { - - // it is okay for the collector to retry indefinitely, as long as RETRY_INTERVAL is set reasonably - private static final int DEFAULT_RETRY_INTERVAL = 60000; - - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - private final GetAndRegisterTask getAndRegisterTask = new GetAndRegisterTask(); - private final StopTask stopTask = new StopTask(); - - private final ZookeeperClient client; - private final CollectorClusterConnectionManager clusterConnectionManager; - private final String zNodePath; - - private final AtomicBoolean retryMode = new AtomicBoolean(false); - - private final BlockingQueue queue = new LinkedBlockingQueue<>(1); - - private final CommonStateContext workerState; - private final Thread workerThread; - - // private final Timer timer; - - // Register Worker + Job - // synchronize current status with Zookeeper when an event(job) is triggered. - // (the number of events does not matter as long as a single event is triggered - subsequent events may be ignored) - public ZookeeperWebClusterManager(ZookeeperClient client, String zookeeperClusterPath, String serverIdentifier, CollectorClusterConnectionManager clusterConnectionManager) { - this.client = client; - - this.clusterConnectionManager = clusterConnectionManager; - this.zNodePath = zookeeperClusterPath; - - this.workerState = new CommonStateContext(); - - final ThreadFactory threadFactory = new PinpointThreadFactory(this.getClass().getSimpleName(), true); - this.workerThread = threadFactory.newThread(new Worker()); - } - - public void start() { - switch (this.workerState.getCurrentState()) { - case NEW: - if (this.workerState.changeStateInitializing()) { - logger.info("{} initialization started.", this.getClass().getSimpleName()); - this.workerThread.start(); - - workerState.changeStateStarted(); - logger.info("{} initialization completed.", this.getClass().getSimpleName()); - - if (clusterConnectionManager != null) { - clusterConnectionManager.start(); - } - - break; - } - case INITIALIZING: - logger.info("{} already initializing.", this.getClass().getSimpleName()); - break; - case STARTED: - logger.info("{} already started.", this.getClass().getSimpleName()); - break; - case DESTROYING: - throw new IllegalStateException("Already destroying."); - case STOPPED: - throw new IllegalStateException("Already stopped."); - case ILLEGAL_STATE: - throw new IllegalStateException("Invalid State."); - } - } - - public void stop() { - if (!(this.workerState.changeStateDestroying())) { - CommonState state = this.workerState.getCurrentState(); - - logger.info("{} already {}.", this.getClass().getSimpleName(), state.toString()); - return; - } - - logger.info("{} destroying started.", this.getClass().getSimpleName()); - - if (clusterConnectionManager != null) { - clusterConnectionManager.stop(); - } - - final boolean stopOffer = queue.offer(stopTask); - if (!stopOffer) { - logger.warn("Insert stopTask failed."); - } - - boolean interrupted = false; - while (this.workerThread.isAlive()) { - this.workerThread.interrupt(); - try { - this.workerThread.join(100L); - } catch (InterruptedException e) { - interrupted = true; - } - } - - this.workerState.changeStateStopped(); - logger.info("{} destroying completed.", this.getClass().getSimpleName()); - } - - public void handleAndRegisterWatcher(String path) { - if (workerState.isStarted()) { - if (zNodePath.equals(path)) { - final boolean offerSuccess = queue.offer(getAndRegisterTask); - if (!offerSuccess) { - logger.info("Message Queue is Full."); - } - } else { - logger.info("Invalid Path {}.", path); - } - } else { - CommonState state = this.workerState.getCurrentState(); - logger.info("{} invalid state {}.", this.getClass().getSimpleName(), state.toString()); - } - } - - private class Worker implements Runnable { - - @Override - public void run() { - // if the node does not exist, create a node and retry. - // retry on timeout as well. - while (workerState.isStarted()) { - Task task = null; - - try { - task = queue.poll(DEFAULT_RETRY_INTERVAL, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - logger.debug(e.getMessage(), e); - } - - if (!workerState.isStarted()) { - break; - } - - if (task == null) { - if (retryMode.get()) { - boolean success = getAndRegisterTask.handleAndRegisterWatcher0(); - if (success) { - retryMode.compareAndSet(true, false); - } - } - } else if (task instanceof GetAndRegisterTask) { - boolean success = ((GetAndRegisterTask) task).handleAndRegisterWatcher0(); - if (!success) { - retryMode.compareAndSet(false, true); - } - } else if (task instanceof StopTask) { - break; - } - } - - logger.info("{} stopped", this.getClass().getSimpleName()); - } - - } - - - interface Task { - - } - - @SuppressWarnings("SuspiciousMethodCalls") - class GetAndRegisterTask implements Task { - - @SuppressWarnings("SuspiciousMethodCalls") - private boolean handleAndRegisterWatcher0() { - boolean needNotRetry = false; - try { - - if (!client.exists(zNodePath)) { - client.createPath(zNodePath, true); - } - - List childNodeList = client.getChildrenNode(zNodePath, true); - List clusterAddressList = NetUtils.toInetSocketAddressLIst(childNodeList); - - List addressList = clusterConnectionManager.getConnectedAddressList(); - - logger.info("Handle register and remove Task. Current Address List = {}, Cluster Address List = {}", addressList, clusterAddressList); - - for (InetSocketAddress clusterAddress : clusterAddressList) { - if (!addressList.contains(clusterAddress)) { - clusterConnectionManager.connectPointIfAbsent(clusterAddress); - } - } - - for (SocketAddress address : addressList) { - //noinspection SuspiciousMethodCalls,SuspiciousMethodCalls - if (!clusterAddressList.contains(address)) { - clusterConnectionManager.disconnectPoint(address); - } - } - - needNotRetry = true; - return needNotRetry; - } catch (Exception e) { - if (!(e instanceof ConnectionException)) { - needNotRetry = true; - } - } - - return needNotRetry; - } - } - - static class StopTask implements Task { - - } - -} +/* + * Copyright 2014 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.collector.cluster.zookeeper; + +import com.navercorp.pinpoint.collector.cluster.connection.ClusterConnectionManager; +import com.navercorp.pinpoint.collector.cluster.connection.CollectorClusterConnectionManager; +import com.navercorp.pinpoint.collector.cluster.zookeeper.exception.ConnectionException; +import com.navercorp.pinpoint.common.util.NetUtils; +import com.navercorp.pinpoint.common.util.PinpointThreadFactory; +import com.navercorp.pinpoint.common.server.util.concurrent.CommonState; +import com.navercorp.pinpoint.common.server.util.concurrent.CommonStateContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author koo.taejin + * @author minwoo.jung + */ +public class ZookeeperClusterManager { + + // it is okay for the collector to retry indefinitely, as long as RETRY_INTERVAL is set reasonably + private static final int DEFAULT_RETRY_INTERVAL = 60000; + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private final GetAndRegisterTask getAndRegisterTask = new GetAndRegisterTask(); + private final StopTask stopTask = new StopTask(); + + private final ZookeeperClient client; + private final ClusterConnectionManager clusterConnectionManager; + private final String zNodePath; + + private final AtomicBoolean retryMode = new AtomicBoolean(false); + + private final BlockingQueue queue = new LinkedBlockingQueue<>(1); + + private final CommonStateContext workerState; + private final Thread workerThread; + + // private final Timer timer; + + // Register Worker + Job + // synchronize current status with Zookeeper when an event(job) is triggered. + // (the number of events does not matter as long as a single event is triggered - subsequent events may be ignored) + public ZookeeperClusterManager(ZookeeperClient client, String zookeeperClusterPath, ClusterConnectionManager clusterConnectionManager) { + this.client = client; + + this.clusterConnectionManager = clusterConnectionManager; + this.zNodePath = zookeeperClusterPath; + + this.workerState = new CommonStateContext(); + + final ThreadFactory threadFactory = new PinpointThreadFactory(this.getClass().getSimpleName(), true); + this.workerThread = threadFactory.newThread(new Worker()); + } + + public void start() { + switch (this.workerState.getCurrentState()) { + case NEW: + if (this.workerState.changeStateInitializing()) { + logger.info("{} initialization started.", this.getClass().getSimpleName()); + this.workerThread.start(); + + workerState.changeStateStarted(); + logger.info("{} initialization completed.", this.getClass().getSimpleName()); + + if (clusterConnectionManager != null) { + clusterConnectionManager.start(); + } + + break; + } + case INITIALIZING: + logger.info("{} already initializing.", this.getClass().getSimpleName()); + break; + case STARTED: + logger.info("{} already started.", this.getClass().getSimpleName()); + break; + case DESTROYING: + throw new IllegalStateException("Already destroying."); + case STOPPED: + throw new IllegalStateException("Already stopped."); + case ILLEGAL_STATE: + throw new IllegalStateException("Invalid State."); + } + } + + public void stop() { + if (!(this.workerState.changeStateDestroying())) { + CommonState state = this.workerState.getCurrentState(); + + logger.info("{} already {}.", this.getClass().getSimpleName(), state.toString()); + return; + } + + logger.info("{} destroying started.", this.getClass().getSimpleName()); + + if (clusterConnectionManager != null) { + clusterConnectionManager.stop(); + } + + final boolean stopOffer = queue.offer(stopTask); + if (!stopOffer) { + logger.warn("Insert stopTask failed."); + } + + boolean interrupted = false; + while (this.workerThread.isAlive()) { + this.workerThread.interrupt(); + try { + this.workerThread.join(100L); + } catch (InterruptedException e) { + interrupted = true; + } + } + + this.workerState.changeStateStopped(); + logger.info("{} destroying completed.", this.getClass().getSimpleName()); + } + + public void handleAndRegisterWatcher(String path) { + if (workerState.isStarted()) { + if (zNodePath.equals(path)) { + final boolean offerSuccess = queue.offer(getAndRegisterTask); + if (!offerSuccess) { + logger.info("Message Queue is Full."); + } + } else { + logger.info("Invalid Path {}.", path); + } + } else { + CommonState state = this.workerState.getCurrentState(); + logger.info("{} invalid state {}.", this.getClass().getSimpleName(), state.toString()); + } + } + + private class Worker implements Runnable { + + @Override + public void run() { + // if the node does not exist, create a node and retry. + // retry on timeout as well. + while (workerState.isStarted()) { + Task task = null; + + try { + task = queue.poll(DEFAULT_RETRY_INTERVAL, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + logger.debug(e.getMessage(), e); + } + + if (!workerState.isStarted()) { + break; + } + + if (task == null) { + if (retryMode.get()) { + boolean success = getAndRegisterTask.handleAndRegisterWatcher0(); + if (success) { + retryMode.compareAndSet(true, false); + } + } + } else if (task instanceof GetAndRegisterTask) { + boolean success = ((GetAndRegisterTask) task).handleAndRegisterWatcher0(); + if (!success) { + retryMode.compareAndSet(false, true); + } + } else if (task instanceof StopTask) { + break; + } + } + + logger.info("{} stopped", this.getClass().getSimpleName()); + } + + } + + + interface Task { + + } + + @SuppressWarnings("SuspiciousMethodCalls") + class GetAndRegisterTask implements Task { + + @SuppressWarnings("SuspiciousMethodCalls") + private boolean handleAndRegisterWatcher0() { + boolean needNotRetry = false; + try { + + if (!client.exists(zNodePath)) { + client.createPath(zNodePath, true); + } + + List childNodeList = client.getChildrenNode(zNodePath, true); + List clusterAddressList = NetUtils.toInetSocketAddressLIst(childNodeList); + + List addressList = clusterConnectionManager.getConnectedAddressList(); + + logger.info("Handle register and remove Task. Current Address List = {}, Cluster Address List = {}", addressList, clusterAddressList); + + for (InetSocketAddress clusterAddress : clusterAddressList) { + if (!addressList.contains(clusterAddress)) { + clusterConnectionManager.connectPointIfAbsent(clusterAddress); + } + } + + for (SocketAddress address : addressList) { + //noinspection SuspiciousMethodCalls,SuspiciousMethodCalls + if (!clusterAddressList.contains(address)) { + clusterConnectionManager.disconnectPoint(address); + } + } + + needNotRetry = true; + return needNotRetry; + } catch (Exception e) { + if (!(e instanceof ConnectionException)) { + needNotRetry = true; + } + } + + return needNotRetry; + } + } + + static class StopTask implements Task { + + } + +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/zookeeper/ZookeeperClusterService.java b/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/zookeeper/ZookeeperClusterService.java index d4aa34cc9231..0c3d666802f7 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/zookeeper/ZookeeperClusterService.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/cluster/zookeeper/ZookeeperClusterService.java @@ -63,7 +63,7 @@ public class ZookeeperClusterService extends AbstractClusterService { private ZookeeperClient client; // WebClusterManager checks Zookeeper for the Web data, and manages collector -> web connections. - private ZookeeperWebClusterManager webClusterManager; + private ZookeeperClusterManager webClusterManager; // ProfilerClusterManager detects/manages profiler -> collector connections, and saves their information in Zookeeper. private ZookeeperProfilerClusterManager profilerClusterManager; @@ -108,7 +108,7 @@ public void setUp() throws KeeperException, IOException, InterruptedException { this.profilerClusterManager = new ZookeeperProfilerClusterManager(client, serverIdentifier, clusterPointRouter.getTargetClusterPointRepository()); this.profilerClusterManager.start(); - this.webClusterManager = new ZookeeperWebClusterManager(client, PINPOINT_WEB_CLUSTER_PATH, serverIdentifier, clusterConnectionManager); + this.webClusterManager = new ZookeeperClusterManager(client, PINPOINT_WEB_CLUSTER_PATH, clusterConnectionManager); this.webClusterManager.start(); this.serviceState.changeStateStarted(); @@ -187,7 +187,7 @@ public ZookeeperProfilerClusterManager getProfilerClusterManager() { return profilerClusterManager; } - public ZookeeperWebClusterManager getWebClusterManager() { + public ZookeeperClusterManager getWebClusterManager() { return webClusterManager; } diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/config/CollectorConfiguration.java b/collector/src/main/java/com/navercorp/pinpoint/collector/config/CollectorConfiguration.java index 26777c05d072..dca4efa652bc 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/config/CollectorConfiguration.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/config/CollectorConfiguration.java @@ -312,7 +312,7 @@ public void afterPropertiesSet() throws Exception { readPropertyValues(this.properties); } - private void readPropertyValues(Properties properties) { + protected void readPropertyValues(Properties properties) { logger.info("pinpoint-collector.properties read."); this.tcpListenIp = readString(properties, "collector.tcpListenIp", DEFAULT_LISTEN_IP); this.tcpListenPort = readInt(properties, "collector.tcpListenPort", 9994); @@ -360,7 +360,7 @@ private void readPropertyValues(Properties properties) { this.clusterListenPort = readInt(properties, "cluster.listen.port", -1); } - private String readString(Properties properties, String propertyName, String defaultValue) { + protected String readString(Properties properties, String propertyName, String defaultValue) { final String result = properties.getProperty(propertyName, defaultValue); if (logger.isInfoEnabled()) { logger.info("{}={}", propertyName, result); @@ -368,7 +368,7 @@ private String readString(Properties properties, String propertyName, String def return result ; } - private int readInt(Properties properties, String propertyName, int defaultValue) { + protected int readInt(Properties properties, String propertyName, int defaultValue) { final String value = properties.getProperty(propertyName); final int result = NumberUtils.toInt(value, defaultValue); if (logger.isInfoEnabled()) { @@ -377,7 +377,7 @@ private int readInt(Properties properties, String propertyName, int defaultValue return result; } - private long readLong(Properties properties, String propertyName, long defaultValue) { + protected long readLong(Properties properties, String propertyName, long defaultValue) { final String value = properties.getProperty(propertyName); final long result = NumberUtils.toLong(value, defaultValue); if (logger.isInfoEnabled()) { @@ -386,7 +386,7 @@ private long readLong(Properties properties, String propertyName, long defaultVa return result; } - private boolean readBoolean(Properties properties, String propertyName) { + protected boolean readBoolean(Properties properties, String propertyName) { final String value = properties.getProperty(propertyName); // if a default value will be needed afterwards, may match string value instead of Utils. diff --git a/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/PushZnodeJob.java b/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/PushZnodeJob.java new file mode 100644 index 000000000000..1903e862d07f --- /dev/null +++ b/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/PushZnodeJob.java @@ -0,0 +1,29 @@ +/* + * Copyright 2017 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.web.cluster.zookeeper; + +import org.jboss.netty.util.TimerTask; + +/** + * @author minwoo.jung + */ +public interface PushZnodeJob extends TimerTask { + String getZNodePath(); + + byte[] getContents(); + + int getRetryInterval(); +} diff --git a/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClient.java b/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClient.java index 1f8a9a3df3dd..60f20a76b55e 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClient.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClient.java @@ -53,6 +53,7 @@ public class ZookeeperClient { private final Logger logger = LoggerFactory.getLogger(this.getClass()); + public static final long DEFAULT_RECONNECT_DELAY_WHEN_SESSION_EXPIRED = 30000; private final CommonStateContext stateContext; @@ -60,21 +61,21 @@ public class ZookeeperClient { private final String hostPort; private final int sessionTimeout; - private final ZookeeperClusterDataManager zookeeperDataManager; + private final ZookeeperEventWatcher zookeeperEventWatcher; private final long reconnectDelayWhenSessionExpired; // ZK client is thread-safe private volatile ZooKeeper zookeeper; // hmm this structure should contain all necessary information - public ZookeeperClient(String hostPort, int sessionTimeout, ZookeeperClusterDataManager manager) { - this(hostPort, sessionTimeout, manager, ZookeeperClusterDataManager.DEFAULT_RECONNECT_DELAY_WHEN_SESSION_EXPIRED); + public ZookeeperClient(String hostPort, int sessionTimeout, ZookeeperEventWatcher manager) { + this(hostPort, sessionTimeout, manager, DEFAULT_RECONNECT_DELAY_WHEN_SESSION_EXPIRED); } - public ZookeeperClient(String hostPort, int sessionTimeout, ZookeeperClusterDataManager zookeeperDataManager, long reconnectDelayWhenSessionExpired) { + public ZookeeperClient(String hostPort, int sessionTimeout, ZookeeperEventWatcher zookeeperEventWatcher, long reconnectDelayWhenSessionExpired) { this.hostPort = hostPort; this.sessionTimeout = sessionTimeout; - this.zookeeperDataManager = zookeeperDataManager; + this.zookeeperEventWatcher = zookeeperEventWatcher; this.reconnectDelayWhenSessionExpired = reconnectDelayWhenSessionExpired; @@ -85,7 +86,7 @@ public ZookeeperClient(String hostPort, int sessionTimeout, ZookeeperClusterData public void connect() throws IOException { if (stateContext.changeStateInitializing()) { - this.zookeeper = new ZooKeeper(hostPort, sessionTimeout, zookeeperDataManager); // server + this.zookeeper = new ZooKeeper(hostPort, sessionTimeout, zookeeperEventWatcher); // server stateContext.changeStateStarted(); } else { logger.warn("connect() failed. error : Illegal State. State may be {}.", stateContext.getCurrentState()); @@ -134,7 +135,7 @@ public void run(Timeout timeout) throws Exception { private ZooKeeper createNewZookeeper() { try { - return new ZooKeeper(hostPort, sessionTimeout, zookeeperDataManager); + return new ZooKeeper(hostPort, sessionTimeout, zookeeperEventWatcher); } catch (IOException ignore) { // ignore } @@ -258,7 +259,7 @@ public boolean exists(String path) throws PinpointZookeeperException, Interrupte } private void checkState() throws PinpointZookeeperException { - if (!zookeeperDataManager.isConnected() || !stateContext.isStarted()) { + if (!zookeeperEventWatcher.isConnected() || !stateContext.isStarted()) { throw new ConnectionException("Instance must be connected."); } } diff --git a/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClusterDataManager.java b/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClusterDataManager.java index b73cf5f08d4f..c8825b1d0560 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClusterDataManager.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClusterDataManager.java @@ -24,7 +24,6 @@ import com.navercorp.pinpoint.web.config.WebConfig; import com.navercorp.pinpoint.web.vo.AgentInfo; import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.jboss.netty.util.HashedWheelTimer; @@ -44,14 +43,11 @@ /** * @author koo.taejin */ -public class ZookeeperClusterDataManager implements ClusterDataManager, Watcher { - - static final long DEFAULT_RECONNECT_DELAY_WHEN_SESSION_EXPIRED = 30000; +public class ZookeeperClusterDataManager implements ClusterDataManager, ZookeeperEventWatcher { private static final String PINPOINT_CLUSTER_PATH = "/pinpoint-cluster"; private static final String PINPOINT_WEB_CLUSTER_PATh = PINPOINT_CLUSTER_PATH + "/web"; private static final String PINPOINT_COLLECTOR_CLUSTER_PATH = PINPOINT_CLUSTER_PATH + "/collector"; - private static final long SYNC_INTERVAL_TIME_MILLIS = 15 * 1000; private final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -83,7 +79,7 @@ public ZookeeperClusterDataManager(String connectAddress, int sessionTimeout, in @Override public void start() throws Exception { this.timer = createTimer(); - this.client = new ZookeeperClient(connectAddress, sessionTimeout, this, DEFAULT_RECONNECT_DELAY_WHEN_SESSION_EXPIRED); + this.client = new ZookeeperClient(connectAddress, sessionTimeout, this, ZookeeperClient.DEFAULT_RECONNECT_DELAY_WHEN_SESSION_EXPIRED); this.client.connect(); } @@ -118,7 +114,7 @@ public boolean registerWebCluster(String zNodeName, byte[] contents) { return true; } - if (!clusterDataManagerHelper.pushWebClusterResource(client, job)) { + if (!clusterDataManagerHelper.pushZnode(client, job)) { timer.newTimeout(job, job.getRetryInterval(), TimeUnit.MILLISECONDS); } @@ -175,7 +171,7 @@ private boolean handleConnected() { if (changed) { PushWebClusterJob job = this.job.get(); if (job != null) { - if (!clusterDataManagerHelper.pushWebClusterResource(client, job)) { + if (!clusterDataManagerHelper.pushZnode(client, job)) { timer.newTimeout(job, job.getRetryInterval(), TimeUnit.MILLISECONDS); result = false; } @@ -244,6 +240,7 @@ private Timer createTimer() { return timer; } + @Override public boolean isConnected() { return connected.get(); } @@ -287,7 +284,7 @@ private boolean pushCollectorClusterData(String id) { } } - class PushWebClusterJob implements TimerTask { + class PushWebClusterJob implements PushZnodeJob { private final String zNodeName; private final byte[] contents; private final int retryInterval; @@ -306,19 +303,22 @@ public void run(Timeout timeout) throws Exception { return; } - if (!clusterDataManagerHelper.pushWebClusterResource(client, this)) { + if (!clusterDataManagerHelper.pushZnode(client, this)) { timer.newTimeout(this, getRetryInterval(), TimeUnit.MILLISECONDS); } } + @Override public String getZNodePath() { return zNodeName; } + @Override public byte[] getContents() { return contents; } + @Override public int getRetryInterval() { return retryInterval; } diff --git a/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClusterDataManagerHelper.java b/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClusterDataManagerHelper.java index ed3210222f1c..681b9219b4fb 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClusterDataManagerHelper.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperClusterDataManagerHelper.java @@ -16,7 +16,6 @@ package com.navercorp.pinpoint.web.cluster.zookeeper; -import com.navercorp.pinpoint.web.cluster.zookeeper.ZookeeperClusterDataManager.PushWebClusterJob; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +61,7 @@ Map getCollectorData(ZookeeperClient client, String path) { return Collections.emptyMap(); } - String bindingPathAndZNode(String path, String zNodeName) { + public String bindingPathAndZNode(String path, String zNodeName) { StringBuilder fullPath = new StringBuilder(); fullPath.append(path); @@ -87,7 +86,7 @@ String extractCollectorClusterId(String path, String collectorClusterPath) { return null; } - boolean pushWebClusterResource(ZookeeperClient client, PushWebClusterJob job) { + public boolean pushZnode(ZookeeperClient client, PushZnodeJob job) { if (job == null) { return false; } @@ -102,7 +101,7 @@ boolean pushWebClusterResource(ZookeeperClient client, PushWebClusterJob job) { // ip:port zNode naming scheme String nodeName = client.createNode(zNodePath, contents, CreateMode.EPHEMERAL); - logger.info("Register Web Cluster Zookeeper UniqPath = {}.", zNodePath); + logger.info("Register Zookeeper node UniqPath = {}.", zNodePath); return true; } catch (Exception e) { logger.warn(e.getMessage(), e); diff --git a/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperEventWatcher.java b/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperEventWatcher.java new file mode 100644 index 000000000000..3bfc18660c20 --- /dev/null +++ b/web/src/main/java/com/navercorp/pinpoint/web/cluster/zookeeper/ZookeeperEventWatcher.java @@ -0,0 +1,25 @@ +/* + * Copyright 2017 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.web.cluster.zookeeper; + +import org.apache.zookeeper.Watcher; + +/** + * @author minwoo.jung + */ +public interface ZookeeperEventWatcher extends Watcher { + boolean isConnected(); +}