Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
introduce AggregationTask to provide error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed Apr 7, 2014
1 parent d0bdbf8 commit 239d0af
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,31 @@ abstract class BaseAggregator {

private int cacheBatchSize;

/**
* AggregationTask is a Runnable that computes aggregates for a set of schedules in a {@link CacheIndexEntry}.
* If there are any unexpected errors, e.g., a NullPointerException, aggregation will be aborted.
*/
protected abstract class AggregationTask implements Runnable {

private CacheIndexEntry indexEntry;

public AggregationTask(CacheIndexEntry indexEntry) {
this.indexEntry = indexEntry;
}

@Override
public void run() {
try {
run(indexEntry);
} catch (Exception e) {
LOG.error("Aggregation will be aborted due to an unexpected error", e);
taskTracker.abort("Aborting aggregation due to an unexpected error: " + e.getMessage());
}
}

abstract void run(CacheIndexEntry indexEntry);
}

protected class AggregationTaskFinishedCallback<T> implements FutureCallback<T> {
@Override
public void onSuccess(T args) {
Expand Down Expand Up @@ -144,7 +169,7 @@ public void onFailure(Throwable t) {

protected abstract ListenableFuture<List<CacheIndexEntry>> findIndexEntries();

protected abstract Runnable createAggregationTask(CacheIndexEntry indexEntry);
protected abstract AggregationTask createAggregationTask(CacheIndexEntry indexEntry);

protected abstract Map<AggregationType, Integer> getAggregationCounts();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ protected ListenableFuture<List<CacheIndexEntry>> findIndexEntries() {
}

@Override
protected Runnable createAggregationTask(final CacheIndexEntry indexEntry) {
return new Runnable() {
protected AggregationTask createAggregationTask(CacheIndexEntry indexEntry) {
return new AggregationTask(indexEntry) {
@Override
public void run() {
void run(CacheIndexEntry indexEntry) {
StorageResultSetFuture cacheFuture = dao.findCacheEntriesAsync(aggregationType.getCacheTable(),
startTime.getMillis(), indexEntry.getStartScheduleId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ public List<CacheIndexEntry> apply(List<ResultSet> resultSets) {
}

@Override
protected Runnable createAggregationTask(final CacheIndexEntry indexEntry) {
return new Runnable() {
protected AggregationTask createAggregationTask(CacheIndexEntry indexEntry) {
return new AggregationTask(indexEntry) {
@Override
public void run() {
public void run(CacheIndexEntry indexEntry) {
if (indexEntry.getScheduleIds().isEmpty()) {
StorageResultSetFuture cacheFuture = dao.findCacheEntriesAsync(aggregationType.getCacheTable(),
indexEntry.getCollectionTimeSlice(), indexEntry.getStartScheduleId());
Expand Down

0 comments on commit 239d0af

Please sign in to comment.