Skip to content

Commit

Permalink
[Remote Store] Drain ongoing refreshes, translog uploads during prima…
Browse files Browse the repository at this point in the history
…ry relocation
  • Loading branch information
ashking94 committed Nov 24, 2023
1 parent 5bb6cae commit a26708b
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@

import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.ReferenceManager;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.Semaphore;
Expand All @@ -26,7 +27,7 @@
* is closed, all the permits are acquired and there are no available permits to afterRefresh. This abstract class provides
* necessary abstract methods to schedule retry.
*/
public abstract class CloseableRetryableRefreshListener implements ReferenceManager.RefreshListener, Closeable {
public abstract class CloseableRetryableRefreshListener implements ReferenceManager.RefreshListener {

/**
* Total permits = 1 ensures that there is only single instance of runAfterRefreshWithPermit that is running at a time.
Expand Down Expand Up @@ -184,18 +185,24 @@ private void scheduleRetry(boolean afterRefreshSuccessful, boolean didRefresh) {
*/
protected abstract boolean performAfterRefreshWithPermit(boolean didRefresh);

@Override
public final void close() throws IOException {
public final Releasable drainRefreshes() {
try {
if (semaphore.tryAcquire(TOTAL_PERMITS, 10, TimeUnit.MINUTES)) {
boolean result = closed.compareAndSet(false, true);
assert result && semaphore.availablePermits() == 0;
getLogger().info("All permits are acquired and refresh listener is closed");
return Releasables.releaseOnce(() -> {
semaphore.release(TOTAL_PERMITS);
boolean wasClosed = closed.getAndSet(false);
assert semaphore.availablePermits() == TOTAL_PERMITS : "Available permits is " + semaphore.availablePermits();
assert wasClosed : "RefreshListener is not closed before reopening it";
getLogger().info("All permits are released and refresh listener is open");
});
} else {
throw new TimeoutException("timeout while closing gated refresh listener");
throw new TimeoutException("Timeout while acquiring all permits");
}
} catch (InterruptedException | TimeoutException e) {
throw new RuntimeException("Failed to close the closeable retryable listener", e);
throw new RuntimeException("Failed to acquire all permits", e);
}
}

Expand Down
26 changes: 21 additions & 5 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,9 @@ public void relocated(
final Runnable performSegRep
) throws IllegalIndexShardStateException, IllegalStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
// The below list of releasable ensures that if the relocation does not happen, we undo the activity of close and
// acquire all permits. This will ensure that the remote store uploads can still be done by the existing primary shard.
List<Releasable> releasablesOnNoHandoff = new ArrayList<>();
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
forceRefreshes.close();
Expand All @@ -857,11 +860,15 @@ public void relocated(
maybeSync();
}

// Ensures all in-flight remote store operations drain, before we perform the handoff.
internalRefreshListener.stream()
.filter(refreshListener -> refreshListener instanceof Closeable)
.map(refreshListener -> (Closeable) refreshListener)
.close();
// Ensures all in-flight remote store refreshes drain, before we perform the performSegRep.
for (ReferenceManager.RefreshListener refreshListener : internalRefreshListener) {
if (refreshListener instanceof CloseableRetryableRefreshListener) {
releasablesOnNoHandoff.add(((CloseableRetryableRefreshListener) refreshListener).drainRefreshes());
}
}

// Ensure all in-flight remote store translog upload drains, before we perform the performSegRep.
releasablesOnNoHandoff.add(getEngineOrNull().translogManager().drainSyncToStore());

// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
Expand Down Expand Up @@ -896,6 +903,15 @@ public void relocated(
// Fail primary relocation source and target shards.
failShard("timed out waiting for relocation hand-off to complete", null);
throw new IndexShardClosedException(shardId(), "timed out waiting for relocation hand-off to complete");
} finally {
// If the primary mode is still true after the end of handoff attempt, it basically means that the relocation
// failed. The existing primary will continue to be the primary, so we need to allow the segments and translog
// upload to resume.
if (replicationTracker.isPrimaryMode()) {
for (Releasable releasable : releasablesOnNoHandoff) {
releasable.close();
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.io.IOUtils;
Expand Down Expand Up @@ -301,10 +302,16 @@ public void setMinSeqNoToKeep(long seqNo) {
translog.setMinSeqNoToKeep(seqNo);
}

@Override
public void onDelete() {
translog.onDelete();
}

@Override
public Releasable drainSyncToStore() {
return translog.drainSyncToStore();
}

@Override
public Translog.TranslogGeneration getTranslogGeneration() {
return translog.getGeneration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.translog;

import org.opensearch.common.lease.Releasable;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.core.index.shard.ShardId;

Expand Down Expand Up @@ -121,8 +122,14 @@ public Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolea
throw new UnsupportedOperationException("Translog snapshot unsupported with no-op translogs");
}

@Override
public void onDelete() {}

@Override
public Releasable drainSyncToStore() {
return () -> {};
}

@Override
public Translog.TranslogGeneration getTranslogGeneration() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.common.SetOnce;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.io.IOUtils;
Expand Down Expand Up @@ -38,6 +39,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
Expand All @@ -53,7 +56,6 @@
public class RemoteFsTranslog extends Translog {

private final Logger logger;
private final BlobStoreRepository blobStoreRepository;
private final TranslogTransferManager translogTransferManager;
private final FileTransferTracker fileTransferTracker;
private final BooleanSupplier primaryModeSupplier;
Expand All @@ -75,6 +77,10 @@ public class RemoteFsTranslog extends Translog {
// Semaphore used to allow only single remote generation to happen at a time
private final Semaphore remoteGenerationDeletionPermits = new Semaphore(REMOTE_DELETION_PERMITS);

// These permits exist to allow any inflight background triggered upload.
private static final int UPLOAD_PERMITS = 1;
private final Semaphore uploadPermits = new Semaphore(UPLOAD_PERMITS);

public RemoteFsTranslog(
TranslogConfig config,
String translogUUID,
Expand All @@ -89,7 +95,6 @@ public RemoteFsTranslog(
) throws IOException {
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
logger = Loggers.getLogger(getClass(), shardId);
this.blobStoreRepository = blobStoreRepository;
this.primaryModeSupplier = primaryModeSupplier;
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker);
Expand Down Expand Up @@ -321,8 +326,13 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws
// below ensures that the real primary only is uploading. Before the primary mode is set as true for the new
// primary, the engine is reset to InternalEngine which also initialises the RemoteFsTranslog which in turns
// downloads all the translogs from remote store and does a flush before the relocation finishes.
if (primaryModeSupplier.getAsBoolean() == false) {
logger.debug("skipped uploading translog for {} {}", primaryTerm, generation);
if (primaryModeSupplier.getAsBoolean() == false || uploadPermits.tryAcquire(1) == false) {
logger.debug(
"skipped uploading translog for {} {} uploadPermits={}",
primaryTerm,
generation,
uploadPermits.availablePermits()
);
// NO-OP
return true;
}
Expand All @@ -341,6 +351,8 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws
transferSnapshotProvider,
new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo)
);
} finally {
uploadPermits.release(1);
}

}
Expand Down Expand Up @@ -423,6 +435,24 @@ protected void setMinSeqNoToKeep(long seqNo) {
this.minSeqNoToKeep = seqNo;
}

@Override
protected Releasable drainSyncToStore() {
try {
if (uploadPermits.tryAcquire(UPLOAD_PERMITS, 1, TimeUnit.MINUTES)) {
logger.info("All permits acquired");
return Releasables.releaseOnce(() -> {
uploadPermits.release(UPLOAD_PERMITS);
assert uploadPermits.availablePermits() == UPLOAD_PERMITS : "Available permits is " + uploadPermits.availablePermits();
logger.info("All permits released");
});
} else {
throw new TimeoutException("Timeout while acquiring all permits");
}
} catch (TimeoutException | InterruptedException e) {
throw new RuntimeException("Failed to acquire all permits", e);
}
}

@Override
public void trimUnreferencedReaders() throws IOException {
// clean up local translog files and updates readers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1817,6 +1817,13 @@ protected void setMinSeqNoToKeep(long seqNo) {}

protected void onDelete() {}

/**
* Drains ongoing syncs to the underlying store. It returns a releasable which can be closed to resume the syncs back.
*/
protected Releasable drainSyncToStore() {
return () -> {};
}

/**
* deletes all files associated with a reader. package-private to be able to simulate node failures at this point
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.translog;

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.lease.Releasable;

import java.io.IOException;
import java.util.stream.Stream;
Expand Down Expand Up @@ -135,5 +136,10 @@ public interface TranslogManager {
*/
void onDelete();

/**
* Drains ongoing syncs to the underlying store. It returns a releasable which can be closed to resume the syncs back.
*/
Releasable drainSyncToStore();

Translog.TranslogGeneration getTranslogGeneration();
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected Logger getLogger() {
// Second invocation of afterRefresh method
testRefreshListener.afterRefresh(true);
assertEquals(0, countDownLatch.getCount());
testRefreshListener.close();
testRefreshListener.drainRefreshes();
}

/**
Expand Down Expand Up @@ -98,7 +98,7 @@ protected Logger getLogger() {
assertEquals(initialCount - refreshCount, countDownLatch.getCount());

// Closing the refresh listener so that no further afterRefreshes are executed going forward
testRefreshListener.close();
testRefreshListener.drainRefreshes();

for (int i = 0; i < initialCount - refreshCount; i++) {
testRefreshListener.afterRefresh(true);
Expand Down Expand Up @@ -129,7 +129,7 @@ protected Logger getLogger() {
};
testRefreshListener.afterRefresh(true);
assertEquals(initialCount - 1, countDownLatch.getCount());
testRefreshListener.close();
testRefreshListener.drainRefreshes();

testRefreshListener = new CloseableRetryableRefreshListener(threadPool) {
@Override
Expand All @@ -148,7 +148,7 @@ protected Logger getLogger() {
};
testRefreshListener.afterRefresh(true);
assertEquals(initialCount - 2, countDownLatch.getCount());
testRefreshListener.close();
testRefreshListener.drainRefreshes();

testRefreshListener = new CloseableRetryableRefreshListener(threadPool) {
@Override
Expand All @@ -172,7 +172,7 @@ protected Logger getLogger() {
};
testRefreshListener.afterRefresh(true);
assertEquals(initialCount - 3, countDownLatch.getCount());
testRefreshListener.close();
testRefreshListener.drainRefreshes();

testRefreshListener = new CloseableRetryableRefreshListener(threadPool) {
@Override
Expand All @@ -196,7 +196,7 @@ protected Logger getLogger() {
};
testRefreshListener.afterRefresh(true);
assertEquals(initialCount - 4, countDownLatch.getCount());
testRefreshListener.close();
testRefreshListener.drainRefreshes();
}

/**
Expand Down Expand Up @@ -237,7 +237,7 @@ protected boolean isRetryEnabled() {
};
testRefreshListener.afterRefresh(true);
assertBusy(() -> assertEquals(0, countDownLatch.getCount()));
testRefreshListener.close();
testRefreshListener.drainRefreshes();
}

/**
Expand Down Expand Up @@ -272,7 +272,7 @@ protected Logger getLogger() {
}
};
testRefreshListener.afterRefresh(randomBoolean());
testRefreshListener.close();
testRefreshListener.drainRefreshes();
assertNotEquals(0, countDownLatch.getCount());
}

Expand Down Expand Up @@ -307,7 +307,7 @@ protected Logger getLogger() {
});
thread.start();
assertBusy(() -> assertEquals(0, countDownLatch.getCount()));
testRefreshListener.close();
testRefreshListener.drainRefreshes();
}

public void testScheduleRetryAfterClose() throws Exception {
Expand Down Expand Up @@ -358,8 +358,8 @@ protected TimeValue getNextRetryInterval() {
Thread thread2 = new Thread(() -> {
try {
Thread.sleep(500);
testRefreshListener.close();
} catch (IOException | InterruptedException e) {
testRefreshListener.drainRefreshes();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
});
Expand Down Expand Up @@ -408,7 +408,7 @@ protected boolean isRetryEnabled() {
testRefreshListener.afterRefresh(true);
testRefreshListener.afterRefresh(true);
assertBusy(() -> assertEquals(3, runCount.get()));
testRefreshListener.close();
testRefreshListener.drainRefreshes();
}

public void testExceptionDuringThreadPoolSchedule() throws Exception {
Expand Down Expand Up @@ -450,7 +450,7 @@ protected boolean isRetryEnabled() {
assertThrows(RuntimeException.class, () -> testRefreshListener.afterRefresh(true));
assertBusy(() -> assertFalse(testRefreshListener.getRetryScheduledStatus()));
assertEquals(1, runCount.get());
testRefreshListener.close();
testRefreshListener.drainRefreshes();
}

@After
Expand Down

0 comments on commit a26708b

Please sign in to comment.