Skip to content

Commit

Permalink
[#4272] Refactor DispatchHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad authored and koo-taejin committed Jun 19, 2018
1 parent f530baa commit 16f9de3
Show file tree
Hide file tree
Showing 64 changed files with 1,050 additions and 876 deletions.
@@ -1,11 +1,11 @@
/*
* Copyright 2014 NAVER Corp.
* Copyright 2018 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -20,6 +20,8 @@
import com.navercorp.pinpoint.collector.cluster.route.RequestEvent;
import com.navercorp.pinpoint.collector.cluster.route.StreamEvent;
import com.navercorp.pinpoint.collector.cluster.route.StreamRouteHandler;
import com.navercorp.pinpoint.io.request.EmptyMessage;
import com.navercorp.pinpoint.io.request.Message;
import com.navercorp.pinpoint.rpc.MessageListener;
import com.navercorp.pinpoint.rpc.PinpointSocket;
import com.navercorp.pinpoint.rpc.packet.RequestPacket;
Expand Down Expand Up @@ -171,7 +173,8 @@ private byte[] serialize(TBase<?,?> result) {
}

private TBase<?,?> deserialize(byte[] objectData) {
return SerializationUtils.deserialize(objectData, commandDeserializerFactory, null);
Message<TBase<?, ?>> deserialize = SerializationUtils.deserialize(objectData, commandDeserializerFactory, EmptyMessage.INSTANCE);
return deserialize.getData();
}

private StreamCode convertToStreamCode(TRouteResult routeResult) {
Expand Down
@@ -1,11 +1,11 @@
/*
* Copyright 2017 NAVER Corp.
* Copyright 2018 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -22,10 +22,8 @@
import com.navercorp.pinpoint.common.server.bo.event.AgentEventBo;
import com.navercorp.pinpoint.common.util.CollectionUtils;
import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.io.request.UnSupportedServerRequestTypeException;
import com.navercorp.pinpoint.thrift.dto.TAgentStat;
import com.navercorp.pinpoint.thrift.dto.TAgentStatBatch;
import com.navercorp.pinpoint.thrift.dto.ThriftRequest;
import org.apache.thrift.TBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -53,15 +51,15 @@ public class AgentEventHandler implements SimpleHandler {

@Override
public void handleSimple(ServerRequest serverRequest) {
if (serverRequest instanceof ThriftRequest) {
handleSimple(((ThriftRequest)serverRequest).getData());
final Object data = serverRequest.getData();
if (data instanceof TBase<?, ?>) {
handleSimple((TBase<?, ?>) data);
} else {
throw new UnSupportedServerRequestTypeException(serverRequest.getClass() + "is not support type : " + serverRequest);
throw new UnsupportedOperationException("data is not support type : " + data);
}
}

@Override
public void handleSimple(TBase<?, ?> tBase) {
private void handleSimple(TBase<?, ?> tBase) {
// FIXME (2014.08) Legacy - TAgentStat should not be sent over the wire.
if (tBase instanceof TAgentStat) {
TAgentStat tAgentStat = (TAgentStat)tBase;
Expand Down
@@ -1,11 +1,11 @@
/*
* Copyright 2014 NAVER Corp.
* Copyright 2018 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -19,10 +19,9 @@
import com.navercorp.pinpoint.collector.dao.AgentInfoDao;
import com.navercorp.pinpoint.collector.dao.ApplicationIndexDao;
import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.io.request.UnSupportedServerRequestTypeException;
import com.navercorp.pinpoint.io.request.ServerResponse;
import com.navercorp.pinpoint.thrift.dto.TAgentInfo;
import com.navercorp.pinpoint.thrift.dto.TResult;
import com.navercorp.pinpoint.thrift.dto.ThriftRequest;
import org.apache.thrift.TBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,30 +45,28 @@ public class AgentInfoHandler implements SimpleHandler, RequestResponseHandler {

@Override
public void handleSimple(ServerRequest serverRequest) {
if (serverRequest instanceof ThriftRequest) {
handleSimple(((ThriftRequest) serverRequest).getData());
final Object data = serverRequest.getData();
if (data instanceof TBase<?, ?>) {
handleRequest((TBase<?, ?>) data);
} else {
throw new UnSupportedServerRequestTypeException(serverRequest.getClass() + "is not support type : " + serverRequest);
throw new UnsupportedOperationException("data is not support type : " + data);
}
}

@Override
public void handleSimple(TBase<?, ?> tbase) {
handleRequest(tbase);
}

@Override
public TBase<?, ?> handleRequest(ServerRequest serverRequest) {
if (serverRequest instanceof ThriftRequest) {
return handleRequest(((ThriftRequest) serverRequest).getData());
}
public void handleRequest(ServerRequest serverRequest, ServerResponse serverResponse) {
final Object data = serverRequest.getData();
if (data instanceof TBase<?, ?>) {
TBase<?, ?> tBase = handleRequest((TBase<?, ?>) data);

logger.warn("invalid serverRequest:{}", serverRequest);
return null;
serverResponse.write(tBase);
} else {
logger.warn("invalid serverRequest:{}", serverRequest);
}
}

@Override
public TBase<?, ?> handleRequest(TBase<?, ?> tbase) {
private TBase<?, ?> handleRequest(TBase<?, ?> tbase) {
if (!(tbase instanceof TAgentInfo)) {
logger.warn("invalid tbase:{}", tbase);
// it happens to return null not only at this BO(Business Object) but also at other BOs.
Expand Down
@@ -1,11 +1,11 @@
/*
* Copyright 2014 NAVER Corp.
* Copyright 2018 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -21,10 +21,8 @@
import com.navercorp.pinpoint.collector.service.AgentStatService;
import com.navercorp.pinpoint.common.server.bo.stat.AgentStatBo;
import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.io.request.UnSupportedServerRequestTypeException;
import com.navercorp.pinpoint.thrift.dto.TAgentStat;
import com.navercorp.pinpoint.thrift.dto.TAgentStatBatch;
import com.navercorp.pinpoint.thrift.dto.ThriftRequest;
import org.apache.thrift.TBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -54,15 +52,15 @@ public class AgentStatHandlerV2 implements SimpleHandler {

@Override
public void handleSimple(ServerRequest serverRequest) {
if (serverRequest instanceof ThriftRequest) {
handleSimple(((ThriftRequest)serverRequest).getData());
final Object data = serverRequest.getData();
if (data instanceof TBase<?, ?>) {
handleSimple((TBase<?, ?>) data);
} else {
throw new UnSupportedServerRequestTypeException(serverRequest.getClass() + "is not support type : " + serverRequest);
throw new UnsupportedOperationException("data is not support type : " + data);
}
}

@Override
public void handleSimple(TBase<?, ?> tBase) {
void handleSimple(TBase<?, ?> tBase) {
// FIXME (2014.08) Legacy - TAgentStat should not be sent over the wire.
if (tBase instanceof TAgentStat) {
TAgentStat tAgentStat = (TAgentStat)tBase;
Expand Down
@@ -1,11 +1,11 @@
/*
* Copyright 2014 NAVER Corp.
* Copyright 2018 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -18,10 +18,9 @@

import com.navercorp.pinpoint.collector.dao.ApiMetaDataDao;
import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.io.request.UnSupportedServerRequestTypeException;
import com.navercorp.pinpoint.io.request.ServerResponse;
import com.navercorp.pinpoint.thrift.dto.TApiMetaData;
import com.navercorp.pinpoint.thrift.dto.TResult;
import com.navercorp.pinpoint.thrift.dto.ThriftRequest;
import org.apache.thrift.TBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,17 +39,17 @@ public class ApiMetaDataHandler implements RequestResponseHandler {
private ApiMetaDataDao sqlMetaDataDao;

@Override
public TBase<?, ?> handleRequest(ServerRequest serverRequest) {
if (serverRequest instanceof ThriftRequest) {
return handleRequest(((ThriftRequest) serverRequest).getData());
public void handleRequest(ServerRequest serverRequest, ServerResponse serverResponse) {
final Object data = serverRequest.getData();
if (data instanceof TBase) {
TBase<?, ?> tBase = handleRequest((TBase<?, ?>) data);
serverResponse.write(tBase);
} else {
logger.warn("invalid serverRequest:{}", serverRequest);
}

logger.warn("invalid serverRequest:{}", serverRequest);
return null;
}

@Override
public TBase<?, ?> handleRequest(TBase<?, ?> tbase) {
private TBase<?, ?> handleRequest(TBase<?, ?> tbase) {
if (!(tbase instanceof TApiMetaData)) {
logger.error("invalid tbase:{}", tbase);
return null;
Expand Down
@@ -1,11 +1,11 @@
/*
* Copyright 2014 NAVER Corp.
* Copyright 2018 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -17,6 +17,7 @@
package com.navercorp.pinpoint.collector.handler;

import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.io.request.ServerResponse;
import org.apache.thrift.TBase;
import org.springframework.stereotype.Service;

Expand All @@ -27,8 +28,6 @@
@Service
public interface RequestResponseHandler {

TBase<?, ?> handleRequest(TBase<?, ?> tbase);

TBase<?, ?> handleRequest(ServerRequest serverRequest);
void handleRequest(ServerRequest serverRequest, ServerResponse serverResponse);

}
@@ -0,0 +1,46 @@
/*
* Copyright 2018 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.collector.handler;

import com.navercorp.pinpoint.io.request.ServerRequest;


/**
* @author Woonduk Kang(emeroad)
*/
public class SimpleDualHandler implements SimpleHandler {
private final SimpleHandler one;
private final SimpleHandler two;

public SimpleDualHandler(SimpleHandler one, SimpleHandler two) {
if (one == null) {
throw new NullPointerException("one must not be null");
}
if (two == null) {
throw new NullPointerException("two must not be null");
}
this.one = one;
this.two = two;

}

@Override
public void handleSimple(ServerRequest serverRequest) {
one.handleSimple(serverRequest);
two.handleSimple(serverRequest);
}
}
@@ -1,11 +1,11 @@
/*
* Copyright 2014 NAVER Corp.
* Copyright 2018 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -17,7 +17,6 @@
package com.navercorp.pinpoint.collector.handler;

import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.thrift.dto.ThriftRequest;
import org.apache.thrift.TBase;

/**
Expand All @@ -26,8 +25,6 @@
*/
public interface SimpleHandler {

void handleSimple(TBase<?, ?> tBase);

void handleSimple(ServerRequest thriftRequest);
void handleSimple(ServerRequest serverRequest);

}

0 comments on commit 16f9de3

Please sign in to comment.