Skip to content

Commit

Permalink
[Segment Replication] Fix flaky tests testSegmentReplicationStatsResp…
Browse files Browse the repository at this point in the history
…onse() and testSegmentReplicationStatsWithTimeout() (#6268)

* Fix flaky tests.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Remove unnecessary tests and fix tests.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Update testSegmentReplicationStatsResponse().

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fix falky tests and add concurrent map in SegmentReplicationState.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Spotless Apply

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* moving timing data under detailed flag.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* spotless apply

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Add assert busy and some code refactoring.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Increasing assertBusy Timeout

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

---------

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 authored Feb 21, 2023
1 parent 5560ba4 commit 4316f96
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.opensearch.action.admin.indices.recovery.RecoveryAction;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsAction;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -152,56 +147,6 @@ public void testRecoveriesWithTimeout() {
assertThat(recoveryResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException"));
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6255")
public void testSegmentReplicationStatsWithTimeout() {
internalCluster().startClusterManagerOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);
String dataNode = internalCluster().startDataOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);
String anotherDataNode = internalCluster().startDataOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);

int numShards = 4;
assertAcked(
prepareCreate(
"test-index",
0,
Settings.builder()
.put("number_of_shards", numShards)
.put("number_of_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
)
);
ensureGreen();
final long numDocs = scaledRandomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
index("test-index", "doc", Integer.toString(i));
}
refresh("test-index");
ensureSearchable("test-index");

// Happy case
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats()
.get();
assertThat(segmentReplicationStatsResponse.getTotalShards(), equalTo(numShards * 2));
assertThat(segmentReplicationStatsResponse.getSuccessfulShards(), equalTo(numShards * 2));

// simulate timeout on bad node.
simulateTimeoutAtTransport(dataNode, anotherDataNode, SegmentReplicationStatsAction.NAME);

// verify response with bad node.
segmentReplicationStatsResponse = dataNodeClient().admin().indices().prepareSegmentReplicationStats().get();
assertThat(segmentReplicationStatsResponse.getTotalShards(), equalTo(numShards * 2));
assertThat(segmentReplicationStatsResponse.getSuccessfulShards(), equalTo(numShards));
assertThat(segmentReplicationStatsResponse.getFailedShards(), equalTo(numShards));
assertThat(segmentReplicationStatsResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException"));
}

public void testStatsWithTimeout() {
internalCluster().startClusterManagerOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,55 @@
import org.opensearch.transport.TransportService;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.util.Arrays.asList;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationStatsIT extends SegmentReplicationBaseIT {

public void testSegmentReplicationStatsResponse() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

// index 10 docs
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
internalCluster().startClusterManagerOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
String anotherDataNode = internalCluster().startDataOnlyNode();

int numShards = 4;
assertAcked(
prepareCreate(
INDEX_NAME,
0,
Settings.builder()
.put("number_of_shards", numShards)
.put("number_of_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
)
);
ensureGreen();
final long numDocs = scaledRandomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
index(INDEX_NAME, "doc", Integer.toString(i));
}
refresh(INDEX_NAME);
waitForSearchableDocs(10L, asList(primaryNode, replicaNode));
ensureSearchable(INDEX_NAME);

assertBusy(() -> {
final SegmentReplicationStatsResponse response = client().admin()
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.execute()
.actionGet();
// Verify API Response
assertEquals(response.shardSegmentReplicationStates().size(), SHARD_COUNT);
assertEquals(response.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), SegmentReplicationState.Stage.DONE);
assertTrue(response.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getIndex().recoveredFileCount() > 0);
});
assertEquals(segmentReplicationStatsResponse.shardSegmentReplicationStates().size(), 1);
assertEquals(segmentReplicationStatsResponse.getTotalShards(), numShards * 2);
assertEquals(segmentReplicationStatsResponse.getSuccessfulShards(), numShards * 2);
assertEquals(
segmentReplicationStatsResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(),
SegmentReplicationState.Stage.DONE
);
assertTrue(
segmentReplicationStatsResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getIndex().recoveredFileCount() > 0
);
}, 1, TimeUnit.MINUTES);
}

