Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Decouple replication lag from logic to fail stale replicas (#9507) #9705

Merged
merged 1 commit into from
Sep 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Separate request-based and settings-based concurrent segment search controls and introduce AggregatorFactory method to determine concurrent search support ([#9469](https://github.com/opensearch-project/OpenSearch/pull/9469))
- Use non-concurrent path for sort request on timeseries index and field([#9562](https://github.com/opensearch-project/OpenSearch/pull/9562))
- Added sampler based on `Blanket Probabilistic Sampling rate` and `Override for on demand` ([#9621](https://github.com/opensearch-project/OpenSearch/issues/9621))
- Decouple replication lag from logic to fail stale replicas ([#9507](https://github.com/opensearch-project/OpenSearch/pull/9507))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
import static java.util.Arrays.asList;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_INDEXING_CHECKPOINTS;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_TIME_SETTING;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_TIME_BACKPRESSURE_SETTING;
import static org.opensearch.index.SegmentReplicationPressureService.SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
Expand All @@ -54,7 +55,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.put(MAX_REPLICATION_TIME_BACKPRESSURE_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.put(MAX_INDEXING_CHECKPOINTS.getKey(), MAX_CHECKPOINTS_BEHIND)
.build();
}
Expand Down Expand Up @@ -225,7 +226,10 @@ public void testBelowReplicaLimit() throws Exception {

public void testFailStaleReplica() throws Exception {

Settings settings = Settings.builder().put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(500)).build();
Settings settings = Settings.builder()
.put(MAX_REPLICATION_TIME_BACKPRESSURE_SETTING.getKey(), TimeValue.timeValueMillis(500))
.put(MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING.getKey(), TimeValue.timeValueMillis(1000))
.build();
// Starts a primary and replica node.
final String primaryNode = internalCluster().startNode(settings);
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -264,7 +268,9 @@ public void testWithDocumentReplicationEnabledIndex() throws Exception {
"Can't create DocRep index with remote store enabled. Skipping.",
Objects.equals(featureFlagSettings().get(FeatureFlags.REMOTE_STORE, "false"), "false")
);
Settings settings = Settings.builder().put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(500)).build();
Settings settings = Settings.builder()
.put(MAX_REPLICATION_TIME_BACKPRESSURE_SETTING.getKey(), TimeValue.timeValueMillis(500))
.build();
// Starts a primary and replica node.
final String primaryNode = internalCluster().startNode(settings);
createIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,8 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchBackpressureSettings.SETTING_CANCELLATION_BURST, // deprecated
SegmentReplicationPressureService.SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED,
SegmentReplicationPressureService.MAX_INDEXING_CHECKPOINTS,
SegmentReplicationPressureService.MAX_REPLICATION_TIME_SETTING,
SegmentReplicationPressureService.MAX_REPLICATION_TIME_BACKPRESSURE_SETTING,
SegmentReplicationPressureService.MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING,
SegmentReplicationPressureService.MAX_ALLOWED_STALE_SHARDS,

// Settings related to Searchable Snapshots
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
private volatile boolean isSegmentReplicationBackpressureEnabled;
private volatile int maxCheckpointsBehind;
private volatile double maxAllowedStaleReplicas;
private volatile TimeValue maxReplicationTime;
private volatile TimeValue replicationTimeLimitBackpressure;
private volatile TimeValue replicationTimeLimitFailReplica;

private static final Logger logger = LogManager.getLogger(SegmentReplicationPressureService.class);

Expand All @@ -65,13 +66,23 @@
Setting.Property.NodeScope
);

public static final Setting<TimeValue> MAX_REPLICATION_TIME_SETTING = Setting.positiveTimeSetting(
// Time limit on max allowed replica staleness after which backpressure kicks in on primary.
public static final Setting<TimeValue> MAX_REPLICATION_TIME_BACKPRESSURE_SETTING = Setting.positiveTimeSetting(
"segrep.pressure.time.limit",
TimeValue.timeValueMinutes(5),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

// Time limit on max allowed replica staleness after which we start failing the replica shard.
// Defaults to 0(disabled)
public static final Setting<TimeValue> MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING = Setting.positiveTimeSetting(
"segrep.replication.time.limit",
TimeValue.timeValueMinutes(0),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<Double> MAX_ALLOWED_STALE_SHARDS = Setting.doubleSetting(
"segrep.pressure.replica.stale.limit",
.5,
Expand Down Expand Up @@ -114,8 +125,11 @@
this.maxCheckpointsBehind = MAX_INDEXING_CHECKPOINTS.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_INDEXING_CHECKPOINTS, this::setMaxCheckpointsBehind);

this.maxReplicationTime = MAX_REPLICATION_TIME_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_REPLICATION_TIME_SETTING, this::setMaxReplicationTime);
this.replicationTimeLimitBackpressure = MAX_REPLICATION_TIME_BACKPRESSURE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_REPLICATION_TIME_BACKPRESSURE_SETTING, this::setReplicationTimeLimitBackpressure);

this.replicationTimeLimitFailReplica = MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING, this::setReplicationTimeLimitFailReplica);

this.maxAllowedStaleReplicas = MAX_ALLOWED_STALE_SHARDS.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_ALLOWED_STALE_SHARDS, this::setMaxAllowedStaleReplicas);
Expand Down Expand Up @@ -159,7 +173,7 @@
private Set<SegmentReplicationShardStats> getStaleReplicas(final Set<SegmentReplicationShardStats> replicas) {
return replicas.stream()
.filter(entry -> entry.getCheckpointsBehindCount() > maxCheckpointsBehind)
.filter(entry -> entry.getCurrentReplicationTimeMillis() > maxReplicationTime.millis())
.filter(entry -> entry.getCurrentReplicationTimeMillis() > replicationTimeLimitBackpressure.millis())
.collect(Collectors.toSet());
}

Expand Down Expand Up @@ -187,8 +201,12 @@
this.maxAllowedStaleReplicas = maxAllowedStaleReplicas;
}

public void setMaxReplicationTime(TimeValue maxReplicationTime) {
this.maxReplicationTime = maxReplicationTime;
public void setReplicationTimeLimitFailReplica(TimeValue replicationTimeLimitFailReplica) {
this.replicationTimeLimitFailReplica = replicationTimeLimitFailReplica;
}

Check warning on line 206 in server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java#L205-L206

Added lines #L205 - L206 were not covered by tests

public void setReplicationTimeLimitBackpressure(TimeValue replicationTimeLimitBackpressure) {
this.replicationTimeLimitBackpressure = replicationTimeLimitBackpressure;

Check warning on line 209 in server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java#L209

Added line #L209 was not covered by tests
}

@Override
Expand Down Expand Up @@ -216,7 +234,8 @@

@Override
protected void runInternal() {
if (pressureService.isSegmentReplicationBackpressureEnabled) {
// Do not fail the replicas if time limit is set to 0 (i.e. disabled).
if (TimeValue.ZERO.equals(pressureService.replicationTimeLimitFailReplica) == false) {
final SegmentReplicationStats stats = pressureService.tracker.getStats();

// Find the shardId in node which is having stale replicas with highest current replication time.
Expand All @@ -242,7 +261,7 @@
}
final IndexShard primaryShard = indexService.getShard(shardId.getId());
for (SegmentReplicationShardStats staleReplica : staleReplicas) {
if (staleReplica.getCurrentReplicationTimeMillis() > 2 * pressureService.maxReplicationTime.millis()) {
if (staleReplica.getCurrentReplicationTimeMillis() > pressureService.replicationTimeLimitFailReplica.millis()) {
pressureService.shardStateAction.remoteShardFailed(
shardId,
staleReplica.getAllocationId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
private final String allocationId;
private final long checkpointsBehindCount;
private final long bytesBehindCount;
// Total Replication lag observed.
private final long currentReplicationLagMillis;
// Total time taken for replicas to catch up. Similar to replication lag except this
// doesn't include time taken by primary to upload data to remote store.
private final long currentReplicationTimeMillis;
private final long lastCompletedReplicationTimeMillis;

Expand All @@ -40,12 +44,14 @@
long checkpointsBehindCount,
long bytesBehindCount,
long currentReplicationTimeMillis,
long currentReplicationLagMillis,
long lastCompletedReplicationTime
) {
this.allocationId = allocationId;
this.checkpointsBehindCount = checkpointsBehindCount;
this.bytesBehindCount = bytesBehindCount;
this.currentReplicationTimeMillis = currentReplicationTimeMillis;
this.currentReplicationLagMillis = currentReplicationLagMillis;
this.lastCompletedReplicationTimeMillis = lastCompletedReplicationTime;
}

Expand All @@ -55,6 +61,7 @@
this.bytesBehindCount = in.readVLong();
this.currentReplicationTimeMillis = in.readVLong();
this.lastCompletedReplicationTimeMillis = in.readVLong();
this.currentReplicationLagMillis = in.readVLong();

Check warning on line 64 in server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java#L64

Added line #L64 was not covered by tests
}

public String getAllocationId() {
Expand All @@ -73,6 +80,19 @@
return currentReplicationTimeMillis;
}

/**
* Total Replication lag observed.
* @return currentReplicationLagMillis
*/
public long getCurrentReplicationLagMillis() {
return currentReplicationLagMillis;
}

/**
* Total time taken for replicas to catch up. Similar to replication lag except this doesn't include time taken by
* primary to upload data to remote store.
* @return lastCompletedReplicationTimeMillis
*/
public long getLastCompletedReplicationTimeMillis() {
return lastCompletedReplicationTimeMillis;
}
Expand All @@ -93,6 +113,7 @@
builder.field("checkpoints_behind", checkpointsBehindCount);
builder.field("bytes_behind", new ByteSizeValue(bytesBehindCount).toString());
builder.field("current_replication_time", new TimeValue(currentReplicationTimeMillis));
builder.field("current_replication_lag", new TimeValue(currentReplicationLagMillis));

Check warning on line 116 in server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java#L116

Added line #L116 was not covered by tests
builder.field("last_completed_replication_time", new TimeValue(lastCompletedReplicationTimeMillis));
if (currentReplicationState != null) {
builder.startObject();
Expand All @@ -110,6 +131,7 @@
out.writeVLong(bytesBehindCount);
out.writeVLong(currentReplicationTimeMillis);
out.writeVLong(lastCompletedReplicationTimeMillis);
out.writeVLong(currentReplicationLagMillis);

Check warning on line 134 in server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java#L134

Added line #L134 was not covered by tests
}

@Override
Expand All @@ -121,6 +143,8 @@
+ checkpointsBehindCount
+ ", bytesBehindCount="
+ bytesBehindCount
+ ", currentReplicationLagMillis="
+ currentReplicationLagMillis
+ ", currentReplicationTimeMillis="
+ currentReplicationTimeMillis
+ ", lastCompletedReplicationTimeMillis="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ReplicationGroup;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.indices.replication.common.SegmentReplicationLagTimer;

import java.io.IOException;
import java.nio.file.Path;
Expand Down Expand Up @@ -714,7 +714,7 @@
* Map of ReplicationCheckpoints to ReplicationTimers. Timers are added as new checkpoints are published, and removed when
* the replica is caught up.
*/
Map<ReplicationCheckpoint, ReplicationTimer> checkpointTimers;
Map<ReplicationCheckpoint, SegmentReplicationLagTimer> checkpointTimers;

/**
* The time it took to complete the most recent replication event.
Expand Down Expand Up @@ -1188,9 +1188,9 @@
cps.checkpointTimers.entrySet().removeIf((entry) -> {
boolean result = entry.getKey().isAheadOf(visibleCheckpoint) == false;
if (result) {
final ReplicationTimer timer = entry.getValue();
final SegmentReplicationLagTimer timer = entry.getValue();
timer.stop();
lastFinished.set(Math.max(lastFinished.get(), timer.time()));
lastFinished.set(Math.max(lastFinished.get(), timer.totalElapsedTime()));
}
return result;
});
Expand All @@ -1210,7 +1210,7 @@
}

/**
* After a new checkpoint is published, start a timer for each replica to the checkpoint.
* After a new checkpoint is published, create a timer for each replica to the checkpoint.
* @param checkpoint {@link ReplicationCheckpoint}
*/
public synchronized void setLatestReplicationCheckpoint(ReplicationCheckpoint checkpoint) {
Expand All @@ -1219,15 +1219,15 @@
this.latestReplicationCheckpoint = checkpoint;
}
if (primaryMode) {
startReplicationLagTimers();
createReplicationLagTimers();
}
}

public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return this.latestReplicationCheckpoint;
}

private void startReplicationLagTimers() {
private void createReplicationLagTimers() {
for (Map.Entry<String, CheckpointState> entry : checkpoints.entrySet()) {
final String allocationId = entry.getKey();
if (allocationId.equals(this.shardAllocationId) == false) {
Expand All @@ -1237,11 +1237,7 @@
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)) {
cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> {
final ReplicationTimer replicationTimer = new ReplicationTimer();
replicationTimer.start();
return replicationTimer;
});
cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> new SegmentReplicationLagTimer());
logger.trace(
() -> new ParameterizedMessage(
"updated last published checkpoint for {} at visible cp {} to {} - timers [{}]",
Expand All @@ -1256,6 +1252,29 @@
}
}

/**
* After a new checkpoint is published, start a timer per replica for the checkpoint.
* @param checkpoint {@link ReplicationCheckpoint}
*/
public synchronized void startReplicationLagTimers(ReplicationCheckpoint checkpoint) {
assert indexSettings.isSegRepEnabled();
if (checkpoint.equals(latestReplicationCheckpoint) == false) {
this.latestReplicationCheckpoint = checkpoint;

Check warning on line 1262 in server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java#L1262

Added line #L1262 was not covered by tests
}
if (primaryMode) {
checkpoints.entrySet().stream().filter(e -> !e.getKey().equals(this.shardAllocationId)).forEach(e -> {
String allocationId = e.getKey();
final CheckpointState cps = e.getValue();
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)
&& cps.checkpointTimers.containsKey(latestReplicationCheckpoint)) {
cps.checkpointTimers.get(latestReplicationCheckpoint).start();
}
});
}
}

/**
* Fetch stats on segment replication.
* @return {@link Tuple} V1 - TimeValue in ms - mean replication lag for this primary to its entire group,
Expand Down Expand Up @@ -1284,14 +1303,15 @@
final String allocationId,
final CheckpointState checkpointState
) {
final Map<ReplicationCheckpoint, ReplicationTimer> checkpointTimers = checkpointState.checkpointTimers;
final Map<ReplicationCheckpoint, SegmentReplicationLagTimer> checkpointTimers = checkpointState.checkpointTimers;
return new SegmentReplicationShardStats(
allocationId,
checkpointTimers.size(),
checkpointState.visibleReplicationCheckpoint == null
? latestCheckpointLength
: Math.max(latestCheckpointLength - checkpointState.visibleReplicationCheckpoint.getLength(), 0),
checkpointTimers.values().stream().mapToLong(ReplicationTimer::time).max().orElse(0),
checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0),
checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0),
checkpointState.lastCompletedReplicationLag
);
}
Expand Down