Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Add cumulative bytes lag to NodesStats #9393

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1454,6 +1454,7 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats)
assertEquals(0, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(0, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(0, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
indexSingleDoc(secondIndex, true);

long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long max_bytes_lag = 0, max_time_lag = 0;
long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(randomDataNode).admin()
.cluster()
Expand All @@ -77,6 +77,7 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed;
total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);

Expand All @@ -85,9 +86,11 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
.prepareRemoteStoreStats(secondIndex, "0")
.setLocal(true)
.get();

cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed;
total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);

Expand All @@ -101,6 +104,7 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
assertEquals(cumulativeUploadsSucceeded, remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted());
assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed());
assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
}
Expand Down Expand Up @@ -173,15 +177,15 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats)
assertEquals(0, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(0, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(0, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag());
shourya035 marked this conversation as resolved.
Show resolved Hide resolved
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
}

private static void assertNodeStatsParityAcrossNodes(String firstIndex, String secondIndex) {
for (String dataNode : internalCluster().getDataNodeNames()) {
long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long cumulativeDownloadsSucceeded = 0, cumulativeDownloadsStarted = 0, cumulativeDownloadsFailed = 0;
long max_bytes_lag = 0, max_time_lag = 0;
long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(dataNode).admin()
.cluster()
Expand All @@ -197,6 +201,7 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted;
cumulativeDownloadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed;
total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);

Expand All @@ -214,6 +219,7 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted;
cumulativeDownloadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed;
total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);

Expand All @@ -230,6 +236,7 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
assertEquals(cumulativeDownloadsSucceeded, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(cumulativeDownloadsStarted, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(cumulativeDownloadsFailed, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public class RemoteSegmentStats implements Writeable, ToXContentFragment {
* Used to check for data freshness in the remote store
*/
private long maxRefreshBytesLag;
/**
* Total refresh lag (in bytes) between local and the remote store
* Used to check for data freshness in the remote store
*/
private long totalRefreshBytesLag;

public RemoteSegmentStats() {}

Expand All @@ -73,6 +78,7 @@ public RemoteSegmentStats(StreamInput in) throws IOException {
downloadBytesSucceeded = in.readLong();
maxRefreshTimeLag = in.readLong();
maxRefreshBytesLag = in.readLong();
totalRefreshBytesLag = in.readLong();
}

/**
Expand All @@ -91,7 +97,11 @@ public RemoteSegmentStats(RemoteSegmentTransferTracker.Stats trackerStats) {
this.downloadBytesStarted = trackerStats.directoryFileTransferTrackerStats.transferredBytesStarted;
this.downloadBytesFailed = trackerStats.directoryFileTransferTrackerStats.transferredBytesFailed;
this.maxRefreshTimeLag = trackerStats.refreshTimeLagMs;
// Initializing both total and max bytes lag to the same `bytesLag`
// value from the tracker object
// Aggregations would be performed on the add method
this.maxRefreshBytesLag = trackerStats.bytesLag;
this.totalRefreshBytesLag = trackerStats.bytesLag;
}

// Getter and setters. All are visible for testing
Expand Down Expand Up @@ -155,8 +165,16 @@ public long getMaxRefreshBytesLag() {
return maxRefreshBytesLag;
}

public void setMaxRefreshBytesLag(long maxRefreshBytesLag) {
this.maxRefreshBytesLag = maxRefreshBytesLag;
public void addMaxRefreshBytesLag(long maxRefreshBytesLag) {
this.maxRefreshBytesLag = Math.max(this.maxRefreshBytesLag, maxRefreshBytesLag);
}

public long getTotalRefreshBytesLag() {
return totalRefreshBytesLag;
}

public void addTotalRefreshBytesLag(long totalRefreshBytesLag) {
this.totalRefreshBytesLag += totalRefreshBytesLag;
}

/**
Expand All @@ -174,6 +192,7 @@ public void add(RemoteSegmentStats existingStats) {
this.downloadBytesSucceeded += existingStats.getDownloadBytesSucceeded();
this.maxRefreshTimeLag = Math.max(this.maxRefreshTimeLag, existingStats.getMaxRefreshTimeLag());
this.maxRefreshBytesLag = Math.max(this.maxRefreshBytesLag, existingStats.getMaxRefreshBytesLag());
this.totalRefreshBytesLag += existingStats.getTotalRefreshBytesLag();
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -187,33 +206,41 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(downloadBytesSucceeded);
out.writeLong(maxRefreshTimeLag);
out.writeLong(maxRefreshBytesLag);
out.writeLong(totalRefreshBytesLag);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.REMOTE_STORE);
builder.startObject(Fields.UPLOAD);
buildUploadStats(builder);
builder.endObject();
builder.startObject(Fields.DOWNLOAD);
buildDownloadStats(builder);
builder.endObject();
builder.endObject();
return builder;
}

private void buildUploadStats(XContentBuilder builder) throws IOException {
builder.startObject(Fields.TOTAL_UPLOADS);
builder.humanReadableField(Fields.STARTED_BYTES, Fields.STARTED, new ByteSizeValue(uploadBytesStarted));
builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(uploadBytesSucceeded));
builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(uploadBytesFailed));
builder.endObject();
builder.humanReadableField(Fields.MAX_REFRESH_TIME_LAG_IN_MILLIS, Fields.MAX_REFRESH_TIME_LAG, new TimeValue(maxRefreshTimeLag));
builder.humanReadableField(
Fields.MAX_REFRESH_SIZE_LAG_IN_MILLIS,
Fields.MAX_REFRESH_SIZE_LAG,
new ByteSizeValue(maxRefreshBytesLag)
);
builder.startObject(Fields.REFRESH_SIZE_LAG);
builder.humanReadableField(Fields.TOTAL_SIZE_IN_BYTES, Fields.TOTAL, new ByteSizeValue(totalRefreshBytesLag));
builder.humanReadableField(Fields.MAX_SIZE_IN_BYTES, Fields.MAX, new ByteSizeValue(maxRefreshBytesLag));
builder.endObject();
builder.startObject(Fields.DOWNLOAD);
builder.humanReadableField(Fields.MAX_REFRESH_TIME_LAG_IN_MILLIS, Fields.MAX_REFRESH_TIME_LAG, new TimeValue(maxRefreshTimeLag));
}

private void buildDownloadStats(XContentBuilder builder) throws IOException {
builder.startObject(Fields.TOTAL_DOWNLOADS);
builder.humanReadableField(Fields.STARTED_BYTES, Fields.STARTED, new ByteSizeValue(downloadBytesStarted));
builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(downloadBytesSucceeded));
builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(downloadBytesFailed));
builder.endObject();
builder.endObject();
builder.endObject();
return builder;
}

static final class Fields {
Expand All @@ -230,7 +257,10 @@ static final class Fields {
static final String SUCCEEDED_BYTES = "succeeded_bytes";
static final String MAX_REFRESH_TIME_LAG = "max_refresh_time_lag";
static final String MAX_REFRESH_TIME_LAG_IN_MILLIS = "max_refresh_time_lag_in_millis";
static final String MAX_REFRESH_SIZE_LAG = "max_refresh_size_lag";
static final String MAX_REFRESH_SIZE_LAG_IN_MILLIS = "max_refresh_size_lag_in_bytes";
static final String REFRESH_SIZE_LAG = "refresh_size_lag";
static final String TOTAL = "total";
static final String TOTAL_SIZE_IN_BYTES = "total_size_in_bytes";
static final String MAX = "max";
static final String MAX_SIZE_IN_BYTES = "max_size_in_bytes";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ public void testSerialization() throws IOException {
assertEquals(remoteSegmentStats.getUploadBytesFailed(), deserializedRemoteSegmentStats.getUploadBytesFailed());
assertEquals(remoteSegmentStats.getMaxRefreshTimeLag(), deserializedRemoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(remoteSegmentStats.getMaxRefreshBytesLag(), deserializedRemoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(remoteSegmentStats.getTotalRefreshBytesLag(), deserializedRemoteSegmentStats.getTotalRefreshBytesLag());
}
}
}
Expand Down Expand Up @@ -789,7 +790,8 @@ private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) {
remoteSegmentStats.addDownloadBytesStarted(10L);
remoteSegmentStats.addDownloadBytesSucceeded(10L);
remoteSegmentStats.addDownloadBytesFailed(1L);
remoteSegmentStats.setMaxRefreshBytesLag(5L);
remoteSegmentStats.addTotalRefreshBytesLag(5L);
remoteSegmentStats.addMaxRefreshBytesLag(2L);
remoteSegmentStats.setMaxRefreshTimeLag(2L);
}
return indicesStats;
Expand Down