Skip to content

Commit

Permalink
HotShardRCA adjustments and threshold tuning (#304)
Browse files Browse the repository at this point in the history
* Measure implementation.

Signed-off-by: Filip Drobnjakovic <drobnjakovicfilip@gmail.com>

* Initial threshold tuning.

Signed-off-by: Filip Drobnjakovic <drobnjakovicfilip@gmail.com>

* HotShardSummary change.

Signed-off-by: Filip Drobnjakovic <drobnjakovicfilip@gmail.com>

* Thresholds tuning.

Signed-off-by: Filip Drobnjakovic <drobnjakovicfilip@gmail.com>

* Documentation update.

Signed-off-by: Filip Drobnjakovic <drobnjakovicfilip@gmail.com>

* Images update.

Signed-off-by: Filip Drobnjakovic <drobnjakovicfilip@gmail.com>

* README update.

Signed-off-by: Filip Drobnjakovic <drobnjakovicfilip@gmail.com>

* HotShardSummary shrinking and rework.

Signed-off-by: Filip Drobnjakovic <drobnjakovicfilip@gmail.com>

---------

Signed-off-by: Filip Drobnjakovic <drobnjakovicfilip@gmail.com>
  • Loading branch information
Tjofil committed Mar 20, 2023
1 parent a7d296e commit 398095a
Show file tree
Hide file tree
Showing 36 changed files with 504 additions and 332 deletions.
4 changes: 2 additions & 2 deletions config/rca.conf
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
},
//hot shard rca
"hot-shard-rca": {
"cpu-utilization" : 0.01,
"heap-alloc-rate-in-bytes" : 250000.0,
"cpu-utilization" : 0.015,
"heap-alloc-rate-in-bytes" : 1400000.0,
"top-k-consumers" : 50
},
// field data cache rca
Expand Down
4 changes: 2 additions & 2 deletions config/rca_cluster_manager.conf
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
},
//hot shard rca
"hot-shard-rca": {
"cpu-utilization" : 0.01,
"heap-alloc-rate-in-bytes" : 250000.0,
"cpu-utilization" : 0.015,
"heap-alloc-rate-in-bytes" : 1400000.0,
"top-k-consumers" : 50
},
//hot shard cluster rca
Expand Down
4 changes: 2 additions & 2 deletions config/rca_idle_cluster_manager.conf
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
},
//hot shard rca
"hot-shard-rca": {
"cpu-utilization" : 0.01,
"heap-alloc-rate-in-bytes" : 250000.0,
"cpu-utilization" : 0.015,
"heap-alloc-rate-in-bytes" : 1400000.0,
"top-k-consumers" : 50
},
//hot shard cluster rca
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,8 @@ public String toString() {
}

