ratis stream read#16
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces the ability to read RATIS blocks through the Ratis data stream read-only API, adding RatisDataStreamBlockInputStream and associated configurations, stream observers, and benchmark tests. The review identified several critical issues in the implementation: readFully violates the standard InputStream.read contract by returning EOF instead of 0 when the target buffer is empty; a resource leak occurs in acquireClient when the acquired client is not an instance of XceiverClientRatis because it is never released; wrapping ExecutionException directly in a generic IOException hides the underlying cause; stream.write is not guaranteed to write all bytes in a single call and should be executed in a loop; and allocating a new ByteBuffer on every iteration of the loop in KeyValueHandler when rawReadBlockStream is true can cause severe GC pressure.
| public synchronized int readFully(ByteBuffer targetBuf, boolean preRead) | ||
| throws IOException { | ||
| checkOpen(); | ||
| int read = 0; |
There was a problem hiding this comment.
If targetBuf has no remaining bytes, readFully will skip the loop and return EOF (-1). This violates the standard InputStream.read contract, which states that reading 0 bytes should return 0. We should return 0 immediately if targetBuf is empty.
public synchronized int readFully(ByteBuffer targetBuf, boolean preRead)
throws IOException {
checkOpen();
if (!targetBuf.hasRemaining()) {
return 0;
}
int read = 0;| if (!(client instanceof XceiverClientRatis)) { | ||
| throw new IOException("Unexpected client class: " | ||
| + client.getClass().getName() + ", " + pipelineRef.get()); | ||
| } |
There was a problem hiding this comment.
If the acquired client is not an instance of XceiverClientRatis, an IOException is thrown, but the client is never released back to xceiverClientFactory. This causes a resource leak. Release the client before throwing the exception.
| if (!(client instanceof XceiverClientRatis)) { | |
| throw new IOException("Unexpected client class: " | |
| + client.getClass().getName() + ", " + pipelineRef.get()); | |
| } | |
| if (!(client instanceof XceiverClientRatis)) { | |
| xceiverClientFactory.releaseClientForReadData(client, false); | |
| throw new IOException("Unexpected client class: " | |
| + client.getClass().getName() + ", " + pipelineRef.get()); | |
| } |
| } catch (ExecutionException | TimeoutException e) { | ||
| releaseClient(true); | ||
| throw new IOException("Failed Ratis read-only data stream request", e); | ||
| } |
There was a problem hiding this comment.
Catching ExecutionException and wrapping it directly in a generic IOException hides the actual cause of the failure. It is better to unwrap the ExecutionException and throw its cause directly if it is an IOException.
} catch (ExecutionException e) {
releaseClient(true);
final Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
}
throw new IOException("Failed Ratis read-only data stream request", cause != null ? cause : e);
} catch (TimeoutException e) {
releaseClient(true);
throw new IOException("Failed Ratis read-only data stream request due to timeout", e);
}| private static void writeReadBlockStreamResponse( | ||
| StateMachine.DataChannel stream, ContainerCommandResponseProto response, | ||
| ByteBuffer data) throws IOException { | ||
| final ByteBuffer buffer = encodeReadBlockStreamResponse(response, data); | ||
| final int before = buffer.position(); | ||
| final int written = stream.write(buffer); | ||
| if (written != buffer.limit() - before) { | ||
| throw new IOException("Failed to write full ReadBlock stream response: " | ||
| + "written=" + written + ", expected=" + (buffer.limit() - before)); | ||
| } | ||
| } |
There was a problem hiding this comment.
stream.write(buffer) is not guaranteed to write all bytes in a single call, especially if the channel is non-blocking or has limited buffer space. If it writes partially, this method will throw an IOException and fail the stream read. We should write in a loop to ensure all bytes are written.
private static void writeReadBlockStreamResponse(
StateMachine.DataChannel stream, ContainerCommandResponseProto response,
ByteBuffer data) throws IOException {
final ByteBuffer buffer = encodeReadBlockStreamResponse(response, data);
while (buffer.hasRemaining()) {
int written = stream.write(buffer);
if (written < 0) {
throw new IOException("EOF reached while writing ReadBlock stream response");
}
if (written == 0) {
throw new IOException("Zero bytes written to ReadBlock stream response channel");
}
}
}| if (rawReadBlockStream) { | ||
| buffer = ByteBuffer.allocate(responseDataSize); | ||
| } else { | ||
| buffer.clear(); | ||
| } |
There was a problem hiding this comment.
Allocating a new ByteBuffer of responseDataSize (which can be several megabytes) on every iteration of the loop when rawReadBlockStream is true can cause severe GC pressure and performance degradation during large block reads. Consider checking if the observer actually needs to retain the buffer, or if we can safely reuse or pool it.
What changes were proposed in this pull request?
Provide a one-liner summary of the changes in the PR Title field above.
It should be in the form of
HDDS-1234. Short summary of the change.Please describe your PR in detail:
perspective not just for the reviewer.
the Jira's description if the jira is well defined.
issue investigation, github discussion, etc.
Examples of well-written pull requests:
What is the link to the Apache JIRA
Please create an issue in ASF JIRA before opening a pull request, and you need to set the title of the pull
request which starts with the corresponding JIRA issue number. (e.g. HDDS-XXXX. Fix a typo in YYY.)
If you do not have an ASF Jira account yet, please follow the first-time contributor
instructions in the Jira guideline.
(Please replace this section with the link to the Apache JIRA)
How was this patch tested?
(Please explain how this patch was tested. Ex: unit tests, manual tests, workflow run on the fork git repo.)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this.)