Skip to content

Commit

Permalink
[Draft] Update sparse file tracker complete pointer on progress
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed May 31, 2024
1 parent 6532af8 commit fb220f6
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.core.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.function.LongConsumer;
import java.util.function.Supplier;

/**
Expand All @@ -26,12 +28,17 @@ class ProgressListenableActionFuture extends PlainActionFuture<Long> {

private record PositionAndListener(long position, ActionListener<Long> listener) {}

protected final long start;
protected final long end;
final long start;
final long end;

// modified under 'this' mutex
private volatile List<PositionAndListener> listeners;
protected volatile long progress;
/**
* A consumer that accepts progress made by this {@link ProgressListenableActionFuture}
*/
@Nullable
private final LongConsumer progressConsumer;

private List<PositionAndListener> listeners;
private long progress;
private volatile boolean completed;

/**
Expand All @@ -41,12 +48,13 @@ private record PositionAndListener(long position, ActionListener<Long> listener)
* @param start the start (inclusive)
* @param end the end (exclusive)
*/
ProgressListenableActionFuture(long start, long end) {
ProgressListenableActionFuture(long start, long end, @Nullable LongConsumer progressConsumer) {
super();
this.start = start;
this.end = end;
this.progress = start;
this.completed = false;
this.progressConsumer = progressConsumer;
assert invariant();
}

Expand Down Expand Up @@ -108,6 +116,9 @@ public void onProgress(final long progressValue) {
}
}
if (listenersToExecute != null) {
if (progressConsumer != null) {
safeAcceptProgress(progressConsumer, progressValue);
}
listenersToExecute.forEach(listener -> executeListener(listener, () -> progressValue));
}
assert invariant();
Expand Down Expand Up @@ -189,6 +200,16 @@ private static void executeListener(final ActionListener<Long> listener, final S
}
}

private static void safeAcceptProgress(LongConsumer consumer, long progress) {
assert consumer != null;
try {
consumer.accept(progress);
} catch (Exception e) {
assert false : e;
throw e;
}
}

@Override
public String toString() {
return "ProgressListenableActionFuture[start="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.LongConsumer;

/**
* Keeps track of the contents of a file that may not be completely present.
Expand Down Expand Up @@ -199,7 +200,7 @@ private List<Gap> doWaitForRange(ByteRange range, ByteRange subRange, ActionList
final Range newPendingRange = new Range(
targetRange.start,
range.end(),
new ProgressListenableActionFuture(targetRange.start, range.end())
new ProgressListenableActionFuture(targetRange.start, range.end(), progressConsumer(targetRange.start))
);
ranges.add(newPendingRange);
pendingRanges.add(newPendingRange);
Expand All @@ -218,7 +219,7 @@ private List<Gap> doWaitForRange(ByteRange range, ByteRange subRange, ActionList
final Range newPendingRange = new Range(
targetRange.start,
newPendingRangeEnd,
new ProgressListenableActionFuture(targetRange.start, newPendingRangeEnd)
new ProgressListenableActionFuture(targetRange.start, newPendingRangeEnd, progressConsumer(targetRange.start))
);
ranges.add(newPendingRange);
pendingRanges.add(newPendingRange);
Expand Down Expand Up @@ -260,6 +261,15 @@ private void determineStartingRange(ByteRange range, List<Range> pendingRanges,
}
}

private LongConsumer progressConsumer(long rangeStart) {
assert Thread.holdsLock(ranges);
if (rangeStart == complete) {
return this::updateCompletePointer;
} else {
return null;
}
}

public boolean checkAvailable(long upTo) {
assert upTo <= length : "tried to check availability up to [" + upTo + "] but length is only [" + length + "]";
return complete >= upTo;
Expand Down Expand Up @@ -464,8 +474,20 @@ private void onGapSuccess(final Range gapRange) {
private void maybeUpdateCompletePointer(Range gapRange) {
assert Thread.holdsLock(ranges);
if (gapRange.start == 0) {
assert complete <= gapRange.end;
complete = gapRange.end;
updateCompletePointerHoldingLock(gapRange.end);
}
}

private void updateCompletePointerHoldingLock(long value) {
assert Thread.holdsLock(ranges);
assert complete <= value : complete + ">" + value;
complete = value;
}

private void updateCompletePointer(long value) {
synchronized (ranges) {
updateCompletePointerHoldingLock(value);
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>");
}
}

Expand Down Expand Up @@ -535,9 +557,9 @@ public class Gap {
/**
* Range in the file corresponding to the current gap
*/
public final Range range;
private final Range range;

Gap(Range range) {
private Gap(Range range) {
assert range.start < range.end : range.start + "-" + range.end;
this.range = range;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,6 @@ public void testListenerCalledImmediatelyWhenProgressReached() {
private static ProgressListenableActionFuture randomFuture() {
final long delta = randomLongBetween(1L, ByteSizeUnit.TB.toBytes(1L));
final long start = randomLongBetween(Long.MIN_VALUE, Long.MAX_VALUE - delta);
return new ProgressListenableActionFuture(start, start + delta);
return new ProgressListenableActionFuture(start, start + delta, null);
}
}

0 comments on commit fb220f6

Please sign in to comment.