Skip to content

Commit

Permalink
Cleanup BlobStoreRepository Abort and Failure Handling (elastic#46208)
Browse files Browse the repository at this point in the history
Aborts and failures were handled in a somewhat unfortunate way in elastic#42791:
Since the tasks for all files are generated before uploading they are all executed when a snapshot is aborted and lead to a massive number of failures added to the original aborted exception.
In the case of failures the situation was not very reasonable as well. If one blob fails uploading the snapshot logic would upload all the remaining files as well and then fail (when previously it would just fail all following files).
I fixed both of the above issues, by just short-circuiting all remaining tasks for a shard in case of an exception in any one upload.
  • Loading branch information
original-brownbear committed Sep 11, 2019
1 parent f9a39ed commit dcd1146
Showing 1 changed file with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
Expand Down Expand Up @@ -1065,17 +1066,27 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
final GroupedActionListener<Void> filesListener =
new GroupedActionListener<>(allFilesUploadedListener, indexIncrementalFileCount);
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
// Flag to signal that the snapshot has been aborted/failed so we can stop any further blob uploads from starting
final AtomicBoolean alreadyFailed = new AtomicBoolean();
for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) {
executor.execute(new ActionRunnable<Void>(filesListener) {
@Override
protected void doRun() {
try {
snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store);
if (alreadyFailed.get() == false) {
snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store);
}
filesListener.onResponse(null);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e);
}
}

@Override
public void onFailure(Exception e) {
alreadyFailed.set(true);
super.onFailure(e);
}
});
}
} catch (Exception e) {
Expand Down

0 comments on commit dcd1146

Please sign in to comment.