diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/LonelyProcessingStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/LonelyProcessingStep.java index ec36b2d782b8e..5624cc7ccc2a7 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/LonelyProcessingStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/LonelyProcessingStep.java @@ -43,7 +43,7 @@ public LonelyProcessingStep( StageControl control, String name, Configuration co @Override public long receive( long ticket, Void nothing ) { - startProcessing( () -> { + new Thread( () -> { assertHealthy(); try { @@ -75,15 +75,10 @@ public long receive( long ticket, Void nothing ) throw e; } } - } ); + } ).start(); return 0; } - protected void startProcessing( Runnable runnable ) - { - new Thread( runnable ).start(); - } - /** * Called once and signals the start of this step. Responsible for calling {@link #progress(long)} * at least now and then. diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/LonelyProcessingStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/LonelyProcessingStepTest.java index 912c41bb1482c..13152839756e0 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/LonelyProcessingStepTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/LonelyProcessingStepTest.java @@ -19,17 +19,12 @@ */ package org.neo4j.unsafe.impl.batchimport.staging; -import org.junit.After; import org.junit.ClassRule; import org.junit.Test; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import org.neo4j.concurrent.BinaryLatch; import org.neo4j.test.rule.SuppressOutput; import org.neo4j.unsafe.impl.batchimport.Configuration; @@ -41,27 +36,19 @@ public class LonelyProcessingStepTest @ClassRule public static SuppressOutput mute = SuppressOutput.suppressAll(); - private final ExecutorService executors = Executors.newCachedThreadPool(); - - @After - public void tearDown() throws Exception - { - executors.shutdown(); - executors.awaitTermination( 1, TimeUnit.MINUTES ); - } - - @Test - public void issuePanicBeforeCompletionOnError() + @Test( timeout = 10_000 ) + public void issuePanicBeforeCompletionOnError() throws InterruptedException { List> stepsPipeline = new ArrayList<>(); - BinaryLatch endOfUpstreamLatch = new BinaryLatch(); - FaultyLonelyProcessingStepTest faultyStep = - new FaultyLonelyProcessingStepTest( stepsPipeline, endOfUpstreamLatch ); + FaultyLonelyProcessingStepTest faultyStep = new FaultyLonelyProcessingStepTest( stepsPipeline ); stepsPipeline.add( faultyStep ); faultyStep.receive( 1, null ); - endOfUpstreamLatch.await(); + while ( !faultyStep.isCompleted() ) + { + Thread.sleep( 10 ); + } assertTrue( "On upstream end step should be already on panic in case of exception", faultyStep.isPanicOnEndUpstream() ); @@ -72,14 +59,12 @@ public void issuePanicBeforeCompletionOnError() private class FaultyLonelyProcessingStepTest extends LonelyProcessingStep { - private final BinaryLatch endOfUpstreamLatch; private volatile boolean panicOnEndUpstream; - FaultyLonelyProcessingStepTest( List> pipeLine, BinaryLatch endOfUpstreamLatch ) + FaultyLonelyProcessingStepTest( List> pipeLine ) { super( new StageExecution( "Faulty", null, Configuration.DEFAULT, pipeLine, 0 ), "Faulty", Configuration.DEFAULT ); - this.endOfUpstreamLatch = endOfUpstreamLatch; } @Override @@ -88,21 +73,14 @@ protected void process() throw new RuntimeException( "Process exception" ); } - @Override - protected void startProcessing( Runnable runnable ) - { - executors.submit( runnable ); - } - @Override public void endOfUpstream() { panicOnEndUpstream = isPanic(); super.endOfUpstream(); - endOfUpstreamLatch.release(); } - public boolean isPanicOnEndUpstream() + private boolean isPanicOnEndUpstream() { return panicOnEndUpstream; }