Skip to content

Commit

Permalink
[#4661] Isolate domain model and transport model
Browse files Browse the repository at this point in the history
 - Reduce thrift dependency in CommandService
  • Loading branch information
emeroad committed Oct 16, 2018
1 parent 08636af commit 8b0f6e5
Show file tree
Hide file tree
Showing 26 changed files with 236 additions and 217 deletions.
Expand Up @@ -20,7 +20,6 @@
import com.navercorp.pinpoint.collector.cluster.route.RequestEvent;
import com.navercorp.pinpoint.collector.cluster.route.StreamEvent;
import com.navercorp.pinpoint.collector.cluster.route.StreamRouteHandler;
import com.navercorp.pinpoint.io.request.EmptyMessage;
import com.navercorp.pinpoint.io.request.Message;
import com.navercorp.pinpoint.rpc.MessageListener;
import com.navercorp.pinpoint.rpc.PinpointSocket;
Expand Down Expand Up @@ -173,7 +172,10 @@ private byte[] serialize(TBase<?,?> result) {
}

private TBase<?,?> deserialize(byte[] objectData) {
Message<TBase<?, ?>> deserialize = SerializationUtils.deserialize(objectData, commandDeserializerFactory, EmptyMessage.emptyMessage());
final Message<TBase<?, ?>> deserialize = SerializationUtils.deserialize(objectData, commandDeserializerFactory, null);
if (deserialize == null) {
return null;
}
return deserialize.getData();
}

Expand Down
@@ -1,11 +1,11 @@
/*
* Copyright 2014 NAVER Corp.
* Copyright 2018 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -36,4 +36,15 @@ public static <V> IntHashMap<V> copy(Map<Integer, V> target) {
}
return copyMap;
}

public static <V> IntHashMap<V> copyShortMap(Map<Short, V> target) {
if (target == null) {
throw new NullPointerException("target must not be null");
}
final IntHashMap<V> copyMap = new IntHashMap<V>();
for (Map.Entry<Short, V> entry : target.entrySet()) {
copyMap.put(entry.getKey(), entry.getValue());
}
return copyMap;
}
}
Expand Up @@ -198,13 +198,13 @@ private boolean sendAgentInfo() {
}

private boolean getResult(ResponseMessage responseMessage) {
byte[] message = responseMessage.getMessage();
Message<TBase<?, ?>> deserialize = SerializationUtils.deserialize(message, HeaderTBaseDeserializerFactory.DEFAULT_FACTORY, EmptyMessage.INSTANCE);
TBase<?, ?> tbase = deserialize.getData();
if (tbase == null) {
logger.warn("tbase is null");
byte[] byteMessage = responseMessage.getMessage();
Message<TBase<?, ?>> message = SerializationUtils.deserialize(byteMessage, HeaderTBaseDeserializerFactory.DEFAULT_FACTORY, null);
if (message == null) {
logger.warn("message is null");
return false;
}
final TBase<?, ?> tbase = message.getData();
if (!(tbase instanceof TResult)) {
logger.warn("Invalid response : {}", tbase.getClass());
return false;
Expand Down
Expand Up @@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -18,8 +18,6 @@

import com.google.inject.Inject;
import com.google.inject.Provider;
import com.navercorp.pinpoint.rpc.client.PinpointClientFactory;
import com.navercorp.pinpoint.thrift.io.AgentEventTBaseLocator;
import com.navercorp.pinpoint.thrift.io.HeaderTBaseSerializer;
import com.navercorp.pinpoint.thrift.io.HeaderTBaseSerializerFactory;

Expand All @@ -28,15 +26,15 @@
*/
public class HeaderTBaseSerializerProvider implements Provider<HeaderTBaseSerializer> {

private static final HeaderTBaseSerializerFactory DEFAULT_FACTORY = new HeaderTBaseSerializerFactory();
private final HeaderTBaseSerializerFactory serializerFactory = new HeaderTBaseSerializerFactory();

@Inject
public HeaderTBaseSerializerProvider() {
}

@Override
public HeaderTBaseSerializer get() {
return DEFAULT_FACTORY.createSerializer();
return serializerFactory.createSerializer();
}

}
Expand Up @@ -17,7 +17,7 @@
package com.navercorp.pinpoint.profiler.receiver;

import com.google.inject.Inject;
import com.navercorp.pinpoint.io.request.EmptyMessage;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.io.request.Message;
import com.navercorp.pinpoint.rpc.MessageListener;
import com.navercorp.pinpoint.rpc.PinpointSocket;
Expand All @@ -43,15 +43,11 @@ public class CommandDispatcher implements MessageListener, ServerStreamChannelMe

private final Logger logger = LoggerFactory.getLogger(this.getClass());

private final ProfilerCommandServiceLocator commandServiceLocator;
private final ProfilerCommandServiceLocator<TBase<?, ?>, TBase<?, ?>> commandServiceLocator;

@Inject
public CommandDispatcher(ProfilerCommandServiceLocator commandServiceLocator) {
if (commandServiceLocator == null) {
throw new NullPointerException("commandServiceLocator must not be null");
}

this.commandServiceLocator = commandServiceLocator;
public CommandDispatcher(ProfilerCommandServiceLocator<TBase<?, ?>, TBase<?, ?>> commandServiceLocator) {
this.commandServiceLocator = Assert.requireNonNull(commandServiceLocator, "commandServiceLocator must not be null");
}

@Override
Expand All @@ -63,49 +59,58 @@ public void handleSend(SendPacket sendPacket, PinpointSocket pinpointSocket) {
public void handleRequest(RequestPacket requestPacket, PinpointSocket pinpointSocket) {
logger.info("handleRequest packet:{}, remote:{}", requestPacket, pinpointSocket.getRemoteAddress());

final Message<TBase<?, ?>> deserialize = SerializationUtils.deserialize(requestPacket.getPayload(), CommandSerializer.DESERIALIZER_FACTORY, EmptyMessage.INSTANCE);
final TBase<?, ?> request = deserialize.getData();
logger.debug("handleRequest request:{}, remote:{}", request, pinpointSocket.getRemoteAddress());
final Message<TBase<?, ?>> message = SerializationUtils.deserialize(requestPacket.getPayload(), CommandSerializer.DESERIALIZER_FACTORY, null);
if (logger.isDebugEnabled()) {
logger.debug("handleRequest request:{}, remote:{}", message, pinpointSocket.getRemoteAddress());
}

final TBase response = processRequest(message);

final byte[] payload = SerializationUtils.serialize(response, CommandSerializer.SERIALIZER_FACTORY, null);
if (payload != null) {
pinpointSocket.response(requestPacket.getRequestId(), payload);
}
}

TBase response;
if (request == null) {
private TBase<?, ?> processRequest(Message<TBase<?, ?>> message) {
if (message == null) {
final TResult tResult = new TResult(false);
tResult.setMessage("Unsupported ServiceTypeInfo.");

response = tResult;
} else {
final ProfilerRequestCommandService service = commandServiceLocator.getRequestService(request);
if (service == null) {
TResult tResult = new TResult(false);
tResult.setMessage("Can't find suitable service(" + request + ").");

response = tResult;
} else {
response = service.requestCommandService(request);
}
return tResult;
}

final byte[] payload = SerializationUtils.serialize(response, CommandSerializer.SERIALIZER_FACTORY, null);
if (payload != null) {
pinpointSocket.response(requestPacket.getRequestId(), payload);
final short type = message.getHeader().getType();
final ProfilerRequestCommandService<TBase<?, ?>, TBase<?, ?>> service = commandServiceLocator.getRequestService(type);
if (service == null) {
TResult tResult = new TResult(false);
tResult.setMessage("Can't find suitable service(" + message + ").");

return tResult;
}

final TBase<?, ?> request = message.getData();
final TBase<?, ?> tResponse = service.requestCommandService(request);
return tResponse;
}


@Override
public StreamCode handleStreamCreate(ServerStreamChannelContext streamChannelContext, StreamCreatePacket packet) {
logger.info("MessageReceived handleStreamCreate {} {}", packet, streamChannelContext);

final Message<TBase<?, ?>> deserialize = SerializationUtils.deserialize(packet.getPayload(), CommandSerializer.DESERIALIZER_FACTORY, EmptyMessage.INSTANCE);
final TBase<?, ?> request = deserialize.getData();
if (request == null) {
final Message<TBase<?, ?>> message = SerializationUtils.deserialize(packet.getPayload(), CommandSerializer.DESERIALIZER_FACTORY, null);
if (message == null) {
return StreamCode.TYPE_UNKNOWN;
}

final ProfilerStreamCommandService service = commandServiceLocator.getStreamService(request);

final short type = message.getHeader().getType();
final ProfilerStreamCommandService<TBase<?, ?>> service = commandServiceLocator.getStreamService(type);
if (service == null) {
return StreamCode.TYPE_UNSUPPORT;
}


final TBase<?, ?> request = message.getData();
return service.streamCommandService(request, streamChannelContext);
}

Expand Down
@@ -1,11 +1,11 @@
/*
* Copyright 2017 NAVER Corp.
* Copyright 2018 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -18,13 +18,17 @@

import com.navercorp.pinpoint.thrift.io.CommandHeaderTBaseDeserializerFactory;
import com.navercorp.pinpoint.thrift.io.CommandHeaderTBaseSerializerFactory;
import com.navercorp.pinpoint.thrift.io.DeserializerFactory;
import com.navercorp.pinpoint.thrift.io.HeaderTBaseDeserializer;
import com.navercorp.pinpoint.thrift.io.HeaderTBaseSerializer;
import com.navercorp.pinpoint.thrift.io.SerializerFactory;

/**
* @author Taejin Koo
*/
public class CommandSerializer {

public static final CommandHeaderTBaseSerializerFactory SERIALIZER_FACTORY = new CommandHeaderTBaseSerializerFactory();
public static final CommandHeaderTBaseDeserializerFactory DESERIALIZER_FACTORY = new CommandHeaderTBaseDeserializerFactory();
public static final SerializerFactory<HeaderTBaseSerializer> SERIALIZER_FACTORY = new CommandHeaderTBaseSerializerFactory();
public static final DeserializerFactory<HeaderTBaseDeserializer> DESERIALIZER_FACTORY = new CommandHeaderTBaseDeserializerFactory();

}
@@ -1,11 +1,11 @@
/*
* Copyright 2017 NAVER Corp.
* Copyright 2018 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -16,10 +16,9 @@

package com.navercorp.pinpoint.profiler.receiver;

import com.navercorp.pinpoint.thrift.io.TCommandType;
import org.apache.thrift.TBase;
import com.navercorp.pinpoint.common.util.apache.IntHashMap;
import com.navercorp.pinpoint.common.util.apache.IntHashMapUtils;

import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand All @@ -29,27 +28,24 @@
*/
public class DefaultProfilerCommandServiceLocator implements ProfilerCommandServiceLocator {

private final Map<Class<? extends TBase>, ProfilerCommandService> profilerCommandServiceRepository;
private final IntHashMap<ProfilerCommandService> profilerCommandServiceRepository;
private final Set<Short> codeSet;

DefaultProfilerCommandServiceLocator(ProfilerCommandLocatorBuilder builder) {
this.profilerCommandServiceRepository = Collections.unmodifiableMap(builder.getProfilerCommandServiceRepository());
Map<Short, ProfilerCommandService> commandServiceRepository = builder.getProfilerCommandServiceRepository();
this.profilerCommandServiceRepository = IntHashMapUtils.copyShortMap(commandServiceRepository);
this.codeSet = buildCodeSet(commandServiceRepository);
}

@Override
public ProfilerCommandService getService(TBase tBase) {
if (tBase == null) {
return null;
}
return profilerCommandServiceRepository.get(tBase.getClass());
public ProfilerCommandService getService(short commandCode) {
return profilerCommandServiceRepository.get(commandCode);
}

@Override
public ProfilerSimpleCommandService getSimpleService(TBase tBase) {
if (tBase == null) {
return null;
}
public ProfilerSimpleCommandService getSimpleService(short commandCode) {

final ProfilerCommandService service = profilerCommandServiceRepository.get(tBase.getClass());
final ProfilerCommandService service = profilerCommandServiceRepository.get(commandCode);
if (service instanceof ProfilerSimpleCommandService) {
return (ProfilerSimpleCommandService) service;
}
Expand All @@ -58,12 +54,9 @@ public ProfilerSimpleCommandService getSimpleService(TBase tBase) {
}

@Override
public ProfilerRequestCommandService getRequestService(TBase tBase) {
if (tBase == null) {
return null;
}
public ProfilerRequestCommandService getRequestService(short commandCode) {

final ProfilerCommandService service = profilerCommandServiceRepository.get(tBase.getClass());
final ProfilerCommandService service = profilerCommandServiceRepository.get(commandCode);
if (service instanceof ProfilerRequestCommandService) {
return (ProfilerRequestCommandService) service;
}
Expand All @@ -72,36 +65,28 @@ public ProfilerRequestCommandService getRequestService(TBase tBase) {
}

@Override
public ProfilerStreamCommandService getStreamService(TBase tBase) {
if (tBase == null) {
return null;
}
public ProfilerStreamCommandService getStreamService(short commandCode) {

final ProfilerCommandService service = profilerCommandServiceRepository.get(tBase.getClass());
final ProfilerCommandService service = profilerCommandServiceRepository.get(commandCode);
if (service instanceof ProfilerStreamCommandService) {
return (ProfilerStreamCommandService) service;
}

return null;
}

@Override
public Set<Class<? extends TBase>> getCommandServiceClasses() {
return profilerCommandServiceRepository.keySet();
int getCommandServiceSize() {
return profilerCommandServiceRepository.size();
}

@Override
public Set<Short> getCommandServiceCodes() {
Set<Short> commandServiceCodes = new HashSet<Short>(profilerCommandServiceRepository.size());

Set<Class<? extends TBase>> clazzSet = profilerCommandServiceRepository.keySet();
for (Class<? extends TBase> clazz : clazzSet) {
TCommandType commandType = TCommandType.getType(clazz);
if (commandType != null) {
commandServiceCodes.add(commandType.getCode());
}
}
return commandServiceCodes;
return this.codeSet;
}

private Set<Short> buildCodeSet(Map<Short, ProfilerCommandService> codes) {
return new HashSet<Short>(codes.keySet());

}

}

0 comments on commit 8b0f6e5

Please sign in to comment.