Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
[BZ 1114200] fix handling of late data
Browse files Browse the repository at this point in the history
My previous commits that involved setting the column timestamp on inserts had
to be reverted. When aggregating late data, it is possible to end up with a
clock tie (See https://wiki.apache.org/cassandra/FAQ#clocktie for details). In
those situations the columns might not get overwritten.

The key thing is to ensure that we do not try to recompute aggregate metrics
when some or all of the source data has already expired, leading to new,
incorrect data. I have put changes in place that should prevent this from
happening.

This commit also includes a change to IndexIterator. It will now continue
loading pages if there is a read timeout. Previously, the exception would go
uncaught causing data aggregation to abort early.
  • Loading branch information
John Sanda committed Sep 26, 2014
1 parent 4b8f550 commit 26c0014
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 47 deletions.
Expand Up @@ -64,6 +64,8 @@ public class MetricsServer {

private final Log log = LogFactory.getLog(MetricsServer.class);

private static final int RAW_DATA_AGE_LIMIT_MAX = 5;

private DateTimeService dateTimeService = new DateTimeService();

private MetricsDAO dao;
Expand All @@ -77,7 +79,8 @@ public class MetricsServer {

private AggregationManager aggregationManager;

private Days rawDataAgeLimit = Days.days(Integer.parseInt(System.getProperty("rhq.metrics.data.age-limit", "3")));
private Days rawDataAgeLimit = Days.days(Math.min(3, Integer.parseInt(
System.getProperty("rhq.metrics.data.age-limit", "3"))));

public void setDAO(MetricsDAO dao) {
this.dao = dao;
Expand All @@ -96,6 +99,10 @@ public int getRawDataAgeLimit() {
}

public void setRawDataAgeLimit(int rawDataAgeLimit) {
if (rawDataAgeLimit > RAW_DATA_AGE_LIMIT_MAX) {
throw new IllegalArgumentException("The requested limit, " + rawDataAgeLimit + ", exceeds the max age " +
"limit of " + RAW_DATA_AGE_LIMIT_MAX);
}
this.rawDataAgeLimit = Days.days(rawDataAgeLimit);
}

Expand Down Expand Up @@ -335,47 +342,45 @@ public void addNumericData(final Set<MeasurementDataNumeric> dataSet, final RawD
}
final Stopwatch stopwatch = new Stopwatch().start();
final AtomicInteger remainingInserts = new AtomicInteger(dataSet.size());
// TODO add support for splitting cache index partition
final int partition = 0;

for (final MeasurementDataNumeric data : dataSet) {
DateTime collectionTimeSlice = dateTimeService.getTimeSlice(new DateTime(data.getTimestamp()),
configuration.getRawTimeSliceDuration());
Days days = Days.daysBetween(collectionTimeSlice, dateTimeService.now());

if (days.isGreaterThan(rawDataAgeLimit)) {
callback.onSuccess(data);
continue;
}
log.info(data + " is older than the raw data age limit of " + rawDataAgeLimit.getDays() +
" days. It will not be stored.");
} else {
StorageResultSetFuture rawFuture = dao.insertRawData(data);
StorageResultSetFuture indexFuture = dao.updateIndex(IndexBucket.RAW, collectionTimeSlice.getMillis(),
data.getScheduleId());
ListenableFuture<List<ResultSet>> insertsFuture = Futures.successfulAsList(rawFuture, indexFuture);
Futures.addCallback(insertsFuture, new FutureCallback<List<ResultSet>>() {
@Override
public void onSuccess(List<ResultSet> result) {
callback.onSuccess(data);
if (remainingInserts.decrementAndGet() == 0) {
stopwatch.stop();
if (log.isDebugEnabled()) {
log.debug("Finished inserting " + dataSet.size() + " raw metrics in " +
stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
}
callback.onFinish();
}
}

StorageResultSetFuture rawFuture = dao.insertRawData(data);
StorageResultSetFuture indexFuture = dao.updateIndex(IndexBucket.RAW, collectionTimeSlice.getMillis(),
data.getScheduleId());
ListenableFuture<List<ResultSet>> insertsFuture = Futures.successfulAsList(rawFuture, indexFuture);
Futures.addCallback(insertsFuture, new FutureCallback<List<ResultSet>>() {
@Override
public void onSuccess(List<ResultSet> result) {
callback.onSuccess(data);
if (remainingInserts.decrementAndGet() == 0) {
stopwatch.stop();
@Override
public void onFailure(Throwable t) {
if (log.isDebugEnabled()) {
log.debug("Finished inserting " + dataSet.size() + " raw metrics in " +
stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
log.debug("An error occurred while inserting raw data", ThrowableUtil.getRootCause(t));
} else {
log.warn("An error occurred while inserting raw data: " + ThrowableUtil.getRootMessage(t));
}
callback.onFinish();
callback.onFailure(t);
}
}

@Override
public void onFailure(Throwable t) {
if (log.isDebugEnabled()) {
log.debug("An error occurred while inserting raw data", ThrowableUtil.getRootCause(t));
} else {
log.warn("An error occurred while inserting raw data: " + ThrowableUtil.getRootMessage(t));
}
callback.onFailure(t);
}
}, tasks);
}, tasks);
}
}
}

Expand Down
Expand Up @@ -135,10 +135,13 @@ public Set<AggregateNumericMetric> run() {
PersistFunctions persistFunctions = new PersistFunctions(dao, dtService);
final Set<AggregateNumericMetric> oneHourData = new ConcurrentSkipListSet<AggregateNumericMetric>(
AGGREGATE_COMPARATOR);

DateTime endTime = dtService.currentHour();
DateTime end = endTime;
DateTime start = end.minusDays(2);
// We set the start time to the retention period minus 1 hour, or 6 days and 23
// hours ago instead of 7 days ago because if we set the start time to the full
// 7 days, then we could end up in a situation where data has expired and
// aggregate metric get overwritten with partial data.
DateTime start = end.minus(configuration.getRawRetention().toPeriod().minusHours(1));
DataAggregator rawAggregator = createRawAggregator(persistFunctions);
rawAggregator.setBatchFinishedListener(new DataAggregator.BatchFinishedListener() {
@Override
Expand All @@ -149,11 +152,11 @@ public void onFinish(List<AggregateNumericMetric> metrics) {
num1Hour = rawAggregator.execute(start, end);

end = dtService.get6HourTimeSlice(endTime);
start = dtService.get6HourTimeSlice(endTime).minusDays(7);
start = dtService.get6HourTimeSlice(endTime).minus(configuration.getRawRetention());
num6Hour = create1HourAggregator(persistFunctions).execute(start, end);

end = dtService.get24HourTimeSlice(endTime);
start = dtService.get24HourTimeSlice(endTime).minusDays(14);
start = dtService.get24HourTimeSlice(endTime).minus(configuration.getRawRetention());
num24Hour = create6HourAggregator(persistFunctions).execute(start, end);

return oneHourData;
Expand Down
Expand Up @@ -144,7 +144,7 @@ public int execute(DateTime start, DateTime end) throws InterruptedException,
AbortedException {

log.info("Starting " + bucket + " data aggregation");
Stopwatch stopwatch = new Stopwatch().start();
Stopwatch stopwatch = Stopwatch.createStarted();
try {
IndexIterator iterator = new IndexIterator(start, end, bucket, dao, configuration);
Batch batch = new Batch();
Expand Down Expand Up @@ -182,15 +182,15 @@ public int execute(DateTime start, DateTime end) throws InterruptedException,
taskTracker.abort("There was an unexpected error scheduling aggregation tasks: " + e.getMessage());
} finally {
stopwatch.stop();
log.info("Finished " + bucket + " data aggregation for " + schedulesCount + " measurement schedules in " +
log.info("Finished " + schedulesCount + " " + bucket + " data aggregations in " +
stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
}
return schedulesCount.get();
}

protected void submitAggregationTask(Batch batch) throws InterruptedException {
if (log.isDebugEnabled()) {
log.debug("Scheduling aggregation task for " + batch);
log.debug("Scheduling " + bucket + " aggregation task for " + batch);
}
permits.acquire();
aggregationTasks.submit(new AggregationTask(batch) {
Expand Down Expand Up @@ -370,8 +370,8 @@ public void onSuccess(R args) {
permits.release();
taskTracker.finishedTask();
if (log.isDebugEnabled()) {
log.debug("There are " + taskTracker.getRemainingTasks() + " remaining tasks and " +
permits.availablePermits() + " available permits");
log.debug("There are " + taskTracker.getRemainingTasks() + " remaining " + bucket +
" aggregation tasks and " + permits.availablePermits() + " available permits");
}
}
}
Expand Down
@@ -1,12 +1,18 @@
package org.rhq.server.metrics.aggregation;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.ExecutionInfo;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.exceptions.ReadTimeoutException;
import com.google.common.collect.Iterators;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.joda.time.DateTime;
import org.joda.time.Duration;

Expand All @@ -17,12 +23,47 @@

/**
* An iterator for the index with paging support. The iterator will scan through all partitions for a particular date
* range, one page at a time.
* range, one page at a time. If a query results in a read timeout, the iterator simply logs the exception and queries
* the next page.
*
* @author John Sanda
*/
public class IndexIterator implements Iterator<IndexEntry> {

private static final Log log = LogFactory.getLog(IndexIterator.class);

private static final ResultSet EMPTY_RESULT_SET = new ResultSet() {
@Override
public ColumnDefinitions getColumnDefinitions() {
return null;
}

@Override
public boolean isExhausted() {
return true;
}

@Override
public Row one() {
return null;
}

@Override
public List<Row> all() {
return Collections.emptyList();
}

@Override
public Iterator<Row> iterator() {
return null;
}

@Override
public ExecutionInfo getExecutionInfo() {
return null;
}
};

private DateTime time;

private Duration duration;
Expand Down Expand Up @@ -57,7 +98,6 @@ public IndexIterator(DateTime startTime, DateTime endTime, IndexBucket bucket, M
MetricsConfiguration configuration) {
time = startTime;
this.endTime = endTime;
this.duration = duration;
this.bucket = bucket;
this.dao = dao;
this.numPartitions = configuration.getIndexPartitions();
Expand Down Expand Up @@ -99,15 +139,16 @@ public void remove() {

private void loadPage() {
if (rowIterator == null) {
nextPage(dao.findIndexEntries(bucket, partition, time.getMillis()).get());
nextPage(findIndexEntries());
} else {
if (rowCount < pageSize) {
// When we get here, it means that we have gone through all the pages in
// the current partition; consequently, we query the next partition.
nextPage(dao.findIndexEntries(bucket, ++partition, time.getMillis()).get());
++partition;
nextPage(findIndexEntries());
} else{
// We query the current partition again because there could be more pages.
nextPage(dao.findIndexEntries(bucket, partition, time.getMillis(), lastScheduleId).get());
nextPage(findIndexEntriesAfterScheduleId());
}
}
}
Expand All @@ -126,7 +167,7 @@ private void nextPage(ResultSet resultSet) {
partition = 0;
time = time.plus(duration);
}
nextResultSet = dao.findIndexEntries(bucket, partition, time.getMillis()).get();
nextResultSet = findIndexEntries();
}
if (time.isBefore(endTime)) {
List<Row> rows = nextResultSet.all();
Expand All @@ -138,4 +179,24 @@ private void nextPage(ResultSet resultSet) {
}
}

private ResultSet findIndexEntries() {
try {
return dao.findIndexEntries(bucket, partition, time.getMillis()).get();
} catch (ReadTimeoutException e) {
log.warn("There was a read timeout while querying the index with {bucket: " + bucket + ", partition: " +
partition + ", time: " + time + "}", e);
return EMPTY_RESULT_SET;
}
}

private ResultSet findIndexEntriesAfterScheduleId() {
try {
return dao.findIndexEntries(bucket, partition, time.getMillis(), lastScheduleId).get();
} catch (ReadTimeoutException e) {
log.warn("There was a read timeout while querying the index with {bucket: " + bucket + ", partition: " +
partition + ", time: " + time + ", lastScheduleId: " + lastScheduleId + "}", e);
return EMPTY_RESULT_SET;
}
}

}
Expand Up @@ -357,7 +357,7 @@
has a timestamp that is more than limit days old, then it is not stored. This prevents data
that is for example a year old from being stored.">
<c:constraint>
<c:integer-constraint minimum="1"/>
<c:integer-constraint minimum="1" maximum="5"/>
</c:constraint>
</c:simple-property>
<c:group name="Aggregation">
Expand Down

0 comments on commit 26c0014

Please sign in to comment.