Skip to content

Commit

Permalink
Fixes a race in shutdown of DynamicTaskExecutor
Browse files Browse the repository at this point in the history
where shutdown() might race with processors() and the processor
would forever miss the fact that it had been shutdown.
  • Loading branch information
tinwelint committed Aug 13, 2016
1 parent a7f7a94 commit 4dce023
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 27 deletions.
Expand Up @@ -83,34 +83,42 @@ public int processors( int delta )
return processors.length;
}

int requestedNumber = processors.length + delta;
if ( delta > 0 )
synchronized ( this )
{
requestedNumber = min( requestedNumber, maxProcessorCount );
if ( requestedNumber > processors.length )
if ( shutDown )
{
Processor[] newProcessors = Arrays.copyOf( processors, requestedNumber );
for ( int i = processors.length; i < requestedNumber; i++ )
return processors.length;
}

int requestedNumber = processors.length + delta;
if ( delta > 0 )
{
requestedNumber = min( requestedNumber, maxProcessorCount );
if ( requestedNumber > processors.length )
{
newProcessors[i] = new Processor( processorThreadNamePrefix + "-" + i );
Processor[] newProcessors = Arrays.copyOf( processors, requestedNumber );
for ( int i = processors.length; i < requestedNumber; i++ )
{
newProcessors[i] = new Processor( processorThreadNamePrefix + "-" + i );
}
this.processors = newProcessors;
}
this.processors = newProcessors;
}
}
else
{
requestedNumber = max( 1, requestedNumber );
if ( requestedNumber < processors.length )
else
{
Processor[] newProcessors = Arrays.copyOf( processors, requestedNumber );
for ( int i = newProcessors.length; i < processors.length; i++ )
requestedNumber = max( 1, requestedNumber );
if ( requestedNumber < processors.length )
{
processors[i].shutDown = true;
Processor[] newProcessors = Arrays.copyOf( processors, requestedNumber );
for ( int i = newProcessors.length; i < processors.length; i++ )
{
processors[i].processorShutDown = true;
}
this.processors = newProcessors;
}
this.processors = newProcessors;
}
return processors.length;
}
return processors.length;
}

@Override
Expand Down Expand Up @@ -157,17 +165,13 @@ public synchronized void shutdown( int flags )
return;
}

