From dcd114613b26e11e38fbeb48e98d82afea713c61 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 2 Sep 2019 11:26:43 +0200 Subject: [PATCH] Cleanup BlobStoreRepository Abort and Failure Handling (#46208) Aborts and failures were handled in a somewhat unfortunate way in #42791: Since the tasks for all files are generated before uploading they are all executed when a snapshot is aborted and lead to a massive number of failures added to the original aborted exception. In the case of failures the situation was not very reasonable as well. If one blob fails uploading the snapshot logic would upload all the remaining files as well and then fail (when previously it would just fail all following files). I fixed both of the above issues, by just short-circuiting all remaining tasks for a shard in case of an exception in any one upload. --- .../repositories/blobstore/BlobStoreRepository.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 036e1ceea4331..e312acaf83c1d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -110,6 +110,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; @@ -1065,17 +1066,27 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s final GroupedActionListener filesListener = new GroupedActionListener<>(allFilesUploadedListener, indexIncrementalFileCount); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + // Flag to signal that the snapshot has been aborted/failed so we can stop any further blob uploads from starting + final AtomicBoolean alreadyFailed = new AtomicBoolean(); for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) { executor.execute(new ActionRunnable(filesListener) { @Override protected void doRun() { try { - snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); + if (alreadyFailed.get() == false) { + snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); + } filesListener.onResponse(null); } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); } } + + @Override + public void onFailure(Exception e) { + alreadyFailed.set(true); + super.onFailure(e); + } }); } } catch (Exception e) {