Skip to content

Commit

Permalink
Avoid performing duplicate tasks when they are rebased (Consensys#2690)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton committed Aug 30, 2020
1 parent 074de9c commit cd23274
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 6 deletions.
Expand Up @@ -123,6 +123,9 @@ public synchronized SafeFuture<Optional<V>> perform(final CacheableTask<K, V> ta
return currentPendingTask;
}

final SafeFuture<Optional<V>> generationResult = new SafeFuture<>();
pendingTasks.put(task.getKey(), generationResult);

// Check if there's a better starting point (in cache or in progress)
final Optional<SafeFuture<Optional<V>>> newBase =
task.streamIntermediateSteps()
Expand All @@ -135,24 +138,26 @@ public synchronized SafeFuture<Optional<V>> perform(final CacheableTask<K, V> ta
.findFirst();
if (newBase.isPresent()) {
rebasedTaskCounter.inc();
return newBase.get().thenCompose(ancestorResult -> queueTask(task.rebase(ancestorResult)));
newBase
.get()
.thenAccept(ancestorResult -> queueTask(task.rebase(ancestorResult)))
.finish(error -> completePendingTask(task, Optional.empty(), error));
return generationResult;
}

// Schedule the task for execution
newTaskCounter.inc();
return queueTask(task);
queueTask(task);
return generationResult;
}

public Optional<V> getIfAvailable(final K key) {
return Optional.ofNullable(cache.get(key));
}

private SafeFuture<Optional<V>> queueTask(final CacheableTask<K, V> task) {
final SafeFuture<Optional<V>> generationResult = new SafeFuture<>();
pendingTasks.put(task.getKey(), generationResult);
private void queueTask(final CacheableTask<K, V> task) {
queuedTasks.add(task);
tryProcessNext();
return generationResult;
}

private synchronized void tryProcessNext() {
Expand Down
Expand Up @@ -144,6 +144,38 @@ void shouldRebaseOntoIntermediateStepsWhenPossible() {
assertRebasedTaskCount(1);
}

@Test
void shouldNotPerformDuplicateTasksWhenTasksAreRebased() {
final StubTask taskA = new StubTask(1);
final StubTask taskB = new StubTask(5, 1);
final StubTask taskC = new StubTask(5, 1);

final SafeFuture<Optional<String>> resultA = taskQueue.perform(taskA);
final SafeFuture<Optional<String>> resultB = taskQueue.perform(taskB);
final SafeFuture<Optional<String>> resultC = taskQueue.perform(taskC);

assertPendingTaskCount(2); // B & C were de-duplicated

taskA.assertPerformedWithoutRebase();
// Task B will be scheduled for rebase when A completes
// Task C should just use the pending future from B and never execute
taskC.assertNotRebased();
taskC.assertNotPerformed();

taskA.completeTask();
taskB.assertPerformedFrom(taskA.getExpectedValue().orElseThrow());
taskC.assertNotRebased();
taskC.assertNotPerformed();

taskB.completeTask();

assertThat(resultA).isCompletedWithValue(taskA.getExpectedValue());
assertThat(resultB).isCompletedWithValue(taskB.getExpectedValue());
assertThat(resultC).isCompletedWithValue(taskC.getExpectedValue());
taskC.assertNotRebased();
taskC.assertNotPerformed();
}

@Test
void getIfAvailable_shouldReturnValueWhenPresent() {
final StubTask task = new StubTask(1);
Expand Down Expand Up @@ -243,6 +275,14 @@ private void assertRebasedTaskCount(final int expectedCount) {
assertThat(value).isEqualTo(expectedCount);
}

private void assertPendingTaskCount(final int expectedCount) {
final double value =
metricsSystem
.getGauge(TekuMetricCategory.STORAGE, METRICS_PREFIX + "_tasks_requested")
.getValue();
assertThat(value).isEqualTo(expectedCount);
}

public static class StubTask implements CacheableTask<Integer, String> {
private final SafeFuture<Optional<String>> result = new SafeFuture<>();
private final Integer key;
Expand Down

0 comments on commit cd23274

Please sign in to comment.