Skip to content

Commit

Permalink
Decouple replication lag from replication timer logic used to fail st…
Browse files Browse the repository at this point in the history
…ale replicas

Signed-off-by: Ankit Kala <ankikala@amazon.com>
  • Loading branch information
ankitkala committed Aug 23, 2023
1 parent 5d3633c commit 0ecee5f
Show file tree
Hide file tree
Showing 12 changed files with 218 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
import static java.util.Arrays.asList;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_INDEXING_CHECKPOINTS;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_TIME_SETTING;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_TIME_BACKPRESSURE_SETTING;
import static org.opensearch.index.SegmentReplicationPressureService.SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
Expand All @@ -54,7 +55,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.put(MAX_REPLICATION_TIME_BACKPRESSURE_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.put(MAX_INDEXING_CHECKPOINTS.getKey(), MAX_CHECKPOINTS_BEHIND)
.build();
}
Expand Down Expand Up @@ -226,7 +227,10 @@ public void testBelowReplicaLimit() throws Exception {

public void testFailStaleReplica() throws Exception {

Settings settings = Settings.builder().put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(500)).build();
Settings settings = Settings.builder()
.put(MAX_REPLICATION_TIME_BACKPRESSURE_SETTING.getKey(), TimeValue.timeValueMillis(500))
.put(MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING.getKey(), TimeValue.timeValueMillis(1000))
.build();
// Starts a primary and replica node.
final String primaryNode = internalCluster().startNode(settings);
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -265,7 +269,9 @@ public void testWithDocumentReplicationEnabledIndex() throws Exception {
"Can't create DocRep index with remote store enabled. Skipping.",
Objects.equals(featureFlagSettings().get(FeatureFlags.REMOTE_STORE, "false"), "false")
);
Settings settings = Settings.builder().put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(500)).build();
Settings settings = Settings.builder()
.put(MAX_REPLICATION_TIME_BACKPRESSURE_SETTING.getKey(), TimeValue.timeValueMillis(500))
.build();
// Starts a primary and replica node.
final String primaryNode = internalCluster().startNode(settings);
createIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,8 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchBackpressureSettings.SETTING_CANCELLATION_BURST, // deprecated
SegmentReplicationPressureService.SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED,
SegmentReplicationPressureService.MAX_INDEXING_CHECKPOINTS,
SegmentReplicationPressureService.MAX_REPLICATION_TIME_SETTING,
SegmentReplicationPressureService.MAX_REPLICATION_TIME_BACKPRESSURE_SETTING,
SegmentReplicationPressureService.MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING,
SegmentReplicationPressureService.MAX_ALLOWED_STALE_SHARDS,

