From cd23274b21c8e5b38762a1dc3792927f87688547 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Mon, 31 Aug 2020 08:20:18 +1000 Subject: [PATCH] Avoid performing duplicate tasks when they are rebased (#2690) --- .../core/stategenerator/CachingTaskQueue.java | 17 +++++--- .../stategenerator/CachingTaskQueueTest.java | 40 +++++++++++++++++++ 2 files changed, 51 insertions(+), 6 deletions(-) diff --git a/ethereum/core/src/main/java/tech/pegasys/teku/core/stategenerator/CachingTaskQueue.java b/ethereum/core/src/main/java/tech/pegasys/teku/core/stategenerator/CachingTaskQueue.java index faa3966d250..0e290cf7934 100644 --- a/ethereum/core/src/main/java/tech/pegasys/teku/core/stategenerator/CachingTaskQueue.java +++ b/ethereum/core/src/main/java/tech/pegasys/teku/core/stategenerator/CachingTaskQueue.java @@ -123,6 +123,9 @@ public synchronized SafeFuture> perform(final CacheableTask ta return currentPendingTask; } + final SafeFuture> generationResult = new SafeFuture<>(); + pendingTasks.put(task.getKey(), generationResult); + // Check if there's a better starting point (in cache or in progress) final Optional>> newBase = task.streamIntermediateSteps() @@ -135,24 +138,26 @@ public synchronized SafeFuture> perform(final CacheableTask ta .findFirst(); if (newBase.isPresent()) { rebasedTaskCounter.inc(); - return newBase.get().thenCompose(ancestorResult -> queueTask(task.rebase(ancestorResult))); + newBase + .get() + .thenAccept(ancestorResult -> queueTask(task.rebase(ancestorResult))) + .finish(error -> completePendingTask(task, Optional.empty(), error)); + return generationResult; } // Schedule the task for execution newTaskCounter.inc(); - return queueTask(task); + queueTask(task); + return generationResult; } public Optional getIfAvailable(final K key) { return Optional.ofNullable(cache.get(key)); } - private SafeFuture> queueTask(final CacheableTask task) { - final SafeFuture> generationResult = new SafeFuture<>(); - pendingTasks.put(task.getKey(), generationResult); + private void queueTask(final CacheableTask task) { queuedTasks.add(task); tryProcessNext(); - return generationResult; } private synchronized void tryProcessNext() { diff --git a/ethereum/core/src/test/java/tech/pegasys/teku/core/stategenerator/CachingTaskQueueTest.java b/ethereum/core/src/test/java/tech/pegasys/teku/core/stategenerator/CachingTaskQueueTest.java index 819572e17b4..897c1931570 100644 --- a/ethereum/core/src/test/java/tech/pegasys/teku/core/stategenerator/CachingTaskQueueTest.java +++ b/ethereum/core/src/test/java/tech/pegasys/teku/core/stategenerator/CachingTaskQueueTest.java @@ -144,6 +144,38 @@ void shouldRebaseOntoIntermediateStepsWhenPossible() { assertRebasedTaskCount(1); } + @Test + void shouldNotPerformDuplicateTasksWhenTasksAreRebased() { + final StubTask taskA = new StubTask(1); + final StubTask taskB = new StubTask(5, 1); + final StubTask taskC = new StubTask(5, 1); + + final SafeFuture> resultA = taskQueue.perform(taskA); + final SafeFuture> resultB = taskQueue.perform(taskB); + final SafeFuture> resultC = taskQueue.perform(taskC); + + assertPendingTaskCount(2); // B & C were de-duplicated + + taskA.assertPerformedWithoutRebase(); + // Task B will be scheduled for rebase when A completes + // Task C should just use the pending future from B and never execute + taskC.assertNotRebased(); + taskC.assertNotPerformed(); + + taskA.completeTask(); + taskB.assertPerformedFrom(taskA.getExpectedValue().orElseThrow()); + taskC.assertNotRebased(); + taskC.assertNotPerformed(); + + taskB.completeTask(); + + assertThat(resultA).isCompletedWithValue(taskA.getExpectedValue()); + assertThat(resultB).isCompletedWithValue(taskB.getExpectedValue()); + assertThat(resultC).isCompletedWithValue(taskC.getExpectedValue()); + taskC.assertNotRebased(); + taskC.assertNotPerformed(); + } + @Test void getIfAvailable_shouldReturnValueWhenPresent() { final StubTask task = new StubTask(1); @@ -243,6 +275,14 @@ private void assertRebasedTaskCount(final int expectedCount) { assertThat(value).isEqualTo(expectedCount); } + private void assertPendingTaskCount(final int expectedCount) { + final double value = + metricsSystem + .getGauge(TekuMetricCategory.STORAGE, METRICS_PREFIX + "_tasks_requested") + .getValue(); + assertThat(value).isEqualTo(expectedCount); + } + public static class StubTask implements CacheableTask { private final SafeFuture> result = new SafeFuture<>(); private final Integer key;