Skip to content

Commit

Permalink
Address comment - Move upload check in prepareAndUpload
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Nov 27, 2023
1 parent 112bff6 commit a27e030
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,6 @@ public void relocated(
failShard("timed out waiting for relocation hand-off to complete", null);
throw new IndexShardClosedException(shardId(), "timed out waiting for relocation hand-off to complete");
} catch (Exception ex) {
logger.warn("exception occurred during relocation hand-off to complete errorMsg={}", ex.getMessage());
assert replicationTracker.isPrimaryMode();
// If the primary mode is still true after the end of handoff attempt, it basically means that the relocation
// failed. The existing primary will continue to be the primary, so we need to allow the segments and translog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,16 @@ public void rollGeneration() throws IOException {
}

private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOException {
// During primary relocation, both the old and new primary have engine created with RemoteFsTranslog and having
// ReplicationTracker.primaryMode() as true. However, before we perform the `internal:index/shard/replication/segments_sync`
// action which re-downloads the segments and translog on the new primary. We are ensuring 2 things here -
// 1. Using startedPrimarySupplier, we prevent the new primary to do pre-emptive syncs
// 2. Using syncPermits, we prevent syncs at the desired time during primary relocation.
if (startedPrimarySupplier.getAsBoolean() == false || syncPermit.tryAcquire(SYNC_PERMIT) == false) {
logger.debug("skipped uploading translog for {} {} syncPermits={}", primaryTerm, generation, syncPermit.availablePermits());
// NO-OP
return false;
}
long maxSeqNo = -1;
try (Releasable ignored = writeLock.acquire()) {
if (generation == null || generation == current.getGeneration()) {
Expand Down Expand Up @@ -324,16 +334,6 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc
}

private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws IOException {
// During primary relocation, both the old and new primary have engine created with RemoteFsTranslog and having
// ReplicationTracker.primaryMode() as true. However, before we perform the `internal:index/shard/replication/segments_sync`
// action which re-downloads the segments and translog on the new primary. We are ensuring 2 things here -
// 1. Using startedPrimarySupplier, we prevent the new primary to do pre-emptive syncs
// 2. Using syncPermits, we prevent syncs at the desired time during primary relocation.
if (startedPrimarySupplier.getAsBoolean() == false || syncPermit.tryAcquire(SYNC_PERMIT) == false) {
logger.debug("skipped uploading translog for {} {} syncPermits={}", primaryTerm, generation, syncPermit.availablePermits());
// NO-OP
return true;
}
logger.trace("uploading translog for {} {}", primaryTerm, generation);
try (
TranslogCheckpointTransferSnapshot transferSnapshotProvider = new TranslogCheckpointTransferSnapshot.Builder(
Expand Down Expand Up @@ -462,7 +462,7 @@ public void trimUnreferencedReaders() throws IOException {

// This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote
// store.
if (pauseSync.get()) {
if (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get()) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,21 +879,21 @@ public void testDrainSync() throws Exception {
ops,
new Translog.Index(String.valueOf(2), 2, primaryTerm.get(), new byte[] { 1 })
);
assertEquals(2, translog.readers.size());
assertEquals(1, translog.readers.size());
assertEquals(6, translog.allUploaded().size());
assertEquals(mdFiles, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)));

// Refill the permits back
Releasables.close(releasable);
addToTranslogAndListAndUpload(translog, ops, new Translog.Index(String.valueOf(3), 3, primaryTerm.get(), new byte[] { 1 }));
assertEquals(3, translog.readers.size());
assertEquals(10, translog.allUploaded().size());
assertEquals(2, translog.readers.size());
assertEquals(8, translog.allUploaded().size());
assertEquals(3, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size());

translog.setMinSeqNoToKeep(3);
translog.trimUnreferencedReaders();
assertEquals(1, translog.readers.size());
assertBusy(() -> assertEquals(6, translog.allUploaded().size()));
assertBusy(() -> assertEquals(4, translog.allUploaded().size()));
assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));
}

Expand Down

0 comments on commit a27e030

Please sign in to comment.