Skip to content

[WIP] RATIS-2546. Add input stream to DataStreamApi for read operations in Client#1

Open
peterxcli wants to merge 1 commit into
RATIS-1240-Add-input-stream-to-DataStreamApi-for-read-operationsfrom
RATIS-2546-stream-read-client
Open

[WIP] RATIS-2546. Add input stream to DataStreamApi for read operations in Client#1
peterxcli wants to merge 1 commit into
RATIS-1240-Add-input-stream-to-DataStreamApi-for-read-operationsfrom
RATIS-2546-stream-read-client

Conversation

@peterxcli
Copy link
Copy Markdown
Owner

This pull request introduces support for read-only data streaming in the Ratis client and adds a new DataStreamReplyByteBuf implementation for handling replies using Netty's ByteBuf. It also refactors and extends several interfaces and utility classes to support asynchronous read operations and resource management for streaming replies. The most important changes are summarized below.

Read-only DataStream support

  • Added the DataStreamInput interface, which provides asynchronous, zero-copy reading of stream replies and exposes a future for the final RaftClientReply (ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamInput.java).
  • Added new methods to DataStreamApi and its implementation to create read-only streams, including streamReadOnly() and streamReadOnly(ByteBuffer message) (ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java, ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java). [1] [2]
  • Implemented DataStreamInputImpl to manage the lifecycle, asynchronous reading, and resource cleanup of replies for read-only streams (ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java).

ByteBuf-based DataStreamReply implementation

  • Added DataStreamReplyByteBuf, a new implementation of DataStreamReply using Netty's ByteBuf, with builder pattern, resource management, and commit info support (ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuf.java).
  • Updated DataStreamReply interface to include a release() method for resource management, with a default no-op implementation (ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java).

Protocol and utility enhancements

  • Enhanced ClientProtoUtils.getRaftClientReply() to support extracting RaftClientReply from both DataStreamReplyByteBuffer and the new DataStreamReplyByteBuf (ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java).
  • Updated Netty utilities to support encoding and decoding of DataStreamReplyByteBuf, including conversion to/from DataStreamReplyByteBuffer and header processing (ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java).

API changes

  • Extended DataStreamClientRpc with a new overloaded streamAsync method that accepts a Consumer<DataStreamReply>, enabling async receipt of multiple replies for a request (ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java).

These changes collectively enhance the Ratis client with efficient, asynchronous, and resource-safe read-only streaming capabilities and better integration with Netty's buffer management.

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/RATIS-2546

How was this patch tested?

(Please explain how this patch was tested. Ex: unit tests, manual tests)
(If this patch involves UI changes, please attach a screen-shot; otherwise, remove this)

Signed-off-by: peterxcli <peterxcli@gmail.com>
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a read-only streaming API (streamReadOnly) to the Ratis client, allowing clients to read data asynchronously via a stream. This includes adding the DataStreamInput interface and its implementation DataStreamInputImpl, supporting multiple replies for a single request, and introducing DataStreamReplyByteBuf to handle Netty ByteBuf replies. The review feedback highlights two critical memory leak issues related to unreleased Netty ByteBuf resources: one in DataStreamClientImpl.receive when futures are cancelled or replies are received after the stream is closed/ended, and another in NettyClientStreamRpc.channelRead when handling exceptions in the read-only streaming path.

Comment on lines +273 to +287
private void receive(DataStreamReply reply) {
final CompletableFuture<DataStreamReply> pending;
synchronized (this) {
if (closed) {
reply.release();
return;
}
pending = pendingReads.poll();
if (pending == null) {
replies.add(reply);
return;
}
}
pending.complete(reply);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Potential memory leak of DataStreamReply (which holds Netty ByteBuf resources) in two scenarios:

  1. If the client cancels or times out the future returned by readAsync(), pending.complete(reply) will return false because the future is already completed. The reply is never released.
  2. If replies are received after the stream is closed, failed, or ended, they are added to the replies queue and might be leaked if the client does not call close() immediately.

We should release the reply immediately if the stream is closed/failed/ended, or if the pending future was already completed.

    private void receive(DataStreamReply reply) {
      final CompletableFuture<DataStreamReply> pending;
      synchronized (this) {
        if (closed || readException != null || endOfStream) {
          reply.release();
          return;
        }
        pending = pendingReads.poll();
        if (pending == null) {
          replies.add(reply);
          return;
        }
      }
      if (!pending.complete(reply)) {
        reply.release();
      }
    }

Comment on lines +431 to +435
} catch (Throwable cause) {
LOG.warn("{} : channelRead error:", name, cause);
readOnlyStreamingReplies.remove(clientInvocationId, readOnlyStreamingReply);
readOnlyStreamingReply.completeExceptionally(cause);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In the read-only path, if readOnlyStreamingReply.receiveReply(reply) throws an exception, the reply is not released in the catch block, leading to a Netty ByteBuf leak. This is inconsistent with the non-read-only path below (line 453) where reply.release() is correctly called in the catch block.

Suggested change
} catch (Throwable cause) {
LOG.warn("{} : channelRead error:", name, cause);
readOnlyStreamingReplies.remove(clientInvocationId, readOnlyStreamingReply);
readOnlyStreamingReply.completeExceptionally(cause);
}
} catch (Throwable cause) {
LOG.warn("{} : channelRead error:", name, cause);
readOnlyStreamingReplies.remove(clientInvocationId, readOnlyStreamingReply);
readOnlyStreamingReply.completeExceptionally(cause);
reply.release();
}

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 68c738c4cb

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +349 to +353
closed = true;
for (DataStreamReply reply; (reply = replies.poll()) != null;) {
reply.release();
}
failReads(new AlreadyClosedException(clientId + ": stream already closed, request=" + header));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Cancel read-only RPC when DataStreamInput closes

DataStreamInputImpl.close() only marks the local stream closed and fails pending reads, but it never signals the underlying read-only RPC to terminate. The NettyClientStreamRpc entry stays in readOnlyStreamingReplies until a terminal reply or timeout (streamAsync(..., Consumer) cleanup path), so callers that close inputs early still keep per-stream state/timers and can continue receiving/discarding server data until timeout. This can accumulate avoidable memory/network pressure when many read-only streams are abandoned early.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant