Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/impl/Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,12 @@ public boolean isDecompressed() {
public void setDecompressed(boolean decompressed) {
this.decompressed = decompressed;
}

long getFirstCommitOffsetFrom() {
return messages.get(0).getCommitOffsetFrom();
}

long getLastOffset() {
return messages.get(messages.size() - 1).getOffset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class PartitionSessionImpl {
private final PartitionSession sessionInfo;
private final Executor decompressionExecutor;
private final AtomicBoolean isWorking = new AtomicBoolean(true);
private final int maxBatchSize;

private final Queue<Batch> decodingBatches = new LinkedList<>();
private final ReentrantLock decodingBatchesLock = new ReentrantLock();
Expand All @@ -60,6 +61,7 @@ public class PartitionSessionImpl {

private PartitionSessionImpl(Builder builder) {
this.id = builder.id;
this.maxBatchSize = builder.maxBatchSize;
this.fullId = builder.fullId;
this.topicPath = builder.topicPath;
this.consumerName = builder.consumerName;
Expand Down Expand Up @@ -308,21 +310,45 @@ private void sendDataToReadersIfNeeded() {
return;
}
if (isReadingNow.compareAndSet(false, true)) {
Batch batchToRead = readingQueue.poll();
if (batchToRead == null) {
List<Batch> batchesToRead = new ArrayList<>();

Batch next = readingQueue.poll();
if (next == null) {
isReadingNow.set(false);
return;
}

batchesToRead.add(next);
List<Message> messagesToRead = new ArrayList<>(next.getMessages());
long commitFrom = next.getFirstCommitOffsetFrom();
long commitTo = next.getLastOffset() + 1;

int batchSize = messagesToRead.size();
while (maxBatchSize <= 0 || batchSize < maxBatchSize) {
next = readingQueue.peek();
if (next == null) {
break;
}
if (maxBatchSize > 0 && next.getMessages().size() + batchSize > maxBatchSize) {
break;
}

next = readingQueue.poll();

batchesToRead.add(next);
messagesToRead.addAll(next.getMessages());
batchSize += next.getMessages().size();
commitTo = next.getLastOffset() + 1;
}

// Should be called maximum in 1 thread at a time
List<MessageImpl> messageImplList = batchToRead.getMessages();
List<Message> messagesToRead = new ArrayList<>(messageImplList);
OffsetsRange offsetsToCommit = new OffsetsRangeImpl(messageImplList.get(0).getCommitOffsetFrom(),
messageImplList.get(messageImplList.size() - 1).getOffset() + 1);
OffsetsRange offsetsToCommit = new OffsetsRangeImpl(commitFrom, commitTo);
DataReceivedEvent event = new DataReceivedEventImpl(this, messagesToRead, offsetsToCommit);
if (logger.isDebugEnabled()) {
logger.debug("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) is about " +
"to be called...", fullId, messagesToRead.size(), messagesToRead.get(0).getOffset(),
messagesToRead.get(messagesToRead.size() - 1).getOffset());
"to be called...", fullId, messagesToRead.size(),
messagesToRead.get(0).getOffset(),
messagesToRead.get(messagesToRead.size() - 1).getOffset());
}
dataEventCallback.apply(event)
.whenComplete((res, th) -> {
Expand All @@ -338,7 +364,7 @@ private void sendDataToReadersIfNeeded() {
messagesToRead.get(messagesToRead.size() - 1).getOffset());
}
isReadingNow.set(false);
batchToRead.complete();
batchesToRead.forEach(Batch::complete);
sendDataToReadersIfNeeded();
});
} else {
Expand Down Expand Up @@ -376,6 +402,7 @@ public void shutdown() {
*/
public static class Builder {
private long id;
private int maxBatchSize;
private String fullId;
private String topicPath;
private String consumerName;
Expand All @@ -391,6 +418,11 @@ public Builder setId(long id) {
return this;
}

public Builder setMaxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
return this;
}

public Builder setFullId(String fullId) {
this.fullId = fullId;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ private void onStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPart

PartitionSessionImpl partitionSession = PartitionSessionImpl.newBuilder()
.setId(partitionSessionId)
.setMaxBatchSize(settings.getMaxBatchSize())
.setFullId(partitionSessionFullId)
.setTopicPath(request.getPartitionSession().getPath())
.setConsumerName(consumerName)
Expand Down
12 changes: 12 additions & 0 deletions topic/src/main/java/tech/ydb/topic/settings/ReaderSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class ReaderSettings {
private final String readerName;
private final List<TopicReadSettings> topics;
private final long maxMemoryUsageBytes;
private final int maxBatchSize;
private final Executor decompressionExecutor;
private final BiConsumer<Status, Throwable> errorsHandler;

Expand All @@ -31,6 +32,7 @@ private ReaderSettings(Builder builder) {
this.readerName = builder.readerName;
this.topics = ImmutableList.copyOf(builder.topics);
this.maxMemoryUsageBytes = builder.maxMemoryUsageBytes;
this.maxBatchSize = builder.maxBatchSize;
this.decompressionExecutor = builder.decompressionExecutor;
this.errorsHandler = builder.errorsHandler;
}
Expand Down Expand Up @@ -60,6 +62,10 @@ public long getMaxMemoryUsageBytes() {
return maxMemoryUsageBytes;
}

public int getMaxBatchSize() {
return maxBatchSize;
}

public Executor getDecompressionExecutor() {
return decompressionExecutor;
}
Expand All @@ -78,6 +84,7 @@ public static class Builder {
private String readerName = null;
private List<TopicReadSettings> topics = new ArrayList<>();
private long maxMemoryUsageBytes = MAX_MEMORY_USAGE_BYTES_DEFAULT;
private int maxBatchSize = 0;
private Executor decompressionExecutor = null;
private BiConsumer<Status, Throwable> errorsHandler = null;

Expand Down Expand Up @@ -133,6 +140,11 @@ public Builder setMaxMemoryUsageBytes(long maxMemoryUsageBytes) {
return this;
}

public Builder setMaxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
return this;
}

public Builder setErrorsHandler(BiConsumer<Status, Throwable> handler) {
this.errorsHandler = handler;
return this;
Expand Down