Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
[BZ 1015706,1092756] check for invalid max and update it with the avg
Browse files Browse the repository at this point in the history
We also log a warning to indicate that we have come across an invalid metric.
We then update and persist the max.

Conflicts:
	modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
  • Loading branch information
John Sanda committed May 30, 2014
1 parent d3c6251 commit b1b4eee
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 25 deletions.
Expand Up @@ -187,6 +187,10 @@ public void initPreparedStatements() {
log.info("Finished initializing prepared statements in " + (endTime - startTime) + " ms");
}

public StorageSession getStorageSession() {
return storageSession;
}

public StorageResultSetFuture insertRawData(MeasurementDataNumeric data) {
BoundStatement statement = insertRawData.bind(data.getScheduleId(), new Date(data.getTimestamp()),
data.getValue());
Expand Down
Expand Up @@ -47,6 +47,7 @@
import org.apache.commons.logging.LogFactory;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
import org.joda.time.Duration;

import org.rhq.core.domain.measurement.MeasurementDataNumeric;
import org.rhq.core.domain.measurement.composite.MeasurementDataNumericHighLowComposite;
Expand All @@ -66,6 +67,8 @@ public class MetricsServer {

private final Log log = LogFactory.getLog(MetricsServer.class);

private static final double THRESHOLD = 0.00001d;

private DateTimeService dateTimeService = new DateTimeService();

private MetricsDAO dao;
Expand Down Expand Up @@ -234,53 +237,51 @@ public Iterable<MeasurementDataNumericHighLowComposite> findDataForResource(int
Iterable<AggregateNumericMetric> metrics = null;
if (dateTimeService.isIn1HourDataRange(begin)) {
metrics = dao.findOneHourMetrics(scheduleId, beginTime, endTime);
return createComposites(metrics, beginTime, endTime, numberOfBuckets);
} else if (dateTimeService.isIn6HourDataRange(begin)) {
metrics = dao.findSixHourMetrics(scheduleId, beginTime, endTime);
return createComposites(metrics, beginTime, endTime, numberOfBuckets, MetricsTable.SIX_HOUR);
} else if (dateTimeService.isIn24HourDataRange(begin)) {
metrics = dao.findTwentyFourHourMetrics(scheduleId, beginTime, endTime);
return createComposites(metrics, beginTime, endTime, numberOfBuckets, MetricsTable.TWENTY_FOUR_HOUR);
} else {
throw new IllegalArgumentException("beginTime[" + beginTime + "] is outside the accepted range.");
}

return createComposites(metrics, beginTime, endTime, numberOfBuckets);
} finally {
stopwatch.stop();
if (log.isDebugEnabled()) {
log.debug("Retrieved resource data for [scheduleId: " + scheduleId + ", beginTime: " + beginTime +
", endTime: " + endTime + "] in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
log.debug("Finished calculating resource summary aggregate in " +
stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
}
}
}

public List<MeasurementDataNumericHighLowComposite> findDataForGroup(List<Integer> scheduleIds, long beginTime,
long endTime, int numberOfBuckets) {
Stopwatch stopwatch = new Stopwatch().start();
try {
DateTime begin = new DateTime(beginTime);
if (log.isDebugEnabled()) {
log.debug("Querying for metric data using parameters [scheduleIds: " + scheduleIds + ", beingTime: " +
beginTime + ", endTime: " + endTime + ", numberOfBuckets: " + numberOfBuckets + "]");
}

DateTime begin = new DateTime(beginTime);

if (dateTimeService.isInRawDataRange(begin)) {
Iterable<RawNumericMetric> metrics = dao.findRawMetrics(scheduleIds, beginTime, endTime);
return createRawComposites(metrics, beginTime, endTime, numberOfBuckets);
}

Iterable<AggregateNumericMetric> metrics = null;
if (dateTimeService.isIn1HourDataRange(begin)) {
metrics = dao.findOneHourMetrics(scheduleIds, beginTime, endTime);
} else if (dateTimeService.isIn6HourDataRange(begin)) {
metrics = dao.findSixHourMetrics(scheduleIds, beginTime, endTime);
} else if (dateTimeService.isIn24HourDataRange(begin)) {
metrics = dao.findTwentyFourHourMetrics(scheduleIds, beginTime, endTime);
} else {
throw new IllegalArgumentException("beginTime[" + beginTime + "] is outside the accepted range.");
}

Iterable<AggregateNumericMetric> metrics = null;
if (dateTimeService.isIn1HourDataRange(begin)) {
metrics = dao.findOneHourMetrics(scheduleIds, beginTime, endTime);
return createComposites(metrics, beginTime, endTime, numberOfBuckets);
} finally {
stopwatch.stop();
if (log.isDebugEnabled()) {
log.debug("Retrieved resource group data for [scheduleIds: " + scheduleIds + ", beginTime: " +
beginTime + ", endTime: " + endTime + "] in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
}
} else if (dateTimeService.isIn6HourDataRange(begin)) {
metrics = dao.findSixHourMetrics(scheduleIds, beginTime, endTime);
return createComposites(metrics, beginTime, endTime, numberOfBuckets, MetricsTable.SIX_HOUR);
} else if (dateTimeService.isIn24HourDataRange(begin)) {
metrics = dao.findTwentyFourHourMetrics(scheduleIds, beginTime, endTime);
return createComposites(metrics, beginTime, endTime, numberOfBuckets, MetricsTable.TWENTY_FOUR_HOUR);
} else {
throw new IllegalArgumentException("beginTime[" + beginTime + "] is outside the accepted range.");
}
}

Expand Down Expand Up @@ -417,6 +418,76 @@ private List<MeasurementDataNumericHighLowComposite> createComposites(Iterable<A

}

private List<MeasurementDataNumericHighLowComposite> createComposites(Iterable<AggregateNumericMetric> metrics,
long beginTime, long endTime, int numberOfBuckets, MetricsTable type) {

Buckets buckets = new Buckets(beginTime, endTime, numberOfBuckets);
for (AggregateNumericMetric metric : metrics) {
// see https://bugzilla.redhat.com/show_bug.cgi?id=1015706 for details
if (metric.getMax() < metric.getAvg() && Math.abs(metric.getMax() - metric.getAvg()) > THRESHOLD) {
log.warn(metric + " is invalid. The max value for an aggregate metric should not be larger than " +
"its average. The max will be set to the average.");
metric.setMax(metric.getAvg());
updateMaxWithNewTTL(metric, type);
}
buckets.insert(metric.getTimestamp(), metric.getAvg(), metric.getMin(), metric.getMax());
}

List<MeasurementDataNumericHighLowComposite> data = new ArrayList<MeasurementDataNumericHighLowComposite>();
for (int i = 0; i < buckets.getNumDataPoints(); ++i) {
Buckets.Bucket bucket = buckets.get(i);
data.add(new MeasurementDataNumericHighLowComposite(bucket.getStartTime(), bucket.getAvg(),
bucket.getMax(), bucket.getMin()));
}
return data;
}

private void updateMaxWithNewTTL(AggregateNumericMetric metric, MetricsTable type) {
int newTTL;

switch (type) {
case ONE_HOUR:
newTTL = calculateNewTTL(MetricsTable.ONE_HOUR.getTTLinMilliseconds(), metric.getTimestamp());
updateMax(metric, MetricsTable.ONE_HOUR, newTTL);
break;
case SIX_HOUR:
newTTL = calculateNewTTL(MetricsTable.SIX_HOUR.getTTLinMilliseconds(), metric.getTimestamp());
updateMax(metric, MetricsTable.SIX_HOUR, newTTL);
break;
case TWENTY_FOUR_HOUR:
newTTL = calculateNewTTL(MetricsTable.TWENTY_FOUR_HOUR.getTTLinMilliseconds(), metric.getTimestamp());
updateMax(metric, MetricsTable.TWENTY_FOUR_HOUR, newTTL);
break;
default: // raw
throw new IllegalArgumentException("This method should only be called for aggregate metrics");
}
}

private int calculateNewTTL(long originalTTLMillis, long timestamp) {
return new Duration(originalTTLMillis - (System.currentTimeMillis() - timestamp)).toStandardSeconds()
.getSeconds();
}

private void updateMax(final AggregateNumericMetric metric, MetricsTable table, int ttl) {
StorageSession session = dao.getStorageSession();
StorageResultSetFuture future = session.executeAsync(
"INSERT INTO " + table + " (schedule_id, time, type, value) " +
"VALUES (" + metric.getScheduleId() + ", " + metric.getTimestamp() + ", " + AggregateType.MAX.ordinal() +
", " + metric.getMax() + ") " +
"USING TTL " + ttl);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
log.info("Successfully updated the max value for " + metric);
}

@Override
public void onFailure(Throwable t) {
log.warn("Failed to update the max value for " + metric, t);
}
});
}

public void addNumericData(final Set<MeasurementDataNumeric> dataSet, final RawDataInsertedCallback callback) {
if (log.isDebugEnabled()) {
log.debug("Inserting " + dataSet.size() + " raw metrics");
Expand Down
Expand Up @@ -781,6 +781,45 @@ public void find1HourDataComposites() {
actualData.get(29), TEST_PRECISION);
}

@Test
public void find6HourDataCompositesHavingInvalidMax() throws Exception {
DateTime beginTime = now().minusDays(30);
DateTime endTime = now();

Buckets buckets = new Buckets(beginTime, endTime);
DateTime bucket0Time = new DateTime(buckets.get(0).getStartTime());
DateTime bucket59Time = new DateTime(buckets.get(59).getStartTime());

int scheduleId = 123;
List<AggregateNumericMetric> metrics = asList(
new AggregateNumericMetric(scheduleId, 3.0, 1.0, 2.9, bucket0Time.getMillis()),
new AggregateNumericMetric(scheduleId, 5.0, 4.0, 6.0, bucket0Time.plusHours(1).getMillis())
);
for (AggregateNumericMetric metric : metrics) {
dao.insertSixHourData(metric.getScheduleId(), metric.getTimestamp(), AggregateType.MIN, metric.getMin());
dao.insertSixHourData(metric.getScheduleId(), metric.getTimestamp(), AggregateType.MAX, metric.getMax());
dao.insertSixHourData(metric.getScheduleId(), metric.getTimestamp(), AggregateType.AVG, metric.getAvg());
}

List<MeasurementDataNumericHighLowComposite> actualData = Lists.newArrayList(metricsServer.findDataForResource(
scheduleId, beginTime.getMillis(), endTime.getMillis(), 60));

assertEquals(actualData.size(), buckets.getNumDataPoints(), "Expected to get back 60 data points.");

MeasurementDataNumericHighLowComposite expectedBucket0 = new MeasurementDataNumericHighLowComposite(
buckets.get(0).getStartTime(), divide(3.0 + 5.0, 2), 6.0, 1.0);

assertPropertiesMatch("The data for bucket 0 does not match expected values", expectedBucket0,
actualData.get(0), TEST_PRECISION);

// make sure the max for the invalid metric was updated
List<AggregateNumericMetric> updatedMetrics = Lists.newArrayList(dao.findSixHourMetrics(scheduleId,
bucket0Time.getMillis(), bucket0Time.plusSeconds(10).getMillis()));

assertEquals(updatedMetrics.size(), 1, "Expected to get back 1 updated metric");
assertEquals(updatedMetrics.get(0).getMax(), 3.0, "Failed to update the max for invalid metric");
}

@Test
public void find1HourDatCompositesForGroup() {
DateTime beginTime = now().minusDays(11);
Expand Down Expand Up @@ -896,4 +935,4 @@ private Iterable<RawNumericMetric> findRawMetricsWithMetadata(int scheduleId, lo
return new SimplePagedResult<RawNumericMetric>(cql, new RawNumericMetricMapper(true), storageSession);
}

}
}

0 comments on commit b1b4eee

Please sign in to comment.