Skip to content

Commit

Permalink
[#3515] Refactor for simpler agent info send task management
Browse files Browse the repository at this point in the history
  • Loading branch information
Xylus authored and emeroad committed Nov 9, 2017
1 parent 47a0b3a commit 50ee197
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,20 @@

package com.navercorp.pinpoint.profiler;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;

import com.navercorp.pinpoint.profiler.util.AgentInfoFactory;
import com.navercorp.pinpoint.rpc.DefaultFuture;
import com.navercorp.pinpoint.rpc.ResponseMessage;
import com.navercorp.pinpoint.thrift.dto.TResult;
import com.navercorp.pinpoint.thrift.io.HeaderTBaseDeserializerFactory;
import com.navercorp.pinpoint.thrift.util.SerializationUtils;
import org.apache.thrift.TBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.navercorp.pinpoint.common.util.PinpointThreadFactory;
import com.navercorp.pinpoint.profiler.sender.EnhancedDataSender;
import com.navercorp.pinpoint.thrift.dto.TAgentInfo;

Expand All @@ -46,105 +47,163 @@ public class AgentInfoSender {
private static final int DEFAULT_MAX_TRY_COUNT_PER_ATTEMPT = 3;

private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final ThreadFactory threadFactory = new PinpointThreadFactory("Pinpoint-agentInfo-sender", true);
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(threadFactory);

private final EnhancedDataSender dataSender;
private final AgentInfoFactory agentInfoFactory;
private final long refreshIntervalMs;
private final long sendIntervalMs;
private final int maxTryPerAttempt;
private final Scheduler scheduler;

private AgentInfoSender(Builder builder) {
this.dataSender = builder.dataSender;
this.agentInfoFactory = builder.agentInfoFactory;
this.refreshIntervalMs = builder.refreshIntervalMs;
this.sendIntervalMs = builder.sendIntervalMs;
this.maxTryPerAttempt = builder.maxTryPerAttempt;
this.scheduler = new Scheduler();
}

public void start() {
sendAgentInfo(Integer.MAX_VALUE);
this.executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
refresh();
}
}, this.refreshIntervalMs, this.refreshIntervalMs, TimeUnit.MILLISECONDS);
scheduler.start();
}

