Skip to content

Commit

Permalink
Implement realtime span delivery service. #560
Browse files Browse the repository at this point in the history
code cleanup.
  • Loading branch information
koo-taejin committed Oct 14, 2015
1 parent bac83c7 commit f5e3969
Show file tree
Hide file tree
Showing 17 changed files with 172 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.navercorp.pinpoint.rpc.server.ServerMessageListener;
import com.navercorp.pinpoint.rpc.server.handler.ServerStateChangeEventHandler;
import com.navercorp.pinpoint.rpc.util.ClassUtils;
import com.navercorp.pinpoint.thrift.io.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -89,14 +88,6 @@ public void stop() {
logger.info("{} destroying completed.", ClassUtils.simpleClassName(this));
}

private SerializerFactory<HeaderTBaseSerializer> wrappedThreadLocalSerializerFactory(SerializerFactory<HeaderTBaseSerializer> serializerFactory) {
return new ThreadLocalHeaderTBaseSerializerFactory<HeaderTBaseSerializer>(serializerFactory);
}

private DeserializerFactory<HeaderTBaseDeserializer> wrappedThreadLocalDeserializerFactory(DeserializerFactory<HeaderTBaseDeserializer> deserializerFactory) {
return new ThreadLocalHeaderTBaseDeserializerFactory<HeaderTBaseDeserializer>(deserializerFactory);
}

class ClusterServerMessageListener implements ServerMessageListener {

private final String clusterId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,24 @@ public void close(ServerStreamChannelContext consumerContext) {
}
}

private TCommandTransferResponse createResponse(TRouteResult result) {
return createResponse(result, new byte[0]);
}

private TCommandTransferResponse createResponse(TRouteResult result, byte[] payload) {
TCommandTransferResponse response = new TCommandTransferResponse();
response.setRouteResult(result);
response.setPayload(payload);
return response;
}

private byte[] serialize(TBase<?,?> result) {
return SerializationUtils.serialize(result, commandSerializerFactory, null);
}


