Skip to content

Commit

Permalink
[#8619] Extract zookeeper cluster module
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Feb 3, 2022
1 parent 3efa962 commit ea03b6f
Show file tree
Hide file tree
Showing 53 changed files with 292 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import com.navercorp.pinpoint.web.cluster.connection.ClusterConnectionManager;
import com.navercorp.pinpoint.web.cluster.connection.ClusterConnector;
import com.navercorp.pinpoint.web.config.WebClusterConfig;
import org.apache.zookeeper.KeeperException;

import java.io.IOException;
import java.util.List;

/**
Expand All @@ -38,7 +36,7 @@ public EmptyClusterConnectionManager(WebClusterConfig config) {
}

@Override
public void start() throws InterruptedException, IOException, KeeperException {
public void start() {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@

import com.navercorp.pinpoint.web.cluster.ClusterDataManager;
import com.navercorp.pinpoint.web.vo.AgentInfo;
import org.apache.zookeeper.KeeperException;

import java.io.IOException;
import java.util.List;

/**
* @author minwoo.jung
*/
public class EmptyClusterDataManager implements ClusterDataManager {
@Override
public void start() throws InterruptedException, IOException, KeeperException, Exception {
public void start() {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import com.navercorp.pinpoint.web.cluster.ClusterManager;
import com.navercorp.pinpoint.web.config.WebClusterConfig;
import com.navercorp.pinpoint.web.vo.AgentInfo;
import org.apache.zookeeper.KeeperException;

import java.io.IOException;
import java.util.List;

/**
Expand All @@ -35,7 +33,7 @@ public EmptyClusterManager() {
};

@Override
public void start() throws InterruptedException, IOException, KeeperException {
public void start() {
}

@Override
Expand Down
4 changes: 4 additions & 0 deletions collector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-commons-server</artifactId>
</dependency>
<dependency>
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-commons-server-cluster</artifactId>
</dependency>
<dependency>
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-commons-hbase</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,16 @@
import com.navercorp.pinpoint.common.server.cluster.zookeeper.CuratorZookeeperClient;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperClient;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperEventWatcher;
import com.navercorp.pinpoint.common.server.util.concurrent.CommonState;
import com.navercorp.pinpoint.common.server.util.concurrent.CommonStateContext;
import org.apache.zookeeper.KeeperException;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.exception.PinpointZookeeperException;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonState;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonStateContext;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.springframework.util.Assert;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.Objects;

/**
Expand All @@ -40,6 +38,7 @@
public class FlinkClusterService {

private final Logger logger = LogManager.getLogger(this.getClass());

private final CommonStateContext serviceState;
private final FlinkConfiguration config;
private final FlinkClusterConnectionManager clusterConnectionManager;
Expand All @@ -56,7 +55,7 @@ public FlinkClusterService(FlinkConfiguration config, FlinkClusterConnectionMana
}

@PostConstruct
public void setUp() throws KeeperException, IOException, InterruptedException {
public void setUp() {
if (!config.isFlinkClusterEnable()) {
logger.info("flink cluster disable.");
return;
Expand All @@ -69,7 +68,11 @@ public void setUp() throws KeeperException, IOException, InterruptedException {

ClusterManagerWatcher watcher = new ClusterManagerWatcher(pinpointFlinkClusterPath);
this.client = new CuratorZookeeperClient(config.getFlinkClusterZookeeperAddress(), config.getFlinkClusterSessionTimeout(), watcher);
this.client.connect();
try {
this.client.connect();
} catch (PinpointZookeeperException e) {
throw new RuntimeException("ZookeeperClient connect failed", e);
}

this.zookeeperClusterManager = new ZookeeperClusterManager(client, pinpointFlinkClusterPath, clusterConnectionManager);
this.zookeeperClusterManager.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import com.navercorp.pinpoint.collector.util.AddressParser;
import com.navercorp.pinpoint.collector.util.MultipleAddress;
import com.navercorp.pinpoint.common.profiler.concurrent.PinpointThreadFactory;

import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperClient;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperConstants;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.exception.ConnectionException;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.exception.PinpointZookeeperException;
import com.navercorp.pinpoint.common.server.util.concurrent.CommonState;
import com.navercorp.pinpoint.common.server.util.concurrent.CommonStateContext;

import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonState;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonStateContext;
import com.navercorp.pinpoint.common.util.StringUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -123,7 +123,7 @@ public void stop() {
if (!(this.workerState.changeStateDestroying())) {
CommonState state = this.workerState.getCurrentState();

logger.info("{} already {}.", this.getClass().getSimpleName(), state.toString());
logger.info("{} already {}.", this.getClass().getSimpleName(), state);
return;
}

Expand Down Expand Up @@ -164,7 +164,7 @@ public void handleAndRegisterWatcher(String path) {
}
} else {
CommonState state = this.workerState.getCurrentState();
logger.info("{} invalid state {}.", this.getClass().getSimpleName(), state.toString());
logger.info("{} invalid state {}.", this.getClass().getSimpleName(), state);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@
import com.navercorp.pinpoint.common.server.cluster.zookeeper.CuratorZookeeperClient;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperClient;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperEventWatcher;
import com.navercorp.pinpoint.common.server.util.concurrent.CommonState;
import com.navercorp.pinpoint.common.server.util.concurrent.CommonStateContext;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.exception.PinpointZookeeperException;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonState;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonStateContext;
import com.navercorp.pinpoint.common.util.Assert;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -102,7 +103,7 @@ private CollectorClusterAcceptor newCollectorClusterAcceptor(CollectorClusterCon


@Override
public void setUp() throws IOException {
public void setUp() {
logger.info("pinpoint-collector cluster setUp");

switch (this.serviceState.getCurrentState()) {
Expand All @@ -112,7 +113,11 @@ public void setUp() throws IOException {

ClusterManagerWatcher watcher = new ClusterManagerWatcher();
this.client = new CuratorZookeeperClient(config.getClusterAddress(), config.getClusterSessionTimeout(), watcher);
this.client.connect();
try {
this.client.connect();
} catch (PinpointZookeeperException e) {
throw new RuntimeException("ZookeeperClient connect failed", e);
}

final String connectedAgentZNodePath = ZKPaths.makePath(config.getCollectorZNodePath(), serverIdentifier);

Expand Down Expand Up @@ -153,7 +158,7 @@ public void tearDown() {
if (!(this.serviceState.changeStateDestroying())) {
CommonState state = this.serviceState.getCurrentState();

logger.info("{} already {}.", this.getClass().getSimpleName(), state.toString());
logger.info("{} already {}.", this.getClass().getSimpleName(), state);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.navercorp.pinpoint.common.server.cluster.zookeeper.CreateNodeMessage;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperClient;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.exception.PinpointZookeeperException;
import com.navercorp.pinpoint.common.server.util.concurrent.CommonStateContext;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonStateContext;
import com.navercorp.pinpoint.common.util.BytesUtils;
import com.navercorp.pinpoint.rpc.util.ClassUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.navercorp.pinpoint.collector.cluster.ClusterPointRepository;
import com.navercorp.pinpoint.collector.cluster.ProfilerClusterManager;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperClient;
import com.navercorp.pinpoint.common.server.util.concurrent.CommonStateContext;

import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonStateContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
Expand Down
4 changes: 2 additions & 2 deletions collector/src/main/resources/applicationContext-collector.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
com.navercorp.pinpoint.collector.service,
com.navercorp.pinpoint.common.server.bo.codec,
com.navercorp.pinpoint.common.server.util,
com.navercorp.pinpoint.common.server.bo,
com.navercorp.pinpoint.common.server.config" />
com.navercorp.pinpoint.common.server.bo" />

<import resource="classpath:applicationContext-collector-profile.xml"/>
<import resource="classpath:applicationContext-collector-grpc.xml"/>
<import resource="classpath:applicationContext-collector-thrift.xml"/>
<import resource="classpath:applicationContext-collector-hbase.xml"/>
<import resource="classpath:applicationContext-collector-namespace.xml"/>

<bean class="com.navercorp.pinpoint.common.server.cluster.zookeeper.config.ClusterConfigurationFactory"/>

<bean id="metricRegistry" class="com.codahale.metrics.MetricRegistry">
</bean>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,26 @@ public InMemoryZookeeperClient(boolean throwException) {
}

@Override
public void connect() throws IOException {
public void connect() throws PinpointZookeeperException {
connected = true;
}

@Override
public synchronized void createPath(String value) throws PinpointZookeeperException, InterruptedException {
public synchronized void createPath(String value) throws PinpointZookeeperException {
ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(value);
contents.put(pathAndNode.getPath(), EMPTY_BYTE);
}

@Override
public synchronized void createNode(CreateNodeMessage createNodeMessage) throws PinpointZookeeperException, InterruptedException {
public synchronized void createNode(CreateNodeMessage createNodeMessage) throws PinpointZookeeperException {
byte[] bytes = contents.putIfAbsent(createNodeMessage.getNodePath(), createNodeMessage.getData());
if (bytes != null) {
throw new BadOperationException("node already exist");
}
}

@Override
public synchronized void createOrSetNode(CreateNodeMessage createNodeMessage) throws PinpointZookeeperException, KeeperException, InterruptedException {
public synchronized void createOrSetNode(CreateNodeMessage createNodeMessage) throws PinpointZookeeperException {
if (intAdder.incrementAndGet() % 2 == 1 && throwException) {
throw new PinpointZookeeperException("exception");
}
Expand All @@ -80,18 +80,18 @@ public synchronized void createOrSetNode(CreateNodeMessage createNodeMessage) th
}

@Override
public synchronized byte[] getData(String path) throws PinpointZookeeperException, InterruptedException {
public synchronized byte[] getData(String path) throws PinpointZookeeperException {
byte[] bytes = contents.get(path);
return bytes;
}

@Override
public byte[] getData(String path, boolean watch) throws PinpointZookeeperException, InterruptedException {
public byte[] getData(String path, boolean watch) throws PinpointZookeeperException {
return contents.get(path);
}

@Override
public synchronized void delete(String path) throws PinpointZookeeperException, InterruptedException {
public synchronized void delete(String path) throws PinpointZookeeperException {
contents.remove(path);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.navercorp.pinpoint.collector.receiver.grpc.RecordedStreamObserver;
import com.navercorp.pinpoint.collector.receiver.grpc.service.command.GrpcCommandService;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperConstants;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.exception.PinpointZookeeperException;
import com.navercorp.pinpoint.common.trace.ServiceType;
import com.navercorp.pinpoint.grpc.Header;
import com.navercorp.pinpoint.grpc.server.DefaultTransportMetadata;
Expand Down Expand Up @@ -67,7 +68,7 @@ private ConditionFactory awaitility() {
}

@Test
public void oldVersionHandshakeTest() throws IOException {
public void oldVersionHandshakeTest() throws IOException, PinpointZookeeperException {
ZookeeperProfilerClusterManager manager = creteMemoryClusterManager();

ZookeeperClusterService mockClusterService = Mockito.mock(ZookeeperClusterService.class);
Expand All @@ -91,7 +92,7 @@ public void oldVersionHandshakeTest() throws IOException {
}

@Test
public void oldVersionHandshakeFailTest() throws IOException {
public void oldVersionHandshakeFailTest() throws IOException, PinpointZookeeperException {
ZookeeperProfilerClusterManager manager = creteMemoryClusterManager();

ZookeeperClusterService mockClusterService = Mockito.mock(ZookeeperClusterService.class);
Expand All @@ -117,7 +118,7 @@ public void oldVersionHandshakeFailTest() throws IOException {
}

@Test
public void newVersionHandshakeTest() throws IOException {
public void newVersionHandshakeTest() throws IOException, PinpointZookeeperException {
ZookeeperProfilerClusterManager manager = creteMemoryClusterManager();

ZookeeperClusterService mockClusterService = Mockito.mock(ZookeeperClusterService.class);
Expand All @@ -136,7 +137,7 @@ public void newVersionHandshakeTest() throws IOException {
}
}

private ZookeeperProfilerClusterManager creteMemoryClusterManager() throws IOException {
private ZookeeperProfilerClusterManager creteMemoryClusterManager() throws PinpointZookeeperException {
InMemoryZookeeperClient zookeeperClient = new InMemoryZookeeperClient();
zookeeperClient.connect();

Expand Down
Loading

0 comments on commit ea03b6f

Please sign in to comment.