From 35b39e9993343cf725ffc37c3a44d505e64126f8 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Tue, 18 Nov 2025 09:20:54 +0000 Subject: [PATCH] Added option to disable grpc deadline for long streams --- .../tech/ydb/coordination/impl/Stream.java | 4 +++- .../ydb/core/grpc/GrpcRequestSettings.java | 14 +++++++++++ .../tech/ydb/core/impl/BaseGrpcTransport.java | 9 ++++++++ .../java/tech/ydb/query/impl/SessionImpl.java | 2 +- .../tech/ydb/topic/impl/GrpcTopicRpc.java | 23 ++++++++++--------- 5 files changed, 39 insertions(+), 13 deletions(-) diff --git a/coordination/src/main/java/tech/ydb/coordination/impl/Stream.java b/coordination/src/main/java/tech/ydb/coordination/impl/Stream.java index a68944314..38018f0ef 100644 --- a/coordination/src/main/java/tech/ydb/coordination/impl/Stream.java +++ b/coordination/src/main/java/tech/ydb/coordination/impl/Stream.java @@ -40,7 +40,9 @@ class Stream implements GrpcReadWriteStream.Observer { Stream(Rpc rpc) { this.scheduler = rpc.getScheduler(); - this.stream = rpc.createSession(GrpcRequestSettings.newBuilder().build()); + this.stream = rpc.createSession(GrpcRequestSettings.newBuilder() + .disableDeadline() + .build()); } public CompletableFuture startStream() { diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java b/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java index 2cbdf0bc5..1c7f4f146 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java @@ -17,6 +17,7 @@ public class GrpcRequestSettings { private final long deadlineAfter; private final Integer preferredNodeID; private final boolean directMode; + private final boolean deadlineDisabled; private final String traceId; private final List clientCapabilities; private final Consumer trailersHandler; @@ -27,6 +28,7 @@ private GrpcRequestSettings(Builder builder) { this.deadlineAfter = builder.deadlineAfter; this.preferredNodeID = builder.preferredNodeID; this.directMode = builder.directMode; + this.deadlineDisabled = builder.deadlineDisabled; this.traceId = builder.traceId; this.clientCapabilities = builder.clientCapabilities; this.trailersHandler = builder.trailersHandler; @@ -42,6 +44,10 @@ public long getDeadlineAfter() { return deadlineAfter; } + public boolean isDeadlineDisabled() { + return deadlineDisabled; + } + public Integer getPreferredNodeID() { return preferredNodeID; } @@ -72,6 +78,7 @@ public GrpcFlowControl getFlowControl() { public static final class Builder { private long deadlineAfter = 0L; + private boolean deadlineDisabled = false; private Integer preferredNodeID = null; private boolean directMode = false; private String traceId = null; @@ -91,6 +98,7 @@ public static final class Builder { */ public Builder withDeadlineAfter(long deadlineAfter) { this.deadlineAfter = deadlineAfter; + this.deadlineDisabled = false; return this; } @@ -106,6 +114,7 @@ public Builder withDeadline(Duration duration) { } else { this.deadlineAfter = 0L; } + this.deadlineDisabled = false; return this; } @@ -152,6 +161,11 @@ public Builder withPessimizationHook(BooleanSupplier pessimizationHook) { return this; } + public Builder disableDeadline() { + this.deadlineDisabled = true; + return this; + } + public GrpcRequestSettings build() { return new GrpcRequestSettings(this); } diff --git a/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java b/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java index c1d3c047c..edd81cffc 100644 --- a/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java +++ b/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java @@ -83,6 +83,9 @@ public CompletableFuture> unaryCall( } options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS); } + if (settings.isDeadlineDisabled()) { + options = options.withDeadline(null); + } try { GrpcChannel channel = getChannel(settings); @@ -126,6 +129,9 @@ public GrpcReadStream readStreamCall( } options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS); } + if (settings.isDeadlineDisabled()) { + options = options.withDeadline(null); + } try { GrpcChannel channel = getChannel(settings); @@ -172,6 +178,9 @@ public GrpcReadWriteStream readWriteStreamCall( } options = options.withDeadlineAfter(settings.getDeadlineAfter() - now, TimeUnit.NANOSECONDS); } + if (settings.isDeadlineDisabled()) { + options = options.withDeadline(null); + } try { GrpcChannel channel = getChannel(settings); diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index 420d5b577..a5974e5e7 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -142,7 +142,7 @@ GrpcReadStream attach(AttachSessionSettings settings) { Context ctx = Context.ROOT.fork(); Context previous = ctx.attach(); try { - GrpcRequestSettings grpcSettings = makeOptions(settings).build(); + GrpcRequestSettings grpcSettings = makeOptions(settings).disableDeadline().build(); GrpcReadStream origin = rpc.attachSession(request, grpcSettings); return new GrpcReadStream() { @Override diff --git a/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java b/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java index 68ca6223e..d7be2799f 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java +++ b/topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java @@ -16,7 +16,6 @@ import tech.ydb.proto.topic.v1.TopicServiceGrpc; import tech.ydb.topic.TopicRpc; - /** * @author Nikolay Perfilov */ @@ -49,7 +48,7 @@ public CompletableFuture alterTopic(YdbTopic.AlterTopicRequest request, @Override public CompletableFuture> describeTopic(YdbTopic.DescribeTopicRequest request, - GrpcRequestSettings settings) { + GrpcRequestSettings settings) { return transport .unaryCall(TopicServiceGrpc.getDescribeTopicMethod(), settings, request) .thenApply(OperationBinder.bindSync( @@ -84,7 +83,7 @@ public CompletableFuture commitOffset(YdbTopic.CommitOffsetRequest reque @Override public CompletableFuture updateOffsetsInTransaction(YdbTopic.UpdateOffsetsInTransactionRequest request, - GrpcRequestSettings settings) { + GrpcRequestSettings settings) { return transport .unaryCall(TopicServiceGrpc.getUpdateOffsetsInTransactionMethod(), settings, request) .thenApply(OperationBinder.bindSync(YdbTopic.UpdateOffsetsInTransactionResponse::getOperation)); @@ -93,19 +92,21 @@ public CompletableFuture updateOffsetsInTransaction(YdbTopic.UpdateOffse @Override public GrpcReadWriteStream writeSession(String streamId) { - return transport.readWriteStreamCall( - TopicServiceGrpc.getStreamWriteMethod(), - GrpcRequestSettings.newBuilder().withTraceId(streamId).build() - ); + GrpcRequestSettings settings = GrpcRequestSettings.newBuilder() + .withTraceId(streamId) + .disableDeadline() + .build(); + return transport.readWriteStreamCall(TopicServiceGrpc.getStreamWriteMethod(), settings); } @Override public GrpcReadWriteStream readSession(String streamId) { - return transport.readWriteStreamCall( - TopicServiceGrpc.getStreamReadMethod(), - GrpcRequestSettings.newBuilder().withTraceId(streamId).build() - ); + GrpcRequestSettings settings = GrpcRequestSettings.newBuilder() + .withTraceId(streamId) + .disableDeadline() + .build(); + return transport.readWriteStreamCall(TopicServiceGrpc.getStreamReadMethod(), settings); } @Override