Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#11158] Add Testcase #11163

Merged
merged 1 commit into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions agent-module/profiler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,16 @@
</exclusion>
</exclusions>
</dependency>-->

<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-inprocess</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-otlp</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,334 @@
package com.navercorp.pinpoint.profiler.sender.grpc;

import com.navercorp.pinpoint.grpc.client.ChannelFactory;
import com.navercorp.pinpoint.grpc.client.retry.HedgingServiceConfigBuilder;
import com.navercorp.pinpoint.grpc.trace.MetadataGrpc;
import com.navercorp.pinpoint.grpc.trace.PApiMetaData;
import com.navercorp.pinpoint.grpc.trace.PResult;
import com.navercorp.pinpoint.profiler.context.grpc.GrpcMetadataMessageConverter;
import com.navercorp.pinpoint.profiler.context.grpc.mapper.MetaDataMapper;
import com.navercorp.pinpoint.profiler.metadata.ApiMetaData;
import com.navercorp.pinpoint.profiler.metadata.MetaDataType;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.ForwardingClientCall;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.Status;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mapstruct.factory.Mappers;

import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;

class MetadataGrpcDataSenderTest {

private final Logger logger = LogManager.getLogger(this.getClass());

private static final long DEFAULT_TEST_HEDGING_DELAY_MILLIS = 500L;
private static final String DELAY_METADATA = "delay few seconds";
private static final String RUNTIME_EXCEPTION_METADATA = "runtime exception test";
private static final String UNAVAILABLE_METADATA = "status code UNAVAILABLE";
private static final String UNKNOWN_METADATA = "status code UNKNOWN";
private static final String FAIL_METADATA = "success=false";

private static final Metadata.Key<String> TEST_ID_KEY = Metadata.Key.of("test-id", Metadata.ASCII_STRING_MARSHALLER);
private static final Metadata.Key<String> GRPC_PREVIOUS_RPC_ATTEMPTS_KEY = Metadata.Key.of("grpc-previous-rpc-attempts", Metadata.ASCII_STRING_MARSHALLER);

private static Server server;
private static String serverName;
private static int testId;
private static int requestCounter;

@BeforeAll
public static void setUp() {
serverName = InProcessServerBuilder.generateName();

server = InProcessServerBuilder
.forName(serverName)
//.directExecutor()
.addService(ServerInterceptors.intercept(new MetadataGrpcService(), new TestServerInterceptor()))
.build();

CompletableFuture.supplyAsync(() -> {
try {
server.start();
} catch (Exception e) {
e.printStackTrace();
}
return null;
});

testId = 0;
}

@AfterAll
public static void tearDown() {
server.shutdown();
}

@BeforeEach
public void resetCounter() {
testId++;
requestCounter = 0;
}

public static class MetadataGrpcService extends MetadataGrpc.MetadataImplBase {
@Override
public void requestApiMetaData(PApiMetaData request, StreamObserver<PResult> responseObserver) {
System.out.println(request);
switch (request.getApiInfo()) {
case DELAY_METADATA:
try {
Thread.sleep(1000);
System.out.println("server delayed response time: " + new Timestamp(System.currentTimeMillis()));
responseObserver.onNext(PResult.newBuilder().setSuccess(true).setMessage("test 1s delay, status code: OK").build());
} catch (InterruptedException ignore) {
}
responseObserver.onCompleted();
break;
case UNAVAILABLE_METADATA:
responseObserver.onError(Status.UNAVAILABLE.withDescription("test status code: UNAVAILABLE").asException());
break;
case UNKNOWN_METADATA:
responseObserver.onError(Status.UNKNOWN.withDescription("test status code: UNKNOWN").asException());
break;
case RUNTIME_EXCEPTION_METADATA:
responseObserver.onError(new RuntimeException("test with runtime exception, status code: UNKNOWN "));
break;
case FAIL_METADATA:
responseObserver.onNext(PResult.newBuilder().setSuccess(false).setMessage("test success=false, status code: OK").build());
responseObserver.onCompleted();
break;
default:
responseObserver.onNext(PResult.newBuilder().setSuccess(true).setMessage("test success=true, status code: OK").build());
responseObserver.onCompleted();
break;
}
}
}

public static class TestServerInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
int totalAttempts = -1;
String callTestId = metadata.get(TEST_ID_KEY);
if (callTestId != null && callTestId.equals(Integer.toString(testId))) {
requestCounter++;

String previousAttempts = metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS_KEY);
if (previousAttempts == null) {
totalAttempts = 1;
} else {
totalAttempts = Integer.parseInt(previousAttempts) + 1;
}
//Assertions.assertThat(requestCounter).isEqualTo(totalAttempts);
}

