From eabb8fbbd91b79964ebae2bf8b02aa7947ee0a49 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 10 Feb 2025 13:14:12 +0000 Subject: [PATCH] Added simple handler for topic's internal retryable streams --- .../tech/ydb/table/values/DecimalValueTest.java | 2 -- .../tech/ydb/topic/impl/GrpcStreamRetrier.java | 12 ++++++++++-- .../tech/ydb/topic/read/impl/ReaderImpl.java | 2 +- .../tech/ydb/topic/settings/ReaderSettings.java | 15 +++++++++++++++ .../tech/ydb/topic/settings/WriterSettings.java | 16 +++++++++++++++- .../tech/ydb/topic/write/impl/WriterImpl.java | 2 +- 6 files changed, 42 insertions(+), 7 deletions(-) diff --git a/table/src/test/java/tech/ydb/table/values/DecimalValueTest.java b/table/src/test/java/tech/ydb/table/values/DecimalValueTest.java index a8debfce4..b3719282c 100644 --- a/table/src/test/java/tech/ydb/table/values/DecimalValueTest.java +++ b/table/src/test/java/tech/ydb/table/values/DecimalValueTest.java @@ -192,8 +192,6 @@ public void allTypeInfiniteAndNan() { BigDecimal scaledInf = new BigDecimal(inf, scale); BigDecimal scaledNan = new BigDecimal(nan, scale); - System.out.println("Nan for " + scaled + " -> " + scaledNan); - assertIsInf(scaled.newValue(scaledInf)); assertIsNegInf(scaled.newValue(scaledInf.negate())); diff --git a/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java b/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java index 3913f7e3f..d57352869 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java +++ b/topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java @@ -8,6 +8,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import org.slf4j.Logger; @@ -29,12 +30,15 @@ public abstract class GrpcStreamRetrier { protected final String id; protected final AtomicBoolean isReconnecting = new AtomicBoolean(false); protected final AtomicBoolean isStopped = new AtomicBoolean(false); - private final ScheduledExecutorService scheduler; protected final AtomicInteger reconnectCounter = new AtomicInteger(0); - protected GrpcStreamRetrier(ScheduledExecutorService scheduler) { + private final ScheduledExecutorService scheduler; + private final BiConsumer errorsHandler; + + protected GrpcStreamRetrier(ScheduledExecutorService scheduler, BiConsumer errorsHandler) { this.scheduler = scheduler; this.id = generateRandomId(ID_LENGTH); + this.errorsHandler = errorsHandler; } protected abstract Logger getLogger(); @@ -127,6 +131,10 @@ protected void onSessionClosed(Status status, Throwable th) { } } + if (errorsHandler != null) { + errorsHandler.accept(status, th); + } + if (!isStopped.get()) { tryScheduleReconnect(); } else { diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index a57d8b6c9..984134e06 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -55,7 +55,7 @@ public abstract class ReaderImpl extends GrpcStreamRetrier { private final String consumerName; public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) { - super(topicRpc.getScheduler()); + super(topicRpc.getScheduler(), settings.getErrorsHandler()); this.topicRpc = topicRpc; this.settings = settings; this.session = new ReadSessionImpl(); diff --git a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java index e523ef920..f27b719ed 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java @@ -3,11 +3,14 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executor; +import java.util.function.BiConsumer; import javax.annotation.Nullable; import com.google.common.collect.ImmutableList; +import tech.ydb.core.Status; + /** * @author Nikolay Perfilov */ @@ -19,6 +22,7 @@ public class ReaderSettings { private final List topics; private final long maxMemoryUsageBytes; private final Executor decompressionExecutor; + private final BiConsumer errorsHandler; private ReaderSettings(Builder builder) { this.consumerName = builder.consumerName; @@ -26,6 +30,7 @@ private ReaderSettings(Builder builder) { this.topics = ImmutableList.copyOf(builder.topics); this.maxMemoryUsageBytes = builder.maxMemoryUsageBytes; this.decompressionExecutor = builder.decompressionExecutor; + this.errorsHandler = builder.errorsHandler; } public String getConsumerName() { @@ -41,6 +46,10 @@ public List getTopics() { return topics; } + public BiConsumer getErrorsHandler() { + return errorsHandler; + } + public long getMaxMemoryUsageBytes() { return maxMemoryUsageBytes; } @@ -63,6 +72,7 @@ public static class Builder { private List topics = new ArrayList<>(); private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT; private Executor decompressionExecutor = null; + private BiConsumer errorsHandler = null; public Builder setConsumerName(String consumerName) { this.consumerName = consumerName; @@ -103,6 +113,11 @@ public Builder setMaxMemoryUsageBytes(long maxMemoryUsageBytes) { return this; } + public Builder setErrorsHandler(BiConsumer handler) { + this.errorsHandler = handler; + return this; + } + /** * Set executor for decompression tasks. * If not set, default executor will be used. diff --git a/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java b/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java index 217a6bc23..7f160c4d6 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java @@ -1,5 +1,8 @@ package tech.ydb.topic.settings; +import java.util.function.BiConsumer; + +import tech.ydb.core.Status; import tech.ydb.topic.description.Codec; /** @@ -16,6 +19,7 @@ public class WriterSettings { private final Codec codec; private final long maxSendBufferMemorySize; private final int maxSendBufferMessagesCount; + private final BiConsumer errorsHandler; private WriterSettings(Builder builder) { this.topicPath = builder.topicPath; @@ -25,6 +29,7 @@ private WriterSettings(Builder builder) { this.codec = builder.codec; this.maxSendBufferMemorySize = builder.maxSendBufferMemorySize; this.maxSendBufferMessagesCount = builder.maxSendBufferMessagesCount; + this.errorsHandler = builder.errorsHandler; } public static Builder newBuilder() { @@ -43,6 +48,10 @@ public String getMessageGroupId() { return messageGroupId; } + public BiConsumer getErrorsHandler() { + return errorsHandler; + } + public Long getPartitionId() { return partitionId; } @@ -70,6 +79,7 @@ public static class Builder { private Codec codec = Codec.GZIP; private long maxSendBufferMemorySize = MAX_MEMORY_USAGE_BYTES_DEFAULT; private int maxSendBufferMessagesCount = MAX_IN_FLIGHT_COUNT_DEFAULT; + private BiConsumer errorsHandler = null; /** * Set path to a topic to write to @@ -148,9 +158,13 @@ public Builder setMaxSendBufferMessagesCount(int maxMessagesCount) { return this; } + public Builder setErrorsHandler(BiConsumer handler) { + this.errorsHandler = handler; + return this; + } + public WriterSettings build() { return new WriterSettings(this); } - } } diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index 9a52e9a91..1db6d2023 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -66,7 +66,7 @@ public abstract class WriterImpl extends GrpcStreamRetrier { private CompletableFuture lastAcceptedMessageFuture; public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) { - super(topicRpc.getScheduler()); + super(topicRpc.getScheduler(), settings.getErrorsHandler()); this.topicRpc = topicRpc; this.settings = settings; this.session = new WriteSessionImpl();