Skip to content

Commit

Permalink
[#5639] Add agent ping time history
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Jun 4, 2019
1 parent eb2a287 commit a95eeb7
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 128 deletions.
Expand Up @@ -24,22 +24,23 @@
import com.navercorp.pinpoint.common.server.util.AgentEventType;
import com.navercorp.pinpoint.common.server.util.AgentLifeCycleState;
import com.navercorp.pinpoint.grpc.AgentHeaderFactory;
import com.navercorp.pinpoint.grpc.server.LastAccessTime;
import com.navercorp.pinpoint.grpc.server.TransportMetadata;
import com.navercorp.pinpoint.grpc.server.lifecycle.Lifecycle;
import com.navercorp.pinpoint.grpc.server.lifecycle.LifecycleRegistry;
import com.navercorp.pinpoint.grpc.trace.KeepAliveGrpc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import javax.annotation.PreDestroy;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;

public class KeepAliveService extends KeepAliveGrpc.KeepAliveImplBase {
public class KeepAliveService {
private final Logger logger = LoggerFactory.getLogger(this.getClass());

private static final int CLOSE_MASK = 1;

private final AgentEventAsyncTaskService agentEventAsyncTask;
private final AgentLifeCycleAsyncTaskService agentLifeCycleAsyncTask;
private final LifecycleRegistry lifecycleRegistry;
Expand All @@ -55,26 +56,12 @@ public KeepAliveService(AgentEventAsyncTaskService agentEventAsyncTask,


public void updateState() {
Collection<Lifecycle> lifecycles = lifecycleRegistry.values();
final Collection<Lifecycle> lifecycles = lifecycleRegistry.values();
for (Lifecycle lifecycle : lifecycles) {
final AgentHeaderFactory.Header header = lifecycle.getRef();
if (header != null) {
logger.debug("updateState:{}", lifecycle);
final TransportMetadata transportMetadata = lifecycle.getTransportMetadata();

final long connectTime = transportMetadata.getConnectTime();
final long pingTimestamp = System.currentTimeMillis();
final long eventCount = transportMetadata.nextEventCount();
final AgentProperty channelProperties = newChannelProperties(header);
try {
if (!(eventCount < 0)) {
agentLifeCycleAsyncTask.handleLifeCycleEvent(channelProperties, pingTimestamp, AgentLifeCycleState.RUNNING, connectTime);
}
agentEventAsyncTask.handleEvent(channelProperties, pingTimestamp, AgentEventType.AGENT_PING);
} catch (Exception e) {
logger.warn("Error handling client ping event", e);
}
}
boolean closeState = false;
AgentLifeCycleState agentLifeCycleState = AgentLifeCycleState.RUNNING;
AgentEventType agentEventType = AgentEventType.AGENT_PING;
updateState(lifecycle, closeState, agentLifeCycleState, agentEventType);
}
}

Expand All @@ -86,25 +73,58 @@ private AgentProperty newChannelProperties(AgentHeaderFactory.Header header) {

public void updateState(Lifecycle lifecycle, ManagedAgentLifeCycle managedAgentLifeCycle) {

boolean closeState = managedAgentLifeCycle.isClosedEvent();
AgentLifeCycleState agentLifeCycleState = managedAgentLifeCycle.getMappedState();
AgentEventType agentEventType = managedAgentLifeCycle.getMappedEvent();
updateState(lifecycle, closeState, agentLifeCycleState, agentEventType);
}

public void updateState(Lifecycle lifecycle, boolean closeState, AgentLifeCycleState agentLifeCycleState, AgentEventType agentEventType) {

final AgentHeaderFactory.Header header = lifecycle.getRef();
if (header == null) {
logger.warn("Not found request header");
return;
}

logger.debug("updateState:{}", lifecycle);
final TransportMetadata transportMetadata = lifecycle.getTransportMetadata();

final long pingTimestamp = System.currentTimeMillis();
final AgentProperty agentProperty = newChannelProperties(header);
final LastAccessTime lastAccessTime = transportMetadata.getLastAccessTime();

AgentLifeCycleState agentLifeCycleState = managedAgentLifeCycle.getMappedState();
AgentEventType agentEventType = managedAgentLifeCycle.getMappedEvent();
final long eventIdentifier = getEventIdentifier(managedAgentLifeCycle, transportMetadata);

final long eventIdentifier = getEventIdentifier(lastAccessTime, pingTimestamp, closeState);
if (eventIdentifier == -1) {
// skip
return;
}
final AgentProperty agentProperty = newChannelProperties(header);

try {
this.agentLifeCycleAsyncTask.handleLifeCycleEvent(agentProperty , pingTimestamp, agentLifeCycleState, eventIdentifier);
this.agentEventAsyncTask.handleEvent(agentProperty, pingTimestamp, agentEventType);
} catch (Exception e) {
logger.warn("Failed to update state. header={}, lifeCycle={}", header, managedAgentLifeCycle);
logger.warn("Failed to update state. closeState:{} lifeCycle={} {}/{}", lifecycle, closeState, agentLifeCycleState, agentEventType);
}
}

private long getEventIdentifier(LastAccessTime lastAccessTime, long eventTime, boolean closeState) {
synchronized (lastAccessTime) {
if (closeState) {
if (lastAccessTime.expire(eventTime)) {
return eventTime + CLOSE_MASK;
} else {
// event runs after expire.
return -1;
}
} else {
if (lastAccessTime.isExpire()) {
return -1;
}
lastAccessTime.update(eventTime);
return eventTime;
}
}
}

Expand All @@ -116,11 +136,4 @@ public void destroy() {
}
}

private static final int CLOSE_MASK = 1;
private long getEventIdentifier(ManagedAgentLifeCycle managedAgentLifeCycle, TransportMetadata transportMetadata) {
if (ManagedAgentLifeCycle.isClosedEvent(managedAgentLifeCycle)) {
return transportMetadata.getConnectTime() + CLOSE_MASK;
}
return transportMetadata.getConnectTime();
}
}
Expand Up @@ -18,7 +18,6 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Set;
Expand All @@ -28,71 +27,66 @@
import com.navercorp.pinpoint.rpc.common.SocketStateCode;

public enum ManagedAgentLifeCycle {
RUNNING(0, SocketStateCode.RUN_SIMPLEX, SocketStateCode.RUN_DUPLEX),
RUNNING(0, AgentLifeCycleState.RUNNING, AgentEventType.AGENT_CONNECTED, SocketStateCode.RUN_SIMPLEX,
SocketStateCode.RUN_DUPLEX),

CLOSED_BY_CLIENT(Integer.MAX_VALUE, SocketStateCode.CLOSED_BY_CLIENT),
UNEXPECTED_CLOSE_BY_CLIENT(Integer.MAX_VALUE, SocketStateCode.UNEXPECTED_CLOSE_BY_CLIENT),
CLOSED_BY_SERVER(Integer.MAX_VALUE, SocketStateCode.CLOSED_BY_SERVER),
UNEXPECTED_CLOSE_BY_SERVER(Integer.MAX_VALUE, SocketStateCode.UNEXPECTED_CLOSE_BY_SERVER, SocketStateCode.ERROR_UNKNOWN,
SocketStateCode.ERROR_ILLEGAL_STATE_CHANGE, SocketStateCode.ERROR_SYNC_STATE_SESSION);
CLOSED_BY_CLIENT(Integer.MAX_VALUE, AgentLifeCycleState.SHUTDOWN, AgentEventType.AGENT_SHUTDOWN,
SocketStateCode.CLOSED_BY_CLIENT),

UNEXPECTED_CLOSE_BY_CLIENT(Integer.MAX_VALUE, AgentLifeCycleState.UNEXPECTED_SHUTDOWN, AgentEventType.AGENT_UNEXPECTED_SHUTDOWN,
SocketStateCode.UNEXPECTED_CLOSE_BY_CLIENT),

private static final EnumMap<ManagedAgentLifeCycle, AgentLifeCycleState> MAPPED_STATE = new EnumMap<>(
ManagedAgentLifeCycle.class);
CLOSED_BY_SERVER(Integer.MAX_VALUE, AgentLifeCycleState.DISCONNECTED, AgentEventType.AGENT_CLOSED_BY_SERVER,
SocketStateCode.CLOSED_BY_SERVER),

UNEXPECTED_CLOSE_BY_SERVER(Integer.MAX_VALUE, AgentLifeCycleState.DISCONNECTED, AgentEventType.AGENT_UNEXPECTED_CLOSE_BY_SERVER,
SocketStateCode.UNEXPECTED_CLOSE_BY_SERVER, SocketStateCode.ERROR_UNKNOWN,
SocketStateCode.ERROR_ILLEGAL_STATE_CHANGE, SocketStateCode.ERROR_SYNC_STATE_SESSION);

private static final EnumMap<ManagedAgentLifeCycle, AgentEventType> MAPPED_EVENT = new EnumMap<>(
ManagedAgentLifeCycle.class);

private static final EnumSet<ManagedAgentLifeCycle> CLOSED_EVENT
= EnumSet.of(CLOSED_BY_CLIENT, UNEXPECTED_CLOSE_BY_CLIENT, CLOSED_BY_SERVER, UNEXPECTED_CLOSE_BY_SERVER);

static {
MAPPED_STATE.put(RUNNING, AgentLifeCycleState.RUNNING);
MAPPED_STATE.put(CLOSED_BY_CLIENT, AgentLifeCycleState.SHUTDOWN);
MAPPED_STATE.put(UNEXPECTED_CLOSE_BY_CLIENT, AgentLifeCycleState.UNEXPECTED_SHUTDOWN);
MAPPED_STATE.put(CLOSED_BY_SERVER, AgentLifeCycleState.DISCONNECTED);
MAPPED_STATE.put(UNEXPECTED_CLOSE_BY_SERVER, AgentLifeCycleState.DISCONNECTED);

MAPPED_EVENT.put(RUNNING, AgentEventType.AGENT_CONNECTED);
MAPPED_EVENT.put(CLOSED_BY_CLIENT, AgentEventType.AGENT_SHUTDOWN);
MAPPED_EVENT.put(UNEXPECTED_CLOSE_BY_CLIENT, AgentEventType.AGENT_UNEXPECTED_SHUTDOWN);
MAPPED_EVENT.put(CLOSED_BY_SERVER, AgentEventType.AGENT_CLOSED_BY_SERVER);
MAPPED_EVENT.put(UNEXPECTED_CLOSE_BY_SERVER, AgentEventType.AGENT_UNEXPECTED_CLOSE_BY_SERVER);
}
private static final EnumSet<ManagedAgentLifeCycle> ALL = EnumSet.allOf(ManagedAgentLifeCycle.class);

private final int eventCounter;
private final Set<SocketStateCode> managedStateCodeSet;
private final AgentLifeCycleState agentLifeCycleState;
private final AgentEventType agentEventType;

ManagedAgentLifeCycle(int eventCounter, SocketStateCode... managedStateCodes) {
ManagedAgentLifeCycle(int eventCounter, AgentLifeCycleState agentLifeCycleState, AgentEventType agentEventType, SocketStateCode... managedStateCodes) {
this.eventCounter = eventCounter;
this.managedStateCodeSet = new HashSet<>(Arrays.asList(managedStateCodes));
this.agentLifeCycleState = agentLifeCycleState;
this.agentEventType = agentEventType;
this.managedStateCodeSet = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(managedStateCodes)));
}

public int getEventCounter() {
return this.eventCounter;
}

public Set<SocketStateCode> getManagedStateCodes() {
return Collections.unmodifiableSet(this.managedStateCodeSet);
return this.managedStateCodeSet;
}

public AgentLifeCycleState getMappedState() {
return MAPPED_STATE.get(this);
return this.agentLifeCycleState;
}

public AgentEventType getMappedEvent() {
return MAPPED_EVENT.get(this);
return agentEventType;
}

public static ManagedAgentLifeCycle getManagedAgentLifeCycleByStateCode(SocketStateCode stateCode) {
for (ManagedAgentLifeCycle agentLifeCycle : ManagedAgentLifeCycle.values()) {
for (ManagedAgentLifeCycle agentLifeCycle : ALL) {
if (agentLifeCycle.managedStateCodeSet.contains(stateCode)) {
return agentLifeCycle;
}
}
return null;
}

public static boolean isClosedEvent(ManagedAgentLifeCycle managedAgentLifeCycle) {
return CLOSED_EVENT.contains(managedAgentLifeCycle);
public boolean isClosedEvent() {
return CLOSED_EVENT.contains(this);
}
}
Expand Up @@ -462,7 +462,7 @@
<property name="awaitTerminationSeconds" value="10"/>
</bean>
<task:scheduled-tasks scheduler="grpcLifecycleScheduler">
<task:scheduled ref="keepAliveService" method="updateState" fixed-rate="5000"/>
<task:scheduled ref="keepAliveService" method="updateState" fixed-rate="#{ 60 * 1000 * 5 }"/>
</task:scheduled-tasks>
<bean id="lifecycleRegistry" class="com.navercorp.pinpoint.grpc.server.lifecycle.DefaultLifecycleRegistry"/>
<bean id="lifecycleListener" class="com.navercorp.pinpoint.collector.receiver.grpc.service.AgentLifecycleListener">
Expand Down
Expand Up @@ -19,10 +19,8 @@
import com.navercorp.pinpoint.grpc.AgentHeaderFactory;
import com.navercorp.pinpoint.grpc.HeaderFactory;
import com.navercorp.pinpoint.grpc.trace.AgentGrpc;
import com.navercorp.pinpoint.grpc.trace.KeepAliveGrpc;
import com.navercorp.pinpoint.grpc.trace.PAgentInfo;
import com.navercorp.pinpoint.grpc.trace.PApiMetaData;
import com.navercorp.pinpoint.grpc.trace.PPing;
import com.navercorp.pinpoint.grpc.trace.PResult;
import com.navercorp.pinpoint.grpc.trace.PSqlMetaData;
import com.navercorp.pinpoint.grpc.trace.PStringMetaData;
Expand All @@ -34,12 +32,10 @@
import io.grpc.LoadBalancer;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.PickFirstBalancerFactory;
import io.grpc.Status;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.grpc.util.RoundRobinLoadBalancerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -59,7 +55,6 @@ public class AgentClientMock {

private final ManagedChannel channel;
private final AgentGrpc.AgentBlockingStub agentStub;
private final KeepAliveGrpc.KeepAliveStub keepAliveStub;


public AgentClientMock(final String host, final int port, final boolean agentHeader) throws Exception {
Expand All @@ -76,7 +71,6 @@ public AgentClientMock(final String host, final int port, final boolean agentHea

channel = builder.build();
this.agentStub = AgentGrpc.newBlockingStub(channel);
this.keepAliveStub = KeepAliveGrpc.newStub(channel);
}

public void stop() throws InterruptedException {
Expand Down Expand Up @@ -136,38 +130,6 @@ public void stringMetaData(final int count) throws InterruptedException {
}
}

StreamObserver<PPing> requestObserver;

public void pingPoing() {
StreamObserver<PPing> responseObserver = new StreamObserver<PPing>() {
@Override
public void onNext(PPing ping) {
logger.info("Response {}", ping);
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
}
pingPong("ping");
}

@Override
public void onError(Throwable throwable) {
logger.info("Error ", throwable);
}

@Override
public void onCompleted() {
logger.info("Completed");
}
};
requestObserver = keepAliveStub.clientKeepAlive(responseObserver);
requestObserver.onNext(PPing.newBuilder().build());
}

private void pingPong(final String message) {
requestObserver.onNext(PPing.newBuilder().build());
}


private StreamObserver<PResult> getResponseObserver() {
StreamObserver<PResult> responseObserver = new StreamObserver<PResult>() {
Expand Down
Expand Up @@ -19,7 +19,6 @@
import com.navercorp.pinpoint.common.util.Assert;

import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicLong;

/**
* @author Woonduk Kang(emeroad)
Expand All @@ -29,13 +28,14 @@ public class DefaultTransportMetadata implements TransportMetadata {
private final InetSocketAddress remoteAddress;
private final Long transportId;
private final long connectTime;
private final AtomicLong eventCounter = new AtomicLong();
private final Object connectionLock = new Object();

private final LastAccessTime lastAccessTime;

public DefaultTransportMetadata(InetSocketAddress remoteAddress, long transportId, long connectTime) {
this.remoteAddress = Assert.requireNonNull(remoteAddress, "remoteAddress must not be null");
this.transportId = transportId;
this.connectTime = connectTime;
this.lastAccessTime = new LastAccessTime(connectTime);
}

@Override
Expand All @@ -53,23 +53,20 @@ public long getConnectTime() {
return connectTime;
}

@Override
public long nextEventCount() {
return eventCounter.incrementAndGet();
}

@Override
public Object getConnectionLock() {
// return this;
return connectionLock;
public LastAccessTime getLastAccessTime() {
return lastAccessTime;
}


@Override
public String toString() {
return "DefaultTransportMetadata{" +
"remoteAddress=" + remoteAddress +
", transportId=" + transportId +
", connectTime=" + connectTime +
", lastAccessTime=" + lastAccessTime +
'}';
}
}

0 comments on commit a95eeb7

Please sign in to comment.