diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java index 1e9610f4e6c32..8d57045c9049d 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java @@ -80,6 +80,15 @@ public boolean test( long ticket ) return processedTicket.get() == ticket - 1; } }; + private final LongPredicate catchUp = new LongPredicate() + { + @Override + public boolean test( long ticket ) + { + long queued = submittedTicket.get() - processedTicket.get(); + return queued <= executor.processors( 0 ); + } + }; private final Runnable healthCheck; private volatile boolean done; @@ -93,9 +102,20 @@ public TicketedProcessing( String name, int maxProcessors, BiFunction( maxProcessors ); } - public void submit( long ticket, FROM job ) + /** + * Submits a job for processing. Results from processed jobs will be available from {@link #next()} + * in the order in which they got submitted. This method will queue jobs for processing, but not + * more than the number of current processors and never more than number of maximum processors + * given in the constructor; the call will block until queue size goes under this threshold. + * Blocking will provide push-back of submitting new jobs as to reduce unnecessary memory requirements + * for jobs that will sit and wait to be processed. + * + * @param job to process. + */ + public void submit( FROM job ) { - submittedTicket.incrementAndGet(); + long ticket = submittedTicket.incrementAndGet(); + await( catchUp, ticket, healthCheck, park ); executor.submit( threadLocalState -> { // Process this job (we're now in one of the processing threads) @@ -116,7 +136,7 @@ public void submit( long ticket, FROM job ) } /** - * Essentially starting a thread {@link #submit(long, Object) submitting} a stream of inputs which will + * Essentially starting a thread {@link #submit(Object) submitting} a stream of inputs which will * each be processed and asynchronically made available in order of processing ticket by later calling * {@link #next()}. * @@ -130,9 +150,9 @@ public Future slurp( Iterator input, boolean shutdownAfterAllSubmitt { return future( () -> { - for ( long ticket = 0; input.hasNext(); ticket++ ) + while ( input.hasNext() ) { - submit( ticket, input.next() ); + submit( input.next() ); } if ( shutdownAfterAllSubmitted ) { diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java index ab331e09732f9..92959bd8d55e3 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java @@ -22,9 +22,11 @@ import org.junit.Rule; import org.junit.Test; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; import java.util.function.BiFunction; + import org.neo4j.test.OtherThreadExecutor.WorkerCommand; import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy; import org.neo4j.test.OtherThreadRule; @@ -32,13 +34,15 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; - +import static java.lang.Integer.parseInt; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.neo4j.test.DoubleLatch.awaitLatch; + public class TicketedProcessingTest { @Rule - public final OtherThreadRule asserter = new OtherThreadRule<>(); + public final OtherThreadRule t2 = new OtherThreadRule<>(); @Test public void shouldReturnTicketsInOrder() throws Exception @@ -60,7 +64,7 @@ public void shouldReturnTicketsInOrder() throws Exception processing.processors( processorCount - processing.processors( 0 ) ); // WHEN - Future assertions = asserter.execute( new WorkerCommand() + Future assertions = t2.execute( new WorkerCommand() { @Override public Void doWork( Void state ) throws Exception @@ -77,11 +81,62 @@ public Void doWork( Void state ) throws Exception } ); for ( int i = 0; i < items; i++ ) { - processing.submit( i, i ); + processing.submit( i ); } processing.shutdown( true ); // THEN assertions.get(); } + + @Test + public void shouldNotBeAbleToSubmitTooFarAhead() throws Exception + { + // GIVEN + TicketedProcessing processing = new TicketedProcessing<>( "Parser", 2, + (job,state) -> + { + awaitLatch( job.latch ); + return parseInt( job.string ); + }, () -> null ); + processing.processors( 1 ); + StringJob firstJob = new StringJob( "1" ); + processing.submit( firstJob ); + StringJob secondJob = new StringJob( "2" ); + processing.submit( secondJob ); + + // WHEN + StringJob thirdJob = new StringJob( "3" ); + thirdJob.latch.countDown(); + Future thirdSubmit = t2.execute( new WorkerCommand() + { + @Override + public Void doWork( Void state ) throws Exception + { + processing.submit( thirdJob ); + return null; + } + } ); + t2.get().waitUntilThreadState( Thread.State.TIMED_WAITING, Thread.State.WAITING ); + firstJob.latch.countDown(); + assertEquals( 1, processing.next().intValue() ); + thirdSubmit.get(); + secondJob.latch.countDown(); + assertEquals( 2, processing.next().intValue() ); + assertEquals( 3, processing.next().intValue() ); + + // THEN + processing.shutdown( true ); + } + + private static class StringJob + { + final String string; + final CountDownLatch latch = new CountDownLatch( 1 ); + + StringJob( String string ) + { + this.string = string; + } + } }