public void stop() {
this.executor.shutdown();
try {
this.executor.awaitTermination(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
scheduler.stop();
logger.info("AgentInfoSender stopped");
}

public void refresh() {
sendAgentInfo(this.maxTryPerAttempt);
scheduler.refresh();
}

private void sendAgentInfo(int maxTries) {
TAgentInfo agentInfo = agentInfoFactory.createAgentInfo();
AgentInfoSendRunnable agentInfoSendRunnable = new AgentInfoSendRunnable(agentInfo);
AgentInfoSendRunnableWrapper wrapper = new AgentInfoSendRunnableWrapper(agentInfoSendRunnable, maxTries);
wrapper.repeatWithFixedDelay(this.executor, 0, this.sendIntervalMs, TimeUnit.MILLISECONDS);
private interface SuccessListener {
void onSuccess();

SuccessListener NO_OP = new SuccessListener() {
@Override
public void onSuccess() {
// noop
}
};
}

private static class AgentInfoSendRunnableWrapper implements Runnable {
private final AgentInfoSendRunnable delegate;
private final int maxTryCount;
private final AtomicInteger tryCount = new AtomicInteger();
private volatile ScheduledFuture<?> self;
private class Scheduler {

private AgentInfoSendRunnableWrapper(AgentInfoSendRunnable agentInfoSendRunnable, int maxTryCount) {
if (agentInfoSendRunnable == null) {
throw new NullPointerException("agentInfoSendRunnable must not be null");
}
if (maxTryCount < 0) {
throw new IllegalArgumentException("maxTryCount must not be less than 0");
}
this.delegate = agentInfoSendRunnable;
this.maxTryCount = maxTryCount;
private static final long IMMEDIATE = 0L;
private final Timer timer = new Timer("Pinpoint-AgentInfoSender-Timer", true);
private final Object lock = new Object();
// protected by lock's monitor
private boolean isRunning = true;

private Scheduler() {
// preload
AgentInfoSendTask task = new AgentInfoSendTask(SuccessListener.NO_OP);
task.run();
}

@Override
public void run() {
// Cancel self when delegated runnable is completed successfully, or when max try count has been reached
if (this.delegate.isSuccessful() || this.tryCount.getAndIncrement() == this.maxTryCount) {
this.self.cancel(false);
} else {
this.delegate.run();
public void start() {
final SuccessListener successListener = new SuccessListener() {
@Override
public void onSuccess() {
schedule(this, maxTryPerAttempt, refreshIntervalMs, sendIntervalMs);
}
};
schedule(successListener, Integer.MAX_VALUE, IMMEDIATE, sendIntervalMs);
}

public void refresh() {
schedule(SuccessListener.NO_OP, maxTryPerAttempt, IMMEDIATE, sendIntervalMs);
}

private void schedule(SuccessListener successListener, int retryCount, long delay, long period) {
synchronized (lock) {
if (isRunning) {
AgentInfoSendTask task = new AgentInfoSendTask(successListener, retryCount);
timer.scheduleAtFixedRate(task, delay, period);
}
}
}

private void repeatWithFixedDelay(ScheduledExecutorService scheduledExecutorService, long initialDelay, long delay, TimeUnit unit) {
this.self = scheduledExecutorService.scheduleWithFixedDelay(this, initialDelay, delay, unit);
public void stop() {
synchronized (lock) {
isRunning = false;
timer.cancel();
}
}
}

private class AgentInfoSendRunnable implements Runnable {
private final AtomicBoolean isSuccessful = new AtomicBoolean(false);
private final AgentInfoSenderListener agentInfoSenderListener = new AgentInfoSenderListener(this.isSuccessful);
private final TAgentInfo agentInfo;
private class AgentInfoSendTask extends TimerTask {

private AgentInfoSendRunnable(TAgentInfo agentInfo) {
this.agentInfo = agentInfo;
private final SuccessListener taskHandler;
private final int retryCount;
private AtomicInteger counter;

private AgentInfoSendTask(SuccessListener taskHandler) {
this(taskHandler, 0);
}

private AgentInfoSendTask(SuccessListener taskHandler, int retryCount) {
if (taskHandler == null) {
throw new NullPointerException("taskHandler must not be null");
}
this.taskHandler = taskHandler;
this.retryCount = retryCount;
this.counter = new AtomicInteger(0);
}

@Override
public void run() {
if (!isSuccessful.get()) {
int runCount = counter.incrementAndGet();
if (runCount > retryCount) {
this.cancel();
return;
}
boolean isSuccessful = sendAgentInfo();
if (isSuccessful) {
logger.info("AgentInfo sent.");
this.cancel();
taskHandler.onSuccess();
}
}

private boolean sendAgentInfo() {
try {
TAgentInfo agentInfo = agentInfoFactory.createAgentInfo();
final DefaultFuture<ResponseMessage> future = new DefaultFuture<ResponseMessage>();

logger.info("Sending AgentInfo {}", agentInfo);
dataSender.request(this.agentInfo, this.agentInfoSenderListener);
dataSender.request(agentInfo, new AgentInfoSenderListener(future));
if (!future.await()) {
logger.warn("request timed out while waiting for response.");
return false;
}
if (!future.isSuccess()) {
Throwable t = future.getCause();
logger.warn("request failed.", t);
return false;
}
ResponseMessage responseMessage = future.getResult();
return getResult(responseMessage);
} catch (Exception e) {
logger.warn("failed to send agent info.", e);
}
return false;
}

public boolean isSuccessful() {
return this.isSuccessful.get();
private boolean getResult(ResponseMessage responseMessage) {
byte[] message = responseMessage.getMessage();
TBase<?, ?> tbase = SerializationUtils.deserialize(message, HeaderTBaseDeserializerFactory.DEFAULT_FACTORY, null);
if (!(tbase instanceof TResult)) {
logger.warn("Invalid response : {}", tbase.getClass());
return false;
}
TResult result = (TResult) tbase;
if (!result.isSuccess()) {
logger.warn("request unsuccessful. Cause : {}", result.getMessage());
return false;
}
return true;
}
}

Expand Down Expand Up @@ -194,5 +253,4 @@ public AgentInfoSender build() {
return new AgentInfoSender(this);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,58 +16,36 @@

package com.navercorp.pinpoint.profiler;

import com.navercorp.pinpoint.rpc.DefaultFuture;
import com.navercorp.pinpoint.rpc.Future;
import com.navercorp.pinpoint.rpc.FutureListener;
import com.navercorp.pinpoint.rpc.ResponseMessage;
import com.navercorp.pinpoint.thrift.dto.TResult;
import com.navercorp.pinpoint.thrift.io.HeaderTBaseDeserializerFactory;
import com.navercorp.pinpoint.thrift.util.SerializationUtils;
import org.apache.thrift.TBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicBoolean;

public class AgentInfoSenderListener implements FutureListener<ResponseMessage> {

private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final AtomicBoolean isSuccessful;
private final DefaultFuture<ResponseMessage> future;

public AgentInfoSenderListener(AtomicBoolean isSuccessful) {
this.isSuccessful = isSuccessful;
public AgentInfoSenderListener(DefaultFuture<ResponseMessage> future) {
this.future = future;
}

@Override
public void onComplete(Future<ResponseMessage> future) {
try {
if (future != null && future.isSuccess()) {
TBase<?, ?> tbase = deserialize(future);
if (tbase instanceof TResult) {
TResult result = (TResult) tbase;
if (result.isSuccess()) {
logger.debug("result success");
this.isSuccessful.set(true);
return;
} else {
logger.warn("request fail. Caused:{}", result.getMessage());
}
} else {
logger.warn("Invalid Class. {}", tbase);
}
}
} catch(Exception e) {
logger.warn("request fail. caused:{}", e.getMessage());
if (future == null) {
this.future.setFailure(new IllegalStateException("ResponseMessage future is null"));
return;
}
if (!future.isReady()) {
this.future.setFailure(new IllegalStateException("ResponseMessage future is not complete"));
return;
}
}

private TBase<?, ?> deserialize(Future<ResponseMessage> future) {
final ResponseMessage responseMessage = future.getResult();

// TODO Should we change this to thread local cache? This object's life cycle is different because it could be created many times.
// Should we cache this?
byte[] message = responseMessage.getMessage();
return SerializationUtils.deserialize(message, HeaderTBaseDeserializerFactory.DEFAULT_FACTORY, null);

if (future.isSuccess()) {
ResponseMessage responseMessage = future.getResult();
this.future.setResult(responseMessage);
} else {
Throwable cause = future.getCause();
this.future.setFailure(cause);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ public void reconnectionStressTest() throws InterruptedException {
final int randomMaxTime = 3000;
final long agentInfoSendRetryIntervalMs = 1000L;
final int maxTryPerAttempt = Integer.MAX_VALUE;
final int expectedTriesUntilSuccess = (int) stressTestTime / (randomMaxTime * 2);
final int expectedTriesUntilSuccess = (int) stressTestTime / (randomMaxTime * 2) - 1;

ResponseServerMessageListener serverListener = new ResponseServerMessageListener(requestCount, successCount, expectedTriesUntilSuccess);

Expand Down Expand Up @@ -486,14 +486,14 @@ private void createAndDeleteServer(ServerMessageListener listener, long waitTime
}

private void closeAll(PinpointServerAcceptor serverAcceptor, AgentInfoSender agentInfoSender, PinpointClientFactory factory) {
if (serverAcceptor != null) {
serverAcceptor.close();
}

if (agentInfoSender != null) {
agentInfoSender.stop();
}

if (serverAcceptor != null) {
serverAcceptor.close();
}

if (factory != null) {
factory.release();
}
Expand Down Expand Up @@ -580,13 +580,4 @@ public boolean checkCompleted() {

Assert.assertTrue(pass);
}

@Test
public void test() {
Integer a = new Integer(1);
Integer b = 2;

Integer integer = Integer.valueOf(1);
System.out.println("a=" + System.identityHashCode(a) + " valueOf:" + System.identityHashCode(integer));
}
}

0 comments on commit 50ee197

Please sign in to comment.