Skip to content

Commit

Permalink
Allow executing multiple periodic flushes while they are being made d…
Browse files Browse the repository at this point in the history
…urable (elastic#102571)

Closes ES-7200
  • Loading branch information
fcofdez authored and timgrein committed Nov 30, 2023
1 parent 9b23acf commit 24916f6
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 54 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/102571.yaml
@@ -0,0 +1,5 @@
pr: 102571
summary: Allow executing multiple periodic flushes while they are being made durable
area: Store
type: enhancement
issues: []
Expand Up @@ -79,20 +79,18 @@ protected void shardOperationOnPrimary(
IndexShard primary,
ActionListener<PrimaryResult<ShardFlushRequest, ReplicationResponse>> listener
) {
ActionListener.completeWith(listener, () -> {
primary.flush(shardRequest.getRequest());
primary.flush(shardRequest.getRequest(), listener.map(flushed -> {
logger.trace("{} flush request executed on primary", primary.shardId());
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
});
}));
}

@Override
protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
replica.flush(request.getRequest());
replica.flush(request.getRequest(), listener.map(flushed -> {
logger.trace("{} flush request executed on replica", replica.shardId());
return new ReplicaResult();
});
}));
}

// TODO: Remove this transition in 9.0
Expand Down
109 changes: 61 additions & 48 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Expand Up @@ -1393,22 +1393,38 @@ public BulkStats bulkStats() {
* If <code>false</code> is returned, no flush happened.
*/
public boolean flush(FlushRequest request) {
PlainActionFuture<Boolean> future = new PlainActionFuture<>();
flush(request, future);
return future.actionGet();
}

/**
* Executes the given flush request against the engine.
*
* @param request the flush request
* @param listener to notify after full durability has been achieved.
* <code>false</code> if <code>waitIfOngoing==false</code>
* and an ongoing request is detected, else <code>true</code>.
* If <code>false</code> is returned, no flush happened.
*/
public void flush(FlushRequest request, ActionListener<Boolean> listener) {
final boolean waitIfOngoing = request.waitIfOngoing();
final boolean force = request.force();
logger.trace("flush with {}", request);
/*
* We allow flushes while recovery since we allow operations to happen while recovering and we want to keep the translog under
* control (up to deletes, which we do not GC). Yet, we do not use flush internally to clear deletes and flush the index writer
* since we use Engine#writeIndexingBuffer for this now.
*/
verifyNotClosed();
final long time = System.nanoTime();
// TODO: Transition this method to async to support async flush
PlainActionFuture<Engine.FlushResult> future = new PlainActionFuture<>();
getEngine().flush(force, waitIfOngoing, future);
Engine.FlushResult flushResult = future.actionGet();
flushMetric.inc(System.nanoTime() - time);
return flushResult.flushPerformed();
ActionListener.run(listener, l -> {
/*
* We allow flushes while recovery since we allow operations to happen while recovering and we want to keep the translog under
* control (up to deletes, which we do not GC). Yet, we do not use flush internally to clear deletes and flush the index writer
* since we use Engine#writeIndexingBuffer for this now.
*/
verifyNotClosed();
final long startTime = System.nanoTime();
getEngine().flush(
force,
waitIfOngoing,
ActionListener.runBefore(l.map(Engine.FlushResult::flushPerformed), () -> flushMetric.inc(System.nanoTime() - startTime))
);
});
}

/**
Expand Down Expand Up @@ -2291,25 +2307,26 @@ public void flushOnIdle(long inactiveTimeNS) {
boolean wasActive = active.getAndSet(false);
if (wasActive) {
logger.debug("flushing shard on inactive");
threadPool.executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (state != IndexShardState.CLOSED) {
active.set(true);
logger.warn("failed to flush shard on inactive", e);
threadPool.executor(ThreadPool.Names.FLUSH)
.execute(() -> flush(new FlushRequest().waitIfOngoing(false).force(false), new ActionListener<>() {
@Override
public void onResponse(Boolean flushed) {
if (flushed == false) {
// In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request
// will retry (#87888)
active.set(true);
}
periodicFlushMetric.inc();
}
}

@Override
protected void doRun() {
if (flush(new FlushRequest().waitIfOngoing(false).force(false)) == false) {
// In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request
// will retry (#87888)
active.set(true);
@Override
public void onFailure(Exception e) {
if (state != IndexShardState.CLOSED) {
active.set(true);
logger.warn("failed to flush shard on inactive", e);
}
}
periodicFlushMetric.inc();
}
});
}));
}
}
}
Expand Down Expand Up @@ -3756,27 +3773,23 @@ public void afterWriteOperation() {
*/
if (shouldPeriodicallyFlush()) {
logger.debug("submitting async flush request");
final AbstractRunnable flush = new AbstractRunnable() {
@Override
public void onFailure(final Exception e) {
if (state != IndexShardState.CLOSED) {
logger.warn("failed to flush index", e);
threadPool.executor(ThreadPool.Names.FLUSH).execute(() -> {
flush(new FlushRequest(), new ActionListener<>() {
@Override
public void onResponse(Boolean flushed) {
periodicFlushMetric.inc();
}
}

@Override
protected void doRun() {
flush(new FlushRequest());
periodicFlushMetric.inc();
}

@Override
public void onAfter() {
flushOrRollRunning.compareAndSet(true, false);
afterWriteOperation();
}
};
threadPool.executor(ThreadPool.Names.FLUSH).execute(flush);
@Override
public void onFailure(Exception e) {
if (state != IndexShardState.CLOSED) {
logger.warn("failed to flush index", e);
}
}
});
flushOrRollRunning.compareAndSet(true, false);
afterWriteOperation();
});
} else if (shouldRollTranslogGeneration()) {
logger.debug("submitting async roll translog generation request");
final AbstractRunnable roll = new AbstractRunnable() {
Expand Down
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
Expand All @@ -77,6 +78,7 @@
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
Expand Down Expand Up @@ -173,6 +175,7 @@
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.elasticsearch.index.IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
Expand Down Expand Up @@ -4152,6 +4155,44 @@ public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
closeShards(shard);
}

public void testMultiplePeriodicFlushesCanBeTriggeredBeforeTheyAreDurable() throws Exception {
List<ActionListener<Engine.FlushResult>> pendingListeners = Collections.synchronizedList(new ArrayList<>());
// Ensure that a single document forces a flush after each write
var indexSettings = Settings.builder()
.put(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1))
.build();
var shardStarted = new AtomicBoolean();
var flushExecutedBarrier = new CyclicBarrier(2);
var shard = newStartedShard(true, indexSettings, config -> new InternalEngine(config) {
@Override
public void flush(boolean force, boolean waitIfOngoing, ActionListener<FlushResult> listener) throws EngineException {
if (shardStarted.get()) {
super.flush(force, waitIfOngoing, ActionListener.noop());
pendingListeners.add(listener);
safeAwait(flushExecutedBarrier);
} else {
super.flush(force, waitIfOngoing, listener);
}
}
});
shardStarted.set(true);

int numberOfFlushes = randomIntBetween(5, 10);
for (int i = 0; i < numberOfFlushes; i++) {
indexDoc(shard, "_doc", Integer.toString(i));
shard.afterWriteOperation();
safeAwait(flushExecutedBarrier);
}

assertThat(pendingListeners.size(), is(numberOfFlushes));
assertThat(shard.flushStats().getPeriodic(), is(equalTo(0L)));

pendingListeners.forEach(l -> l.onResponse(new Engine.FlushResult(true, 1)));
assertThat(shard.flushStats().getPeriodic(), is(equalTo((long) numberOfFlushes)));

closeShards(shard);
}

public void testOnCloseStats() throws IOException {
final IndexShard indexShard = newStartedShard(true);

Expand Down

0 comments on commit 24916f6

Please sign in to comment.