public static class Constants {
public static final String PENDING_TASKS_COUNT_VALUE = "ClusterManager_PendingQueueSize";
public static final String PENDING_TASKS_COUNT_VALUE =
"ClusterManager_PendingQueueSize";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ public class HotShardRcaConfig {

private final Integer maxConsumersToSend;

public static final double DEFAULT_CPU_UTILIZATION_THRESHOLD = 0.01;
public static final double DEFAULT_HEAP_ALLOC_RATE_THRESHOLD_IN_BYTE_PER_SEC = 250000.0;
public static final double DEFAULT_CPU_UTILIZATION_THRESHOLD = 0.015;
public static final double DEFAULT_HEAP_ALLOC_RATE_THRESHOLD_IN_BYTE_PER_SEC = 1400000.0;
public static final int DEFAULT_TOP_K_CONSUMERS = 50;

public HotShardRcaConfig(final RcaConf rcaConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public List<HotShardSummary> getHotShardSummaryList() {
return hotShardSummaryList;
}

public void setHotShardSummaryList(List<HotShardSummary> summaryList) {
if (summaryList != null) {
this.hotShardSummaryList = summaryList;
}
}
public void appendNestedSummary(HotResourceSummary summary) {
if (summary != null) {
hotResourceSummaryList.add(summary);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,20 @@
import org.jooq.impl.DSL;
import org.opensearch.performanceanalyzer.grpc.FlowUnitMessage;
import org.opensearch.performanceanalyzer.grpc.HotShardSummaryMessage;
import org.opensearch.performanceanalyzer.grpc.HotShardSummaryMessage.CriteriaEnum;
import org.opensearch.performanceanalyzer.rca.framework.api.persist.JooqFieldValue;
import org.opensearch.performanceanalyzer.rca.framework.core.GenericSummary;

/**
* HotShardSummary contains information such as the index_name, shard_id, node_id, cpu_utilization,
* cpu_utilization_threshold io_throughput, io_throughput_threshold, io_sys_callrate,
* io_sys_callrate_threshold and time_period.
* heap_alloc_rate, criteria and time_period.
*
* <p>The hot shard summary is created by node level and cluster level RCAs running on data nodes
* and elected cluster_manager node resp. This object is persisted in SQLite table Table name :
* and elected cluster_manager node resp. This object is persisted in SQLite table. Table name :
* HotClusterSummary
*
* <p>schema : | ID(primary key) | index_name | shard_id | node_id | cpu_utilization |
* cpu_utilization_threshold | io_throughput | io_throughput_threshold | io_sys_callrate |
* io_sys_callrate_threshold| ID in FlowUnit(foreign key)
* heap_alloc_rate| ID in FlowUnit(foreign key)
*/
public class HotShardSummary extends GenericSummary {

Expand All @@ -42,11 +41,9 @@ public class HotShardSummary extends GenericSummary {
private final String indexName;
private final String shardId;
private final String nodeId;
private double cpu_utilization;
private double cpu_utilization_threshold;
private double heap_alloc_rate;
private double heap_alloc_rate_threshold;

private CriteriaEnum criteria;
private double cpuUtilization;
private double heapAllocRate;
private int timePeriodInSeconds;

public HotShardSummary(String indexName, String shardId, String nodeId, int timePeriod) {
Expand All @@ -57,20 +54,16 @@ public HotShardSummary(String indexName, String shardId, String nodeId, int time
this.timePeriodInSeconds = timePeriod;
}

public void setcpuUtilization(final double cpu_utilization) {
this.cpu_utilization = cpu_utilization;
}

public void setCpuUtilizationThreshold(final double cpu_utilization_threshold) {
this.cpu_utilization_threshold = cpu_utilization_threshold;
public void setCpuUtilization(final double cpuUtilization) {
this.cpuUtilization = cpuUtilization;
}

public void setHeapAllocRate(final double heap_alloc_rate) {
this.heap_alloc_rate = heap_alloc_rate;
public void setHeapAllocRate(final double heapAllocRate) {
this.heapAllocRate = heapAllocRate;
}

public void setHeapAllocRateThreshold(final double heap_alloc_rate_threshold) {
this.heap_alloc_rate_threshold = heap_alloc_rate_threshold;
public void setCriteria(final CriteriaEnum criteria) {
this.criteria = criteria;
}

public String getIndexName() {
Expand All @@ -86,11 +79,15 @@ public String getNodeId() {
}

public double getCpuUtilization() {
return this.cpu_utilization;
return this.cpuUtilization;
}

public double getHeapAllocRate() {
return this.heap_alloc_rate;
return this.heapAllocRate;
}

public CriteriaEnum getCriteria() {
return this.criteria;
}

@Override
Expand All @@ -100,10 +97,9 @@ public HotShardSummaryMessage buildSummaryMessage() {
summaryMessageBuilder.setIndexName(this.indexName);
summaryMessageBuilder.setShardId(this.shardId);
summaryMessageBuilder.setNodeId(this.nodeId);
summaryMessageBuilder.setCpuUtilization(this.cpu_utilization);
summaryMessageBuilder.setCpuUtilizationThreshold(this.cpu_utilization_threshold);
summaryMessageBuilder.setHeapAllocRate(this.heap_alloc_rate);
summaryMessageBuilder.setHeapAllocRateThreshold(this.heap_alloc_rate_threshold);
summaryMessageBuilder.setCpuUtilization(this.cpuUtilization);
summaryMessageBuilder.setHeapAllocRate(this.heapAllocRate);
summaryMessageBuilder.setCriteria(this.criteria);
summaryMessageBuilder.setTimePeriod(this.timePeriodInSeconds);
return summaryMessageBuilder.build();
}
Expand All @@ -120,10 +116,9 @@ public static HotShardSummary buildHotShardSummaryFromMessage(HotShardSummaryMes
message.getShardId(),
message.getNodeId(),
message.getTimePeriod());
summary.setcpuUtilization(message.getCpuUtilization());
summary.setCpuUtilizationThreshold(message.getCpuUtilizationThreshold());
summary.setCpuUtilization(message.getCpuUtilization());
summary.setHeapAllocRate(message.getHeapAllocRate());
summary.setHeapAllocRateThreshold(message.getHeapAllocRateThreshold());
summary.setCriteria(message.getCriteria());
return summary;
}

Expand All @@ -135,10 +130,9 @@ public String toString() {
this.indexName,
this.shardId,
this.nodeId,
String.valueOf(this.cpu_utilization),
String.valueOf(this.cpu_utilization_threshold),
String.valueOf(this.heap_alloc_rate),
String.valueOf(this.heap_alloc_rate_threshold),
String.valueOf(this.cpuUtilization),
String.valueOf(this.heapAllocRate),
String.valueOf(this.criteria)
});
}

Expand All @@ -154,9 +148,8 @@ public List<Field<?>> getSqlSchema() {
schema.add(HotShardSummaryField.SHARD_ID_FIELD.getField());
schema.add(HotShardSummaryField.NODE_ID_FIELD.getField());
schema.add(HotShardSummaryField.CPU_UTILIZATION_FIELD.getField());
schema.add(HotShardSummaryField.CPU_UTILIZATION_THRESHOLD_FIELD.getField());
schema.add(HotShardSummaryField.HEAP_ALLOC_RATE_FIELD.getField());
schema.add(HotShardSummaryField.HEAP_ALLOC_RATE_THRESHOLD_FIELD.getField());
schema.add(HotShardSummaryField.CRITERIA_FIELD.getField());
schema.add(HotShardSummaryField.TIME_PERIOD_FIELD.getField());
return schema;
}
Expand All @@ -167,10 +160,9 @@ public List<Object> getSqlValue() {
value.add(this.indexName);
value.add(this.shardId);
value.add(this.nodeId);
value.add(this.cpu_utilization);
value.add(this.cpu_utilization_threshold);
value.add(this.heap_alloc_rate);
value.add(this.heap_alloc_rate_threshold);
value.add(this.cpuUtilization);
value.add(this.heapAllocRate);
value.add(this.criteria.getNumber());
value.add(Integer.valueOf(this.timePeriodInSeconds));
return value;
}
Expand All @@ -186,14 +178,9 @@ public JsonElement toJson() {
summaryObj.addProperty(SQL_SCHEMA_CONSTANTS.INDEX_NAME_COL_NAME, this.indexName);
summaryObj.addProperty(SQL_SCHEMA_CONSTANTS.SHARD_ID_COL_NAME, this.shardId);
summaryObj.addProperty(SQL_SCHEMA_CONSTANTS.NODE_ID_COL_NAME, this.nodeId);
summaryObj.addProperty(SQL_SCHEMA_CONSTANTS.CPU_UTILIZATION_COL_NAME, this.cpu_utilization);
summaryObj.addProperty(
SQL_SCHEMA_CONSTANTS.CPU_UTILIZATION_THRESHOLD_COL_NAME,
this.cpu_utilization_threshold);
summaryObj.addProperty(SQL_SCHEMA_CONSTANTS.HEAP_ALLOC_RATE_COL_NAME, this.heap_alloc_rate);
summaryObj.addProperty(
SQL_SCHEMA_CONSTANTS.HEAP_ALLOC_RATE_THRESHOLD_COL_NAME,
this.heap_alloc_rate_threshold);
summaryObj.addProperty(SQL_SCHEMA_CONSTANTS.CPU_UTILIZATION_COL_NAME, this.cpuUtilization);
summaryObj.addProperty(SQL_SCHEMA_CONSTANTS.HEAP_ALLOC_RATE_COL_NAME, this.heapAllocRate);
summaryObj.addProperty(SQL_SCHEMA_CONSTANTS.CRITERIA_COL_NAME, this.criteria.toString());
summaryObj.addProperty(SQL_SCHEMA_CONSTANTS.TIME_PERIOD_COL_NAME, this.timePeriodInSeconds);
return summaryObj;
}
Expand All @@ -203,9 +190,8 @@ public static class SQL_SCHEMA_CONSTANTS {
public static final String SHARD_ID_COL_NAME = "shard_id";
public static final String NODE_ID_COL_NAME = "node_id";
public static final String CPU_UTILIZATION_COL_NAME = "cpu_utilization";
public static final String CPU_UTILIZATION_THRESHOLD_COL_NAME = "cpu_utilization_threshold";
public static final String HEAP_ALLOC_RATE_COL_NAME = "heap_alloc_rate";
public static final String HEAP_ALLOC_RATE_THRESHOLD_COL_NAME = "heap_alloc_rate_threshold";
public static final String CRITERIA_COL_NAME = "criteria";
public static final String TIME_PERIOD_COL_NAME = "time_period";
}

Expand All @@ -215,11 +201,8 @@ public enum HotShardSummaryField implements JooqFieldValue {
SHARD_ID_FIELD(SQL_SCHEMA_CONSTANTS.SHARD_ID_COL_NAME, String.class),
NODE_ID_FIELD(SQL_SCHEMA_CONSTANTS.NODE_ID_COL_NAME, String.class),
CPU_UTILIZATION_FIELD(SQL_SCHEMA_CONSTANTS.CPU_UTILIZATION_COL_NAME, Double.class),
CPU_UTILIZATION_THRESHOLD_FIELD(
SQL_SCHEMA_CONSTANTS.CPU_UTILIZATION_THRESHOLD_COL_NAME, Double.class),
HEAP_ALLOC_RATE_FIELD(SQL_SCHEMA_CONSTANTS.HEAP_ALLOC_RATE_COL_NAME, Double.class),
HEAP_ALLOC_RATE_THRESHOLD_FIELD(
SQL_SCHEMA_CONSTANTS.HEAP_ALLOC_RATE_THRESHOLD_COL_NAME, Double.class),
CRITERIA_FIELD(SQL_SCHEMA_CONSTANTS.CRITERIA_COL_NAME, Integer.class),
TIME_PERIOD_FIELD(SQL_SCHEMA_CONSTANTS.TIME_PERIOD_COL_NAME, Integer.class);

private String name;
Expand Down Expand Up @@ -259,41 +242,32 @@ public static HotShardSummary buildSummary(final Record record) {
String shardId =
record.get(HotShardSummaryField.SHARD_ID_FIELD.getField(), String.class);
String nodeId = record.get(HotShardSummaryField.NODE_ID_FIELD.getField(), String.class);
Double cpu_utilization =
Double cpuUtilization =
record.get(HotShardSummaryField.CPU_UTILIZATION_FIELD.getField(), Double.class);
Double cpu_utilization_threshold =
record.get(
HotShardSummaryField.CPU_UTILIZATION_THRESHOLD_FIELD.getField(),
Double.class);
Double heap_alloc_rate =
Double heapAllocRate =
record.get(HotShardSummaryField.HEAP_ALLOC_RATE_FIELD.getField(), Double.class);
Double heap_alloc_rate_threshold =
record.get(
HotShardSummaryField.HEAP_ALLOC_RATE_THRESHOLD_FIELD.getField(),
Double.class);
Integer criteria =
record.get(HotShardSummaryField.CRITERIA_FIELD.getField(), Integer.class);

Integer timePeriod =
record.get(HotShardSummaryField.TIME_PERIOD_FIELD.getField(), Integer.class);
if (timePeriod == null
|| cpu_utilization == null
|| cpu_utilization_threshold == null
|| heap_alloc_rate == null
|| heap_alloc_rate_threshold == null) {
|| cpuUtilization == null
|| heapAllocRate == null
|| criteria == null) {
LOG.warn(
"read null object from SQL, timePeriod: {}, cpu_utilization: {}, cpu_utilization_threshold: {},"
+ " heap_alloc_rate: {}, heap_alloc_rate_threshold: {}",
"read null object from SQL, timePeriod: {}, cpuUtilization: {}, heapAllocRate: {},"
+ " criteria: {}",
timePeriod,
cpu_utilization,
cpu_utilization_threshold,
heap_alloc_rate,
heap_alloc_rate_threshold);
cpuUtilization,
heapAllocRate,
criteria);
return null;
}
summary = new HotShardSummary(indexName, shardId, nodeId, timePeriod);
summary.setcpuUtilization(cpu_utilization);
summary.setCpuUtilizationThreshold(cpu_utilization_threshold);
summary.setHeapAllocRate(heap_alloc_rate);
summary.setHeapAllocRateThreshold(heap_alloc_rate_threshold);
summary.setCpuUtilization(cpuUtilization);
summary.setHeapAllocRate(heapAllocRate);
summary.setCriteria(CriteriaEnum.forNumber(criteria));
} catch (IllegalArgumentException ie) {
LOG.error("Some fields might not be found in record, cause : {}", ie.getMessage());
} catch (DataTypeException de) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public MetricFlowUnit gather(final Queryable queryable) {
context, tableName, aggDimension, groupByFieldsList, selectFieldsList);
} catch (Exception e) {
// TODO: Emit log/stats that gathering failed.
LOG.error("RCA: Caught an exception while getting the DB {}", e.getMessage());
LOG.error("RCA: Caught an exception while getting the DB {}",
e.getMessage());
return MetricFlowUnit.generic();
}
return new MetricFlowUnit(0, result);
Expand Down
Loading

0 comments on commit 398095a

Please sign in to comment.