Skip to content

Commit

Permalink
Fix failing test and incorporate PR review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Nov 25, 2023
1 parent f7375fb commit 25bc81b
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ public void relocated(
}

// Ensure all in-flight remote store translog upload drains, before we perform the performSegRep.
releasablesOnNoHandoff.add(getEngineOrNull().translogManager().drainSyncToStore());
releasablesOnNoHandoff.add(getEngineOrNull().translogManager().drainSync());

// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ public void onDelete() {
}

@Override
public Releasable drainSyncToStore() {
return translog.drainSyncToStore();
public Releasable drainSync() {
return translog.drainSync();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolea
public void onDelete() {}

@Override
public Releasable drainSyncToStore() {
public Releasable drainSync() {
return () -> {};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.translog;

import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SetOnce;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
Expand Down Expand Up @@ -78,8 +79,8 @@ public class RemoteFsTranslog extends Translog {
private final Semaphore remoteGenerationDeletionPermits = new Semaphore(REMOTE_DELETION_PERMITS);

// These permits exist to allow any inflight background triggered upload.
private static final int UPLOAD_PERMITS = 1;
private final Semaphore uploadPermits = new Semaphore(UPLOAD_PERMITS);
private static final int SYNC_PERMITS = 1;
private final Semaphore syncPermits = new Semaphore(SYNC_PERMITS);

public RemoteFsTranslog(
TranslogConfig config,
Expand Down Expand Up @@ -321,18 +322,13 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc
}

private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws IOException {
// During primary relocation (primary-primary peer recovery), both the old and the new primary have engine
// created with the RemoteFsTranslog. Both primaries are equipped to upload the translogs. The primary mode check
// below ensures that the real primary only is uploading. Before the primary mode is set as true for the new
// primary, the engine is reset to InternalEngine which also initialises the RemoteFsTranslog which in turns
// downloads all the translogs from remote store and does a flush before the relocation finishes.
if (primaryModeSupplier.getAsBoolean() == false || uploadPermits.tryAcquire(1) == false) {
logger.debug(
"skipped uploading translog for {} {} uploadPermits={}",
primaryTerm,
generation,
uploadPermits.availablePermits()
);
// During primary relocation, both the old and new primary have engine created with RemoteFsTranslog and having
// ReplicationTracker.primaryMode() as true. However, before we perform the `internal:index/shard/replication/segments_sync`
// action which re-downloads the segments and translog on the new primary. We are ensuring 2 things here -
// 1. Using primaryModeSupplier, we prevent the new primary to do pre-emptive syncs
// 2. Using syncPermits, we prevent syncs at the desired time during primary relocation.
if (primaryModeSupplier.getAsBoolean() == false || syncPermits.tryAcquire(1) == false) {
logger.debug("skipped uploading translog for {} {} uploadPermits={}", primaryTerm, generation, syncPermits.availablePermits());
// NO-OP
return true;
}
Expand All @@ -352,7 +348,7 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws
new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo)
);
} finally {
uploadPermits.release(1);
syncPermits.release(1);
}

}
Expand Down Expand Up @@ -436,13 +432,13 @@ protected void setMinSeqNoToKeep(long seqNo) {
}

@Override
protected Releasable drainSyncToStore() {
protected Releasable drainSync() {
try {
if (uploadPermits.tryAcquire(UPLOAD_PERMITS, 1, TimeUnit.MINUTES)) {
if (syncPermits.tryAcquire(SYNC_PERMITS, 1, TimeUnit.MINUTES)) {
logger.info("All permits acquired");
return Releasables.releaseOnce(() -> {
uploadPermits.release(UPLOAD_PERMITS);
assert uploadPermits.availablePermits() == UPLOAD_PERMITS : "Available permits is " + uploadPermits.availablePermits();
syncPermits.release(SYNC_PERMITS);
assert syncPermits.availablePermits() == SYNC_PERMITS : "Available permits is " + syncPermits.availablePermits();
logger.info("All permits released");
});
} else {
Expand All @@ -458,6 +454,12 @@ public void trimUnreferencedReaders() throws IOException {
// clean up local translog files and updates readers
super.trimUnreferencedReaders();

// This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote
// store.
if (syncPermits.availablePermits() == 0) {
return;
}

// Since remote generation deletion is async, this ensures that only one generation deletion happens at a time.
// Remote generations involves 2 async operations - 1) Delete translog generation files 2) Delete metadata files
// We try to acquire 2 permits and if we can not, we return from here itself.
Expand Down Expand Up @@ -535,11 +537,7 @@ public static void cleanup(Repository repository, ShardId shardId, ThreadPool th
}

protected void onDelete() {
if (primaryModeSupplier.getAsBoolean() == false) {
logger.trace("skipped delete translog");
// NO-OP
return;
}
ClusterService.assertClusterOrClusterManagerStateThread();
// clean up all remote translog files
translogTransferManager.delete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1820,8 +1820,8 @@ protected void onDelete() {}
/**
* Drains ongoing syncs to the underlying store. It returns a releasable which can be closed to resume the syncs back.
*/
protected Releasable drainSyncToStore() {
return () -> {};
protected Releasable drainSync() {
throw new UnsupportedOperationException();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public interface TranslogManager {
/**
* Drains ongoing syncs to the underlying store. It returns a releasable which can be closed to resume the syncs back.
*/
Releasable drainSyncToStore();
Releasable drainSync();

Translog.TranslogGeneration getTranslogGeneration();
}

0 comments on commit 25bc81b

Please sign in to comment.