public void testSegmentReplicationStatsResponseForActiveAndCompletedOnly() throws Exception {
Expand Down Expand Up @@ -121,6 +140,7 @@ public void testSegmentReplicationStatsResponseForActiveAndCompletedOnly() throw
completedOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(),
SegmentReplicationState.Stage.DONE
);
assertTrue(completedOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getIndex().recoveredFileCount() > 0);
waitForAssertions.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected SegmentReplicationState shardOperation(SegmentReplicationStatsRequest
singleIndexWithSegmentReplicationDisabled = shardRouting.getIndexName();
return null;
}
if (indexShard.indexSettings().isSegRepEnabled() == false) {
if (indexShard.indexSettings().isSegRepEnabled() == false || shardRouting.primary()) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* ReplicationState implementation to track Segment Replication events.
Expand Down Expand Up @@ -121,23 +122,28 @@ public Map<String, Long> getTimingData() {
}

public TimeValue getReplicatingStageTime() {
return new TimeValue(timingData.get(Stage.REPLICATING.toString()));
long time = timingData.getOrDefault(Stage.REPLICATING.toString(), 0L);
return new TimeValue(time);
}

public TimeValue getGetCheckpointInfoStageTime() {
return new TimeValue(timingData.get(Stage.GET_CHECKPOINT_INFO.toString()));
long time = timingData.getOrDefault(Stage.GET_CHECKPOINT_INFO.toString(), 0L);
return new TimeValue(time);
}

public TimeValue getFileDiffStageTime() {
return new TimeValue(timingData.get(Stage.FILE_DIFF.toString()));
long time = timingData.getOrDefault(Stage.FILE_DIFF.toString(), 0L);
return new TimeValue(time);
}

public TimeValue getGetFileStageTime() {
return new TimeValue(timingData.get(Stage.GET_FILES.toString()));
long time = timingData.getOrDefault(Stage.GET_FILES.toString(), 0L);
return new TimeValue(time);
}

public TimeValue getFinalizeReplicationStageTime() {
return new TimeValue(timingData.get(Stage.FINALIZE_REPLICATION.toString()));
long time = timingData.getOrDefault(Stage.FINALIZE_REPLICATION.toString(), 0L);
return new TimeValue(time);
}

public SegmentReplicationState(
Expand All @@ -153,7 +159,7 @@ public SegmentReplicationState(
this.sourceDescription = sourceDescription;
this.targetNode = targetNode;
// Timing data will have as many entries as stages, plus one
timingData = new HashMap<>(Stage.values().length + 1);
timingData = new ConcurrentHashMap<>(Stage.values().length + 1);
overallTimer = new ReplicationTimer();
stageTimer = new ReplicationTimer();
setStage(Stage.INIT);
Expand All @@ -180,7 +186,13 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(replicationId);
overallTimer.writeTo(out);
stageTimer.writeTo(out);
out.writeMap(timingData, StreamOutput::writeString, StreamOutput::writeLong);

// Copy of timingData is created to avoid concurrent modification of timingData map.
Map<String, Long> timingDataCopy = new HashMap<>();
for (Map.Entry<String, Long> entry : timingData.entrySet()) {
timingDataCopy.put(entry.getKey(), entry.getValue());
}
out.writeMap(timingDataCopy, StreamOutput::writeString, StreamOutput::writeLong);
out.writeString(sourceDescription);
targetNode.writeTo(out);
}
Expand Down Expand Up @@ -257,22 +269,20 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
builder.timeField(Fields.STOP_TIME_IN_MILLIS, Fields.STOP_TIME, getTimer().stopTime());
}
builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, new TimeValue(getTimer().time()));
if (sourceDescription != null) {
builder.field(Fields.SOURCE, getSourceDescription());
}
builder.field(Fields.SOURCE, getSourceDescription());

builder.startObject(Fields.TARGET);
builder.field(Fields.ID, targetNode.getId());
builder.field(Fields.HOST, targetNode.getHostName());
builder.field(Fields.TRANSPORT_ADDRESS, targetNode.getAddress().toString());
builder.field(Fields.IP, targetNode.getHostAddress());
builder.field(Fields.NAME, targetNode.getName());
builder.endObject();

if (targetNode != null) {
builder.startObject(Fields.TARGET);
builder.field(Fields.ID, targetNode.getId());
builder.field(Fields.HOST, targetNode.getHostName());
builder.field(Fields.TRANSPORT_ADDRESS, targetNode.getAddress().toString());
builder.field(Fields.IP, targetNode.getHostAddress());
builder.field(Fields.NAME, targetNode.getName());
builder.endObject();
}
builder.startObject(SegmentReplicationState.Fields.INDEX);
index.toXContent(builder, params);
builder.endObject();

