From c25ab366bdb08d7a0813c7cff9c521e83633ee48 Mon Sep 17 00:00:00 2001 From: Nezih Yigitbasi Date: Thu, 9 Aug 2018 15:33:33 -0700 Subject: [PATCH] Fix the race between close and destroy in the Driver There is a race condition where a driver thread may not destroy the operators even though it's closed. This is not desirable as there are parts of the code that rely on the operators to be destroyed by the driver, e.g., memory tracking related code. The race occurs when a driver thread T1 is in the tryWithLock method and holds the exclusiveLock, and it has already called the destroyIfNecessary() method. At this point T1 hasn't destroyed the operators yet as the driver hasn't been closed. Now if the task owning those splits gets aborted (say due to a LIMIT query), another thread T2 will call driver.close(), and in close() it will try to acquire the lock to destroy the operators, but T1 still holds that lock. Then, T1 releases the lock and checks the condition `while (pendingTaskSourceUpdates.get() != null && state.get() == State.ALIVE && exclusiveLock.tryLock())`, and this condition is false as the state is NEED_DESTRUCTION (as the driver is closed by T2). At this point T1 just exits without destroying the operators. --- .../com/facebook/presto/operator/Driver.java | 5 ++++- .../facebook/presto/operator/TestDriver.java | 21 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/presto-main/src/main/java/com/facebook/presto/operator/Driver.java b/presto-main/src/main/java/com/facebook/presto/operator/Driver.java index 888c95bf60e60..7397cd4c49205 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/Driver.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/Driver.java @@ -688,7 +688,10 @@ private Optional tryWithLock(long timeout, TimeUnit unit, Supplier tas // If there are more source updates available, attempt to reacquire the lock and process them. // This can happen if new sources are added while we're holding the lock here doing work. // NOTE: this is separate duplicate code to make debugging lock reacquisition easier - while (pendingTaskSourceUpdates.get() != null && state.get() == State.ALIVE && exclusiveLock.tryLock()) { + // The first condition is for processing the pending updates if this driver is still ALIVE + // The second condition is to destroy the driver if the state is NEED_DESTRUCTION + while (((pendingTaskSourceUpdates.get() != null && state.get() == State.ALIVE) || state.get() == State.NEED_DESTRUCTION) + && exclusiveLock.tryLock()) { try { try { processNewSources(); diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestDriver.java b/presto-main/src/test/java/com/facebook/presto/operator/TestDriver.java index 5138d50abb9e6..232b57512b99d 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestDriver.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestDriver.java @@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.Uninterruptibles; import io.airlift.units.Duration; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -116,6 +117,26 @@ public void testNormalFinish() assertTrue(source.isFinished()); } + // The race can be reproduced somewhat reliably when the invocationCount is 10K, but we use 1K iterations to cap the test runtime. + @Test(invocationCount = 1_000, timeOut = 1_000) + public void testConcurrentClose() + { + List types = ImmutableList.of(VARCHAR, BIGINT, BIGINT); + OperatorContext operatorContext = driverContext.addOperatorContext(0, new PlanNodeId("test"), "values"); + ValuesOperator source = new ValuesOperator(operatorContext, rowPagesBuilder(types) + .addSequencePage(10, 20, 30, 40) + .build()); + + Operator sink = createSinkOperator(types); + Driver driver = Driver.createDriver(driverContext, source, sink); + // let these threads race + scheduledExecutor.submit(() -> driver.processFor(new Duration(1, TimeUnit.NANOSECONDS))); // don't want to call isFinishedInternal in processFor + scheduledExecutor.submit(() -> driver.close()); + while (!driverContext.isDone()) { + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); + } + } + @Test public void testAbruptFinish() {