Skip to content

Commit

Permalink
rework
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed Jun 23, 2023
1 parent 33420aa commit 83ffe5c
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), TestRepositoryPlugin.class);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/95994")
public void testRecoveryStateRecoveredBytesMatchPhysicalCacheState() throws Exception {
final String fsRepoName = randomAlphaOfLength(10);
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,16 +506,32 @@ private void prewarmCache(ActionListener<Void> listener, Supplier<Boolean> cance

final AtomicLong prefetchedBytes = new AtomicLong(0L);
try (var fileListener = new RefCountingListener(ActionListener.runBefore(completionListener.acquire().map(v -> {
// we can't report a file as being partially recovered from disk and partially from the blob store, but this is
// something that can happen for fully mounted searchable snapshots. So we make a choice here: if less than 50%
// of the file is prewarmed from the blob store then it is marked as reused from cache.
if ((prefetchedBytes.get() / (double) file.length()) < 0.5d) {
// we don't support files to be reported as partially recovered from disk and partially from the blob store, but
// this is something that can happen for fully mounted searchable snapshots. It is possible that prewarming
// prefetched nothing if a concurrent search was executing (and cached the data) or if the data were fetched from
// the blob cache system index.
if (prefetchedBytes.get() == 0L) {
recoveryState.markIndexFileAsReused(file.physicalName());
} else {
recoveryState.getIndex().addRecoveredFromSnapshotBytesToFile(file.physicalName(), file.length());
}
return v;
}), () -> IOUtils.closeWhileHandlingException(input)))) {

if (input instanceof CachedBlobContainerIndexInput cachedIndexInput) {
if (cachedIndexInput.getPersistentCacheInitialLength() == file.length()) {
logger.trace(
() -> format(
"%s file [%s] is already available in cache (%d bytes)",
shardId,
file.physicalName(),
file.length()
)
);
return;
}
}

for (int p = 0; p < file.numberOfParts(); p++) {
final int part = p;
prewarmTaskRunner.enqueueTask(fileListener.acquire().map(releasable -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ protected void readWithoutBlobCache(ByteBuffer b) throws Exception {
assert bytesRead == length : bytesRead + " vs " + length;
}

/**
* @return Returns the number of bytes already cached for the file in the cold persistent cache
*/
public long getPersistentCacheInitialLength() throws Exception {
return cacheFileReference.get().getInitialLength();
}

/**
* Prefetches a complete part and writes it in cache. This method is used to prewarm the cache.
*
Expand Down

0 comments on commit 83ffe5c

Please sign in to comment.