System.out.println("---- server time: " + new Timestamp(System.currentTimeMillis()));
System.out.println("testId: " + callTestId);
System.out.println("total attempts: " + totalAttempts);
return Contexts.interceptCall(Context.current(), serverCall, metadata, serverCallHandler);
}
}

@Test
void sendTest() throws InterruptedException {
HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder();
InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder()
.defaultServiceConfig(serviceConfigBuilder.buildMetadataConfig());

MetadataGrpcHedgingDataSender<MetaDataType> metadataGrpcDataSender = getMetadataGrpcHedgingDataSender(channelBuilder);

ApiMetaData apiMetaData = new ApiMetaData(1, "call", 10, 2);
boolean send = metadataGrpcDataSender.request(apiMetaData);

Assertions.assertThat(send).isTrue();
Thread.sleep(DEFAULT_TEST_HEDGING_DELAY_MILLIS * 2);
Assertions.assertThat(requestCounter).isGreaterThan(0);
}

@Test
void sendFatalStatusCodeTest() throws InterruptedException {
HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder();
serviceConfigBuilder.setNonFatalStatusCodes(Collections.emptyList());
InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder()
.defaultServiceConfig(serviceConfigBuilder.buildMetadataConfig());
MetadataGrpcHedgingDataSender<MetaDataType> metadataGrpcDataSender = getMetadataGrpcHedgingDataSender(channelBuilder);

ApiMetaData apiMetaData = new ApiMetaData(1, UNAVAILABLE_METADATA, 10, 2);
boolean send = metadataGrpcDataSender.request(apiMetaData);

Assertions.assertThat(send).isTrue();
Thread.sleep(DEFAULT_TEST_HEDGING_DELAY_MILLIS * 4);
Assertions.assertThat(requestCounter).isEqualTo(1);
}

@Test
void sendFailRetryTest() throws InterruptedException {
HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder();
InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder()
.defaultServiceConfig(serviceConfigBuilder.buildMetadataConfig());
MetadataGrpcHedgingDataSender<MetaDataType> metadataGrpcDataSender = getMetadataGrpcHedgingDataSender(channelBuilder);

ApiMetaData apiMetaData = new ApiMetaData(2, UNAVAILABLE_METADATA, 10, 2);
boolean send = metadataGrpcDataSender.request(apiMetaData);

Assertions.assertThat(send).isTrue();
Thread.sleep(DEFAULT_TEST_HEDGING_DELAY_MILLIS * 4);
Assertions.assertThat(requestCounter).isEqualTo(3);
}

@Test
void sendDelayRetryTest() throws InterruptedException {
HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder();
serviceConfigBuilder.setHedgingDelayMillis(100);
InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder()
.defaultServiceConfig(serviceConfigBuilder.buildMetadataConfig());
MetadataGrpcHedgingDataSender<MetaDataType> metadataGrpcDataSender = getMetadataGrpcHedgingDataSender(channelBuilder);

ApiMetaData apiMetaData = new ApiMetaData(3, DELAY_METADATA, 10, 2);
boolean send = metadataGrpcDataSender.request(apiMetaData);

Assertions.assertThat(send).isTrue();
Thread.sleep(DEFAULT_TEST_HEDGING_DELAY_MILLIS * 4);
Assertions.assertThat(requestCounter).isGreaterThan(1);
}

@Test
void sendFailRetryRuntimeExceptionTest() throws InterruptedException {
HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder();
InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder()
.defaultServiceConfig(serviceConfigBuilder.buildMetadataConfig());
MetadataGrpcHedgingDataSender<MetaDataType> metadataGrpcDataSender = getMetadataGrpcHedgingDataSender(channelBuilder);

ApiMetaData apiMetaData = new ApiMetaData(3, RUNTIME_EXCEPTION_METADATA, 10, 2);
boolean send = metadataGrpcDataSender.request(apiMetaData);

Assertions.assertThat(send).isTrue();
Thread.sleep(DEFAULT_TEST_HEDGING_DELAY_MILLIS * 4);
Assertions.assertThat(requestCounter).isGreaterThan(1);
}

@Test
void sendMaxAttempts() throws InterruptedException {
HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder();
serviceConfigBuilder.setMaxAttempts(5);
InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder()
.defaultServiceConfig(serviceConfigBuilder.buildMetadataConfig());
MetadataGrpcHedgingDataSender<MetaDataType> metadataGrpcDataSender = getMetadataGrpcHedgingDataSender(channelBuilder);

ApiMetaData apiMetaData = new ApiMetaData(3, UNAVAILABLE_METADATA, 10, 2);
boolean send = metadataGrpcDataSender.request(apiMetaData);

Assertions.assertThat(send).isTrue();
Thread.sleep(DEFAULT_TEST_HEDGING_DELAY_MILLIS * 7);
Assertions.assertThat(requestCounter).isEqualTo(5);
}

