Skip to content

Commit

Permalink
Improve state synchronization between server and client. #136
Browse files Browse the repository at this point in the history
Divide PinpointServerSocket into PinpointServerAcceptor and
PinpointServer.
  • Loading branch information
koo-taejin committed Feb 12, 2015
1 parent d245458 commit df87df5
Show file tree
Hide file tree
Showing 50 changed files with 1,535 additions and 1,475 deletions.
Expand Up @@ -22,33 +22,28 @@

import com.navercorp.pinpoint.collector.receiver.tcp.AgentHandshakePropertyType;
import com.navercorp.pinpoint.rpc.Future;
import com.navercorp.pinpoint.rpc.server.ChannelContext;
import com.navercorp.pinpoint.rpc.server.SocketChannel;
import com.navercorp.pinpoint.rpc.server.PinpointServer;
import com.navercorp.pinpoint.rpc.util.AssertUtils;
import com.navercorp.pinpoint.rpc.util.MapUtils;

/**
* @author koo.taejin
*/
public class ChannelContextClusterPoint implements TargetClusterPoint {
public class PinpointServerClusterPoint implements TargetClusterPoint {

private final ChannelContext channelContext;
private final SocketChannel socketChannel;
private final PinpointServer pinpointServer;

private final String applicationName;
private final String agentId;
private final long startTimeStamp;

private final String version;

public ChannelContextClusterPoint(ChannelContext channelContext) {
AssertUtils.assertNotNull(channelContext, "ChannelContext may not be null.");
this.channelContext = channelContext;
public PinpointServerClusterPoint(PinpointServer pinpointServer) {
AssertUtils.assertNotNull(pinpointServer, "pinpointServer may not be null.");
this.pinpointServer = pinpointServer;

this.socketChannel = channelContext.getSocketChannel();
AssertUtils.assertNotNull(socketChannel, "SocketChannel may not be null.");

Map<Object, Object> properties = channelContext.getChannelProperties();
Map<Object, Object> properties = pinpointServer.getChannelProperties();
this.version = MapUtils.getString(properties, AgentHandshakePropertyType.VERSION.getName());
AssertUtils.assertTrue(!StringUtils.isBlank(version), "Version may not be null or empty.");

Expand All @@ -63,13 +58,13 @@ public ChannelContextClusterPoint(ChannelContext channelContext) {
}

@Override
public void send(byte[] data) {
socketChannel.sendMessage(data);
public void send(byte[] payload) {
pinpointServer.send(payload);
}

@Override
public Future request(byte[] data) {
return socketChannel.sendRequestMessage(data);
public Future request(byte[] payload) {
return pinpointServer.request(payload);
}

@Override
Expand All @@ -91,13 +86,13 @@ public String gerVersion() {
return version;
}

public ChannelContext getChannelContext() {
return channelContext;
public PinpointServer getPinpointServer() {
return pinpointServer;
}

@Override
public String toString() {
return socketChannel.toString();
return pinpointServer.toString();
}

@Override
Expand All @@ -118,11 +113,11 @@ public boolean equals(Object obj) {
return true;
}

if (!(obj instanceof ChannelContextClusterPoint)) {
if (!(obj instanceof PinpointServerClusterPoint)) {
return false;
}

if (this.getChannelContext() == ((ChannelContextClusterPoint) obj).getChannelContext()) {
if (this.getPinpointServer() == ((PinpointServerClusterPoint) obj).getPinpointServer()) {
return true;
}

Expand Down
Expand Up @@ -20,13 +20,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.navercorp.pinpoint.collector.cluster.ChannelContextClusterPoint;
import com.navercorp.pinpoint.collector.cluster.ClusterPointLocator;
import com.navercorp.pinpoint.collector.cluster.PinpointServerClusterPoint;
import com.navercorp.pinpoint.collector.cluster.TargetClusterPoint;
import com.navercorp.pinpoint.rpc.ResponseMessage;
import com.navercorp.pinpoint.rpc.packet.stream.StreamClosePacket;
import com.navercorp.pinpoint.rpc.packet.stream.StreamResponsePacket;
import com.navercorp.pinpoint.rpc.server.ChannelContext;
import com.navercorp.pinpoint.rpc.server.PinpointServer;
import com.navercorp.pinpoint.rpc.stream.ClientStreamChannel;
import com.navercorp.pinpoint.rpc.stream.ClientStreamChannelContext;
import com.navercorp.pinpoint.rpc.stream.ClientStreamChannelMessageListener;
Expand Down Expand Up @@ -95,13 +95,13 @@ private RouteResult onRoute0(StreamEvent event) {
}

try {
if (clusterPoint instanceof ChannelContextClusterPoint) {
if (clusterPoint instanceof PinpointServerClusterPoint) {
StreamRouteManager routeManager = new StreamRouteManager(event);

ServerStreamChannelContext consumerContext = event.getStreamChannelContext();
consumerContext.setAttributeIfAbsent(ATTACHMENT_KEY, routeManager);

ClientStreamChannelContext producerContext = createStreamChannel((ChannelContextClusterPoint) clusterPoint, event.getDeliveryCommand().getPayload(), routeManager);
ClientStreamChannelContext producerContext = createStreamChannel((PinpointServerClusterPoint) clusterPoint, event.getDeliveryCommand().getPayload(), routeManager);
routeManager.setProducer(producerContext.getStreamChannel());

return new RouteResult(RouteStatus.OK);
Expand All @@ -117,9 +117,9 @@ private RouteResult onRoute0(StreamEvent event) {
return new RouteResult(RouteStatus.NOT_ACCEPTABLE_UNKNOWN);
}

private ClientStreamChannelContext createStreamChannel(ChannelContextClusterPoint clusterPoint, byte[] payload, ClientStreamChannelMessageListener messageListener) {
ChannelContext channelContext = clusterPoint.getChannelContext();
return channelContext.createStreamChannel(payload, messageListener);
private ClientStreamChannelContext createStreamChannel(PinpointServerClusterPoint clusterPoint, byte[] payload, ClientStreamChannelMessageListener messageListener) {
PinpointServer pinpointServer = clusterPoint.getPinpointServer();
return pinpointServer.createStream(payload, messageListener);
}

public void close(ServerStreamChannelContext consumerContext) {
Expand Down
Expand Up @@ -38,7 +38,7 @@
import com.navercorp.pinpoint.collector.cluster.WorkerStateContext;
import com.navercorp.pinpoint.collector.config.CollectorConfiguration;
import com.navercorp.pinpoint.collector.util.CollectorUtils;
import com.navercorp.pinpoint.rpc.server.ChannelContext;
import com.navercorp.pinpoint.rpc.server.PinpointServer;
import com.navercorp.pinpoint.rpc.server.handler.ChannelStateChangeEventHandler;

/**
Expand Down Expand Up @@ -203,9 +203,9 @@ public void process(WatchedEvent event) {

// duplicate event possible - but the logic does not change
if (ZookeeperUtils.isConnectedEvent(state, eventType)) {
List<ChannelContext> currentChannelContextList = profilerClusterManager.getRegisteredChannelContextList();
for (ChannelContext channelContext : currentChannelContextList) {
profilerClusterManager.eventPerformed(channelContext, channelContext.getCurrentStateCode());
List<PinpointServer> pinpointServerList = profilerClusterManager.getRegisteredPinpointServerList();
for (PinpointServer pinpointServer : pinpointServerList) {
profilerClusterManager.eventPerformed(pinpointServer, pinpointServer.getCurrentStateCode());
}

webClusterManager.handleAndRegisterWatcher(PINPOINT_WEB_CLUSTER_PATH);
Expand Down
Expand Up @@ -40,8 +40,8 @@
import com.navercorp.pinpoint.collector.cluster.zookeeper.job.UpdateJob;
import com.navercorp.pinpoint.collector.receiver.tcp.AgentHandshakePropertyType;
import com.navercorp.pinpoint.common.util.PinpointThreadFactory;
import com.navercorp.pinpoint.rpc.server.ChannelContext;
import com.navercorp.pinpoint.rpc.server.PinpointServerSocketStateCode;
import com.navercorp.pinpoint.rpc.server.PinpointServer;
import com.navercorp.pinpoint.rpc.server.PinpointServerStateCode;
import com.navercorp.pinpoint.rpc.util.MapUtils;

/**
Expand Down Expand Up @@ -70,10 +70,10 @@ public class ZookeeperLatestJobWorker implements Runnable {

private final ZookeeperClient zookeeperClient;

private final ConcurrentHashMap<ChannelContext, Job> latestJobRepository = new ConcurrentHashMap<ChannelContext, Job>();
private final ConcurrentHashMap<PinpointServer, Job> latestJobRepository = new ConcurrentHashMap<PinpointServer, Job>();

// Storage for managing ChannelContexts received by Worker
private final CopyOnWriteArrayList<ChannelContext> channelContextRepository = new CopyOnWriteArrayList<ChannelContext>();
// Storage for managing PinpointServers received by Worker
private final CopyOnWriteArrayList<PinpointServer> pinpointServerRepository = new CopyOnWriteArrayList<PinpointServer>();

private final BlockingQueue<Job> leakJobQueue = new LinkedBlockingQueue<Job>();

Expand Down Expand Up @@ -143,22 +143,22 @@ public void run() {

// Things to consider
// spinlock possible when events are not deleted
// may lead to ChannelContext leak when events are left unresolved
// may lead to PinpointServer leak when events are left unresolved
while (workerState.isStarted()) {
boolean eventCreated = await(60000, 200);
if (!workerState.isStarted()) {
break;
}

// handle events
// check and handle ChannelContext leak if events are not triggered
// check and handle PinpointServer leak if events are not triggered
if (eventCreated) {
// to avoid ConcurrentModificationException
Iterator<ChannelContext> keyIterator = getLatestJobRepositoryKeyIterator();
Iterator<PinpointServer> keyIterator = getLatestJobRepositoryKeyIterator();

while (keyIterator.hasNext()) {
ChannelContext channelContext = keyIterator.next();
Job job = getJob(channelContext);
PinpointServer pinpointServer = keyIterator.next();
Job job = getJob(pinpointServer);
if (job == null) {
continue;
}
Expand All @@ -182,14 +182,14 @@ public void run() {
}

if (job instanceof UpdateJob) {
putRetryJob(new UpdateJob(job.getChannelContext(), 1, ((UpdateJob) job).getContents()));
putRetryJob(new UpdateJob(job.getPinpointServer(), 1, ((UpdateJob) job).getContents()));
}
}

for (ChannelContext channelContext : channelContextRepository) {
if (PinpointServerSocketStateCode.isFinished(channelContext.getCurrentStateCode())) {
logger.info("LeakDetector Find Leak ChannelContext={}.", channelContext);
putJob(new DeleteJob(channelContext));
for (PinpointServer pinpointServer : pinpointServerRepository) {
if (PinpointServerStateCode.isFinished(pinpointServer.getCurrentStateCode())) {
logger.info("LeakDetector Find Leak PinpointServer={}.", pinpointServer);
putJob(new DeleteJob(pinpointServer));
}
}

Expand All @@ -200,16 +200,16 @@ public void run() {
}

public boolean handleUpdate(UpdateJob job) {
ChannelContext channelContext = job.getChannelContext();
PinpointServer pinpointServer = job.getPinpointServer();

PinpointServerSocketStateCode code = channelContext.getCurrentStateCode();
if (PinpointServerSocketStateCode.isFinished(code)) {
putJob(new DeleteJob(channelContext));
PinpointServerStateCode code = pinpointServer.getCurrentStateCode();
if (PinpointServerStateCode.isFinished(code)) {
putJob(new DeleteJob(pinpointServer));
return false;
}

try {
String addContents = createProfilerContents(channelContext);
String addContents = createProfilerContents(pinpointServer);

if (zookeeperClient.exists(collectorUniqPath)) {
byte[] contents = zookeeperClient.getData(collectorUniqPath);
Expand All @@ -234,18 +234,18 @@ public boolean handleUpdate(UpdateJob job) {
}

public boolean handleDelete(Job job) {
ChannelContext channelContext = job.getChannelContext();
PinpointServer pinpointServer = job.getPinpointServer();

try {
if (zookeeperClient.exists(collectorUniqPath)) {
byte[] contents = zookeeperClient.getData(collectorUniqPath);

String removeContents = createProfilerContents(channelContext);
String removeContents = createProfilerContents(pinpointServer);
String data = removeIfExistContents(new String(contents, charset), removeContents);

zookeeperClient.setData(collectorUniqPath, data.getBytes(charset));
}
channelContextRepository.remove(channelContext);
pinpointServerRepository.remove(pinpointServer);
return true;
} catch (Exception e) {
logger.warn(e.getMessage(), e);
Expand All @@ -267,8 +267,8 @@ public byte[] getClusterData() {
return null;
}

public List<ChannelContext> getRegisteredChannelContextList() {
return new ArrayList<ChannelContext>(channelContextRepository);
public List<PinpointServer> getRegisteredPinpointServerList() {
return new ArrayList<PinpointServer>(pinpointServerRepository);
}

/**
Expand Down Expand Up @@ -312,29 +312,29 @@ private boolean isOverWaitTime(long waitTimeMillis, long startTimeMillis) {
return waitTimeMillis < (System.currentTimeMillis() - startTimeMillis);
}

private Iterator<ChannelContext> getLatestJobRepositoryKeyIterator() {
private Iterator<PinpointServer> getLatestJobRepositoryKeyIterator() {
synchronized (lock) {
return latestJobRepository.keySet().iterator();
}
}

// must be invoked within a Runnable only
private Job getJob(ChannelContext channelContext) {
private Job getJob(PinpointServer pinpointServer) {
synchronized (lock) {
Job job = latestJobRepository.remove(channelContext);
Job job = latestJobRepository.remove(pinpointServer);
return job;
}
}

public void putJob(Job job) {
ChannelContext channelContext = job.getChannelContext();
if (!checkRequiredProperties(channelContext)) {
PinpointServer pinpointServer = job.getPinpointServer();
if (!checkRequiredProperties(pinpointServer)) {
return;
}

synchronized (lock) {
channelContextRepository.addIfAbsent(channelContext);
latestJobRepository.put(channelContext, job);
pinpointServerRepository.addIfAbsent(pinpointServer);
latestJobRepository.put(pinpointServer, job);
lock.notifyAll();
}
}
Expand All @@ -350,10 +350,10 @@ private void putRetryJob(Job job) {
return;
}

ChannelContext channelContext = job.getChannelContext();
PinpointServer pinpointServer = job.getPinpointServer();

synchronized (lock) {
latestJobRepository.putIfAbsent(channelContext, job);
latestJobRepository.putIfAbsent(pinpointServer, job);
lock.notifyAll();
}
}
Expand All @@ -370,8 +370,8 @@ private String bindingPathAndZnode(String path, String znodeName) {
return fullPath.toString();
}

private boolean checkRequiredProperties(ChannelContext channelContext) {
Map<Object, Object> agentProperties = channelContext.getChannelProperties();
private boolean checkRequiredProperties(PinpointServer pinpointServer) {
Map<Object, Object> agentProperties = pinpointServer.getChannelProperties();
final String applicationName = MapUtils.getString(agentProperties, AgentHandshakePropertyType.APPLICATION_NAME.getName());
final String agentId = MapUtils.getString(agentProperties, AgentHandshakePropertyType.AGENT_ID.getName());
final Long startTimeStampe = MapUtils.getLong(agentProperties, AgentHandshakePropertyType.START_TIMESTAMP.getName());
Expand All @@ -384,10 +384,10 @@ private boolean checkRequiredProperties(ChannelContext channelContext) {
return true;
}

private String createProfilerContents(ChannelContext channelContext) {
private String createProfilerContents(PinpointServer pinpointServer) {
StringBuilder profilerContents = new StringBuilder();

Map<Object, Object> agentProperties = channelContext.getChannelProperties();
Map<Object, Object> agentProperties = pinpointServer.getChannelProperties();
final String applicationName = MapUtils.getString(agentProperties, AgentHandshakePropertyType.APPLICATION_NAME.getName());
final String agentId = MapUtils.getString(agentProperties, AgentHandshakePropertyType.AGENT_ID.getName());
final Long startTimeStampe = MapUtils.getLong(agentProperties, AgentHandshakePropertyType.START_TIMESTAMP.getName());
Expand Down

0 comments on commit df87df5

Please sign in to comment.