Skip to content

Commit

Permalink
[#4115] Change constructor args of TcpReceiver
Browse files Browse the repository at this point in the history
Change the constructor args so that can change the acceptor's functionality.
  • Loading branch information
koo-taejin committed May 11, 2018
1 parent 2f68baf commit aaec23f
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 20 deletions.
@@ -0,0 +1,51 @@
/*
* Copyright 2018 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.collector.receiver;

import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.rpc.PipelineFactory;
import com.navercorp.pinpoint.rpc.cluster.ClusterOption;
import com.navercorp.pinpoint.rpc.server.ChannelFilter;
import com.navercorp.pinpoint.rpc.server.PinpointServerAcceptor;
import com.navercorp.pinpoint.rpc.server.ServerCodecPipelineFactory;

/**
* @author Taejin Koo
*/
public class PinpointServerAcceptorProvider {

private ClusterOption clusterOption = ClusterOption.DISABLE_CLUSTER_OPTION;
private ChannelFilter channelFilter = ChannelFilter.BYPASS;
private PipelineFactory pipelineFactory = new ServerCodecPipelineFactory();

public PinpointServerAcceptor get() {
return new PinpointServerAcceptor(clusterOption, channelFilter, pipelineFactory);
}

public void setClusterOption(ClusterOption clusterOption) {
this.clusterOption = Assert.requireNonNull(clusterOption, "clusterOption must not be null");
}

public void setChannelFilter(ChannelFilter channelFilter) {
this.channelFilter = Assert.requireNonNull(channelFilter, "channelFilter must not be null");
}

public void setPipelineFactory(PipelineFactory pipelineFactory) {
this.pipelineFactory = Assert.requireNonNull(pipelineFactory, "pipelineFactory must not be null");
}

}
Expand Up @@ -20,7 +20,6 @@
import com.navercorp.pinpoint.collector.receiver.tcp.TCPPacketHandler;
import com.navercorp.pinpoint.collector.receiver.tcp.TCPPacketHandlerFactory;
import com.navercorp.pinpoint.collector.receiver.tcp.TCPReceiver;
import com.navercorp.pinpoint.common.server.util.AddressFilter;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
Expand All @@ -43,8 +42,9 @@ public class TCPReceiverBean implements InitializingBean, DisposableBean, BeanNa
private TCPReceiver tcpReceiver;
private Executor executor;

private PinpointServerAcceptorProvider acceptorProvider;

private DispatchHandler dispatchHandler;
private AddressFilter addressFilter;

private TCPPacketHandlerFactory tcpPacketHandlerFactory;

Expand All @@ -55,21 +55,21 @@ public void afterPropertiesSet() throws Exception {
}
Objects.requireNonNull(beanName, "beanName must not be null");
Objects.requireNonNull(bindIp, "bindIp must not be null");
Objects.requireNonNull(dispatchHandler, "dispatchHandler must not be null");
Objects.requireNonNull(addressFilter, "addressFilter must not be null");
Objects.requireNonNull(executor, "executor must not be null");
Objects.requireNonNull(dispatchHandler, "dispatchHandler must not be null");
Objects.requireNonNull(acceptorProvider, "acceptorProvider must not be null");

tcpReceiver = createTcpReceiver(beanName, this.bindIp, bindPort, executor, dispatchHandler, this.tcpPacketHandlerFactory, addressFilter);
tcpReceiver = createTcpReceiver(beanName, this.bindIp, bindPort, executor, dispatchHandler, this.tcpPacketHandlerFactory, acceptorProvider);
tcpReceiver.start();
}


private TCPReceiver createTcpReceiver(String beanName, String bindIp, int port, Executor executor,
DispatchHandler dispatchHandler, TCPPacketHandlerFactory tcpPacketHandlerFactory, AddressFilter addressFilter) {
DispatchHandler dispatchHandler, TCPPacketHandlerFactory tcpPacketHandlerFactory, PinpointServerAcceptorProvider acceptorProvider) {
InetSocketAddress bindAddress = new InetSocketAddress(bindIp, port);
TCPPacketHandler tcpPacketHandler = wrapDispatchHandler(dispatchHandler, tcpPacketHandlerFactory);

return new TCPReceiver(beanName, tcpPacketHandler, executor, bindAddress, addressFilter);
return new TCPReceiver(beanName, tcpPacketHandler, executor, bindAddress, acceptorProvider);
}

