Skip to content

Commit

Permalink
Fix DataBufferUtils::write AsynchronousFileChannel race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
poutsma committed Feb 15, 2023
1 parent dcadddd commit afd67a0
Showing 1 changed file with 8 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1159,23 +1159,22 @@ protected void hookOnComplete() {

@Override
public void completed(Integer written, Attachment attachment) {
this.writing.set(false);
attachment.iterator().close();
DataBuffer.ByteBufferIterator iterator = attachment.iterator();
iterator.close();

long pos = this.position.addAndGet(written);
ByteBuffer byteBuffer = attachment.byteBuffer();
DataBuffer.ByteBufferIterator iterator = attachment.iterator();

if (byteBuffer.hasRemaining()) {
this.writing.set(true);
this.channel.write(byteBuffer, pos, attachment, this);
}
else if (iterator.hasNext()) {
ByteBuffer next = iterator.next();
this.writing.set(true);
this.channel.write(next, pos, attachment, this);
}
else {
sinkDataBuffer(attachment.dataBuffer());
this.sink.next(attachment.dataBuffer());
this.writing.set(false);

Throwable throwable = this.error.get();
if (throwable != null) {
Expand All @@ -1192,15 +1191,12 @@ else if (this.completed.get()) {

@Override
public void failed(Throwable exc, Attachment attachment) {
this.writing.set(false);
attachment.iterator().close();

sinkDataBuffer(attachment.dataBuffer());
this.sink.error(exc);
}
this.sink.next(attachment.dataBuffer());
this.writing.set(false);

private void sinkDataBuffer(DataBuffer dataBuffer) {
this.sink.next(dataBuffer);
this.sink.error(exc);
}

@Override
Expand Down

0 comments on commit afd67a0

Please sign in to comment.