Skip to content

Commit

Permalink
[#11158] Change gRPC hedging to default retransmission policy
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Jun 18, 2024
1 parent 168f53a commit 7273fd5
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 49 deletions.
10 changes: 10 additions & 0 deletions agent-module/agent/src/main/resources/pinpoint-root.config
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ profiler.system.property.io.netty.tryReflectionSetAccessible=true
# disable netty directbuffer
profiler.system.property.io.netty.noPreferDirect=false

# Enable Grpc Hedging Policy
# https://github.com/grpc/proposal/blob/master/A6-client-retries.md#hedging-policy
profiler.transport.grpc.metadata.sender.retry.enable=false
profiler.transport.grpc.metadata.sender.max.attempts=3
profiler.transport.grpc.metadata.sender.hedging.delay.millis=1000
# 16777216 = 64m
profiler.transport.grpc.metadata.sender.retry.buffer.size=16777216
# 1048576 = 1m
profiler.transport.grpc.metadata.sender.retry.per.rpc.buffer.limit=1048576

###########################################################
# Profiler Global Configuration #
###########################################################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.navercorp.pinpoint.profiler.context.module.MetadataDataSender;
import com.navercorp.pinpoint.profiler.metadata.MetaDataType;
import com.navercorp.pinpoint.profiler.sender.grpc.MetadataGrpcDataSender;
import com.navercorp.pinpoint.profiler.sender.grpc.MetadataGrpcHedgingDataSender;
import io.grpc.ClientInterceptor;
import io.grpc.NameResolverProvider;
import io.netty.handler.ssl.SslContext;
Expand Down Expand Up @@ -81,25 +82,32 @@ public EnhancedDataSender<MetaDataType, ResponseMessage> get() {
final String collectorIp = grpcTransportConfig.getMetadataCollectorIp();
final int collectorPort = grpcTransportConfig.getMetadataCollectorPort();
final boolean sslEnable = grpcTransportConfig.isMetadataSslEnable();
final int senderExecutorQueueSize = grpcTransportConfig.getMetadataSenderExecutorQueueSize();

final boolean clientRetryEnable = grpcTransportConfig.isMetadataRetryEnable();

final ChannelFactoryBuilder channelFactoryBuilder = newChannelFactoryBuilder(sslEnable, clientRetryEnable);

final ChannelFactory channelFactory = channelFactoryBuilder.build();
final int senderExecutorQueueSize = grpcTransportConfig.getMetadataSenderExecutorQueueSize();

if (clientRetryEnable) {
return new MetadataGrpcHedgingDataSender<>(collectorIp, collectorPort, senderExecutorQueueSize, messageConverter, channelFactory);
}

final int retryMaxCount = grpcTransportConfig.getMetadataRetryMaxCount();
final int retryDelayMillis = grpcTransportConfig.getMetadataRetryDelayMillis();

return new MetadataGrpcDataSender<>(collectorIp, collectorPort, senderExecutorQueueSize, messageConverter, channelFactory, retryMaxCount, retryDelayMillis, clientRetryEnable);
return new MetadataGrpcDataSender<>(collectorIp, collectorPort, senderExecutorQueueSize, messageConverter, channelFactory, retryMaxCount, retryDelayMillis);
}

protected ChannelFactoryBuilder newChannelFactoryBuilder(boolean sslEnable, boolean clientRetryEnable) {
final int channelExecutorQueueSize = grpcTransportConfig.getMetadataChannelExecutorQueueSize();
final UnaryCallDeadlineInterceptor unaryCallDeadlineInterceptor = new UnaryCallDeadlineInterceptor(grpcTransportConfig.getMetadataRequestTimeout());
final ClientOption clientOption = grpcTransportConfig.getMetadataClientOption();

ChannelFactoryBuilder channelFactoryBuilder = new DefaultChannelFactoryBuilder("MetadataGrpcDataSender");
final String factoryName = getChannelFactoryName(clientRetryEnable);

ChannelFactoryBuilder channelFactoryBuilder = new DefaultChannelFactoryBuilder(factoryName);
channelFactoryBuilder.setHeaderFactory(headerFactory);
channelFactoryBuilder.setNameResolverProvider(nameResolverProvider);
channelFactoryBuilder.addClientInterceptor(unaryCallDeadlineInterceptor);
Expand Down Expand Up @@ -131,4 +139,11 @@ protected ChannelFactoryBuilder newChannelFactoryBuilder(boolean sslEnable, bool

return channelFactoryBuilder;
}

private String getChannelFactoryName(boolean clientRetryEnable) {
if (clientRetryEnable) {
return MetadataGrpcHedgingDataSender.class.getSimpleName();
}
return MetadataGrpcDataSender.class.getSimpleName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,44 @@

public class LogResponseStreamObserver<ResT> implements StreamObserver<ResT> {
private final Logger logger;
private final String name;
private final long requestCount;

public LogResponseStreamObserver(Logger logger) {
public LogResponseStreamObserver(Logger logger, String name, long requestCount) {
this.logger = Objects.requireNonNull(logger, "logger");
this.name = Objects.requireNonNull(name, "name");
this.requestCount = requestCount;
}

@Override
public void onNext(ResT response) {
if (logger.isDebugEnabled()) {
logger.debug("Request success. result={}", logString(response));
logger.debug("{} Request success. result={}", logString(response));
}
}


@Override
public void onError(Throwable throwable) {
final StatusError statusError = StatusErrors.throwable(throwable);
if (statusError.isSimpleError()) {
logger.info("Error. cause={}", statusError.getMessage());
} else {
logger.info("Error. cause={}", statusError.getMessage(), statusError.getThrowable());
if (logger.isInfoEnabled()) {
final StatusError statusError = StatusErrors.throwable(throwable);
if (statusError.isSimpleError()) {
logger.info("{} Error. requestCount={}, cause={}", name, requestCount, statusError.getMessage());
} else {
if (logger.isDebugEnabled()) {
logger.debug("{} Error. requestCount={}, cause={}", name, requestCount, statusError.getMessage(), statusError.getThrowable());
} else {
logger.info("{} Error. requestCount={}, cause={}", name, requestCount, statusError.getMessage());
}

}
}
}

@Override
public void onCompleted() {
if (logger.isDebugEnabled()) {
logger.debug("onCompleted");
logger.debug("{} onCompleted. requestCount={}", requestCount, name);
}
}

Expand All @@ -73,6 +84,7 @@ private String logString(Object message) {
public String toString() {
return "LogResponseStreamObserver{" +
"logger=" + logger +
", name='" + name + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public class MetadataGrpcDataSender<T> extends GrpcDataSender<T> implements Enha
private final MetadataGrpc.MetadataStub metadataStub;
private final int maxAttempts;
private final int retryDelayMillis;
private final boolean clientRetryEnable;

private final Timer retryTimer;
private static final long MAX_PENDING_TIMEOUTS = 1024 * 4;
Expand All @@ -58,7 +57,7 @@ public class MetadataGrpcDataSender<T> extends GrpcDataSender<T> implements Enha

public MetadataGrpcDataSender(String host, int port, int executorQueueSize,
MessageConverter<T, GeneratedMessageV3> messageConverter,
ChannelFactory channelFactory, int retryMaxCount, int retryDelayMillis, boolean clientRetryEnable) {
ChannelFactory channelFactory, int retryMaxCount, int retryDelayMillis) {
super(host, port, executorQueueSize, messageConverter, channelFactory);

this.maxAttempts = getMaxAttempts(retryMaxCount);
Expand All @@ -78,7 +77,6 @@ public void scheduleNextRetry(GeneratedMessageV3 request, int remainingRetryCoun
MetadataGrpcDataSender.this.scheduleNextRetry(request, remainingRetryCount);
}
};
this.clientRetryEnable = clientRetryEnable;
}

private int getMaxAttempts(int retryMaxCount) {
Expand All @@ -104,47 +102,13 @@ public boolean request(T data, BiConsumer<ResponseMessage, Throwable> listener)
throw new UnsupportedOperationException("unsupported operation request(data, listener)");
}

//send with retry
@Override
public boolean send(T data) {
try {
final GeneratedMessageV3 message = messageConverter.toMessage(data);

if (message instanceof PSqlMetaData) {
final PSqlMetaData sqlMetaData = (PSqlMetaData) message;
this.metadataStub.requestSqlMetaData(sqlMetaData, newLogStreamObserver());
} else if (message instanceof PSqlUidMetaData) {
final PSqlUidMetaData sqlUidMetaData = (PSqlUidMetaData) message;
this.metadataStub.requestSqlUidMetaData(sqlUidMetaData, newLogStreamObserver());
} else if (message instanceof PApiMetaData) {
final PApiMetaData apiMetaData = (PApiMetaData) message;
this.metadataStub.requestApiMetaData(apiMetaData, newLogStreamObserver());
} else if (message instanceof PStringMetaData) {
final PStringMetaData stringMetaData = (PStringMetaData) message;
this.metadataStub.requestStringMetaData(stringMetaData, newLogStreamObserver());
} else if (message instanceof PExceptionMetaData) {
final PExceptionMetaData exceptionMetaData = (PExceptionMetaData) message;
this.metadataStub.requestExceptionMetaData(exceptionMetaData, newLogStreamObserver());
} else {
logger.warn("Unsupported message {}", MessageFormatUtils.debugLog(message));
}
} catch (Exception e) {
logger.info("Failed to send metadata={}", data, e);
return false;
}
return true;
}

private StreamObserver<PResult> newLogStreamObserver() {
return new LogResponseStreamObserver<>(logger);
throw new UnsupportedOperationException("unsupported operation send(data)");
}

@Override
public boolean request(final T data) {
if (clientRetryEnable) {
return this.send(data);
}

final Runnable convertAndRun = new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.profiler.sender.grpc;

import com.google.protobuf.GeneratedMessageV3;
import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender;
import com.navercorp.pinpoint.common.profiler.message.MessageConverter;
import com.navercorp.pinpoint.grpc.MessageFormatUtils;
import com.navercorp.pinpoint.grpc.client.ChannelFactory;
import com.navercorp.pinpoint.grpc.trace.MetadataGrpc;
import com.navercorp.pinpoint.grpc.trace.PApiMetaData;
import com.navercorp.pinpoint.grpc.trace.PExceptionMetaData;
import com.navercorp.pinpoint.grpc.trace.PResult;
import com.navercorp.pinpoint.grpc.trace.PSqlMetaData;
import com.navercorp.pinpoint.grpc.trace.PSqlUidMetaData;
import com.navercorp.pinpoint.grpc.trace.PStringMetaData;
import com.navercorp.pinpoint.io.ResponseMessage;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;

/**
*/
public class MetadataGrpcHedgingDataSender<T> extends GrpcDataSender<T> implements EnhancedDataSender<T, ResponseMessage> {
//
private final MetadataGrpc.MetadataStub metadataStub;

private final AtomicLong requestCount = new AtomicLong(0);

public MetadataGrpcHedgingDataSender(String host, int port, int executorQueueSize,
MessageConverter<T, GeneratedMessageV3> messageConverter,
ChannelFactory channelFactory) {
super(host, port, executorQueueSize, messageConverter, channelFactory);

this.metadataStub = MetadataGrpc.newStub(managedChannel);
}

// Unsupported Operation
@Override
public boolean request(T data, int retry) {
throw new UnsupportedOperationException("unsupported operation request(data, retry)");
}

@Override
public boolean request(T data, BiConsumer<ResponseMessage, Throwable> listener) {
throw new UnsupportedOperationException("unsupported operation request(data, listener)");
}

@Override
public boolean send(T data) {
throw new UnsupportedOperationException("unsupported operation send(data)");
}

@Override
public boolean request(final T data) {
try {
final GeneratedMessageV3 message = messageConverter.toMessage(data);

if (message instanceof PSqlMetaData) {
final PSqlMetaData sqlMetaData = (PSqlMetaData) message;
this.metadataStub.requestSqlMetaData(sqlMetaData, newLogStreamObserver(sqlMetaData));
} else if (message instanceof PSqlUidMetaData) {
final PSqlUidMetaData sqlUidMetaData = (PSqlUidMetaData) message;
this.metadataStub.requestSqlUidMetaData(sqlUidMetaData, newLogStreamObserver(sqlUidMetaData));
} else if (message instanceof PApiMetaData) {
final PApiMetaData apiMetaData = (PApiMetaData) message;
this.metadataStub.requestApiMetaData(apiMetaData, newLogStreamObserver(apiMetaData));
} else if (message instanceof PStringMetaData) {
final PStringMetaData stringMetaData = (PStringMetaData) message;
this.metadataStub.requestStringMetaData(stringMetaData, newLogStreamObserver(stringMetaData));
} else if (message instanceof PExceptionMetaData) {
final PExceptionMetaData exceptionMetaData = (PExceptionMetaData) message;
this.metadataStub.requestExceptionMetaData(exceptionMetaData, newLogStreamObserver(exceptionMetaData));
} else {
if (logger.isWarnEnabled()) {
logger.warn("Unsupported message {}", MessageFormatUtils.debugLog(message));
}
}
} catch (Throwable e) {
logger.info("Failed to send metadata={}", data, e);
return false;
}
return true;
}

private StreamObserver<PResult> newLogStreamObserver(GeneratedMessageV3 message) {
String type = message.getClass().getSimpleName();
long requestCount = this.requestCount.incrementAndGet();
return new LogResponseStreamObserver<>(logger, type, requestCount);
}

@Override
public void stop() {
if (shutdown) {
return;
}
this.shutdown = true;

super.release();
}
}

0 comments on commit 7273fd5

Please sign in to comment.