Skip to content

Commit

Permalink
Update test step completion waiting
Browse files Browse the repository at this point in the history
  • Loading branch information
MishaDemianenko committed May 14, 2018
1 parent c7aef06 commit e9397ac
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 38 deletions.
Expand Up @@ -43,7 +43,7 @@ public LonelyProcessingStep( StageControl control, String name, Configuration co
@Override
public long receive( long ticket, Void nothing )
{
startProcessing( () -> {
new Thread( () -> {
assertHealthy();
try
{
Expand Down Expand Up @@ -75,15 +75,10 @@ public long receive( long ticket, Void nothing )
throw e;
}
}
} );
} ).start();
return 0;
}

protected void startProcessing( Runnable runnable )
{
new Thread( runnable ).start();
}

/**
* Called once and signals the start of this step. Responsible for calling {@link #progress(long)}
* at least now and then.
Expand Down
Expand Up @@ -19,17 +19,12 @@
*/
package org.neo4j.unsafe.impl.batchimport.staging;

import org.junit.After;
import org.junit.ClassRule;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.neo4j.concurrent.BinaryLatch;
import org.neo4j.test.rule.SuppressOutput;
import org.neo4j.unsafe.impl.batchimport.Configuration;

Expand All @@ -41,27 +36,19 @@ public class LonelyProcessingStepTest
@ClassRule
public static SuppressOutput mute = SuppressOutput.suppressAll();

private final ExecutorService executors = Executors.newCachedThreadPool();

@After
public void tearDown() throws Exception
{
executors.shutdown();
executors.awaitTermination( 1, TimeUnit.MINUTES );
}

@Test
public void issuePanicBeforeCompletionOnError()
@Test( timeout = 10_000 )
public void issuePanicBeforeCompletionOnError() throws InterruptedException
{
List<Step<?>> stepsPipeline = new ArrayList<>();
BinaryLatch endOfUpstreamLatch = new BinaryLatch();
FaultyLonelyProcessingStepTest faultyStep =
new FaultyLonelyProcessingStepTest( stepsPipeline, endOfUpstreamLatch );
FaultyLonelyProcessingStepTest faultyStep = new FaultyLonelyProcessingStepTest( stepsPipeline );
stepsPipeline.add( faultyStep );

faultyStep.receive( 1, null );

endOfUpstreamLatch.await();
while ( !faultyStep.isCompleted() )
{
Thread.sleep( 10 );
}

assertTrue( "On upstream end step should be already on panic in case of exception",
faultyStep.isPanicOnEndUpstream() );
Expand All @@ -72,14 +59,12 @@ public void issuePanicBeforeCompletionOnError()

private class FaultyLonelyProcessingStepTest extends LonelyProcessingStep
{
private final BinaryLatch endOfUpstreamLatch;
private volatile boolean panicOnEndUpstream;

FaultyLonelyProcessingStepTest( List<Step<?>> pipeLine, BinaryLatch endOfUpstreamLatch )
FaultyLonelyProcessingStepTest( List<Step<?>> pipeLine )
{
super( new StageExecution( "Faulty", null, Configuration.DEFAULT, pipeLine, 0 ),
"Faulty", Configuration.DEFAULT );
this.endOfUpstreamLatch = endOfUpstreamLatch;
}

@Override
Expand All @@ -88,21 +73,14 @@ protected void process()
throw new RuntimeException( "Process exception" );
}

@Override
protected void startProcessing( Runnable runnable )
{
executors.submit( runnable );
}

@Override
public void endOfUpstream()
{
panicOnEndUpstream = isPanic();
super.endOfUpstream();
endOfUpstreamLatch.release();
}

public boolean isPanicOnEndUpstream()
private boolean isPanicOnEndUpstream()
{
return panicOnEndUpstream;
}
Expand Down

0 comments on commit e9397ac

Please sign in to comment.