From 7d47f182b8e6ea7f8d9fd7e8a206a5e13f97c7e0 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 9 Jan 2024 17:58:08 +0100 Subject: [PATCH] Fetch single region in cache --- .../blobcache/common/ByteRange.java | 2 +- .../shared/SharedBlobCacheService.java | 168 ++++++++++++++---- 2 files changed, 134 insertions(+), 36 deletions(-) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/ByteRange.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/ByteRange.java index f58f61c987143..7395a3203b315 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/ByteRange.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/ByteRange.java @@ -23,7 +23,7 @@ private ByteRange(long start, long end) { this.start = start; this.end = end; assert start >= 0L : "Start must be >= 0 but saw [" + start + "]"; - assert end >= start : "End must be greater or equal to start but saw [" + start + "][" + start + "]"; + assert end >= start : "End must be greater or equal to start but saw [" + end + "][" + start + "]"; } public long start() { diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 5e8933f86ae7d..2ebddfc548133 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.blobcache.BlobCacheMetrics; @@ -61,6 +62,7 @@ import java.util.function.IntConsumer; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.IntStream; public class SharedBlobCacheService implements Releasable { @@ -428,6 +430,21 @@ private int getRegionSize(long fileLength, int region) { return effectiveRegionSize; } + /** + * Returns the list of regions, as integers, that are required to read a given byte range. + * + * @param range a range of bytes that can cover more than one region + * @return a list of region ids + */ + public List findRegions(ByteRange range) { + int startRegion = getRegion(range.start()); + int endRegion = getEndingRegion(range.end()); + if (startRegion != endRegion) { + return IntStream.rangeClosed(startRegion, endRegion).boxed().toList(); + } + return List.of(startRegion); + } + CacheFileRegion get(KeyType cacheKey, long fileLength, int region) { return cache.get(cacheKey, fileLength, region).chunk; } @@ -495,6 +512,49 @@ public boolean maybeFetchFullEntry(KeyType cacheKey, long length, RangeMissingHa return true; } + /** + * Fetch and write in cache a region of a blob if there are enough free pages in the cache to do so. + * + * This method returns as soon as the download tasks are instantiated, but the tasks themselves + * are run on the bulk executor. + * + * If an exception is thrown from the writer then the cache entry being downloaded is freed + * and unlinked + * + * @param cacheKey the key to fetch data for + * @param region the region of the blob to fetch + * @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region) + * @param writer a writer that handles writing of newly downloaded data to the shared cache + * @param listener listener that is called once all downloading has finished + */ + public void maybeFetchRegion( + final KeyType cacheKey, + final int region, + final long blobLength, + final RangeMissingHandler writer, + final ActionListener listener + ) { + if (freeRegionCount() < 1) { + listener.onResponse(null); + return; + } + long regionLength = regionSize; + try { + if (region == getEndingRegion(blobLength)) { + regionLength = blobLength - getRegionStart(region); + } + ByteRange regionRange = ByteRange.of(0, regionLength); + if (regionRange.isEmpty()) { + listener.onResponse(null); + return; + } + final CacheFileRegion entry = get(cacheKey, blobLength, region); + entry.populate(regionRange, writer, bulkIOExecutor, listener); + } catch (Exception e) { + listener.onFailure(e); + } + } + private static void throwAlreadyClosed(String message) { throw new AlreadyClosedException(message); } @@ -666,6 +726,45 @@ boolean tryRead(ByteBuffer buf, long offset) throws IOException { return true; } + /** + * Populates a range in cache + *

+ * If the range to write is already available (or pending to be available) in cache, then the listener is completed immediately. If + * the range is not fully available then the listener is completed once the missing gaps are written in cache. + *

+ */ + void populate( + final ByteRange rangeToWrite, + final RangeMissingHandler writer, + final Executor executor, + final ActionListener listener + ) { + Releasable resource = null; + try { + incRef(); + resource = Releasables.releaseOnce(this::decRef); + ensureOpen(); + final List gaps = tracker.waitForRange( + rangeToWrite, + rangeToWrite, + Assertions.ENABLED + ? ActionListener.releaseAfter(ActionListener.running(() -> { assert regionOwners.get(io) == this; }), resource) + : ActionListener.releasing(resource) + ); + try (RefCountingListener refs = new RefCountingListener(listener)) { + if (gaps.isEmpty() == false) { + final var cacheFileRegion = CacheFileRegion.this; + for (SparseFileTracker.Gap gap : gaps) { + var fillGap = fillGap(cacheFileRegion, writer, gap); + executor.execute(ActionRunnable.run(refs.acquire(), fillGap::run)); + } + } + } + } catch (Exception e) { + releaseAndFail(listener, resource, e); + } + } + void populateAndRead( final ByteRange rangeToWrite, final ByteRange rangeToRead, @@ -701,51 +800,50 @@ void populateAndRead( ); if (gaps.isEmpty() == false) { - fillGaps(executor, writer, gaps); + final var cacheFileRegion = CacheFileRegion.this; + for (SparseFileTracker.Gap gap : gaps) { + executor.execute(fillGap(cacheFileRegion, writer, gap)); + } } } catch (Exception e) { releaseAndFail(listener, resource, e); } } - private void fillGaps(Executor executor, RangeMissingHandler writer, List gaps) { - final var cacheFileRegion = CacheFileRegion.this; - for (SparseFileTracker.Gap gap : gaps) { - executor.execute(new AbstractRunnable() { - - @Override - protected void doRun() throws Exception { - ensureOpen(); - if (cacheFileRegion.tryIncRef() == false) { - throw new AlreadyClosedException("File chunk [" + cacheFileRegion.regionKey + "] has been released"); - } - try { - final int start = Math.toIntExact(gap.start()); - var ioRef = io; - assert regionOwners.get(ioRef) == cacheFileRegion; - writer.fillCacheRange( - ioRef, - start, - start, - Math.toIntExact(gap.end() - start), - progress -> gap.onProgress(start + progress) - ); - writeCount.increment(); - } finally { - cacheFileRegion.decRef(); - } - gap.onCompletion(); + private AbstractRunnable fillGap(CacheFileRegion cacheFileRegion, RangeMissingHandler writer, SparseFileTracker.Gap gap) { + return new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + ensureOpen(); + if (cacheFileRegion.tryIncRef() == false) { + throw new AlreadyClosedException("File chunk [" + cacheFileRegion.regionKey + "] has been released"); } - - @Override - public void onFailure(Exception e) { - gap.onFailure(e); + try { + final int start = Math.toIntExact(gap.start()); + var ioRef = io; + assert regionOwners.get(ioRef) == cacheFileRegion; + writer.fillCacheRange( + ioRef, + start, + start, + Math.toIntExact(gap.end() - start), + progress -> gap.onProgress(start + progress) + ); + writeCount.increment(); + } finally { + cacheFileRegion.decRef(); } - }); - } + gap.onCompletion(); + } + + @Override + public void onFailure(Exception e) { + gap.onFailure(e); + } + }; } - private static void releaseAndFail(ActionListener listener, Releasable decrementRef, Exception e) { + private static void releaseAndFail(ActionListener listener, Releasable decrementRef, Exception e) { try { Releasables.close(decrementRef); } catch (Exception ex) {