Skip to content

Commit

Permalink
More Efficient Ordering of Shard Upload Execution (elastic#42791)
Browse files Browse the repository at this point in the history
* Change the upload order of of snapshots to work file by file in parallel on the snapshot pool instead of merely shard-by-shard
* Inspired by elastic#39657
  • Loading branch information
original-brownbear committed Sep 11, 2019
1 parent 8006465 commit f9a39ed
Show file tree
Hide file tree
Showing 13 changed files with 293 additions and 228 deletions.
32 changes: 32 additions & 0 deletions server/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.CheckedSupplier;

import java.util.ArrayList;
Expand Down Expand Up @@ -226,6 +227,37 @@ public void onFailure(Exception e) {
};
}

/**
* Wraps a given listener and returns a new listener which executes the provided {@code runBefore}
* callback before the listener is notified via either {@code #onResponse} or {@code #onFailure}.
* If the callback throws an exception then it will be passed to the listener's {@code #onFailure} and its {@code #onResponse} will
* not be executed.
*/
static <Response> ActionListener<Response> runBefore(ActionListener<Response> delegate, CheckedRunnable<?> runBefore) {
return new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
runBefore.run();
} catch (Exception ex) {
delegate.onFailure(ex);
return;
}
delegate.onResponse(response);
}

@Override
public void onFailure(Exception e) {
try {
runBefore.run();
} catch (Exception ex) {
e.addSuppressed(ex);
}
delegate.onFailure(e);
}
};
}

/**
* Wraps a given listener and returns a new listener which makes sure {@link #onResponse(Object)}
* and {@link #onFailure(Exception)} of the provided listener will be called at most once.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,11 @@ public boolean isReadOnly() {
return in.isReadOnly();
}


@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, listener);
}

@Override
public void restoreShard(Store store, SnapshotId snapshotId,
Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
* <ul>
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
* with list of indices that will be included into the snapshot</li>
* <li>Data nodes call {@link Repository#snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
* <li>Data nodes call {@link Repository#snapshotShard}
* for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
* </ul>
Expand Down Expand Up @@ -191,27 +191,6 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long
*/
boolean isReadOnly();

/**
* Creates a snapshot of the shard based on the index commit point.
* <p>
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireLastIndexCommit} method.
* Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
* {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
* @param indexShard the shard to be snapshotted
* @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status
* @deprecated use {@link #snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)} instead
*/
@Deprecated
default void snapshotShard(IndexShard indexShard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
snapshotShard(indexShard.store(), indexShard.mapperService(), snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
}

/**
* Creates a snapshot of the shard based on the index commit point.
* <p>
Expand All @@ -226,9 +205,10 @@ default void snapshotShard(IndexShard indexShard, SnapshotId snapshotId, IndexId
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status
* @param listener listener invoked on completion
*/
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus);
IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener);

/**
* Restores snapshot of the shard.
Expand Down
Loading

0 comments on commit f9a39ed

Please sign in to comment.