Skip to content

Commit

Permalink
Fix the race between close and destroy in the Driver
Browse files Browse the repository at this point in the history
There is a race condition where a driver thread may not destroy the
operators even though it's closed. This is not desirable as there are
parts of the code that rely on the operators to be destroyed by the
driver, e.g., memory tracking related code.

The race occurs when a driver thread T1 is in the tryWithLock method and
holds the exclusiveLock, and it has already called the
destroyIfNecessary() method. At this point T1 hasn't destroyed the operators
yet as the driver hasn't been closed. Now if the task owning those splits
gets aborted (say due to a LIMIT query), another thread T2 will call
driver.close(), and in close() it will try to acquire the lock to destroy the
operators, but T1 still holds that lock. Then, T1 releases the lock and checks
the condition `while (pendingTaskSourceUpdates.get() != null
&& state.get() == State.ALIVE && exclusiveLock.tryLock())`, and this condition
is false as the state is NEED_DESTRUCTION (as the driver is closed by T2). At this
point T1 just exits without destroying the operators.
  • Loading branch information
nezihyigitbasi committed Aug 10, 2018
1 parent 575a9a6 commit c25ab36
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,10 @@ private <T> Optional<T> tryWithLock(long timeout, TimeUnit unit, Supplier<T> tas
// If there are more source updates available, attempt to reacquire the lock and process them.
// This can happen if new sources are added while we're holding the lock here doing work.
// NOTE: this is separate duplicate code to make debugging lock reacquisition easier
while (pendingTaskSourceUpdates.get() != null && state.get() == State.ALIVE && exclusiveLock.tryLock()) {
// The first condition is for processing the pending updates if this driver is still ALIVE
// The second condition is to destroy the driver if the state is NEED_DESTRUCTION
while (((pendingTaskSourceUpdates.get() != null && state.get() == State.ALIVE) || state.get() == State.NEED_DESTRUCTION)
&& exclusiveLock.tryLock()) {
try {
try {
processNewSources();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import io.airlift.units.Duration;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -116,6 +117,26 @@ public void testNormalFinish()
assertTrue(source.isFinished());
}

// The race can be reproduced somewhat reliably when the invocationCount is 10K, but we use 1K iterations to cap the test runtime.
@Test(invocationCount = 1_000, timeOut = 1_000)
public void testConcurrentClose()
{
List<Type> types = ImmutableList.of(VARCHAR, BIGINT, BIGINT);
OperatorContext operatorContext = driverContext.addOperatorContext(0, new PlanNodeId("test"), "values");
ValuesOperator source = new ValuesOperator(operatorContext, rowPagesBuilder(types)
.addSequencePage(10, 20, 30, 40)
.build());

Operator sink = createSinkOperator(types);
Driver driver = Driver.createDriver(driverContext, source, sink);
// let these threads race
scheduledExecutor.submit(() -> driver.processFor(new Duration(1, TimeUnit.NANOSECONDS))); // don't want to call isFinishedInternal in processFor
scheduledExecutor.submit(() -> driver.close());
while (!driverContext.isDone()) {
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
}
}

@Test
public void testAbruptFinish()
{
Expand Down

0 comments on commit c25ab36

Please sign in to comment.