Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
update metrics_cache and metrics_cache_index for 1 hour and 6 hour data
Browse files Browse the repository at this point in the history
This commit also introduces a big refactoring of AggregationTests. Tracking and
updating expected values was tedious and error prone; consequently, I
introduced InMemoryMetricsDB.
  • Loading branch information
John Sanda committed Mar 22, 2014
1 parent 52fea87 commit 63f166c
Show file tree
Hide file tree
Showing 10 changed files with 460 additions and 234 deletions.
Expand Up @@ -124,6 +124,10 @@ public void setCacheBatchSize(int size) {
cacheBatchSize = size;
}

ListeningExecutorService getAggregationWorkers() {
return aggregationWorkers;
}

public void init() {
init(-1, -1, false);
}
Expand Down
Expand Up @@ -41,6 +41,7 @@
*/
public class AggregationManager {

public static final int INDEX_PARTITION = 0;
private static final Comparator<AggregateNumericMetric> AGGREGATE_COMPARATOR = new Comparator<AggregateNumericMetric>() {
@Override
public int compare(AggregateNumericMetric left, AggregateNumericMetric right) {
Expand Down Expand Up @@ -123,12 +124,12 @@ public Set<AggregateNumericMetric> run() {
int num6Hour = 0;
try {
final int startScheduleId = calculateStartScheduleId(minScheduleId);
numRaw = createRawAggregator(startScheduleId).execute();
numRaw = createRawAggregator().execute();
if (is6HourTimeSliceFinished()) {
num1Hour = create1HourAggregator(startScheduleId).execute();
num1Hour = create1HourAggregator().execute();
}
if (is24HourTimeSliceFinished()) {
num6Hour = create6HourAggregator(startScheduleId).execute();
num6Hour = create6HourAggregator().execute();
}

return oneHourData;
Expand All @@ -146,7 +147,7 @@ public Set<AggregateNumericMetric> run() {
}
}

private Aggregator createRawAggregator(int startScheduleId) {
private Aggregator createRawAggregator() {
ComputeMetric compute1HourMetric = new ComputeMetric() {
@Override
public List<StorageResultSetFuture> execute(int startScheduleId, int scheduleId, Double min, Double max,
Expand All @@ -158,7 +159,9 @@ public List<StorageResultSetFuture> execute(int startScheduleId, int scheduleId,
dao.insertOneHourDataAsync(scheduleId, startTime.getMillis(), MAX, max),
dao.insertOneHourDataAsync(scheduleId, startTime.getMillis(), MIN, min),
dao.updateMetricsCache(ONE_HOUR, get6HourTimeSlice().getMillis(), startScheduleId,
scheduleId, startTime.getMillis(), map(min, max, mean.getArithmeticMean()))
scheduleId, startTime.getMillis(), map(min, max, mean.getArithmeticMean())),
dao.updateCacheIndex(ONE_HOUR, get6HourTimeSlice().getMillis(), INDEX_PARTITION, startScheduleId,
get6HourTimeSlice().getMillis())
);
}
};
Expand All @@ -169,15 +172,13 @@ public List<StorageResultSetFuture> execute(int startScheduleId, int scheduleId,
aggregator.setCacheBatchSize(cacheBatchSize);
aggregator.setComputeMetric(compute1HourMetric);
aggregator.setDao(dao);
aggregator.setMaxScheduleId(maxScheduleId);
aggregator.setPermits(permits);
aggregator.setStartScheduleId(startScheduleId);
aggregator.setStartTime(startTime);

return aggregator;
}

private Aggregator create1HourAggregator(int startScheduleId) {
private Aggregator create1HourAggregator() {
ComputeMetric compute6HourMetric = new ComputeMetric() {
@Override
public List<StorageResultSetFuture> execute(int startScheduleId, int scheduleId, Double min,
Expand All @@ -189,7 +190,9 @@ public List<StorageResultSetFuture> execute(int startScheduleId, int scheduleId,
dao.insertSixHourDataAsync(scheduleId, get6HourTimeSlice().getMillis(), MIN, min),
dao.updateMetricsCache(SIX_HOUR, get24HourTimeSlice().getMillis(),
startScheduleId, scheduleId, get6HourTimeSlice().getMillis(), map(min, max,
mean.getArithmeticMean()))
mean.getArithmeticMean())),
dao.updateCacheIndex(SIX_HOUR, get24HourTimeSlice().getMillis(), INDEX_PARTITION, startScheduleId,
get24HourTimeSlice().getMillis())
);
}
};
Expand All @@ -200,15 +203,13 @@ startScheduleId, scheduleId, get6HourTimeSlice().getMillis(), map(min, max,
aggregator.setCacheBatchSize(cacheBatchSize);
aggregator.setComputeMetric(compute6HourMetric);
aggregator.setDao(dao);
aggregator.setMaxScheduleId(maxScheduleId);
aggregator.setPermits(permits);
aggregator.setStartScheduleId(startScheduleId);
aggregator.setStartTime(get6HourTimeSlice());

return aggregator;
}

private Aggregator create6HourAggregator(int startScheduleId) {
private Aggregator create6HourAggregator() {
ComputeMetric compute24HourMetric = new ComputeMetric() {
@Override
public List<StorageResultSetFuture> execute(int startScheduleId, int scheduleId, Double min,
Expand All @@ -230,9 +231,7 @@ public List<StorageResultSetFuture> execute(int startScheduleId, int scheduleId,
aggregator.setCacheBatchSize(cacheBatchSize);
aggregator.setComputeMetric(compute24HourMetric);
aggregator.setDao(dao);
aggregator.setMaxScheduleId(maxScheduleId);
aggregator.setPermits(permits);
aggregator.setStartScheduleId(startScheduleId);
aggregator.setStartTime(get24HourTimeSlice());

return aggregator;
Expand Down
Expand Up @@ -5,8 +5,8 @@
import java.util.concurrent.atomic.AtomicInteger;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -19,6 +19,8 @@
import org.rhq.server.metrics.AbortedException;
import org.rhq.server.metrics.MetricsDAO;
import org.rhq.server.metrics.StorageResultSetFuture;
import org.rhq.server.metrics.domain.CacheIndexEntry;
import org.rhq.server.metrics.domain.CacheIndexEntryMapper;

/**
* @author John Sanda
Expand All @@ -29,10 +31,6 @@ class Aggregator {

private ComputeMetric computeMetric;

private int startScheduleId;

private int maxScheduleId;

private int cacheBatchSize;

private Semaphore permits;
Expand All @@ -47,25 +45,10 @@ class Aggregator {

private TaskTracker taskTracker = new TaskTracker();

private AsyncFunction<BatchResult, ResultSet> deleteCachePartition = new AsyncFunction<BatchResult, ResultSet>() {
@Override
public ListenableFuture<ResultSet> apply(BatchResult batchResult) throws Exception {
return dao.deleteCacheEntries(aggregationType.getCacheTable(), startTime.getMillis(), startScheduleId);
}
};

void setComputeMetric(ComputeMetric computeMetric) {
this.computeMetric = computeMetric;
}

void setStartScheduleId(int startScheduleId) {
this.startScheduleId = startScheduleId;
}

void setMaxScheduleId(int maxScheduleId) {
this.maxScheduleId = maxScheduleId;
}

void setCacheBatchSize(int cacheBatchSize) {
this.cacheBatchSize = cacheBatchSize;
}
Expand Down Expand Up @@ -94,16 +77,22 @@ public int execute() throws InterruptedException, AbortedException {
Stopwatch stopwatch = new Stopwatch().start();
AtomicInteger numSchedules = new AtomicInteger();
try {
for (int i = startScheduleId; i <= maxScheduleId; i += cacheBatchSize) {
StorageResultSetFuture indexFuture = dao.findCacheIndexEntries(aggregationType.getCacheTable(),
startTime.getMillis(), AggregationManager.INDEX_PARTITION);
ResultSet resultSet = indexFuture.get();
CacheIndexEntryMapper indexEntryMapper = new CacheIndexEntryMapper();

for (Row row : resultSet) {
CacheIndexEntry indexEntry = indexEntryMapper.map(row);
Stopwatch batchStopwatch = new Stopwatch().start();
permits.acquire();
StorageResultSetFuture queryFuture = dao.findCacheEntriesAsync(aggregationType.getCacheTable(),
startTime.getMillis(), i);
StorageResultSetFuture cacheFuture = dao.findCacheEntriesAsync(aggregationType.getCacheTable(),
startTime.getMillis(), indexEntry.getStartScheduleId());
taskTracker.addTask();
ListenableFuture<BatchResult> batchResultFuture = Futures.transform(queryFuture,
new ProcessBatch(dao, computeMetric, i, startTime, aggregationType,
cacheBatchSize), aggregationTasks);
Futures.addCallback(batchResultFuture, batchFinished(numSchedules, batchStopwatch), aggregationTasks);
ListenableFuture<BatchResult> batchResultFuture = Futures.transform(cacheFuture,
new ProcessBatch(dao, computeMetric, indexEntry, aggregationType, cacheBatchSize), aggregationTasks);
Futures.addCallback(batchResultFuture, batchFinished(indexEntry, numSchedules, batchStopwatch),
aggregationTasks);
}
taskTracker.finishedSchedulingTasks();
taskTracker.waitForTasksToFinish();
Expand All @@ -118,10 +107,40 @@ public int execute() throws InterruptedException, AbortedException {
}
}

private FutureCallback<BatchResult> batchFinished(final AtomicInteger numSchedules, final Stopwatch stopwatch) {
// public int execute() throws InterruptedException, AbortedException {
// Stopwatch stopwatch = new Stopwatch().start();
// AtomicInteger numSchedules = new AtomicInteger();
// try {
// for (int i = startScheduleId; i <= maxScheduleId; i += cacheBatchSize) {
// Stopwatch batchStopwatch = new Stopwatch().start();
// permits.acquire();
// StorageResultSetFuture queryFuture = dao.findCacheEntriesAsync(aggregationType.getCacheTable(),
// startTime.getMillis(), i);
// taskTracker.addTask();
// ListenableFuture<BatchResult> batchResultFuture = Futures.transform(queryFuture,
// new ProcessBatch(dao, computeMetric, i, startTime, aggregationType,
// cacheBatchSize), aggregationTasks);
// Futures.addCallback(batchResultFuture, batchFinished(numSchedules, batchStopwatch), aggregationTasks);
// }
// taskTracker.finishedSchedulingTasks();
// taskTracker.waitForTasksToFinish();
//
// return numSchedules.get();
// } finally {
// stopwatch.stop();
// if (LOG.isDebugEnabled()) {
// LOG.debug("Finished " + aggregationType + " aggregation of " + numSchedules + " schedules in " +
// stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
// }
// }
// }

private FutureCallback<BatchResult> batchFinished(final CacheIndexEntry indexEntry,
final AtomicInteger numSchedules, final Stopwatch stopwatch) {
return new FutureCallback<BatchResult>() {
@Override
public void onSuccess(BatchResult result) {
deleteCacheIndexEntry(indexEntry);
updateRemainingBatches();
int delta;
if (aggregationType == AggregationType.SIX_HOUR) {
Expand All @@ -133,7 +152,7 @@ public void onSuccess(BatchResult result) {
stopwatch.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Finished batch of " + aggregationType + " for " + delta + " schedules with starting " +
"schedule id " + result.getStartScheduleId() + " in " +
"schedule id " + indexEntry.getStartScheduleId() + " in " +
stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
}
}
Expand All @@ -143,22 +162,40 @@ public void onFailure(Throwable t) {
if (t instanceof BatchException) {
BatchException exception = (BatchException) t;
LOG.warn("There were errors while processing a batch of " + aggregationType + " with starting " +
"schedule id " + startScheduleId + ": " + exception.getErrorMessages());
"schedule id " + indexEntry.getStartScheduleId() + ": " + exception.getErrorMessages());
if (LOG.isDebugEnabled()) {
for (Throwable error : exception.getRootCauses()) {
LOG.debug("Root cause for batch error", error);
}
}
} else {
LOG.warn("There was an unexpected error while processing a batch of " + aggregationType +
" with starting schedule id " + startScheduleId, t);
" with starting schedule id " + indexEntry.getStartScheduleId(), t);
}
deleteCacheIndexEntry(indexEntry);
// TODO add some configurable strategy to determine whether or not to abort
updateRemainingBatches();
}
};
}

private void deleteCacheIndexEntry(CacheIndexEntry indexEntry) {
StorageResultSetFuture deleteFuture = dao.deleteCacheIndexEntries(aggregationType.getCacheTable(),
indexEntry.getInsertTimeSlice(), AggregationManager.INDEX_PARTITION, indexEntry.getStartScheduleId(),
indexEntry.getCollectionTimeSlice());
Futures.addCallback(deleteFuture, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
LOG.debug("deleted cache index entry");
}

@Override
public void onFailure(Throwable t) {
LOG.warn("Failed to delete cache index entry", t);
}
}, aggregationTasks);
}

private void updateRemainingBatches() {
permits.release();
taskTracker.finishedTask();
Expand Down
Expand Up @@ -5,52 +5,41 @@

import com.datastax.driver.core.ResultSet;

import org.joda.time.DateTime;
import org.rhq.server.metrics.domain.CacheIndexEntry;

/**
* @author John Sanda
*/
class BatchResult {

private List<ResultSet> insertResultSets;

private DateTime timeSlice;
private CacheIndexEntry cacheIndexEntry;

private int startScheduleId;
private List<ResultSet> insertResultSets;

private ResultSet purgeCacheResultSet;

private boolean empty;

public BatchResult(List<ResultSet> insertResultSets, DateTime timeSlice, int startScheduleId,
public BatchResult(List<ResultSet> insertResultSets, CacheIndexEntry cacheIndexEntry,
ResultSet purgeCacheResultSet) {
this.insertResultSets = insertResultSets;
this.timeSlice = timeSlice;
this.startScheduleId = startScheduleId;
this.cacheIndexEntry = cacheIndexEntry;
this.purgeCacheResultSet = purgeCacheResultSet;
}

public BatchResult(DateTime timeSlice, int startScheduleId) {
this.timeSlice = timeSlice;
this.startScheduleId = startScheduleId;
public BatchResult(CacheIndexEntry cacheIndexEntry) {
this.cacheIndexEntry = cacheIndexEntry;
insertResultSets = Collections.emptyList();
empty = true;
}

boolean isEmpty() {
return empty;
return insertResultSets.isEmpty();
}

public List<ResultSet> getInsertResultSets() {
return insertResultSets;
}

public DateTime getTimeSlice() {
return timeSlice;
}

public int getStartScheduleId() {
return startScheduleId;
CacheIndexEntry getCacheIndexEntry() {
return cacheIndexEntry;
}

public ResultSet getPurgeCacheResultSet() {
Expand Down

0 comments on commit 63f166c

Please sign in to comment.