From a768b887dcb337c1eddb6b2e927254381a99dd7e Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Thu, 21 Aug 2025 15:25:23 +0100 Subject: [PATCH 1/2] Added support of topic messages batching --- .../java/tech/ydb/topic/read/impl/Batch.java | 8 ++++ .../topic/read/impl/PartitionSessionImpl.java | 48 +++++++++++++++---- .../tech/ydb/topic/read/impl/ReaderImpl.java | 1 + .../ydb/topic/settings/ReaderSettings.java | 12 +++++ 4 files changed, 60 insertions(+), 9 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/Batch.java b/topic/src/main/java/tech/ydb/topic/read/impl/Batch.java index f80ecbce3..a7a3ca56e 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/Batch.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/Batch.java @@ -47,4 +47,12 @@ public boolean isDecompressed() { public void setDecompressed(boolean decompressed) { this.decompressed = decompressed; } + + long getFirstCommitOffsetFrom() { + return messages.get(0).getCommitOffsetFrom(); + } + + long getLastOffset() { + return messages.get(messages.size() - 1).getOffset(); + } } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java index 69bcfc722..e74de841b 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java @@ -45,6 +45,7 @@ public class PartitionSessionImpl { private final PartitionSession sessionInfo; private final Executor decompressionExecutor; private final AtomicBoolean isWorking = new AtomicBoolean(true); + private final int maxBatchSize; private final Queue decodingBatches = new LinkedList<>(); private final ReentrantLock decodingBatchesLock = new ReentrantLock(); @@ -60,6 +61,7 @@ public class PartitionSessionImpl { private PartitionSessionImpl(Builder builder) { this.id = builder.id; + this.maxBatchSize = builder.maxBatchSize; this.fullId = builder.fullId; this.topicPath = builder.topicPath; this.consumerName = builder.consumerName; @@ -308,21 +310,43 @@ private void sendDataToReadersIfNeeded() { return; } if (isReadingNow.compareAndSet(false, true)) { - Batch batchToRead = readingQueue.poll(); - if (batchToRead == null) { + List batchesToRead = new ArrayList<>(); + + Batch next = readingQueue.poll(); + if (next == null) { isReadingNow.set(false); return; } + + batchesToRead.add(next); + List messagesToRead = new ArrayList<>(next.getMessages()); + long commitFrom = next.getFirstCommitOffsetFrom(); + long commitTo = next.getLastOffset() + 1; + + int batchSize = messagesToRead.size(); + while (maxBatchSize <= 0 || batchSize < maxBatchSize) { + next = readingQueue.peek(); + if (next == null) { + break; + } + if (maxBatchSize > 0 && next.getMessages().size() + batchSize > maxBatchSize) { + break; + } + + next = readingQueue.poll(); + + batchesToRead.add(next); + messagesToRead.addAll(next.getMessages()); + batchSize += next.getMessages().size(); + commitTo = next.getLastOffset() + 1; + } + // Should be called maximum in 1 thread at a time - List messageImplList = batchToRead.getMessages(); - List messagesToRead = new ArrayList<>(messageImplList); - OffsetsRange offsetsToCommit = new OffsetsRangeImpl(messageImplList.get(0).getCommitOffsetFrom(), - messageImplList.get(messageImplList.size() - 1).getOffset() + 1); + OffsetsRange offsetsToCommit = new OffsetsRangeImpl(commitFrom, commitTo); DataReceivedEvent event = new DataReceivedEventImpl(this, messagesToRead, offsetsToCommit); if (logger.isDebugEnabled()) { logger.debug("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) is about " + - "to be called...", fullId, messagesToRead.size(), messagesToRead.get(0).getOffset(), - messagesToRead.get(messagesToRead.size() - 1).getOffset()); + "to be called...", fullId, messagesToRead.size(), commitFrom, commitTo); } dataEventCallback.apply(event) .whenComplete((res, th) -> { @@ -338,7 +362,7 @@ private void sendDataToReadersIfNeeded() { messagesToRead.get(messagesToRead.size() - 1).getOffset()); } isReadingNow.set(false); - batchToRead.complete(); + batchesToRead.forEach(Batch::complete); sendDataToReadersIfNeeded(); }); } else { @@ -376,6 +400,7 @@ public void shutdown() { */ public static class Builder { private long id; + private int maxBatchSize; private String fullId; private String topicPath; private String consumerName; @@ -391,6 +416,11 @@ public Builder setId(long id) { return this; } + public Builder setMaxBatchSize(int maxBatchSize) { + this.maxBatchSize = maxBatchSize; + return this; + } + public Builder setFullId(String fullId) { this.fullId = fullId; return this; diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index 3a757c78f..380de9a29 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -402,6 +402,7 @@ private void onStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPart PartitionSessionImpl partitionSession = PartitionSessionImpl.newBuilder() .setId(partitionSessionId) + .setMaxBatchSize(settings.getMaxBatchSize()) .setFullId(partitionSessionFullId) .setTopicPath(request.getPartitionSession().getPath()) .setConsumerName(consumerName) diff --git a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java index 354632e76..772d83702 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java @@ -22,6 +22,7 @@ public class ReaderSettings { private final String readerName; private final List topics; private final long maxMemoryUsageBytes; + private final int maxBatchSize; private final Executor decompressionExecutor; private final BiConsumer errorsHandler; @@ -31,6 +32,7 @@ private ReaderSettings(Builder builder) { this.readerName = builder.readerName; this.topics = ImmutableList.copyOf(builder.topics); this.maxMemoryUsageBytes = builder.maxMemoryUsageBytes; + this.maxBatchSize = builder.maxBatchSize; this.decompressionExecutor = builder.decompressionExecutor; this.errorsHandler = builder.errorsHandler; } @@ -60,6 +62,10 @@ public long getMaxMemoryUsageBytes() { return maxMemoryUsageBytes; } + public int getMaxBatchSize() { + return maxBatchSize; + } + public Executor getDecompressionExecutor() { return decompressionExecutor; } @@ -78,6 +84,7 @@ public static class Builder { private String readerName = null; private List topics = new ArrayList<>(); private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT; + private int maxBatchSize = 0; private Executor decompressionExecutor = null; private BiConsumer errorsHandler = null; @@ -133,6 +140,11 @@ public Builder setMaxMemoryUsageBytes(long maxMemoryUsageBytes) { return this; } + public Builder setMaxBatchSize(int maxBatchSize) { + this.maxBatchSize = maxBatchSize; + return this; + } + public Builder setErrorsHandler(BiConsumer handler) { this.errorsHandler = handler; return this; From 26edb976298e38c192917740e520c7f309bea0ca Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Thu, 21 Aug 2025 15:45:05 +0100 Subject: [PATCH 2/2] Revert correct log message --- .../java/tech/ydb/topic/read/impl/PartitionSessionImpl.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java index e74de841b..e76f3695f 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java @@ -346,7 +346,9 @@ private void sendDataToReadersIfNeeded() { DataReceivedEvent event = new DataReceivedEventImpl(this, messagesToRead, offsetsToCommit); if (logger.isDebugEnabled()) { logger.debug("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) is about " + - "to be called...", fullId, messagesToRead.size(), commitFrom, commitTo); + "to be called...", fullId, messagesToRead.size(), + messagesToRead.get(0).getOffset(), + messagesToRead.get(messagesToRead.size() - 1).getOffset()); } dataEventCallback.apply(event) .whenComplete((res, th) -> {