diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/ProgressListenableActionFuture.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/ProgressListenableActionFuture.java index 6381f1c1e211d..e66a0825fbaf2 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/ProgressListenableActionFuture.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/ProgressListenableActionFuture.java @@ -10,9 +10,11 @@ import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.core.Nullable; import java.util.ArrayList; import java.util.List; +import java.util.function.LongConsumer; import java.util.function.Supplier; /** @@ -26,12 +28,17 @@ class ProgressListenableActionFuture extends PlainActionFuture { private record PositionAndListener(long position, ActionListener listener) {} - protected final long start; - protected final long end; + final long start; + final long end; - // modified under 'this' mutex - private volatile List listeners; - protected volatile long progress; + /** + * A consumer that accepts progress made by this {@link ProgressListenableActionFuture} + */ + @Nullable + private final LongConsumer progressConsumer; + + private List listeners; + private long progress; private volatile boolean completed; /** @@ -41,12 +48,13 @@ private record PositionAndListener(long position, ActionListener listener) * @param start the start (inclusive) * @param end the end (exclusive) */ - ProgressListenableActionFuture(long start, long end) { + ProgressListenableActionFuture(long start, long end, @Nullable LongConsumer progressConsumer) { super(); this.start = start; this.end = end; this.progress = start; this.completed = false; + this.progressConsumer = progressConsumer; assert invariant(); } @@ -108,6 +116,9 @@ public void onProgress(final long progressValue) { } } if (listenersToExecute != null) { + if (progressConsumer != null) { + safeAcceptProgress(progressConsumer, progressValue); + } listenersToExecute.forEach(listener -> executeListener(listener, () -> progressValue)); } assert invariant(); @@ -189,6 +200,16 @@ private static void executeListener(final ActionListener listener, final S } } + private static void safeAcceptProgress(LongConsumer consumer, long progress) { + assert consumer != null; + try { + consumer.accept(progress); + } catch (Exception e) { + assert false : e; + throw e; + } + } + @Override public String toString() { return "ProgressListenableActionFuture[start=" diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/SparseFileTracker.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/SparseFileTracker.java index 6e6a11fbddc93..78112c83b9841 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/SparseFileTracker.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/SparseFileTracker.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.SortedSet; import java.util.TreeSet; +import java.util.function.LongConsumer; /** * Keeps track of the contents of a file that may not be completely present. @@ -199,7 +200,7 @@ private List doWaitForRange(ByteRange range, ByteRange subRange, ActionList final Range newPendingRange = new Range( targetRange.start, range.end(), - new ProgressListenableActionFuture(targetRange.start, range.end()) + new ProgressListenableActionFuture(targetRange.start, range.end(), progressConsumer(targetRange.start)) ); ranges.add(newPendingRange); pendingRanges.add(newPendingRange); @@ -218,7 +219,7 @@ private List doWaitForRange(ByteRange range, ByteRange subRange, ActionList final Range newPendingRange = new Range( targetRange.start, newPendingRangeEnd, - new ProgressListenableActionFuture(targetRange.start, newPendingRangeEnd) + new ProgressListenableActionFuture(targetRange.start, newPendingRangeEnd, progressConsumer(targetRange.start)) ); ranges.add(newPendingRange); pendingRanges.add(newPendingRange); @@ -260,6 +261,15 @@ private void determineStartingRange(ByteRange range, List pendingRanges, } } + private LongConsumer progressConsumer(long rangeStart) { + assert Thread.holdsLock(ranges); + if (rangeStart == complete) { + return this::updateCompletePointer; + } else { + return null; + } + } + public boolean checkAvailable(long upTo) { assert upTo <= length : "tried to check availability up to [" + upTo + "] but length is only [" + length + "]"; return complete >= upTo; @@ -464,8 +474,20 @@ private void onGapSuccess(final Range gapRange) { private void maybeUpdateCompletePointer(Range gapRange) { assert Thread.holdsLock(ranges); if (gapRange.start == 0) { - assert complete <= gapRange.end; - complete = gapRange.end; + updateCompletePointerHoldingLock(gapRange.end); + } + } + + private void updateCompletePointerHoldingLock(long value) { + assert Thread.holdsLock(ranges); + assert complete <= value : complete + ">" + value; + complete = value; + } + + private void updateCompletePointer(long value) { + synchronized (ranges) { + updateCompletePointerHoldingLock(value); + System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>"); } } @@ -535,9 +557,9 @@ public class Gap { /** * Range in the file corresponding to the current gap */ - public final Range range; + private final Range range; - Gap(Range range) { + private Gap(Range range) { assert range.start < range.end : range.start + "-" + range.end; this.range = range; } diff --git a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/common/ProgressListenableActionFutureTests.java b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/common/ProgressListenableActionFutureTests.java index a94a3214fdd9a..475074586da96 100644 --- a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/common/ProgressListenableActionFutureTests.java +++ b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/common/ProgressListenableActionFutureTests.java @@ -236,6 +236,6 @@ public void testListenerCalledImmediatelyWhenProgressReached() { private static ProgressListenableActionFuture randomFuture() { final long delta = randomLongBetween(1L, ByteSizeUnit.TB.toBytes(1L)); final long start = randomLongBetween(Long.MIN_VALUE, Long.MAX_VALUE - delta); - return new ProgressListenableActionFuture(start, start + delta); + return new ProgressListenableActionFuture(start, start + delta, null); } }