builder.field(Fields.REPLICATING_STAGE, getReplicatingStageTime());
builder.field(Fields.GET_CHECKPOINT_INFO_STAGE, getGetCheckpointInfoStageTime());
builder.field(Fields.FILE_DIFF_STAGE, getFileDiffStageTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ protected Table getTableWithHeader(RestRequest request) {
t.startHeaders()
.addCell("index", "alias:i,idx;desc:index name")
.addCell("shardId", "alias:s;desc: shard Id")
.addCell("start_time", "default:false;alias:start;desc:segment replication start time")
.addCell("start_time_millis", "default:false;alias:start_millis;desc:segment replication start time in epoch milliseconds")
.addCell("stop_time", "default:false;alias:stop;desc:segment replication stop time")
.addCell("stop_time_millis", "default:false;alias:stop_millis;desc:segment replication stop time in epoch milliseconds")
.addCell("time", "alias:t,ti;desc:segment replication time")
.addCell("stage", "alias:st;desc:segment replication stage")
.addCell("source_description", "alias:sdesc;desc:source description")
Expand All @@ -106,7 +102,11 @@ protected Table getTableWithHeader(RestRequest request) {
.addCell("bytes_fetched", "alias:bf;desc:bytes fetched")
.addCell("bytes_percent", "alias:bp;desc:percent of bytes fetched");
if (detailed) {
t.addCell("files", "alias:f;desc:number of files to fetch")
t.addCell("start_time", "alias:start;desc:segment replication start time")
.addCell("start_time_millis", "alias:start_millis;desc:segment replication start time in epoch milliseconds")
.addCell("stop_time", "alias:stop;desc:segment replication stop time")
.addCell("stop_time_millis", "alias:stop_millis;desc:segment replication stop time in epoch milliseconds")
.addCell("files", "alias:f;desc:number of files to fetch")
.addCell("files_total", "alias:tf;desc:total number of files")
.addCell("bytes", "alias:b;desc:number of bytes to fetch")
.addCell("bytes_total", "alias:tb;desc:total number of bytes")
Expand Down Expand Up @@ -162,10 +162,6 @@ public int compare(SegmentReplicationState o1, SegmentReplicationState o2) {
t.startRow();
t.addCell(index);
t.addCell(state.getShardRouting().shardId().id());
t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().startTime()));
t.addCell(state.getTimer().startTime());
t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().stopTime()));
t.addCell(state.getTimer().stopTime());
t.addCell(new TimeValue(state.getTimer().time()));
t.addCell(state.getStage().toString().toLowerCase(Locale.ROOT));
t.addCell(state.getSourceDescription());
Expand All @@ -176,6 +172,10 @@ public int compare(SegmentReplicationState o1, SegmentReplicationState o2) {
t.addCell(state.getIndex().recoveredBytes());
t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().recoveredBytesPercent()));
if (detailed) {
t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().startTime()));
t.addCell(state.getTimer().startTime());
t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().stopTime()));
t.addCell(state.getTimer().stopTime());
t.addCell(state.getIndex().totalRecoverFiles());
t.addCell(state.getIndex().totalFileCount());
t.addCell(state.getIndex().totalRecoverBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.common.Randomness;
import org.opensearch.common.Table;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentOpenSearchExtension;
import org.opensearch.index.Index;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.SegmentReplicationState;
Expand Down Expand Up @@ -92,10 +91,6 @@ public void testSegmentReplicationAction() {
final List<String> expectedHeaders = Arrays.asList(
"index",
"shardId",
"start_time",
"start_time_millis",
"stop_time",
"stop_time_millis",
"time",
"stage",
"source_description",
Expand All @@ -118,10 +113,6 @@ public void testSegmentReplicationAction() {
final List<Object> expectedValues = Arrays.asList(
"index",
i,
XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().startTime()),
state.getTimer().startTime(),
XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().stopTime()),
state.getTimer().stopTime(),
new TimeValue(state.getTimer().time()),
state.getStage().name().toLowerCase(Locale.ROOT),
state.getSourceDescription(),
Expand Down

0 comments on commit 4316f96

Please sign in to comment.