private TCPPacketHandler wrapDispatchHandler(DispatchHandler dispatchHandler, TCPPacketHandlerFactory tcpPacketHandlerFactory) {
Expand Down Expand Up @@ -121,7 +121,8 @@ public void setEnable(boolean enable) {
this.enable = enable;
}

public void setAddressFilter(AddressFilter addressFilter) {
this.addressFilter = addressFilter;
public void setAcceptorProvider(PinpointServerAcceptorProvider acceptorProvider) {
this.acceptorProvider = acceptorProvider;
}

}
Expand Up @@ -16,14 +16,12 @@

package com.navercorp.pinpoint.collector.receiver.tcp;

import com.navercorp.pinpoint.collector.receiver.AddressFilterAdaptor;
import com.navercorp.pinpoint.common.server.util.AddressFilter;
import com.navercorp.pinpoint.collector.receiver.PinpointServerAcceptorProvider;
import com.navercorp.pinpoint.rpc.PinpointSocket;
import com.navercorp.pinpoint.rpc.packet.HandshakeResponseCode;
import com.navercorp.pinpoint.rpc.packet.PingPayloadPacket;
import com.navercorp.pinpoint.rpc.packet.RequestPacket;
import com.navercorp.pinpoint.rpc.packet.SendPacket;
import com.navercorp.pinpoint.rpc.server.ChannelFilter;
import com.navercorp.pinpoint.rpc.server.PinpointServer;
import com.navercorp.pinpoint.rpc.server.PinpointServerAcceptor;
import com.navercorp.pinpoint.rpc.server.ServerMessageListener;
Expand All @@ -45,7 +43,7 @@ public class TCPReceiver {
private final String name;

private final InetSocketAddress bindAddress;
private final AddressFilter addressFilter;
private final PinpointServerAcceptorProvider acceptorProvider;

private PinpointServerAcceptor serverAcceptor;

Expand All @@ -54,13 +52,13 @@ public class TCPReceiver {
private final TCPPacketHandler tcpPacketHandler;


public TCPReceiver(String name, TCPPacketHandler tcpPacketHandler, Executor executor, InetSocketAddress bindAddress, AddressFilter addressFilter) {
public TCPReceiver(String name, TCPPacketHandler tcpPacketHandler, Executor executor, InetSocketAddress bindAddress, PinpointServerAcceptorProvider acceptorProvider) {
this.name = Objects.requireNonNull(name, "name must not be null");
this.logger = LoggerFactory.getLogger(name);

this.bindAddress = Objects.requireNonNull(bindAddress, "bindAddress must not be null");

this.addressFilter = Objects.requireNonNull(addressFilter, "addressFilter must not be null");
this.acceptorProvider = Objects.requireNonNull(acceptorProvider, "acceptorProvider must not be null");
this.executor = Objects.requireNonNull(executor, "executor must not be null");

this.tcpPacketHandler = Objects.requireNonNull(tcpPacketHandler, "tcpPacketHandler must not be null");
Expand All @@ -80,8 +78,7 @@ public void start() {
}

private PinpointServerAcceptor newAcceptor() {
ChannelFilter connectedFilter = new AddressFilterAdaptor(addressFilter);
PinpointServerAcceptor acceptor = new PinpointServerAcceptor(connectedFilter);
PinpointServerAcceptor acceptor = acceptorProvider.get();

// take care when attaching message handlers as events are generated from the IO thread.
// pass them to a separate queue and handle them in a different thread.
Expand Down
10 changes: 8 additions & 2 deletions collector/src/main/resources/applicationContext-collector.xml
Expand Up @@ -309,10 +309,13 @@
<property name="datagramPoolSize" value="#{ statReceiverConfig.workerQueueSize + statReceiverConfig.workerThreadSize }"/>
<property name="enable" value="#{spanReceiverConfig.isUdpEnable()}"/>
</bean>
<bean id="spanAcceptorProvider" class="com.navercorp.pinpoint.collector.receiver.PinpointServerAcceptorProvider">
<property name="channelFilter" ref="channelFilter"/>
</bean>
<bean id="spanTcpReceiver" class="com.navercorp.pinpoint.collector.receiver.TCPReceiverBean">
<property name="bindIp" value="#{spanReceiverConfig.tcpBindIp}"/>
<property name="bindPort" value="#{spanReceiverConfig.tcpBindPort}"/>
<property name="addressFilter" ref="addressFilter"/>
<property name="acceptorProvider" ref="spanAcceptorProvider"/>
<property name="dispatchHandler" ref="spanDispatchHandlerWrapper"/>
<!-- TCP & UDP share threadpool for span -->
<property name="executor" ref="spanReceiverExecutor"/>
Expand All @@ -338,10 +341,13 @@
<property name="datagramPoolSize" value="#{ statReceiverConfig.workerQueueSize + statReceiverConfig.workerThreadSize }"/>
<property name="enable" value="#{statReceiverConfig.isUdpEnable()}"/>
</bean>
<bean id="statAcceptorProvider" class="com.navercorp.pinpoint.collector.receiver.PinpointServerAcceptorProvider">
<property name="channelFilter" ref="channelFilter"/>
</bean>
<bean id="tcpStatReceiver" class="com.navercorp.pinpoint.collector.receiver.TCPReceiverBean">
<property name="bindIp" value="#{statReceiverConfig.tcpBindIp}"/>
<property name="bindPort" value="#{statReceiverConfig.tcpBindPort}"/>
<property name="addressFilter" ref="addressFilter"/>
<property name="acceptorProvider" ref="statAcceptorProvider"/>
<property name="dispatchHandler" ref="statDispatchHandlerWrapper"/>
<!-- TCP & UDP share threadpool for stat -->
<property name="executor" ref="statReceiverExecutor"/>
Expand Down
Expand Up @@ -93,12 +93,16 @@ public UdpDataSender newUdpDataSender(DataReceiverGroupConfiguration mockConfig)
return new UdpDataSender("127.0.0.1", mockConfig.getUdpBindPort(), threadName, 10, 1000, 1024 * 64 * 100);
}

private PinpointServerAcceptorProvider createPinpointAcceptorProvider() {
return new PinpointServerAcceptorProvider();
}

private TCPReceiverBean createTcpReceiverBean(DataReceiverGroupConfiguration mockConfig, DispatchHandler dispatchHandler) {
TCPReceiverBean tcpReceiverBean = new TCPReceiverBean();
tcpReceiverBean.setBeanName("tcpReceiver");
tcpReceiverBean.setBindIp(mockConfig.getTcpBindIp());
tcpReceiverBean.setBindPort(mockConfig.getTcpBindPort());
tcpReceiverBean.setAddressFilter(AddressFilter.ALL);
tcpReceiverBean.setAcceptorProvider(createPinpointAcceptorProvider());
tcpReceiverBean.setDispatchHandler(dispatchHandler);
tcpReceiverBean.setExecutor(MoreExecutors.directExecutor());
tcpReceiverBean.setEnable(true);
Expand Down

0 comments on commit aaec23f

Please sign in to comment.