// fix me : StreamRouteManager will change worker thread pattern.
private class StreamRouteManager implements ClientStreamChannelMessageListener,StreamChannelStateChangeEventHandler<ClientStreamChannel> {
private class StreamRouteManager implements ClientStreamChannelMessageListener, StreamChannelStateChangeEventHandler<ClientStreamChannel> {

private final StreamEvent streamEvent;
private final ServerStreamChannel consumer;
Expand Down Expand Up @@ -179,16 +195,6 @@ public void handleStreamClose(ClientStreamChannelContext producerContext, Stream
consumer.close();
}

public void close() {
if (this.consumer != null) {
consumer.close();
}

if (this.producer != null) {
producer.close();
}
}

@Override
public void eventPerformed(ClientStreamChannel streamChannel, StreamChannelStateCode updatedStateCode) throws Exception {
logger.info("eventPerformed streamChannel:{}, stateCode:{}", streamChannel, updatedStateCode);
Expand All @@ -201,14 +207,23 @@ public void eventPerformed(ClientStreamChannel streamChannel, StreamChannelState
}
break;
}

}

@Override
public void exceptionCaught(ClientStreamChannel streamChannel, StreamChannelStateCode updatedStateCode, Throwable e) {
logger.warn("exceptionCaught message:{}, streamChannel:{}, stateCode:{}", e.getMessage(), streamChannel, updatedStateCode, e);
}

public void close() {
if (consumer != null) {
consumer.close();
}

if (producer != null) {
producer.close();
}
}

public ClientStreamChannel getProducer() {
return producer;
}
Expand All @@ -219,20 +234,4 @@ public void setProducer(ClientStreamChannel sourceStreamChannel) {

}

private TCommandTransferResponse createResponse(TRouteResult result) {
return createResponse(result, new byte[0]);
}

private TCommandTransferResponse createResponse(TRouteResult result, byte[] payload) {
TCommandTransferResponse response = new TCommandTransferResponse();
response.setRouteResult(result);
response.setPayload(payload);
return response;
}

private byte[] serialize(TBase<?,?> result) {
return SerializationUtils.serialize(result, commandSerializerFactory, null);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@

package com.navercorp.pinpoint.profiler;

import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.thrift.TBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.navercorp.pinpoint.rpc.Future;
import com.navercorp.pinpoint.rpc.FutureListener;
import com.navercorp.pinpoint.rpc.ResponseMessage;
import com.navercorp.pinpoint.thrift.dto.TResult;
import com.navercorp.pinpoint.thrift.io.HeaderTBaseDeserializerFactory;
import com.navercorp.pinpoint.thrift.util.SerializationUtils;
import org.apache.thrift.TBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicBoolean;

public class AgentInfoSenderListener implements FutureListener<ResponseMessage> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.navercorp.pinpoint.rpc.stream.ServerStreamChannelContext;
import com.navercorp.pinpoint.rpc.stream.ServerStreamChannelMessageListener;
import com.navercorp.pinpoint.thrift.dto.TResult;
import com.navercorp.pinpoint.thrift.io.*;
import com.navercorp.pinpoint.thrift.util.SerializationUtils;
import org.apache.thrift.TBase;
import org.slf4j.Logger;
Expand Down Expand Up @@ -115,12 +114,4 @@ public void registerCommandService(ProfilerCommandServiceGroup commandServiceGro
this.commandServiceRegistry.addService(commandServiceGroup);
}

private SerializerFactory<HeaderTBaseSerializer> wrappedThreadLocalSerializerFactory(SerializerFactory<HeaderTBaseSerializer> serializerFactory) {
return new ThreadLocalHeaderTBaseSerializerFactory<HeaderTBaseSerializer>(serializerFactory);
}

private DeserializerFactory<HeaderTBaseDeserializer> wrappedThreadLocalDeserializerFactory(DeserializerFactory<HeaderTBaseDeserializer> deserializerFactory) {
return new ThreadLocalHeaderTBaseDeserializerFactory<HeaderTBaseDeserializer>(deserializerFactory);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,12 @@
*/
public class CommandSerializer {

public static final SerializerFactory<HeaderTBaseSerializer> SERIALIZER_FACTORY;
public static final DeserializerFactory<HeaderTBaseDeserializer> DESERIALIZER_FACTORY;
public static final CommandHeaderTBaseSerializerFactory SERIALIZER_FACTORY;
public static final CommandHeaderTBaseDeserializerFactory DESERIALIZER_FACTORY;

static {
TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
TCommandRegistry commandTbaseRegistry = new TCommandRegistry(TCommandTypeVersion.getVersion(Version.VERSION));

SerializerFactory<HeaderTBaseSerializer> serializerFactory = new HeaderTBaseSerializerFactory(true, HeaderTBaseSerializerFactory.DEFAULT_UDP_STREAM_MAX_SIZE, protocolFactory, commandTbaseRegistry);
SERIALIZER_FACTORY = wrappedThreadLocalSerializerFactory(serializerFactory);

DeserializerFactory<HeaderTBaseDeserializer> deserializerFactory = new HeaderTBaseDeserializerFactory(protocolFactory, commandTbaseRegistry);
DESERIALIZER_FACTORY = wrappedThreadLocalDeserializerFactory(deserializerFactory);
SERIALIZER_FACTORY = new CommandHeaderTBaseSerializerFactory(Version.VERSION);
DESERIALIZER_FACTORY = new CommandHeaderTBaseDeserializerFactory(Version.VERSION);
}

private static SerializerFactory<HeaderTBaseSerializer> wrappedThreadLocalSerializerFactory(SerializerFactory<HeaderTBaseSerializer> serializerFactory) {
Expand All @@ -51,4 +45,4 @@ private static DeserializerFactory<HeaderTBaseDeserializer> wrappedThreadLocalDe
return new ThreadLocalHeaderTBaseDeserializerFactory<HeaderTBaseDeserializer>(deserializerFactory);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@
*/
public class ActiveThreadCountService implements ProfilerRequestCommandService, ProfilerStreamCommandService {

private static final long DEFAULT_FLUSH_DELAY = 1000;

private final Logger logger = LoggerFactory.getLogger(this.getClass());

private final Object lock = new Object();

// it will be changed.
private final StreamChannelStateChangeEventHandler stateChangeEventHandler = new ActiveThreadCountStreamChannelStateChangeEventHandler();
private final HashedWheelTimer timer = TimerFactory.createHashedWheelTimer("ActiveThreadCountService-Timer", 100, TimeUnit.MILLISECONDS, 512);
private final long time = 1000;
private final long flushDelay;
private final AtomicBoolean onTimerTask = new AtomicBoolean(false);

private final List<ServerStreamChannel> streamChannelRepository = new CopyOnWriteArrayList<ServerStreamChannel>();
Expand All @@ -80,13 +81,18 @@ public class ActiveThreadCountService implements ProfilerRequestCommandService,
private final HistogramSchema histogramSchema = HistogramSchema.NORMAL_SCHEMA;

public ActiveThreadCountService(ActiveTraceLocator activeTraceLocator) {
this(activeTraceLocator, DEFAULT_FLUSH_DELAY);
}

public ActiveThreadCountService(ActiveTraceLocator activeTraceLocator, long flushDelay) {
if (activeTraceLocator == null) {
throw new NullPointerException("activeTraceLocator");
}
this.activeTraceLocator = activeTraceLocator;
this.activeThreadSlotsCount = ACTIVE_THREAD_SLOTS_ORDER.size();
}

this.flushDelay = flushDelay;
}

@Override
public Class<? extends TBase> getCommandClazz() {
Expand Down Expand Up @@ -165,7 +171,7 @@ public void eventPerformed(ServerStreamChannel streamChannel, StreamChannelState
streamChannelRepository.add(streamChannel);
boolean turnOn = onTimerTask.compareAndSet(false, true);
if (turnOn) {
timer.newTimeout(new ActiveThreadCountTimerTask(), time, TimeUnit.MILLISECONDS);
timer.newTimeout(new ActiveThreadCountTimerTask(), flushDelay, TimeUnit.MILLISECONDS);
}
break;
case CLOSED:
Expand Down Expand Up @@ -203,7 +209,7 @@ public void run(Timeout timeout) throws Exception {
}
} finally {
if (timer != null && onTimerTask.get()) {
timer.newTimeout(this, time, TimeUnit.MILLISECONDS);
timer.newTimeout(this, flushDelay, TimeUnit.MILLISECONDS);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ private List<ThreadInfo> getThreadInfo(String threadName) {
return Arrays.asList(getAllThreadInfo());
}

ThreadInfo[] threadInfos = getAllThreadInfo();
for (ThreadInfo threadIno : getAllThreadInfo()) {
if (threadName.equals(threadIno.getThreadName())) {
result.add(threadIno);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,6 @@ private void assertParsed() {
}
}

private TBase deserialize(DeserializerFactory<HeaderTBaseDeserializer> commandDeserializerFactory, byte[] objectData) throws TException {
return SerializationUtils.deserialize(objectData, commandDeserializerFactory);
}


private TBase deserialize(DeserializerFactory<HeaderTBaseDeserializer> commandDeserializerFactory, byte[] objectData, TBase defaultValue) {
return SerializationUtils.deserialize(objectData, commandDeserializerFactory, defaultValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@


import com.navercorp.pinpoint.web.websocket.PinpointWebSocketHandler;
import com.navercorp.pinpoint.web.websocket.WebSocketHandlerManager;
import com.navercorp.pinpoint.web.websocket.PinpointWebSocketHandlerManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
Expand All @@ -41,7 +41,7 @@ public class WebSocketConfig implements WebSocketConfigurer {
private static final String WEBSOCKET_SUFFIX = ".pinpointws";

@Autowired
private WebSocketHandlerManager handlerRepository;
private PinpointWebSocketHandlerManager handlerRepository;

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,13 @@ public List<AgentInfo> getAgentInfoList(String applicationName) {

@Override
public PinpointRouteResponse invoke(AgentInfo agentInfo, TBase<?, ?> tBase) throws TException {
byte[] payload = serialize(tBase);
byte[] payload = serializeRequest(tBase);
return invoke(agentInfo, payload);
}

@Override
public PinpointRouteResponse invoke(AgentInfo agentInfo, TBase<?, ?> tBase, long timeout) throws TException {
byte[] payload = serialize(tBase);
byte[] payload = serializeRequest(tBase);
return invoke(agentInfo, payload, timeout);
}

Expand All @@ -147,7 +147,7 @@ public PinpointRouteResponse invoke(AgentInfo agentInfo, byte[] payload, long ti

Future<ResponseMessage> future = null;
if (socket != null) {
future = socket.request(serialize(transferObject));
future = socket.request(serializeRequest(transferObject));
}

PinpointRouteResponse response = getResponse(future, timeout);
Expand All @@ -157,14 +157,14 @@ public PinpointRouteResponse invoke(AgentInfo agentInfo, byte[] payload, long ti
@Override
public Map<AgentInfo, PinpointRouteResponse> invoke(List<AgentInfo> agentInfoList, TBase<?, ?> tBase)
throws TException {
byte[] payload = serialize(tBase);
byte[] payload = serializeRequest(tBase);
return invoke(agentInfoList, payload);
}

@Override
public Map<AgentInfo, PinpointRouteResponse> invoke(List<AgentInfo> agentInfoList, TBase<?, ?> tBase, long timeout)
throws TException {
byte[] payload = serialize(tBase);
byte[] payload = serializeRequest(tBase);
return invoke(agentInfoList, payload, timeout);
}

Expand All @@ -182,7 +182,7 @@ public Map<AgentInfo, PinpointRouteResponse> invoke(List<AgentInfo> agentInfoLis
TCommandTransfer transferObject = createCommandTransferObject(agentInfo, payload);
PinpointSocket socket = clusterConnectionManager.getSocket(agentInfo);
if (socket != null) {
Future<ResponseMessage> future = socket.request(serialize(transferObject));
Future<ResponseMessage> future = socket.request(serializeRequest(transferObject));
futureMap.put(agentInfo, future);
} else {
futureMap.put(agentInfo, null);
Expand All @@ -204,7 +204,7 @@ public Map<AgentInfo, PinpointRouteResponse> invoke(List<AgentInfo> agentInfoLis

@Override
public ClientStreamChannelContext openStream(AgentInfo agentInfo, TBase<?, ?> tBase, ClientStreamChannelMessageListener clientStreamChannelMessageListener) throws TException {
byte[] payload = serialize(tBase);
byte[] payload = serializeRequest(tBase);
return openStream(agentInfo, payload, clientStreamChannelMessageListener);
}

Expand All @@ -214,15 +214,15 @@ public ClientStreamChannelContext openStream(AgentInfo agentInfo, byte[] payload
PinpointSocket socket = clusterConnectionManager.getSocket(agentInfo);

if (socket != null) {
return socket.openStream(serialize(transferObject), clientStreamChannelMessageListener);
return socket.openStream(serializeRequest(transferObject), clientStreamChannelMessageListener);
}

return null;
}

@Override
public AgentActiveThreadCountList getActiveThreadCount(List<AgentInfo> agentInfoList) throws TException {
byte[] activeThread = serialize(new TCmdActiveThreadCount());
byte[] activeThread = serializeRequest(new TCmdActiveThreadCount());
return getActiveThreadCount(agentInfoList, activeThread);
}

Expand All @@ -249,18 +249,6 @@ public AgentActiveThreadCountList getActiveThreadCount(List<AgentInfo> agentInfo
return agentActiveThreadStatusList;
}

private byte[] serialize(TBase<?, ?> tBase) throws TException {
return SerializationUtils.serialize(tBase, commandSerializerFactory);
}

private TBase<?, ?> deserialize(byte[] objectData) throws TException {
return SerializationUtils.deserialize(objectData, commandDeserializerFactory);
}

private TBase<?, ?> deserialize(byte[] objectData, TBase<?, ?> defaultValue) throws TException {
return SerializationUtils.deserialize(objectData, commandDeserializerFactory, defaultValue);
}

private TCommandTransfer createCommandTransferObject(AgentInfo agentInfo, byte[] payload) {
TCommandTransfer transferObject = new TCommandTransfer();
transferObject.setApplicationName(agentInfo.getApplicationName());
Expand Down
Loading

0 comments on commit f5e3969

Please sign in to comment.