@Test
void sendMaxAttemptsLimit() throws InterruptedException {
HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder();
serviceConfigBuilder.setMaxAttempts(5);
InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder()
.maxHedgedAttempts(2)
.defaultServiceConfig(serviceConfigBuilder.buildMetadataConfig());
MetadataGrpcHedgingDataSender<MetaDataType> metadataGrpcDataSender = getMetadataGrpcHedgingDataSender(channelBuilder);

ApiMetaData apiMetaData = new ApiMetaData(3, UNAVAILABLE_METADATA, 10, 2);
boolean send = metadataGrpcDataSender.request(apiMetaData);

Assertions.assertThat(send).isTrue();
Thread.sleep(DEFAULT_TEST_HEDGING_DELAY_MILLIS * 6);
Assertions.assertThat(requestCounter).isEqualTo(2);
}


private InProcessChannelBuilder getInProcessChannelBuilder() {
return InProcessChannelBuilder.forName(serverName)
.directExecutor()
.intercept(new TestClientInterceptor())
.enableRetry()
//.retryBufferSize()
//.perRpcBufferLimit()
;
}

public class TestClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
final ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, callOptions);
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
logger.info("request, testId: {}, client time: {}", testId, new Timestamp(System.currentTimeMillis()).toString());

headers.put(TEST_ID_KEY, Integer.toString(testId));
super.start(responseListener, headers);
}
};
}
}

private HedgingServiceConfigBuilder getTestServiceConfigBuilder() {
HedgingServiceConfigBuilder serviceConfigBuilder = new HedgingServiceConfigBuilder();
serviceConfigBuilder.setHedgingDelayMillis(DEFAULT_TEST_HEDGING_DELAY_MILLIS);
serviceConfigBuilder.setNonFatalStatusCodes(Arrays.asList(
Status.Code.UNKNOWN.name(),
Status.Code.INTERNAL.name(),
Status.Code.UNAVAILABLE.name()
));
return serviceConfigBuilder;
}


private MetadataGrpcHedgingDataSender<MetaDataType> getMetadataGrpcHedgingDataSender(InProcessChannelBuilder channelBuilder) {
MetaDataMapper mapper = Mappers.getMapper(MetaDataMapper.class);
GrpcMetadataMessageConverter converter = new GrpcMetadataMessageConverter(mapper);

ChannelFactory factory = new ChannelFactory() {
@Override
public String getFactoryName() {
return "inprocess-builder";
}

@Override
public ManagedChannel build(String channelName, String host, int port) {
return channelBuilder.build();
}

@Override
public ManagedChannel build(String host, int port) {
return channelBuilder.build();
}

@Override
public void close() {
}
};

return new MetadataGrpcHedgingDataSender<>("localhost", 1234, 1,
converter, factory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.navercorp.pinpoint.grpc.client.retry;

import io.grpc.Status;

import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -25,10 +28,15 @@ public class HedgingServiceConfigBuilder implements ServiceConfigBuilder {

public static final int DEFAULT_MAX_ATTEMPTS = 3;
public static final long DEFAULT_HEDGING_DELAY_MILLIS = 1000L;
public static final List<String> DEFAULT_STATUS_CODES = Arrays.asList(
Status.Code.UNKNOWN.name(),
Status.Code.INTERNAL.name(),
Status.Code.UNAVAILABLE.name()
);

private double maxAttempts = DEFAULT_MAX_ATTEMPTS; //Required. Must be two or greater
private String hedgingDelay = millisToString(DEFAULT_HEDGING_DELAY_MILLIS); //Required. Long decimal with "s" appended
private List<String> nonFatalStatusCodes; //Optional (eg. [14], ["UNAVAILABLE"] or ["unavailable"])
private List<String> nonFatalStatusCodes = DEFAULT_STATUS_CODES; //Optional (eg. [14], ["UNAVAILABLE"] or ["unavailable"])

@Override
public Map<String, ?> buildMetadataConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

public interface ServiceConfigBuilder {

String METADATA_SERVICE = "v1.metadata";
String METADATA_SERVICE = com.navercorp.pinpoint.grpc.trace.MetadataGrpc.SERVICE_NAME;

Map<String, ?> buildMetadataConfig();

Expand Down