Skip to content

Commit

Permalink
[#4272] Add getRemoteAddress
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Jun 29, 2018
1 parent b09ba8c commit 340c9fe
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 30 deletions.
Expand Up @@ -62,7 +62,7 @@ private SimpleHandler getSimpleHandler(Header header) {
@Override
public void dispatchSendMessage(ServerRequest serverRequest) {
SimpleHandler simpleHandler = getSimpleHandler(serverRequest.getHeader());
simpleHandler.handleSimple(serverRequest);;
simpleHandler.handleSimple(serverRequest);
}

@Override
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Objects;

Expand Down Expand Up @@ -65,10 +66,10 @@ public void handleSend(SendPacket packet, PinpointSocket pinpointSocket) {
Objects.requireNonNull(pinpointSocket, "pinpointSocket must not be null");

final byte[] payload = getPayload(packet);
SocketAddress remoteAddress = pinpointSocket.getRemoteAddress();
final InetSocketAddress remoteAddress = (InetSocketAddress) pinpointSocket.getRemoteAddress();
try {
Message<TBase<?, ?>> message = SerializationUtils.deserialize(payload, deserializerFactory);
ServerRequest<TBase<?, ?>> serverRequest = new DefaultServerRequest<TBase<?, ?>>(message);
ServerRequest<TBase<?, ?>> serverRequest = newServerRequest(message, remoteAddress);
dispatchHandler.dispatchSendMessage(serverRequest);
} catch (TException e) {
handleTException(payload, remoteAddress, e);
Expand All @@ -78,6 +79,12 @@ public void handleSend(SendPacket packet, PinpointSocket pinpointSocket) {
}
}

private ServerRequest<TBase<?, ?>> newServerRequest(Message<TBase<?, ?>> message, InetSocketAddress remoteSocketAddress) {
final String remoteAddress = remoteSocketAddress.getAddress().getHostAddress();
final int remotePort = remoteSocketAddress.getPort();
return new DefaultServerRequest<TBase<?, ?>>(message, remoteAddress, remotePort);
}

public byte[] getPayload(BasicPacket packet) {
final byte[] payload = packet.getPayload();
Objects.requireNonNull(payload, "payload must not be null");
Expand All @@ -90,17 +97,16 @@ public void handleRequest(RequestPacket packet, PinpointSocket pinpointSocket) {
Objects.requireNonNull(pinpointSocket, "pinpointSocket must not be null");

final byte[] payload = getPayload(packet);
final InetSocketAddress remoteAddress = (InetSocketAddress) pinpointSocket.getRemoteAddress();

try {
Message<TBase<?, ?>> message = SerializationUtils.deserialize(payload, deserializerFactory);
ServerRequest<TBase<?, ?>> request = new DefaultServerRequest<>(message);
ServerRequest<TBase<?, ?>> request = newServerRequest(message, remoteAddress);
ServerResponse<TBase<?, ?>> response = new TCPServerResponse(serializerFactory, pinpointSocket, packet.getRequestId());
dispatchHandler.dispatchRequestMessage(request, response);
} catch (TException e) {
SocketAddress remoteAddress = pinpointSocket.getRemoteAddress();
handleTException(payload, remoteAddress, e);
} catch (Exception e) {
SocketAddress remoteAddress = pinpointSocket.getRemoteAddress();
handleException(payload, remoteAddress, e);
}
}
Expand Down
Expand Up @@ -34,6 +34,7 @@
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Objects;

Expand Down Expand Up @@ -75,34 +76,36 @@ private DispatchPacket() {

@Override
public void receive(DatagramSocket localSocket, T packet) {
if (isIgnoreAddress(packet.getAddress())) {
final InetSocketAddress remoteSocketAddress = (InetSocketAddress) packet.getSocketAddress();
final InetAddress remoteAddress = remoteSocketAddress.getAddress();
if (isIgnoreAddress(remoteAddress)) {
return;
}

final HeaderTBaseDeserializer deserializer = deserializerFactory.createDeserializer();
SocketAddress socketAddress = packet.getSocketAddress();

Message<TBase<?, ?>> message = null;
try {
message = deserializer.deserialize(packet.getData());
TBase<?, ?> data = message.getData();

if (filter.filter(localSocket, data, socketAddress) == TBaseFilter.BREAK) {
if (filter.filter(localSocket, data, remoteSocketAddress) == TBaseFilter.BREAK) {
return;
}
ServerRequest<TBase<?, ?>> request = new DefaultServerRequest<>(message);
ServerRequest<TBase<?, ?>> request = newServerRequest(message, remoteSocketAddress);
// dispatch signifies business logic execution
dispatchHandler.dispatchSendMessage(request);
} catch (TException e) {
if (logger.isWarnEnabled()) {
logger.warn("packet serialize error. SendSocketAddress:{} Cause:{}", socketAddress, e.getMessage(), e);
logger.warn("packet serialize error. SendSocketAddress:{} Cause:{}", remoteSocketAddress, e.getMessage(), e);
}
if (logger.isDebugEnabled()) {
logger.debug("packet dump hex:{}", PacketUtils.dumpDatagramPacket(packet));
}
} catch (Exception e) {
// there are cases where invalid headers are received
if (logger.isWarnEnabled()) {
logger.warn("Unexpected error. SendSocketAddress:{} Cause:{} message:{}", socketAddress, e.getMessage(), message, e);
logger.warn("Unexpected error. SendSocketAddress:{} Cause:{} message:{}", remoteAddress, e.getMessage(), message, e);
}
if (logger.isDebugEnabled()) {
logger.debug("packet dump hex:{}", PacketUtils.dumpDatagramPacket(packet));
Expand All @@ -124,4 +127,11 @@ private boolean isIgnoreAddress(InetAddress remoteAddress) {
}
}

private ServerRequest<TBase<?, ?>> newServerRequest(Message<TBase<?, ?>> message, InetSocketAddress remoteSocketAddress) {
final String remoteAddress = remoteSocketAddress.getAddress().getHostAddress();
final int remotePort = remoteSocketAddress.getPort();

return new DefaultServerRequest<>(message, remoteAddress, remotePort);
}

}
Expand Up @@ -75,11 +75,12 @@ public void receive(DatagramSocket localSocket, T packet) {
return;
}

final InetSocketAddress remoteAddress = (InetSocketAddress) packet.getSocketAddress();
for (Message<TBase<?, ?>> message : list) {
if (filter.filter(localSocket, message.getData(), packet.getSocketAddress()) == TBaseFilter.BREAK) {
if (filter.filter(localSocket, message.getData(), remoteAddress) == TBaseFilter.BREAK) {
return;
}
ServerRequest<TBase<?, ?>> request = new DefaultServerRequest<>(message);
ServerRequest<TBase<?, ?>> request = newServerRequest(message, remoteAddress);;
// dispatch signifies business logic execution
dispatchHandler.dispatchSendMessage(request);
}
Expand All @@ -101,4 +102,13 @@ public void receive(DatagramSocket localSocket, T packet) {
}
}

private ServerRequest<TBase<?, ?>> newServerRequest(Message<TBase<?, ?>> message, InetSocketAddress remoteSocketAddress) {
final String remoteAddress = remoteSocketAddress.getAddress().getHostAddress();
final int remotePort = remoteSocketAddress.getPort();

ServerRequest<TBase<?, ?>> tBaseDefaultServerRequest = new DefaultServerRequest<>(message, remoteAddress, remotePort);
return tBaseDefaultServerRequest;
}


}
Expand Up @@ -34,6 +34,7 @@

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -100,7 +101,7 @@ public void receive(DatagramSocket localSocket, DatagramPacket packet) {

byte version = requestBuffer.get();
int chunkSize = 0xff & requestBuffer.get();
SocketAddress socketAddress = packet.getSocketAddress();
InetSocketAddress remoteSocketAddress = (InetSocketAddress) packet.getSocketAddress();

try {
for (int i = 0; i < chunkSize; i++) {
Expand All @@ -117,7 +118,7 @@ public void receive(DatagramSocket localSocket, DatagramPacket packet) {
}

if (requestList.size() == 1) {
if (filter.filter(localSocket, requestList.get(0).getData(), socketAddress) == TBaseFilter.BREAK) {
if (filter.filter(localSocket, requestList.get(0).getData(), remoteSocketAddress) == TBaseFilter.BREAK) {
continue;
}
}
Expand All @@ -132,7 +133,7 @@ public void receive(DatagramSocket localSocket, DatagramPacket packet) {
((TSpanChunk) tBase).setSpanEventList(spanEventList);
}
Message<TBase<?, ?>> message = new DefaultMessage<>(lastMessage.getHeader(), tBase);
ServerRequest mergedRequest = new DefaultServerRequest(message);
ServerRequest<TBase<?, ?>> mergedRequest = newServerRequest(message, remoteSocketAddress);

dispatchHandler.dispatchRequestMessage(mergedRequest, fake);
}
Expand All @@ -142,6 +143,13 @@ public void receive(DatagramSocket localSocket, DatagramPacket packet) {
}
}

private ServerRequest<TBase<?, ?>> newServerRequest(Message<TBase<?, ?>> message, InetSocketAddress remoteSocketAddress) {
final String remoteAddress = remoteSocketAddress.getAddress().getHostAddress();
final int remotePort = remoteSocketAddress.getPort();

return new DefaultServerRequest<>(message, remoteAddress, remotePort);
}

private byte[] getComponentData(ByteBuffer buffer, HeaderTBaseDeserializer deserializer) {
if (buffer.remaining() < 2) {
logger.warn("Can't available {} fixed buffer.", 2);
Expand Down
Expand Up @@ -62,7 +62,7 @@ public void handleSimple(ServerRequest serverRequest) {

TBase<?, ?> data = (TBase<?, ?>) serverRequest.getData();
Message<TBase<?, ?>> message = new DefaultMessage<>(copiedHeader, data);
return new DefaultServerRequest<>(message);
return new DefaultServerRequest<>(message, serverRequest.getRemoteAddress(), serverRequest.getRemotePort());
}

private Header copyHeader(Header header) {
Expand Down
Expand Up @@ -18,6 +18,7 @@

import com.navercorp.pinpoint.common.server.bo.stat.join.*;
import com.navercorp.pinpoint.flink.mapper.thrift.stat.JoinAgentStatBoMapper;
import com.navercorp.pinpoint.io.header.Header;
import com.navercorp.pinpoint.io.request.DefaultMessage;
import com.navercorp.pinpoint.io.request.DefaultServerRequest;
import com.navercorp.pinpoint.io.request.Message;
Expand Down Expand Up @@ -55,8 +56,7 @@ public void flatMapTest() throws Exception {
TFAgentStatBatch tfAgentStatBatch = createTFAgentStatBatch();
ArrayList<Tuple3<String, JoinStatBo, Long>> dataList = new ArrayList<>();
ListCollector<Tuple3<String, JoinStatBo, Long>> collector = new ListCollector<>(dataList);
Message message = new DefaultMessage(new HeaderV1((short) 1000), tfAgentStatBatch);
ServerRequest request = new DefaultServerRequest<TBase<?, ?>>(message);
ServerRequest<TBase<?, ?>> request = newServerRequest(tfAgentStatBatch);
mapper.flatMap(request, collector);

assertEquals(dataList.size(), 2);
Expand Down Expand Up @@ -150,8 +150,7 @@ public void flatMap2Test() throws Exception {
TFAgentStatBatch tfAgentStatBatch = createTFAgentStatBatch2();
ArrayList<Tuple3<String, JoinStatBo, Long>> dataList = new ArrayList<>();
ListCollector<Tuple3<String, JoinStatBo, Long>> collector = new ListCollector<>(dataList);
Message message = new DefaultMessage(new HeaderV1((short) 1000), tfAgentStatBatch);
ServerRequest request = new DefaultServerRequest<TBase<?, ?>>(message);
ServerRequest<TBase<?, ?>> request = newServerRequest(tfAgentStatBatch);
mapper.flatMap(request, collector);

assertEquals(dataList.size(), 2);
Expand Down Expand Up @@ -230,8 +229,7 @@ public void flatMap3Test() throws Exception {
TFAgentStatBatch tfAgentStatBatch = createTFAgentStatBatch3();
ArrayList<Tuple3<String, JoinStatBo, Long>> dataList = new ArrayList<>();
ListCollector<Tuple3<String, JoinStatBo, Long>> collector = new ListCollector<>(dataList);
Message message = new DefaultMessage(new HeaderV1((short) 1000), tfAgentStatBatch);
ServerRequest request = new DefaultServerRequest<TBase<?, ?>>(message);
ServerRequest<TBase<?, ?>> request = newServerRequest(tfAgentStatBatch);
mapper.flatMap(request, collector);

assertEquals(dataList.size(), 2);
Expand All @@ -255,6 +253,12 @@ public void flatMap3Test() throws Exception {
assertJoinTransactionBo(joinApplicationStatBo.getJoinTransactionBoList());
}

private ServerRequest<TBase<?, ?>> newServerRequest(TFAgentStatBatch tfAgentStatBatch) {
final Header header = new HeaderV1((short) 1000);
final Message<TBase<?, ?>> message = new DefaultMessage<>(header, tfAgentStatBatch);
return new DefaultServerRequest<TBase<?, ?>>(message, "127.0.0.1", 8080);
}

private void assertJoinTransactionBo(List<JoinTransactionBo> joinTransactionBoList) {
assertEquals(2, joinTransactionBoList.size());

Expand Down Expand Up @@ -326,8 +330,7 @@ public void flatMap4Test() throws Exception {
TFAgentStatBatch tfAgentStatBatch = createTFAgentStatBatch4();
ArrayList<Tuple3<String, JoinStatBo, Long>> dataList = new ArrayList<>();
ListCollector<Tuple3<String, JoinStatBo, Long>> collector = new ListCollector<>(dataList);
Message<TBase<?, ?>> message = new DefaultMessage<>(new HeaderV1((short) 1000), tfAgentStatBatch);
ServerRequest<TBase<?, ?>> request = new DefaultServerRequest<>(message);
ServerRequest<TBase<?, ?>> request = newServerRequest(tfAgentStatBatch);
mapper.flatMap(request, collector);

assertEquals(dataList.size(), 2);
Expand Down Expand Up @@ -407,8 +410,7 @@ public void flatMap5Test() throws Exception {
TFAgentStatBatch tfAgentStatBatch = createTFAgentStatBatch5();
ArrayList<Tuple3<String, JoinStatBo, Long>> dataList = new ArrayList<>();
ListCollector<Tuple3<String, JoinStatBo, Long>> collector = new ListCollector<>(dataList);
Message<TBase<?, ?>> message = new DefaultMessage<>(new HeaderV1((short) 1000), tfAgentStatBatch);
ServerRequest<TBase<?, ?>> request = new DefaultServerRequest<>(message);
ServerRequest<TBase<?, ?>> request = newServerRequest(tfAgentStatBatch);
mapper.flatMap(request, collector);

assertEquals(dataList.size(), 2);
Expand Down
Expand Up @@ -21,15 +21,22 @@
/**
* @author Woonduk Kang(emeroad)
*/
public class DefaultServerRequest<T> extends DefaultAttributeMap implements ServerRequest {
public class DefaultServerRequest<T> extends DefaultAttributeMap implements ServerRequest<T> {

private final Message<T> message;
private final String remoteAddress;
private final int remotePort;

public DefaultServerRequest(Message<T> message) {
public DefaultServerRequest(Message<T> message, String remoteAddress, int remotePort) {
if (message == null) {
throw new NullPointerException("message must not be null");
}
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress must not be null");
}
this.message = message;
this.remoteAddress = remoteAddress;
this.remotePort = remotePort;
}


Expand All @@ -43,4 +50,13 @@ public T getData() {
return message.getData();
}

@Override
public String getRemoteAddress() {
return remoteAddress;
}

@Override
public int getRemotePort() {
return remotePort;
}
}
Expand Up @@ -26,4 +26,7 @@ public interface ServerRequest<T> extends AttributeMap {

T getData();

String getRemoteAddress();

int getRemotePort();
}

0 comments on commit 340c9fe

Please sign in to comment.