Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
more refactoring to correctly compute the number of schedules processed
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed Apr 7, 2014
1 parent b9aeca6 commit d0bdbf8
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -79,13 +80,21 @@ public Set<AggregateNumericMetric> run() {
try {
PersistFunctions persistFunctions = new PersistFunctions(dao, dtService);

createPastDataAggregator(persistFunctions).execute();
numRaw = createRawAggregator(persistFunctions).execute();
Map<AggregationType, Integer> counts = createPastDataAggregator(persistFunctions).execute();
numRaw += counts.get(AggregationType.RAW);
num1Hour += counts.get(AggregationType.ONE_HOUR);
num6Hour += counts.get(AggregationType.SIX_HOUR);

counts = createRawAggregator(persistFunctions).execute();
numRaw += counts.get(AggregationType.RAW);

if (is6HourTimeSliceFinished()) {
num1Hour = create1HourAggregator(persistFunctions).execute();
counts = create1HourAggregator(persistFunctions).execute();
num1Hour += counts.get(AggregationType.ONE_HOUR);
}
if (is24HourTimeSliceFinished()) {
num6Hour = create6HourAggregator(persistFunctions).execute();
counts = create6HourAggregator(persistFunctions).execute();
num6Hour += counts.get(AggregationType.SIX_HOUR);
}

return oneHourData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -55,12 +55,30 @@ abstract class BaseAggregator {

protected TaskTracker taskTracker = new TaskTracker();

// Currently this is only used by CacheAggregator as a hook to return the 1 hour data
// that is needed for subsequent baseline calculations
protected CacheBlockFinishedListener cacheBlockFinishedListener;

private int cacheBatchSize;

protected class AggregationTaskFinishedCallback<T> implements FutureCallback<T> {
@Override
public void onSuccess(T args) {
try {
onFinish(args);
} finally {
permits.release();
taskTracker.finishedTask();
}
}

protected void onFinish(T args) {
}

@Override
public void onFailure(Throwable t) {
LOG.warn("There was an error aggregating data", t);
permits.release();
taskTracker.finishedTask();
}
}

void setDao(MetricsDAO dao) {
this.dao = dao;
}
Expand Down Expand Up @@ -93,7 +111,7 @@ void setCacheBatchSize(int cacheBatchSize) {
this.cacheBatchSize = cacheBatchSize;
}

public int execute() throws InterruptedException, AbortedException {
public Map<AggregationType, Integer> execute() throws InterruptedException, AbortedException {
Stopwatch stopwatch = new Stopwatch().start();
AtomicInteger numSchedules = new AtomicInteger();
try {
Expand All @@ -114,7 +132,7 @@ public void onFailure(Throwable t) {
});
taskTracker.waitForTasksToFinish();

return numSchedules.get();
return getAggregationCounts();
} finally {
stopwatch.stop();
if (LOG.isDebugEnabled()) {
Expand All @@ -124,9 +142,11 @@ public void onFailure(Throwable t) {
}
}

abstract ListenableFuture<List<CacheIndexEntry>> findIndexEntries();
protected abstract ListenableFuture<List<CacheIndexEntry>> findIndexEntries();

protected abstract Runnable createAggregationTask(CacheIndexEntry indexEntry);

abstract Runnable createAggregationTask(CacheIndexEntry indexEntry);
protected abstract Map<AggregationType, Integer> getAggregationCounts();

private void scheduleTasks(List<CacheIndexEntry> indexEntries) {
try {
Expand Down Expand Up @@ -229,39 +249,5 @@ public Iterator<List<T>> iterator() {
};
}

protected FutureCallback<ResultSet> cacheBlockFinished(final ListenableFuture<IndexAggregatesPair> pairFuture) {
return new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
if (cacheBlockFinishedListener != null) {
notifyListener(pairFuture);
}
permits.release();
taskTracker.finishedTask();
}

@Override
public void onFailure(Throwable t) {
LOG.warn("There was an error aggregating data", t);
permits.release();
taskTracker.finishedTask();
}
};
}

private void notifyListener(ListenableFuture<IndexAggregatesPair> pairFuture) {
try {
IndexAggregatesPair pair = pairFuture.get();
cacheBlockFinishedListener.onFinish(pair);
} catch (InterruptedException e) {
LOG.info("There was an interrupt while trying to notify the cache block finished listener", e);
} catch (ExecutionException e) {
LOG.error("There was an unexpected error obtaining the " + IndexAggregatesPair.class.getSimpleName() +
". This should not happen!", e);
}
}

static interface CacheBlockFinishedListener {
void onFinish(IndexAggregatesPair pair);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package org.rhq.server.metrics.aggregation;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import com.datastax.driver.core.ResultSet;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -27,8 +30,16 @@ class CacheAggregator extends BaseAggregator {

private static final int LATE_DATA_BATCH_SIZE = 5;

static interface CacheBlockFinishedListener {
void onFinish(IndexAggregatesPair pair);
}

private DateTime currentDay;

private AtomicInteger schedulesCount = new AtomicInteger();

private CacheBlockFinishedListener cacheBlockFinishedListener;

void setStartTime(DateTime startTime) {
this.startTime = startTime;
}
Expand All @@ -42,22 +53,28 @@ void setCacheBlockFinishedListener(CacheBlockFinishedListener cacheBlockFinished
}

@Override
ListenableFuture<List<CacheIndexEntry>> findIndexEntries() {
protected ListenableFuture<List<CacheIndexEntry>> findIndexEntries() {
return findCurrentCacheIndexEntries();
}

@Override
Runnable createAggregationTask(final CacheIndexEntry indexEntry) {
protected Runnable createAggregationTask(final CacheIndexEntry indexEntry) {
return new Runnable() {
@Override
public void run() {
StorageResultSetFuture cacheFuture = dao.findCacheEntriesAsync(aggregationType.getCacheTable(),
startTime.getMillis(), indexEntry.getStartScheduleId());

processCacheBlock(indexEntry, cacheFuture, persistMetrics);
}
};
}

@Override
protected Map<AggregationType, Integer> getAggregationCounts() {
return ImmutableMap.of(aggregationType, schedulesCount.get());
}

private ListenableFuture<List<CacheIndexEntry>> findCurrentCacheIndexEntries() {
StorageResultSetFuture indexFuture = dao.findCurrentCacheIndexEntries(aggregationType.getCacheTable(),
currentDay.getMillis(), AggregationManager.INDEX_PARTITION, startTime.getMillis());
Expand All @@ -73,8 +90,8 @@ public List<CacheIndexEntry> apply(ResultSet resultSet) {
}

@SuppressWarnings("unchecked")
protected void processCacheBlock(CacheIndexEntry indexEntry, StorageResultSetFuture cacheFuture,
AsyncFunction<IndexAggregatesPair, List<ResultSet>> persistMetricsFn) {
protected void processCacheBlock(CacheIndexEntry indexEntry,
StorageResultSetFuture cacheFuture, AsyncFunction<IndexAggregatesPair, List<ResultSet>> persistMetricsFn) {

ListenableFuture<Iterable<List<RawNumericMetric>>> iterableFuture = Futures.transform(cacheFuture,
toIterable(aggregationType.getCacheMapper()), aggregationTasks);
Expand All @@ -94,6 +111,25 @@ protected void processCacheBlock(CacheIndexEntry indexEntry, StorageResultSetFut
ListenableFuture<ResultSet> deleteCacheIndexFuture = Futures.transform(deleteCacheFuture,
deleteCacheIndexEntry(indexEntry), aggregationTasks);

Futures.addCallback(deleteCacheIndexFuture, cacheBlockFinished(pairFuture), aggregationTasks);
aggregationTaskFinished(deleteCacheIndexFuture, pairFuture);
}

@SuppressWarnings("unchecked")
private void aggregationTaskFinished(ListenableFuture<ResultSet> deleteCacheIndexFuture,
ListenableFuture<IndexAggregatesPair> pairFuture) {

final ListenableFuture<List<Object>> argsFuture = Futures.allAsList(deleteCacheIndexFuture, pairFuture);

Futures.addCallback(argsFuture, new AggregationTaskFinishedCallback<List<Object>>() {
@Override
protected void onFinish(List<Object> args) {
IndexAggregatesPair pair = (IndexAggregatesPair) args.get(1);

if (cacheBlockFinishedListener != null) {
cacheBlockFinishedListener.onFinish(pair);
}
schedulesCount.addAndGet(pair.metrics.size());
}
}, aggregationTasks);
}
}

0 comments on commit d0bdbf8

Please sign in to comment.