diff --git a/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java b/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java index 184ae810a..bdb908b5e 100644 --- a/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java +++ b/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java @@ -59,13 +59,13 @@ public CompletableFuture start(Observer observer) { synchronized (call) { try { call.start(this, headers); - call.request(1); if (logger.isTraceEnabled()) { logger.trace("ReadStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request)); } call.sendMessage(request); // close stream by client side call.halfClose(); + call.request(1); } catch (Throwable t) { try { call.cancel(null, t); diff --git a/core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java b/core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java index 3a7af29ee..3ee01529a 100644 --- a/core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java +++ b/core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java @@ -50,12 +50,12 @@ public UnaryCall(String traceId, ClientCall call, GrpcStatusHandler public CompletableFuture> startCall(ReqT request, Metadata headers) { try { call.start(this, headers); - call.request(1); if (logger.isTraceEnabled()) { logger.trace("UnaryCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request)); } call.sendMessage(request); call.halfClose(); + call.request(1); } catch (Exception ex) { future.completeExceptionally(ex); try { diff --git a/core/src/main/java/tech/ydb/core/impl/pool/NettyChannelFactory.java b/core/src/main/java/tech/ydb/core/impl/pool/NettyChannelFactory.java index 2854dc776..48a2e3f2f 100644 --- a/core/src/main/java/tech/ydb/core/impl/pool/NettyChannelFactory.java +++ b/core/src/main/java/tech/ydb/core/impl/pool/NettyChannelFactory.java @@ -76,6 +76,7 @@ public ManagedChannel newManagedChannel(String host, int port, String sslHostOve channelBuilder .maxInboundMessageSize(INBOUND_MESSAGE_SIZE) .withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT) + .withOption(ChannelOption.TCP_NODELAY, true) .intercept(metadataInterceptor()); if (!useDefaultGrpcResolver) { @@ -86,7 +87,8 @@ public ManagedChannel newManagedChannel(String host, int port, String sslHostOve } if (grpcKeepAliveTimeMillis != null) { - channelBuilder.keepAliveTime(grpcKeepAliveTimeMillis, TimeUnit.MILLISECONDS); + channelBuilder.keepAliveTime(grpcKeepAliveTimeMillis, TimeUnit.MILLISECONDS) + .keepAliveWithoutCalls(true); } if (retryEnabled) { diff --git a/core/src/main/java/tech/ydb/core/impl/pool/ShadedNettyChannelFactory.java b/core/src/main/java/tech/ydb/core/impl/pool/ShadedNettyChannelFactory.java index 6a5951543..6d777e36f 100644 --- a/core/src/main/java/tech/ydb/core/impl/pool/ShadedNettyChannelFactory.java +++ b/core/src/main/java/tech/ydb/core/impl/pool/ShadedNettyChannelFactory.java @@ -76,6 +76,7 @@ public ManagedChannel newManagedChannel(String host, int port, String sslHostOve channelBuilder .maxInboundMessageSize(INBOUND_MESSAGE_SIZE) .withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT) + .withOption(ChannelOption.TCP_NODELAY, true) .intercept(metadataInterceptor()); if (!useDefaultGrpcResolver) { @@ -86,7 +87,8 @@ public ManagedChannel newManagedChannel(String host, int port, String sslHostOve } if (grpcKeepAliveTimeMillis != null) { - channelBuilder.keepAliveTime(grpcKeepAliveTimeMillis, TimeUnit.MILLISECONDS); + channelBuilder.keepAliveTime(grpcKeepAliveTimeMillis, TimeUnit.MILLISECONDS) + .keepAliveWithoutCalls(true); } if (retryEnabled) { diff --git a/core/src/test/java/tech/ydb/core/impl/pool/DefaultChannelFactoryTest.java b/core/src/test/java/tech/ydb/core/impl/pool/DefaultChannelFactoryTest.java index d5f9e4f83..1fc4d59c5 100644 --- a/core/src/test/java/tech/ydb/core/impl/pool/DefaultChannelFactoryTest.java +++ b/core/src/test/java/tech/ydb/core/impl/pool/DefaultChannelFactoryTest.java @@ -20,13 +20,9 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; +import org.mockito.ArgumentMatchers; import org.mockito.MockedStatic; import org.mockito.Mockito; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import org.mockito.MockitoAnnotations; import tech.ydb.core.grpc.GrpcTransport; @@ -54,13 +50,20 @@ public void setUp() { channelStaticMock = Mockito.mockStatic(NettyChannelBuilder.class); channelStaticMock.when(FOR_ADDRESS).thenReturn(channelBuilderMock); - when(channelBuilderMock.negotiationType(any())).thenReturn(channelBuilderMock); - when(channelBuilderMock.maxInboundMessageSize(anyInt())).thenReturn(channelBuilderMock); - when(channelBuilderMock.withOption(any(), any())).thenReturn(channelBuilderMock); - when(channelBuilderMock.intercept((ClientInterceptor)any())).thenReturn(channelBuilderMock); - when(channelBuilderMock.nameResolverFactory(any())).thenReturn(channelBuilderMock); - - when(channelBuilderMock.build()).thenReturn(channelMock); + Mockito.when(channelBuilderMock.negotiationType(ArgumentMatchers.any())) + .thenReturn(channelBuilderMock); + Mockito.when(channelBuilderMock.maxInboundMessageSize(ArgumentMatchers.anyInt())) + .thenReturn(channelBuilderMock); + Mockito.when(channelBuilderMock.withOption(ArgumentMatchers.any(), ArgumentMatchers.any())) + .thenReturn(channelBuilderMock); + Mockito.when(channelBuilderMock.intercept(ArgumentMatchers.any(ClientInterceptor.class))) + .thenReturn(channelBuilderMock); + Mockito.when(channelBuilderMock.nameResolverFactory(ArgumentMatchers.any())) + .thenReturn(channelBuilderMock); + Mockito.when(channelBuilderMock.keepAliveTime(ArgumentMatchers.anyLong(), ArgumentMatchers.any())) + .thenReturn(channelBuilderMock); + + Mockito.when(channelBuilderMock.build()).thenReturn(channelMock); } @After @@ -73,20 +76,23 @@ public void tearDown() throws Exception { public void defaultParams() { GrpcTransportBuilder builder = GrpcTransport.forHost(MOCKED_HOST, MOCKED_PORT, "/Root"); ManagedChannelFactory factory = ChannelFactoryLoader.load().buildFactory(builder); - channelStaticMock.verify(FOR_ADDRESS, times(0)); + channelStaticMock.verify(FOR_ADDRESS, Mockito.times(0)); Assert.assertEquals(30_000l, factory.getConnectTimeoutMs()); Assert.assertSame(channelMock, factory.newManagedChannel(MOCKED_HOST, MOCKED_PORT, null)); - channelStaticMock.verify(FOR_ADDRESS, times(1)); - - verify(channelBuilderMock, times(0)).negotiationType(NegotiationType.TLS); - verify(channelBuilderMock, times(1)).negotiationType(NegotiationType.PLAINTEXT); - verify(channelBuilderMock, times(1)).maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE); - verify(channelBuilderMock, times(1)).defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY); - verify(channelBuilderMock, times(1)).withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT); - verify(channelBuilderMock, times(0)).enableRetry(); - verify(channelBuilderMock, times(1)).disableRetry(); + channelStaticMock.verify(FOR_ADDRESS, Mockito.times(1)); + + Mockito.verify(channelBuilderMock, Mockito.times(0)).negotiationType(NegotiationType.TLS); + Mockito.verify(channelBuilderMock, Mockito.times(1)).negotiationType(NegotiationType.PLAINTEXT); + Mockito.verify(channelBuilderMock, Mockito.times(1)) + .maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE); + Mockito.verify(channelBuilderMock, Mockito.times(1)) + .defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY); + Mockito.verify(channelBuilderMock, Mockito.times(1)) + .withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT); + Mockito.verify(channelBuilderMock, Mockito.times(0)).enableRetry(); + Mockito.verify(channelBuilderMock, Mockito.times(1)).disableRetry(); } @Test @@ -97,20 +103,23 @@ public void defaultSslFactory() { .withConnectTimeout(Duration.ofMinutes(1)); ManagedChannelFactory factory = ChannelFactoryLoader.load().buildFactory(builder); - channelStaticMock.verify(FOR_ADDRESS, times(0)); + channelStaticMock.verify(FOR_ADDRESS, Mockito.times(0)); Assert.assertEquals(60000l, factory.getConnectTimeoutMs()); Assert.assertSame(channelMock, factory.newManagedChannel(MOCKED_HOST, MOCKED_PORT, null)); - channelStaticMock.verify(FOR_ADDRESS, times(1)); - - verify(channelBuilderMock, times(1)).negotiationType(NegotiationType.TLS); - verify(channelBuilderMock, times(0)).negotiationType(NegotiationType.PLAINTEXT); - verify(channelBuilderMock, times(1)).maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE); - verify(channelBuilderMock, times(1)).defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY); - verify(channelBuilderMock, times(1)).withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT); - verify(channelBuilderMock, times(1)).enableRetry(); - verify(channelBuilderMock, times(0)).disableRetry(); + channelStaticMock.verify(FOR_ADDRESS, Mockito.times(1)); + + Mockito.verify(channelBuilderMock, Mockito.times(1)).negotiationType(NegotiationType.TLS); + Mockito.verify(channelBuilderMock, Mockito.times(0)).negotiationType(NegotiationType.PLAINTEXT); + Mockito.verify(channelBuilderMock, Mockito.times(1)) + .maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE); + Mockito.verify(channelBuilderMock, Mockito.times(1)) + .defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY); + Mockito.verify(channelBuilderMock, Mockito.times(1)) + .withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT); + Mockito.verify(channelBuilderMock, Mockito.times(1)).enableRetry(); + Mockito.verify(channelBuilderMock, Mockito.times(0)).disableRetry(); } @Test @@ -119,20 +128,24 @@ public void customChannelInitializer() { .withUseDefaultGrpcResolver(true); ManagedChannelFactory factory = ShadedNettyChannelFactory - .withInterceptor(cb -> cb.withOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)) + .withInterceptor(cb -> cb.enableFullStreamDecompression()) .buildFactory(builder); - channelStaticMock.verify(FOR_ADDRESS, times(0)); + channelStaticMock.verify(FOR_ADDRESS, Mockito.times(0)); Assert.assertSame(channelMock, factory.newManagedChannel(MOCKED_HOST, MOCKED_PORT, null)); - channelStaticMock.verify(FOR_ADDRESS, times(1)); - - verify(channelBuilderMock, times(1)).negotiationType(NegotiationType.PLAINTEXT); - verify(channelBuilderMock, times(1)).maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE); - verify(channelBuilderMock, times(0)).defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY); - verify(channelBuilderMock, times(1)).withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT); - verify(channelBuilderMock, times(1)).withOption(ChannelOption.TCP_NODELAY, Boolean.TRUE); + channelStaticMock.verify(FOR_ADDRESS, Mockito.times(1)); + + Mockito.verify(channelBuilderMock, Mockito.times(1)).negotiationType(NegotiationType.PLAINTEXT); + Mockito.verify(channelBuilderMock, Mockito.times(1)) + .maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE); + Mockito.verify(channelBuilderMock, Mockito.times(0)) + .defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY); + Mockito.verify(channelBuilderMock, Mockito.times(1)) + .withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT); + Mockito.verify(channelBuilderMock, Mockito.times(1)).withOption(ChannelOption.TCP_NODELAY, Boolean.TRUE); + Mockito.verify(channelBuilderMock, Mockito.times(1)).enableFullStreamDecompression(); } @Test @@ -156,15 +169,18 @@ public void customSslFactory() throws CertificateException, IOException { selfSignedCert.delete(); } - channelStaticMock.verify(FOR_ADDRESS, times(1)); - - verify(channelBuilderMock, times(1)).negotiationType(NegotiationType.TLS); - verify(channelBuilderMock, times(0)).negotiationType(NegotiationType.PLAINTEXT); - verify(channelBuilderMock, times(1)).maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE); - verify(channelBuilderMock, times(1)).defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY); - verify(channelBuilderMock, times(1)).withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT); - verify(channelBuilderMock, times(0)).enableRetry(); - verify(channelBuilderMock, times(1)).disableRetry(); + channelStaticMock.verify(FOR_ADDRESS, Mockito.times(1)); + + Mockito.verify(channelBuilderMock, Mockito.times(1)).negotiationType(NegotiationType.TLS); + Mockito.verify(channelBuilderMock, Mockito.times(0)).negotiationType(NegotiationType.PLAINTEXT); + Mockito.verify(channelBuilderMock, Mockito.times(1)) + .maxInboundMessageSize(ShadedNettyChannelFactory.INBOUND_MESSAGE_SIZE); + Mockito.verify(channelBuilderMock, Mockito.times(1)) + .defaultLoadBalancingPolicy(ShadedNettyChannelFactory.DEFAULT_BALANCER_POLICY); + Mockito.verify(channelBuilderMock, Mockito.times(1)) + .withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT); + Mockito.verify(channelBuilderMock, Mockito.times(0)).enableRetry(); + Mockito.verify(channelBuilderMock, Mockito.times(1)).disableRetry(); } @Test