Skip to content

Commit

Permalink
Do not request gaps to fill when sub range is completed
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed May 30, 2024
1 parent d26be67 commit 531f049
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public List<Gap> waitForRange(final ByteRange range, final ByteRange subRange, f
);
}

if (complete >= range.end()) {
if (subRange.end() <= complete) {
listener.onResponse(null);
return List.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.blobcache.BlobCacheUtils;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.test.ESTestCase;

Expand All @@ -22,7 +24,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.LongStream;

import static org.elasticsearch.blobcache.BlobCacheTestUtils.mergeContiguousRanges;
import static org.elasticsearch.blobcache.BlobCacheTestUtils.randomRanges;
Expand All @@ -35,6 +39,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class SparseFileTrackerTests extends ESTestCase {
Expand Down Expand Up @@ -119,6 +124,49 @@ public void testInvalidRange() {
}
}

public void testListenerCompletedImmediatelyWhenSubRangeIsAvailable() {
final byte[] bytes = new byte[randomIntBetween(8, 1024)];
final var tracker = new SparseFileTracker(getTestName(), bytes.length);

// wraps a future to assert that the sub range bytes are available
BiFunction<ByteRange, PlainActionFuture<Void>, ActionListener<Void>> wrapper = (range, future) -> ActionListener.runBefore(
future,
() -> LongStream.range(range.start(), range.end())
.forEach(pos -> assertThat(bytes[BlobCacheUtils.toIntBytes(pos)], equalTo(AVAILABLE)))
);

var completeUpTo = randomIntBetween(2, bytes.length);
{
long subRangeStart = randomLongBetween(0, completeUpTo - 2);
long subRangeEnd = randomLongBetween(subRangeStart + 1, completeUpTo - 1);
var subRange = ByteRange.of(subRangeStart, subRangeEnd);
var range = ByteRange.of(0, completeUpTo);
var future = new PlainActionFuture<Void>();

var gaps = tracker.waitForRange(range, subRange, wrapper.apply(subRange, future));
assertThat(future.isDone(), equalTo(false));
assertThat(gaps, notNullValue());
assertThat(gaps, hasSize(1));

fillGap(bytes, gaps.get(0));

assertThat(future.isDone(), equalTo(true));
}
{
long subRangeStart = randomLongBetween(0L, Math.max(0L, completeUpTo - 1));
long subRangeEnd = randomLongBetween(subRangeStart, completeUpTo);
var subRange = ByteRange.of(subRangeStart, subRangeEnd);

var range = ByteRange.of(randomLongBetween(0L, subRangeStart), randomLongBetween(subRangeEnd, bytes.length));
var future = new PlainActionFuture<Void>();

var gaps = tracker.waitForRange(range, subRange, wrapper.apply(subRange, future));
assertThat(future.isDone(), equalTo(true));
assertThat(gaps, notNullValue());
assertThat(gaps, hasSize(0));
}
}

public void testCallsListenerWhenWholeRangeIsAvailable() {
final byte[] fileContents = new byte[between(0, 1000)];
final SparseFileTracker sparseFileTracker = new SparseFileTracker("test", fileContents.length);
Expand Down Expand Up @@ -546,4 +594,16 @@ private static boolean processGap(byte[] fileContents, SparseFileTracker.Gap gap
return true;
}
}


private static void fillGap(byte[] fileContents, SparseFileTracker.Gap gap) {
for (long i = gap.start(); i < gap.end(); i++) {
assertThat(fileContents[toIntBytes(i)], equalTo(UNAVAILABLE));
}
for (long i = gap.start(); i < gap.end(); i++) {
fileContents[toIntBytes(i)] = AVAILABLE;
gap.onProgress(i + 1L);
}
gap.onCompletion();
}
}

0 comments on commit 531f049

Please sign in to comment.