Skip to content

Commit

Permalink
[#4368] add to FlinkTcpDataSender for sending to flink server
Browse files Browse the repository at this point in the history
  • Loading branch information
minwoo-jung authored and emeroad committed Jul 25, 2018
1 parent 21a9a4d commit 21af285
Show file tree
Hide file tree
Showing 41 changed files with 503 additions and 330 deletions.
Expand Up @@ -16,14 +16,14 @@
package com.navercorp.pinpoint.collector.cluster.flink;

import com.navercorp.pinpoint.collector.cluster.connection.ClusterConnectionManager;
import com.navercorp.pinpoint.collector.sender.FlinkRequestFactory;
import com.navercorp.pinpoint.collector.sender.FlinkTcpDataSender;
import com.navercorp.pinpoint.collector.util.Address;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.profiler.sender.TcpDataSender;
import com.navercorp.pinpoint.rpc.client.DefaultPinpointClientFactory;
import com.navercorp.pinpoint.rpc.client.PinpointClientFactory;
import com.navercorp.pinpoint.thrift.io.FlinkHeaderTBaseSerializer;
import com.navercorp.pinpoint.thrift.io.FlinkHeaderTBaseSerializerFactory;
import com.navercorp.pinpoint.thrift.io.HeaderTBaseSerializer;
import com.navercorp.pinpoint.thrift.io.SerializerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,11 +36,13 @@ public class FlinkClusterConnectionManager implements ClusterConnectionManager {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final PinpointClientFactory pinpointClientFactory;
private final TcpDataSenderRepository tcpDataSenderRepository;
private final SerializerFactory<HeaderTBaseSerializer> flinkHeaderTBaseSerializerFactory;
private final FlinkHeaderTBaseSerializerFactory flinkHeaderTBaseSerializerFactory;
private final FlinkRequestFactory flinkRequestFactory;

public FlinkClusterConnectionManager(TcpDataSenderRepository tcpDataSenderRepository, FlinkHeaderTBaseSerializerFactory flinkHeaderTBaseSerializerFactory) {
public FlinkClusterConnectionManager(TcpDataSenderRepository tcpDataSenderRepository, FlinkHeaderTBaseSerializerFactory flinkHeaderTBaseSerializerFactory, FlinkRequestFactory flinkRequestFactory) {
this.tcpDataSenderRepository = Assert.requireNonNull(tcpDataSenderRepository, "tcpDataSenderRepository must not be null");
this.flinkHeaderTBaseSerializerFactory = Assert.requireNonNull(flinkHeaderTBaseSerializerFactory, "flinkHeaderTBaseSerializerFactory must not be null");
this.flinkRequestFactory = Assert.requireNonNull(flinkRequestFactory, "flinkRequestFactory must not be null");
this.pinpointClientFactory = newPointClientFactory();
}

Expand Down Expand Up @@ -80,7 +82,7 @@ public void connectPointIfAbsent(Address address) {

final SenderContext context = tcpDataSenderRepository.putIfAbsent(address, senderContext);
if (context != null) {
logger.info("TcpDataSender have already been for {}.", address);
logger.info("FlinkTcpDataSender have already been for {}.", address);
senderContext.close();
}

Expand Down Expand Up @@ -109,9 +111,8 @@ private SenderContext createTcpDataSender(Address address) {
try {
final String host = address.getHost();
final int port = address.getPort();
PinpointClientFactory pinpointClientFactory = this.pinpointClientFactory;
HeaderTBaseSerializer serializer = flinkHeaderTBaseSerializerFactory.createSerializer();
TcpDataSender tcpDataSender = new TcpDataSender("flink", host, port, pinpointClientFactory, serializer);
FlinkHeaderTBaseSerializer serializer = flinkHeaderTBaseSerializerFactory.createSerializer();
FlinkTcpDataSender tcpDataSender = new FlinkTcpDataSender("flink", host, port, pinpointClientFactory, serializer, flinkRequestFactory);
return new SenderContext(tcpDataSender);
} catch (Exception e) {
logger.error("not create tcpDataSender for {}.", address, e);
Expand Down
Expand Up @@ -15,25 +15,25 @@
*/
package com.navercorp.pinpoint.collector.cluster.flink;

import com.navercorp.pinpoint.collector.sender.FlinkTcpDataSender;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.profiler.sender.TcpDataSender;

/**
* @author minwoo.jung
*/
public class SenderContext {
private TcpDataSender tcpDataSender;
private FlinkTcpDataSender flinkTcpDataSender;

public SenderContext(TcpDataSender tcpDataSender) {
this.tcpDataSender = Assert.requireNonNull(tcpDataSender, "tcpDataSender must not be null");
public SenderContext(FlinkTcpDataSender tcpDataSender) {
this.flinkTcpDataSender = Assert.requireNonNull(tcpDataSender, "flinkTcpDataSender must not be null");
}

public TcpDataSender getTcpDataSender() {
return tcpDataSender;
public FlinkTcpDataSender getFlinkTcpDataSender() {
return flinkTcpDataSender;
}

public void close() {
tcpDataSender.stop();
flinkTcpDataSender.stop();
}

}
Expand Up @@ -15,9 +15,9 @@
*/
package com.navercorp.pinpoint.collector.cluster.flink;

import com.navercorp.pinpoint.collector.sender.FlinkTcpDataSender;
import com.navercorp.pinpoint.collector.service.SendAgentStatService;
import com.navercorp.pinpoint.collector.util.Address;
import com.navercorp.pinpoint.profiler.sender.TcpDataSender;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -38,25 +38,25 @@ public class TcpDataSenderRepository {

public SenderContext putIfAbsent(Address address, SenderContext senderContext) {
SenderContext context = clusterConnectionRepository.putIfAbsent(address, senderContext);
replaceDataInsendAgentStatService();
replaceDataInSendAgentStatService();
return context;
}

public SenderContext remove(Address address) {
SenderContext senderContext = clusterConnectionRepository.remove(address);
replaceDataInsendAgentStatService();
replaceDataInSendAgentStatService();
return senderContext;
}

private void replaceDataInsendAgentStatService() {
private void replaceDataInSendAgentStatService() {
Collection<SenderContext> values = clusterConnectionRepository.values();

List<TcpDataSender> tcpDataSenderList = new ArrayList<>(values.size());
List<FlinkTcpDataSender> tcpDataSenderList = new ArrayList<>(values.size());
for (SenderContext senderContext : values) {
tcpDataSenderList.add(senderContext.getTcpDataSender());
tcpDataSenderList.add(senderContext.getFlinkTcpDataSender());
}

sendAgentStatService.replaceFlinkServerList(tcpDataSenderList);
sendAgentStatService.replaceFlinkTcpDataSenderList(tcpDataSenderList);
}

public boolean containsKey(Address address) {
Expand Down
Expand Up @@ -132,7 +132,7 @@ public void receive(DatagramSocket localSocket, DatagramPacket packet) {
} else if (tBase instanceof TSpanChunk) {
((TSpanChunk) tBase).setSpanEventList(spanEventList);
}
Message<TBase<?, ?>> message = new DefaultMessage<>(lastMessage.getHeader(), tBase);
Message<TBase<?, ?>> message = new DefaultMessage<>(lastMessage.getHeader(), lastMessage.getHeaderEntity(), tBase);
ServerRequest<TBase<?, ?>> mergedRequest = newServerRequest(message, remoteSocketAddress);

dispatchHandler.dispatchRequestMessage(mergedRequest, fake);
Expand Down
Expand Up @@ -13,21 +13,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.cluster.flink;
package com.navercorp.pinpoint.collector.sender;

import com.navercorp.pinpoint.io.header.HeaderDataGenerator;
import com.navercorp.pinpoint.io.header.HeaderEntity;
import com.navercorp.pinpoint.io.request.FlinkRequest;

import org.apache.thrift.TBase;

import java.util.HashMap;
import java.util.Map;

/**
* @author minwoo.jung
*/
public class DefaultCustomHeadergenerator implements HeaderDataGenerator {
public class FlinkRequestFactory {

@Override
public Map<String, String> generate() {
return new HashMap<String, String>();
public FlinkRequest createFlinkRequest(TBase<?,?> data, Map<String, String> headerEntity) {
return new FlinkRequest(new HeaderEntity(headerEntity), data);
}

}
@@ -0,0 +1,86 @@
/*
* Copyright 2018 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.collector.sender;

import com.navercorp.pinpoint.io.request.FlinkRequest;
import com.navercorp.pinpoint.profiler.sender.TcpDataSender;
import com.navercorp.pinpoint.rpc.client.PinpointClientFactory;
import com.navercorp.pinpoint.thrift.io.FlinkHeaderTBaseSerializer;
import org.apache.thrift.TBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import java.util.HashMap;
import java.util.Objects;

/**
* @author minwoo.jung
*/
public class FlinkTcpDataSender extends TcpDataSender {

private final Logger logger = LoggerFactory.getLogger(this.getClass());

private final FlinkHeaderTBaseSerializer flinkHeaderTBaseSerializer;
private final FlinkRequestFactory flinkRequestFactory;

public FlinkTcpDataSender(String name, String host, int port, PinpointClientFactory clientFactory, FlinkHeaderTBaseSerializer serializer, FlinkRequestFactory flinkRequestFactory) {
super(name, host, port, clientFactory);

if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("name must not be empty.");
}
if (StringUtils.isEmpty(host)) {
throw new IllegalArgumentException("host must not be empty.");
}
if (Objects.isNull(clientFactory)) {
throw new IllegalArgumentException("clientFactory must not be null.");
}
if (Objects.isNull(serializer)) {
throw new IllegalArgumentException("serializer must not be null.");
}
if (Objects.isNull(flinkRequestFactory)) {
throw new IllegalArgumentException("flinkRequestFactory must not be null.");
}

this.flinkHeaderTBaseSerializer = serializer;
this.flinkRequestFactory = flinkRequestFactory;
}

@Override
public boolean send(TBase<?, ?> data) {
FlinkRequest flinkRequest = flinkRequestFactory.createFlinkRequest(data, new HashMap<String, String>(0));
return executor.execute(flinkRequest);
}

@Override
protected void sendPacket(Object flinkRequest) {
try {
if (flinkRequest instanceof FlinkRequest) {
byte[] copy = flinkHeaderTBaseSerializer.serialize((FlinkRequest) flinkRequest);
if (copy == null) {
return;
}
doSend(copy);
} else {
logger.error("sendPacket fail. invalid dto type:{}", flinkRequest.getClass());
return;
}
} catch (Exception e) {
logger.warn("tcp send fail. Caused:{}", e.getMessage(), e);
}
}
}
Expand Up @@ -17,8 +17,8 @@

import com.navercorp.pinpoint.collector.config.CollectorConfiguration;
import com.navercorp.pinpoint.collector.mapper.thrift.stat.TFAgentStatBatchMapper;
import com.navercorp.pinpoint.collector.sender.FlinkTcpDataSender;
import com.navercorp.pinpoint.common.server.bo.stat.AgentStatBo;
import com.navercorp.pinpoint.profiler.sender.TcpDataSender;
import com.navercorp.pinpoint.thrift.dto.flink.TFAgentStatBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,7 +38,7 @@ public class SendAgentStatService implements AgentStatService {
private final boolean flinkClusterEnable;
private final TFAgentStatBatchMapper tFAgentStatBatchMapper = new TFAgentStatBatchMapper();

private volatile List<TcpDataSender> flinkServerList = new CopyOnWriteArrayList<>();
private volatile List<FlinkTcpDataSender> flinkTcpDataSenderList = new CopyOnWriteArrayList<>();
private AtomicInteger callCount = new AtomicInteger(1);

public SendAgentStatService(CollectorConfiguration config) {
Expand All @@ -52,10 +52,10 @@ public void save(AgentStatBo agentStatBo) {
}

try {
TcpDataSender tcpDataSender = roundRobinTcpDataSender();
FlinkTcpDataSender tcpDataSender = roundRobinTcpDataSender();

if (tcpDataSender == null) {
logger.warn("not send flink server. Because TcpDataSender is null");
logger.warn("not send flink server. Because FlinkTcpDataSender is null");
return;
}

Expand All @@ -71,29 +71,29 @@ public void save(AgentStatBo agentStatBo) {
}
}

private TcpDataSender roundRobinTcpDataSender() {
if (flinkServerList.isEmpty()) {
private FlinkTcpDataSender roundRobinTcpDataSender() {
if (flinkTcpDataSenderList.isEmpty()) {
return null;
}

int count = callCount.getAndIncrement();
int tcpDataSenderIndex = count % flinkServerList.size();
int tcpDataSenderIndex = count % flinkTcpDataSenderList.size();

if (tcpDataSenderIndex < 0) {
tcpDataSenderIndex = tcpDataSenderIndex * -1;
callCount.set(0);
}

try {
return flinkServerList.get(tcpDataSenderIndex);
return flinkTcpDataSenderList.get(tcpDataSenderIndex);
} catch (Exception e) {
logger.warn("not get TcpDataSender", e);
logger.warn("not get FlinkTcpDataSender", e);
}

return null;
}

public void replaceFlinkServerList(List<TcpDataSender> flinkServerList) {
this.flinkServerList = new CopyOnWriteArrayList<>(flinkServerList);
public void replaceFlinkTcpDataSenderList(List<FlinkTcpDataSender> flinkTcpDataSenderList) {
this.flinkTcpDataSenderList = new CopyOnWriteArrayList<FlinkTcpDataSender>(flinkTcpDataSenderList);
}
}
4 changes: 2 additions & 2 deletions collector/src/main/resources/applicationContext-collector.xml
Expand Up @@ -456,19 +456,19 @@
<constructor-arg index="0" ref="sendAgentStatService"/>
</bean>

<bean id="flinkHeaderDataGeneratorDelegator" class="com.navercorp.pinpoint.collector.cluster.flink.FlinkHeaderDataGeneratorDelegator"/>
<bean id="flinkTBaseLocator" class="com.navercorp.pinpoint.thrift.io.FlinkTBaseLocator">
<constructor-arg index="0">
<util:constant static-field="com.navercorp.pinpoint.io.header.v2.HeaderV2.VERSION"/>
</constructor-arg>
<constructor-arg index="1" ref="flinkHeaderDataGeneratorDelegator"/>
</bean>
<bean id="flinkHeaderTBaseSerializerFactory" class="com.navercorp.pinpoint.thrift.io.FlinkHeaderTBaseSerializerFactory">
<constructor-arg index="0" value="#{flinkTBaseLocator.typeLocator}"/>
</bean>
<bean id="flinkRequestFactory" class="com.navercorp.pinpoint.collector.sender.FlinkRequestFactory"/>
<bean id="flinkClusterConnectionManager" class="com.navercorp.pinpoint.collector.cluster.flink.FlinkClusterConnectionManager">
<constructor-arg index="0" ref="tcpDataSenderRepository"/>
<constructor-arg index="1" ref="flinkHeaderTBaseSerializerFactory"/>
<constructor-arg index="2" ref="flinkRequestFactory"/>
</bean>

<bean id="flinkClusterService" class="com.navercorp.pinpoint.collector.cluster.flink.FlinkClusterService">
Expand Down
Expand Up @@ -23,6 +23,8 @@
import com.navercorp.pinpoint.common.server.util.AgentEventMessageSerializer;
import com.navercorp.pinpoint.common.server.util.AgentEventType;
import com.navercorp.pinpoint.common.util.BytesUtils;
import com.navercorp.pinpoint.io.header.Header;
import com.navercorp.pinpoint.io.header.HeaderEntity;
import com.navercorp.pinpoint.io.header.v1.HeaderV1;
import com.navercorp.pinpoint.io.request.DefaultMessage;
import com.navercorp.pinpoint.io.request.Message;
Expand Down Expand Up @@ -146,7 +148,7 @@ public void handler_should_handle_serialization_of_request_events() throws Excep
ArgumentCaptor<AgentEventBo> argCaptor = ArgumentCaptor.forClass(AgentEventBo.class);
HeaderTBaseDeserializer deserializer = mock(HeaderTBaseDeserializer.class);
when(this.deserializerFactory.createDeserializer()).thenReturn(deserializer);
Message<TBase<?, ?>> message = new DefaultMessage<>(new HeaderV1((short)1000), expectedThreadDumpResponse);
Message<TBase<?, ?>> message = new DefaultMessage<>(new HeaderV1((short)1000), HeaderEntity.EMPTY_HEADER_ENTITY, expectedThreadDumpResponse);
when(deserializer.deserialize(expectedThreadDumpResponseBody)).thenReturn(message);
// when
this.agentEventService.handleResponseEvent(responseEvent, TEST_EVENT_TIMESTAMP);
Expand Down Expand Up @@ -180,7 +182,7 @@ public void handler_should_ignore_request_events_with_unsupported_message_types(
ArgumentCaptor<AgentEventBo> argCaptor = ArgumentCaptor.forClass(AgentEventBo.class);
HeaderTBaseDeserializer deserializer = mock(HeaderTBaseDeserializer.class);
when(this.deserializerFactory.createDeserializer()).thenReturn(deserializer);
Message<TBase<?, ?>> message = new DefaultMessage<>(new HeaderV1((short)1000), mismatchingResponse);
Message<TBase<?, ?>> message = new DefaultMessage<>(new HeaderV1((short)1000), HeaderEntity.EMPTY_HEADER_ENTITY, mismatchingResponse);
when(deserializer.deserialize(mismatchingResponseBody)).thenReturn(message);
// when
this.agentEventService.handleResponseEvent(responseEvent, TEST_EVENT_TIMESTAMP);
Expand Down
Expand Up @@ -27,7 +27,7 @@
import com.navercorp.pinpoint.flink.receiver.AgentStatHandler;
import com.navercorp.pinpoint.flink.receiver.TcpDispatchHandler;
import com.navercorp.pinpoint.flink.receiver.TcpSourceFunction;
import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.flink.vo.RawData;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -156,7 +156,7 @@ public void setSourceFunctionParallel(DataStreamSource rawData) {
rawData.setParallelism(parallel);
}

public void setStatHandlerTcpDispatchHandler(SourceContext<ServerRequest> sourceContext) {
public void setStatHandlerTcpDispatchHandler(SourceContext<RawData> sourceContext) {
AgentStatHandler agentStatHandler = new AgentStatHandler(sourceContext);
tcpDispatchHandler.setAgentStatHandler(agentStatHandler);
}
Expand Down

0 comments on commit 21af285

Please sign in to comment.