Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
[BZ 1110462] handle aggregate metrics with NaN as a value
Browse files Browse the repository at this point in the history
Conflicts:
	modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
  • Loading branch information
John Sanda committed Aug 4, 2014
1 parent 21d8d41 commit 28e6e45
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ public void init() {
invalidMetricsManager = new InvalidMetricsManager(dateTimeService, dao);
}

/**
* A test hook
*/
InvalidMetricsManager getInvalidMetricsManager() {
return invalidMetricsManager;
}

/**
* In normal operating mode we compute aggregates from the last hour. If the server has
* been down, we need to determine the most recently stored raw data so we know the
Expand Down Expand Up @@ -364,76 +371,6 @@ private List<MeasurementDataNumericHighLowComposite> createComposites(Iterable<A

}

// private List<MeasurementDataNumericHighLowComposite> createComposites(Iterable<AggregateNumericMetric> metrics,
// long beginTime, long endTime, int numberOfBuckets, MetricsTable type) {
//
// Buckets buckets = new Buckets(beginTime, endTime, numberOfBuckets);
// for (AggregateNumericMetric metric : metrics) {
// // see https://bugzilla.redhat.com/show_bug.cgi?id=1015706 for details
// if (metric.getMax() < metric.getAvg() && Math.abs(metric.getMax() - metric.getAvg()) > THRESHOLD) {
// log.warn(metric + " is invalid. The max value for an aggregate metric should not be larger than " +
// "its average. The max will be set to the average.");
// metric.setMax(metric.getAvg());
// updateMaxWithNewTTL(metric, type);
// }
// buckets.insert(metric.getTimestamp(), metric.getAvg(), metric.getMin(), metric.getMax());
// }
//
// List<MeasurementDataNumericHighLowComposite> data = new ArrayList<MeasurementDataNumericHighLowComposite>();
// for (int i = 0; i < buckets.getNumDataPoints(); ++i) {
// Buckets.Bucket bucket = buckets.get(i);
// data.add(new MeasurementDataNumericHighLowComposite(bucket.getStartTime(), bucket.getAvg(),
// bucket.getMax(), bucket.getMin()));
// }
// return data;
// }

private void updateMaxWithNewTTL(AggregateNumericMetric metric, MetricsTable type) {
int newTTL;

switch (type) {
case ONE_HOUR:
newTTL = calculateNewTTL(MetricsTable.ONE_HOUR.getTTLinMilliseconds(), metric.getTimestamp());
updateMax(metric, MetricsTable.ONE_HOUR, newTTL);
break;
case SIX_HOUR:
newTTL = calculateNewTTL(MetricsTable.SIX_HOUR.getTTLinMilliseconds(), metric.getTimestamp());
updateMax(metric, MetricsTable.SIX_HOUR, newTTL);
break;
case TWENTY_FOUR_HOUR:
newTTL = calculateNewTTL(MetricsTable.TWENTY_FOUR_HOUR.getTTLinMilliseconds(), metric.getTimestamp());
updateMax(metric, MetricsTable.TWENTY_FOUR_HOUR, newTTL);
break;
default: // raw
throw new IllegalArgumentException("This method should only be called for aggregate metrics");
}
}

private int calculateNewTTL(long originalTTLMillis, long timestamp) {
return new Duration(originalTTLMillis - (System.currentTimeMillis() - timestamp)).toStandardSeconds()
.getSeconds();
}

private void updateMax(final AggregateNumericMetric metric, MetricsTable table, int ttl) {
StorageSession session = dao.getStorageSession();
StorageResultSetFuture future = session.executeAsync(
"INSERT INTO " + table + " (schedule_id, time, type, value) " +
"VALUES (" + metric.getScheduleId() + ", " + metric.getTimestamp() + ", " + AggregateType.MAX.ordinal() +
", " + metric.getMax() + ") " +
"USING TTL " + ttl);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
log.info("Successfully updated the max value for " + metric);
}

@Override
public void onFailure(Throwable t) {
log.warn("Failed to update the max value for " + metric, t);
}
});
}

public void addNumericData(final Set<MeasurementDataNumeric> dataSet,
final RawDataInsertedCallback callback) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,18 @@
import org.rhq.server.metrics.domain.RawNumericMetric;

/**
* <p>
* This class tries to deal with invalid aggregate metrics. An invalid metric is one where
* either min > avg or max < avg. It will recompute the metrics if possible; otherwise it
* will delete them.
* either min > avg or max < avg. There are a couple different bugs which made these
* situations possible See https://bugzilla.redhat.com/show_bug.cgi?id=1110462 and
* https://bugzilla.redhat.com/show_bug.cgi?id=1104885 for details.
* </p>
*
* <p>
* When an invalid metric is found, it is {@link #submit(MetricsTable, AggregateNumericMetric) submitted}
* to an internal queue for later processing. Metrics will be recomputed if possible;
* otherwise, they will be deleted.
* </p>
*
* @author John Sanda
*/
Expand Down Expand Up @@ -85,6 +94,10 @@ void setDelay(long delay) {
this.delay = delay;
}

/**
* Shuts down the executor, waiting for any in progress work to finish. Any invalid
* metrics that are in the queue will not be processed.
*/
public void shutdown() {
log.info("Shutting down...");
isShutdown = true;
Expand Down Expand Up @@ -133,7 +146,7 @@ public boolean submit(MetricsTable type, AggregateNumericMetric metric) {
*
* @return The queue of invalid metrics
*/
DelayQueue<InvalidMetric> getQueue() {
public DelayQueue<InvalidMetric> getQueue() {
return queue;
}

Expand Down Expand Up @@ -354,7 +367,8 @@ private List<AggregateNumericMetric> findInvalidMetrics(List<AggregateNumericMet

public boolean isInvalidMetric(AggregateNumericMetric metric) {
return (metric.getMax() < metric.getAvg() && Math.abs(metric.getMax() - metric.getAvg()) > THRESHOLD) ||
(metric.getMin() > metric.getAvg() && Math.abs(metric.getMin() - metric.getAvg()) > THRESHOLD);
(metric.getMin() > metric.getAvg() && Math.abs(metric.getMin() - metric.getAvg()) > THRESHOLD) ||
(Double.isNaN(metric.getAvg()) || Double.isNaN(metric.getMin()) || Double.isNaN(metric.getMin()));
}

/**
Expand Down

0 comments on commit 28e6e45

Please sign in to comment.