this.shutDown = true;
boolean awaitAllCompleted = (flags & TaskExecutor.SF_AWAIT_ALL_COMPLETED) != 0;
while ( awaitAllCompleted && !queue.isEmpty() && panic == null /*all bets are off in the event of panic*/ )
{
parkAWhile();
}
this.shutDown = true;
this.abortQueued = (flags & TaskExecutor.SF_ABORT_QUEUED) != 0;
for ( Processor processor : processors )
{
processor.shutDown = true;
}
while ( awaitAllCompleted && anyAlive() && panic == null /*all bets are off in the event of panic*/ )
{
parkAWhile();
Expand Down Expand Up @@ -201,7 +205,9 @@ public void uncaughtException( Thread t, Throwable e )

private class Processor extends Thread
{
private volatile boolean shutDown;
// In addition to the global shutDown flag in the executor each processor has a local flag
// so that an individual processor can be shut down, for example when reducing number of processors
private volatile boolean processorShutDown;

Processor( String name )
{
Expand All @@ -215,7 +221,7 @@ public void run()
{
// Initialized here since it's the thread itself that needs to call it
final LOCAL threadLocalState = initialLocalState.get();
while ( !abortQueued && !shutDown )
while ( !shutDown && !processorShutDown )
{
Task<LOCAL> task = queue.poll();
if ( task != null )
Expand All @@ -233,7 +239,7 @@ public void run()
}
else
{
if ( shutDown )
if ( processorShutDown )
{
break;
}
Expand Down
40 changes: 39 additions & 1 deletion community/kernel/src/test/java/org/neo4j/test/Race.java
Expand Up @@ -23,6 +23,11 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

/**
* Simple race scenario, a utility for executing multiple threads coordinated to start at the same time.
Expand Down Expand Up @@ -52,7 +57,25 @@ public void addContestant( Runnable contestant )
contestants.add( new Contestant( contestant, contestants.size() ) );
}

/**
* Starts the race and waits indefinitely for all contestants to either fail or succeed.
*
* @throws Throwable on any exception thrown from any contestant.
*/
public void go() throws Throwable
{
go( 0, TimeUnit.MILLISECONDS );
}

/**
* Starts the race and waits {@code maxWaitTime} for all contestants to either fail or succeed.
*
* @param maxWaitTime max time to wait for all contestants, 0 means indefinite wait.
* @param unit {@link TimeUnit} that {£{@code maxWaitTime} is given in.
* @throws TimeoutException if all contestants haven't either succeeded or failed within the given time.
* @throws Throwable on any exception thrown from any contestant.
*/
public void go( long maxWaitTime, TimeUnit unit ) throws Throwable
{
readySet = new CountDownLatch( contestants.size() );
for ( Contestant contestant : contestants )
Expand All @@ -63,9 +86,24 @@ public void go() throws Throwable
go.countDown();

int errorCount = 0;
long maxWaitTimeMillis = MILLISECONDS.convert( maxWaitTime, unit );
long waitedSoFar = 0;
for ( Contestant contestant : contestants )
{
contestant.join();
if ( maxWaitTime == 0 )
{
contestant.join();
}
else
{
if ( waitedSoFar >= maxWaitTimeMillis )
{
throw new TimeoutException( "Didn't complete after " + maxWaitTime + " " + unit );
}
long time = currentTimeMillis();
contestant.join( maxWaitTimeMillis - waitedSoFar );
waitedSoFar += (currentTimeMillis() - time);
}
if ( contestant.error != null )
{
errorCount++;
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.unsafe.impl.batchimport.executor;

import org.junit.Rule;
import org.junit.Test;

import java.io.IOException;
Expand All @@ -28,6 +29,9 @@
import org.neo4j.test.DoubleLatch;
import org.neo4j.test.OtherThreadExecutor;
import org.neo4j.test.OtherThreadExecutor.WorkerCommand;
import org.neo4j.test.Race;
import org.neo4j.test.RepeatRule;
import org.neo4j.test.RepeatRule.Repeat;
import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy.Park;

import static org.junit.Assert.assertEquals;
Expand All @@ -36,6 +40,7 @@
import static org.junit.Assert.fail;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_ABORT_QUEUED;
import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_AWAIT_ALL_COMPLETED;
Expand All @@ -44,6 +49,9 @@ public class DynamicTaskExecutorTest
{
private static final Park PARK = new ParkStrategy.Park( 1, MILLISECONDS );

@Rule
public final RepeatRule repeater = new RepeatRule();

@Test
public void shouldExecuteTasksInParallel() throws Exception
{
Expand Down Expand Up @@ -296,6 +304,23 @@ public void shouldRespectMaxProcessors() throws Exception
executor.shutdown( SF_AWAIT_ALL_COMPLETED );
}

@Repeat( times = 100 )
@Test
public void shouldCopeWithConcurrentIncrementOfProcessorsAndShutdown() throws Throwable
{
// GIVEN
TaskExecutor<Void> executor = new DynamicTaskExecutor<>( 1, 2, 2, PARK, "test" );
Race race = new Race( true );
race.addContestant( () -> executor.shutdown( SF_AWAIT_ALL_COMPLETED ) );
race.addContestant( () -> executor.processors( 1 ) );

// WHEN
race.go( 10, SECONDS );

// THEN we should be able to do so, there was a recent fix here and before that fix
// shutdown() would hang, that's why we wait for 10 seconds here to cap it if there's an issue.
}

private void assertExceptionOnSubmit( TaskExecutor<Void> executor, IOException exception )
{
Exception submitException = null;
Expand Down

0 comments on commit 4dce023

Please sign in to comment.