// Settings related to Searchable Snapshots
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class SegmentReplicationPressureService implements Closeable {
private volatile boolean isSegmentReplicationBackpressureEnabled;
private volatile int maxCheckpointsBehind;
private volatile double maxAllowedStaleReplicas;
private volatile TimeValue maxReplicationTime;
private volatile TimeValue replicationTimeLimitBackpressure;
private volatile TimeValue replicationTimeLimitFailReplica;

private static final Logger logger = LogManager.getLogger(SegmentReplicationPressureService.class);

Expand All @@ -65,13 +66,20 @@ public class SegmentReplicationPressureService implements Closeable {
Setting.Property.NodeScope
);

public static final Setting<TimeValue> MAX_REPLICATION_TIME_SETTING = Setting.positiveTimeSetting(
public static final Setting<TimeValue> MAX_REPLICATION_TIME_BACKPRESSURE_SETTING = Setting.positiveTimeSetting(
"segrep.pressure.time.limit",
TimeValue.timeValueMinutes(5),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<TimeValue> MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING = Setting.positiveTimeSetting(
"segrep.replication.time.limit",
TimeValue.timeValueMinutes(15),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<Double> MAX_ALLOWED_STALE_SHARDS = Setting.doubleSetting(
"segrep.pressure.replica.stale.limit",
.5,
Expand Down Expand Up @@ -112,8 +120,11 @@ public SegmentReplicationPressureService(
this.maxCheckpointsBehind = MAX_INDEXING_CHECKPOINTS.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_INDEXING_CHECKPOINTS, this::setMaxCheckpointsBehind);

this.maxReplicationTime = MAX_REPLICATION_TIME_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_REPLICATION_TIME_SETTING, this::setMaxReplicationTime);
this.replicationTimeLimitBackpressure = MAX_REPLICATION_TIME_BACKPRESSURE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_REPLICATION_TIME_BACKPRESSURE_SETTING, this::setReplicationTimeLimitBackpressure);

this.replicationTimeLimitFailReplica = MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING, this::setReplicationTimeLimitFailReplica);

this.maxAllowedStaleReplicas = MAX_ALLOWED_STALE_SHARDS.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_ALLOWED_STALE_SHARDS, this::setMaxAllowedStaleReplicas);
Expand Down Expand Up @@ -157,7 +168,7 @@ private void validateReplicationGroup(IndexShard shard) {
private Set<SegmentReplicationShardStats> getStaleReplicas(final Set<SegmentReplicationShardStats> replicas) {
return replicas.stream()
.filter(entry -> entry.getCheckpointsBehindCount() > maxCheckpointsBehind)
.filter(entry -> entry.getCurrentReplicationTimeMillis() > maxReplicationTime.millis())
.filter(entry -> entry.getCurrentReplicationTimeMillis() > replicationTimeLimitBackpressure.millis())
.collect(Collectors.toSet());
}

Expand Down Expand Up @@ -185,8 +196,12 @@ public void setMaxAllowedStaleReplicas(double maxAllowedStaleReplicas) {
this.maxAllowedStaleReplicas = maxAllowedStaleReplicas;
}

public void setMaxReplicationTime(TimeValue maxReplicationTime) {
this.maxReplicationTime = maxReplicationTime;
public void setReplicationTimeLimitFailReplica(TimeValue replicationTimeLimitFailReplica) {
this.replicationTimeLimitFailReplica = replicationTimeLimitFailReplica;
}

public void setReplicationTimeLimitBackpressure(TimeValue replicationTimeLimitBackpressure) {
this.replicationTimeLimitBackpressure = replicationTimeLimitBackpressure;
}

@Override
Expand Down Expand Up @@ -214,7 +229,8 @@ protected boolean mustReschedule() {

@Override
protected void runInternal() {
if (pressureService.isSegmentReplicationBackpressureEnabled) {
// Do not fail the replicas if time limit is set to 0 (i.e. disabled).
if (TimeValue.ZERO.equals(pressureService.replicationTimeLimitFailReplica) == false) {
final SegmentReplicationStats stats = pressureService.tracker.getStats();

// Find the shardId in node which is having stale replicas with highest current replication time.
Expand All @@ -240,7 +256,7 @@ protected void runInternal() {
}
final IndexShard primaryShard = indexService.getShard(shardId.getId());
for (SegmentReplicationShardStats staleReplica : staleReplicas) {
if (staleReplica.getCurrentReplicationTimeMillis() > 2 * pressureService.maxReplicationTime.millis()) {
if (staleReplica.getCurrentReplicationTimeMillis() > pressureService.replicationTimeLimitFailReplica.millis()) {
pressureService.shardStateAction.remoteShardFailed(
shardId,
staleReplica.getAllocationId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class SegmentReplicationShardStats implements Writeable, ToXContentFragme
private final long checkpointsBehindCount;
private final long bytesBehindCount;
private final long currentReplicationTimeMillis;
private final long currentReplicationLagMillis;
private final long lastCompletedReplicationTimeMillis;

@Nullable
Expand All @@ -40,12 +41,14 @@ public SegmentReplicationShardStats(
long checkpointsBehindCount,
long bytesBehindCount,
long currentReplicationTimeMillis,
long currentReplicationLagMillis,
long lastCompletedReplicationTime
) {
this.allocationId = allocationId;
this.checkpointsBehindCount = checkpointsBehindCount;
this.bytesBehindCount = bytesBehindCount;
this.currentReplicationTimeMillis = currentReplicationTimeMillis;
this.currentReplicationLagMillis = currentReplicationLagMillis;
this.lastCompletedReplicationTimeMillis = lastCompletedReplicationTime;
}

Expand All @@ -55,6 +58,7 @@ public SegmentReplicationShardStats(StreamInput in) throws IOException {
this.bytesBehindCount = in.readVLong();
this.currentReplicationTimeMillis = in.readVLong();
this.lastCompletedReplicationTimeMillis = in.readVLong();
this.currentReplicationLagMillis = in.readVLong();
}

public String getAllocationId() {
Expand All @@ -73,6 +77,11 @@ public long getCurrentReplicationTimeMillis() {
return currentReplicationTimeMillis;
}

// TODO: Add java docs to compare both.
public long getCurrentReplicationLagMillis() {
return currentReplicationLagMillis;
}

public long getLastCompletedReplicationTimeMillis() {
return lastCompletedReplicationTimeMillis;
}
Expand All @@ -93,6 +102,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("checkpoints_behind", checkpointsBehindCount);
builder.field("bytes_behind", new ByteSizeValue(bytesBehindCount).toString());
builder.field("current_replication_time", new TimeValue(currentReplicationTimeMillis));
builder.field("current_replication_lag", new TimeValue(currentReplicationLagMillis));
builder.field("last_completed_replication_time", new TimeValue(lastCompletedReplicationTimeMillis));
if (currentReplicationState != null) {
builder.startObject();
Expand All @@ -110,6 +120,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(bytesBehindCount);
out.writeVLong(currentReplicationTimeMillis);
out.writeVLong(lastCompletedReplicationTimeMillis);
out.writeVLong(currentReplicationLagMillis);
}

@Override
Expand All @@ -121,6 +132,8 @@ public String toString() {
+ checkpointsBehindCount
+ ", bytesBehindCount="
+ bytesBehindCount
+ ", currentReplicationLagMillis="
+ currentReplicationLagMillis
+ ", currentReplicationTimeMillis="
+ currentReplicationTimeMillis
+ ", lastCompletedReplicationTimeMillis="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ReplicationGroup;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.indices.replication.common.SegmentReplicationLagTimer;

import java.io.IOException;
import java.nio.file.Path;
Expand Down Expand Up @@ -714,7 +714,7 @@ public static class CheckpointState implements Writeable {
* Map of ReplicationCheckpoints to ReplicationTimers. Timers are added as new checkpoints are published, and removed when
* the replica is caught up.
*/
Map<ReplicationCheckpoint, ReplicationTimer> checkpointTimers;
Map<ReplicationCheckpoint, SegmentReplicationLagTimer> checkpointTimers;

/**
* The time it took to complete the most recent replication event.
Expand Down Expand Up @@ -1179,16 +1179,23 @@ public synchronized void updateVisibleCheckpointForShard(final String allocation
logger.warn("Ignoring the checkpoint update for allocation ID {} as its not being tracked by primary", allocationId);
return;
}
logger.info("ankitkala: Updating visible checkpoint");
if (cps.checkpointTimers.isEmpty() == false) {
// stop any timers for checkpoints up to the received cp and remove from cps.checkpointTimers.
// Compute the max lag from the set of completed timers.
final AtomicLong lastFinished = new AtomicLong(0L);
cps.checkpointTimers.entrySet().removeIf((entry) -> {
boolean result = entry.getKey().isAheadOf(visibleCheckpoint) == false;
if (result) {
final ReplicationTimer timer = entry.getValue();
final SegmentReplicationLagTimer timer = entry.getValue();
timer.stop();
lastFinished.set(Math.max(lastFinished.get(), timer.time()));
lastFinished.set(Math.max(lastFinished.get(), timer.totalElapsedTime()));
logger.info(
"ankitkala: timer created: {}, started: {}, totaltimemillis: {}",
timer.getCreationTime(),
timer.getStartNanoTime(),
timer.totalElapsedTime()
);
}
return result;
});
Expand All @@ -1208,24 +1215,25 @@ public synchronized void updateVisibleCheckpointForShard(final String allocation
}

/**
* After a new checkpoint is published, start a timer for each replica to the checkpoint.
* After a new checkpoint is published, create a timer for each replica to the checkpoint.
* @param checkpoint {@link ReplicationCheckpoint}
*/
public synchronized void setLatestReplicationCheckpoint(ReplicationCheckpoint checkpoint) {
public synchronized void setLatestReplicationCheckpoint(ReplicationCheckpoint checkpoint, boolean shouldStartTimers) {
assert indexSettings.isSegRepEnabled();
if (checkpoint.equals(latestReplicationCheckpoint) == false) {
this.latestReplicationCheckpoint = checkpoint;
}
if (primaryMode) {
startReplicationLagTimers();
createReplicationLagTimers(shouldStartTimers);
}
}

public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return this.latestReplicationCheckpoint;
}

private void startReplicationLagTimers() {
private void createReplicationLagTimers(boolean shouldStartTimers) {
logger.info("ankitkala: Creating the lag timers");
for (Map.Entry<String, CheckpointState> entry : checkpoints.entrySet()) {
final String allocationId = entry.getKey();
if (allocationId.equals(this.shardAllocationId) == false) {
Expand All @@ -1236,8 +1244,10 @@ private void startReplicationLagTimers() {
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)) {
cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> {
final ReplicationTimer replicationTimer = new ReplicationTimer();
replicationTimer.start();
final SegmentReplicationLagTimer replicationTimer = new SegmentReplicationLagTimer();
if (shouldStartTimers) {
replicationTimer.start();
}
return replicationTimer;
});
logger.trace(
Expand All @@ -1254,6 +1264,30 @@ private void startReplicationLagTimers() {
}
}

/**
* After a new checkpoint is published, start a timer for each replica to the checkpoint.
* @param checkpoint {@link ReplicationCheckpoint}
*/
public synchronized void startReplicationLagTimers(ReplicationCheckpoint checkpoint) {
logger.info("ankitkala: Starting the lag timers");
assert indexSettings.isSegRepEnabled();
if (checkpoint.equals(latestReplicationCheckpoint) == false) {
this.latestReplicationCheckpoint = checkpoint;
}
if (primaryMode) {
checkpoints.entrySet().stream().filter(e -> !e.getKey().equals(this.shardAllocationId)).forEach(e -> {
String allocationId = e.getKey();
final CheckpointState cps = e.getValue();
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)
&& cps.checkpointTimers.containsKey(latestReplicationCheckpoint)) {
cps.checkpointTimers.get(latestReplicationCheckpoint).start();
}
});
}
}

/**
* Fetch stats on segment replication.
* @return {@link Tuple} V1 - TimeValue in ms - mean replication lag for this primary to its entire group,
Expand Down Expand Up @@ -1282,14 +1316,15 @@ private SegmentReplicationShardStats buildShardStats(
final String allocationId,
final CheckpointState checkpointState
) {
final Map<ReplicationCheckpoint, ReplicationTimer> checkpointTimers = checkpointState.checkpointTimers;
final Map<ReplicationCheckpoint, SegmentReplicationLagTimer> checkpointTimers = checkpointState.checkpointTimers;
return new SegmentReplicationShardStats(
allocationId,
checkpointTimers.size(),
checkpointState.visibleReplicationCheckpoint == null
? latestCheckpointLength
: Math.max(latestCheckpointLength - checkpointState.visibleReplicationCheckpoint.getLength(), 0),
checkpointTimers.values().stream().mapToLong(ReplicationTimer::time).max().orElse(0),
checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0),
checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0),
checkpointState.lastCompletedReplicationLag
);
}
Expand Down

0 comments on commit 0ecee5f

Please sign in to comment.