diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 036e1ceea4331..e312acaf83c1d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -110,6 +110,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; @@ -1065,17 +1066,27 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s final GroupedActionListener filesListener = new GroupedActionListener<>(allFilesUploadedListener, indexIncrementalFileCount); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + // Flag to signal that the snapshot has been aborted/failed so we can stop any further blob uploads from starting + final AtomicBoolean alreadyFailed = new AtomicBoolean(); for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) { executor.execute(new ActionRunnable(filesListener) { @Override protected void doRun() { try { - snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); + if (alreadyFailed.get() == false) { + snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); + } filesListener.onResponse(null); } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); } } + + @Override + public void onFailure(Exception e) { + alreadyFailed.set(true); + super.onFailure(e); + } }); } } catch (Exception e) {