Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/**
* A client interface for sending stream requests.
Expand All @@ -36,4 +37,11 @@ default CompletableFuture<DataStreamReply> streamAsync(DataStreamRequest request
throw new UnsupportedOperationException(getClass() + " does not support "
+ JavaUtils.getCurrentStackTraceElement().getMethodName());
}

/** Async call to send a request and receive multiple replies for the request. */
default CompletableFuture<DataStreamReply> streamAsync(
DataStreamRequest request, Consumer<DataStreamReply> replyConsumer) {
throw new UnsupportedOperationException(getClass() + " does not support "
+ JavaUtils.getCurrentStackTraceElement().getMethodName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,15 @@ default DataStreamOutput stream() {

/** Create a stream by providing a customized header message and route table. */
DataStreamOutput stream(ByteBuffer headerMessage, RoutingTable routingTable);

/**
* Create a stream to read data for readonly requests.
* This corresponds to {@link AsyncApi#sendReadOnly(org.apache.ratis.protocol.Message)}.
*/
default DataStreamInput streamReadOnly() {
return streamReadOnly(null);
}

/** Create a stream by providing a customized header message for readonly requests. */
DataStreamInput streamReadOnly(ByteBuffer message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.client.api;

import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientReply;

import java.io.Closeable;
import java.util.concurrent.CompletableFuture;

/**
* An asynchronous input stream supporting zero buffer copying.
*/
public interface DataStreamInput extends Closeable {
/**
* Read the next chunk in the stream asynchronously.
* The caller owns the returned {@link DataStreamReply} and should call
* {@link DataStreamReply#release()} after consuming it.
*
* @return a future of the reply.
*/
CompletableFuture<DataStreamReply> readAsync();

/**
* Return the future of the {@link RaftClientReply}
* which will be received once the read-only stream has received a reply.
*
* @return the future of the {@link RaftClientReply}.
*/
CompletableFuture<RaftClientReply> getRaftClientReplyFuture();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ratis.client.impl;

import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuf;
import org.apache.ratis.proto.RaftProtos.AlreadyClosedExceptionProto;
import org.apache.ratis.proto.RaftProtos.ClientMessageEntryProto;
import org.apache.ratis.proto.RaftProtos.GroupAddRequestProto;
Expand Down Expand Up @@ -378,22 +379,24 @@ static GroupInfoReplyProto toGroupInfoReplyProto(GroupInfoReply reply) {
return b.build();
}

static RaftClientReply getRaftClientReply(DataStreamReply reply) {
if (!(reply instanceof DataStreamReplyByteBuffer)) {
throw new IllegalStateException("Unexpected " + reply.getClass() + ": reply is " + reply);
}
public static RaftClientReply getRaftClientReply(DataStreamReply reply) {
try {
return toRaftClientReply(((DataStreamReplyByteBuffer) reply).slice());
if (reply instanceof DataStreamReplyByteBuffer) {
return toRaftClientReply(((DataStreamReplyByteBuffer) reply).slice());
} else if (reply instanceof DataStreamReplyByteBuf) {
return toRaftClientReply(((DataStreamReplyByteBuf) reply).slice().nioBuffer());
}
throw new IllegalStateException("Unexpected " + reply.getClass() + ": reply is " + reply);
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Failed to getRaftClientReply from " + reply, e);
}
}

static RaftClientReply toRaftClientReply(ByteBuffer buffer) throws InvalidProtocolBufferException {
public static RaftClientReply toRaftClientReply(ByteBuffer buffer) throws InvalidProtocolBufferException {
return toRaftClientReply(RaftClientReplyProto.parseFrom(buffer));
}

static RaftClientReply toRaftClientReply(RaftClientReplyProto replyProto) {
public static RaftClientReply toRaftClientReply(RaftClientReplyProto replyProto) {
final RaftRpcReplyProto rp = replyProto.getRpcReply();
final RaftGroupMemberId serverMemberId = ProtoUtils.toRaftGroupMemberId(rp.getReplyId(), rp.getRaftGroupId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.api.DataStreamInput;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
Expand All @@ -34,30 +36,34 @@
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequestHeader;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.rpc.CallId;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.protocol.*;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SlidingWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -237,6 +243,117 @@ private CompletableFuture<DataStreamReply> sendForward(DataStreamReply writeRepl
}
}

public final class DataStreamInputImpl implements DataStreamInput {
private final RaftClientRequest header;
private final CompletableFuture<DataStreamReply> replyFuture;
private final CompletableFuture<RaftClientReply> raftClientReplyFuture = new CompletableFuture<>();
private final Queue<DataStreamReply> replies = new ArrayDeque<>();
private final Queue<CompletableFuture<DataStreamReply>> pendingReads = new ArrayDeque<>();
private Throwable readException;
private boolean endOfStream;
private boolean closed;

private DataStreamInputImpl(RaftClientRequest request) {
this.header = request;
final ByteBuffer buffer = ClientProtoUtils.toRaftClientRequestProtoByteBuffer(header);
final DataStreamRequestHeader h = new DataStreamRequestHeader(header.getClientId(), Type.STREAM_HEADER,
header.getCallId(), 0, buffer.remaining(), StandardWriteOption.FLUSH, StandardWriteOption.CLOSE);
this.replyFuture = dataStreamClientRpc.streamAsync(new DataStreamRequestByteBuffer(h, buffer), this::receive);
replyFuture.thenApply(ClientProtoUtils::getRaftClientReply)
.whenComplete(JavaUtils.asBiConsumer(raftClientReplyFuture));
replyFuture.whenComplete((reply, exception) -> {
if (exception != null) {
failReads(exception);
} else {
markEndOfStream();
}
});
}

private void receive(DataStreamReply reply) {
final CompletableFuture<DataStreamReply> pending;
synchronized (this) {
if (closed) {
reply.release();
return;
}
pending = pendingReads.poll();
if (pending == null) {
replies.add(reply);
return;
}
}
pending.complete(reply);
}
Comment on lines +273 to +287
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Potential memory leak of DataStreamReply (which holds Netty ByteBuf resources) in two scenarios:

  1. If the client cancels or times out the future returned by readAsync(), pending.complete(reply) will return false because the future is already completed. The reply is never released.
  2. If replies are received after the stream is closed, failed, or ended, they are added to the replies queue and might be leaked if the client does not call close() immediately.

We should release the reply immediately if the stream is closed/failed/ended, or if the pending future was already completed.

    private void receive(DataStreamReply reply) {
      final CompletableFuture<DataStreamReply> pending;
      synchronized (this) {
        if (closed || readException != null || endOfStream) {
          reply.release();
          return;
        }
        pending = pendingReads.poll();
        if (pending == null) {
          replies.add(reply);
          return;
        }
      }
      if (!pending.complete(reply)) {
        reply.release();
      }
    }


private void failReads(Throwable t) {
for (;;) {
final CompletableFuture<DataStreamReply> pending;
synchronized (this) {
readException = t;
pending = pendingReads.poll();
if (pending == null) {
return;
}
}
pending.completeExceptionally(t);
}
}

private EOFException newEndOfStreamException() {
return new EOFException(clientId + ": end of stream, request=" + header);
}

private void markEndOfStream() {
for (;;) {
final CompletableFuture<DataStreamReply> pending;
synchronized (this) {
endOfStream = true;
pending = pendingReads.poll();
if (pending == null) {
return;
}
}
pending.completeExceptionally(newEndOfStreamException());
}
}

@Override
public synchronized CompletableFuture<DataStreamReply> readAsync() {
if (closed) {
return JavaUtils.completeExceptionally(new AlreadyClosedException(
clientId + ": stream already closed, request=" + header));
}
final DataStreamReply reply = replies.poll();
if (reply != null) {
return CompletableFuture.completedFuture(reply);
}
if (readException != null) {
return JavaUtils.completeExceptionally(readException);
}
if (endOfStream) {
return JavaUtils.completeExceptionally(newEndOfStreamException());
}
final CompletableFuture<DataStreamReply> f = new CompletableFuture<>();
pendingReads.add(f);
return f;
}

@Override
public CompletableFuture<RaftClientReply> getRaftClientReplyFuture() {
return raftClientReplyFuture;
}

@Override
public synchronized void close() {
closed = true;
for (DataStreamReply reply; (reply = replies.poll()) != null;) {
reply.release();
}
failReads(new AlreadyClosedException(clientId + ": stream already closed, request=" + header));
Comment on lines +349 to +353
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Cancel read-only RPC when DataStreamInput closes

DataStreamInputImpl.close() only marks the local stream closed and fails pending reads, but it never signals the underlying read-only RPC to terminate. The NettyClientStreamRpc entry stays in readOnlyStreamingReplies until a terminal reply or timeout (streamAsync(..., Consumer) cleanup path), so callers that close inputs early still keep per-stream state/timers and can continue receiving/discarding server data until timeout. This can accumulate avoidable memory/network pressure when many read-only streams are abandoned early.

Useful? React with 👍 / 👎.

}
}

@Override
public DataStreamClientRpc getClientRpc() {
return dataStreamClientRpc;
Expand Down Expand Up @@ -274,6 +391,21 @@ public DataStreamOutputRpc stream(ByteBuffer headerMessage, RoutingTable routing
return new DataStreamOutputImpl(request);
}

@Override
public DataStreamInput streamReadOnly(ByteBuffer headerMessage) {
final Message message =
Optional.ofNullable(headerMessage).map(ByteString::copyFrom).map(Message::valueOf).orElse(null);
final RaftClientRequest request = RaftClientRequest.newBuilder()
.setClientId(clientId)
.setServerId(dataStreamServer.getId())
.setGroupId(groupId)
.setCallId(CallId.getAndIncrement())
.setMessage(message)
.setType(RaftClientRequest.readRequestType())
.build();
return new DataStreamInputImpl(request);
}

@Override
public void close() throws IOException {
dataStreamClientRpc.close();
Expand Down
Loading
Loading