Skip to content

Commit

Permalink
Processing queue limited to number of active processors
Browse files Browse the repository at this point in the history
there's no need to queue more items than that, doing so will most likely
only cause higher memory usage to no gain.
  • Loading branch information
tinwelint committed Aug 13, 2016
1 parent a683fb8 commit 07dd135
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 9 deletions.
Expand Up @@ -80,6 +80,15 @@ public boolean test( long ticket )
return processedTicket.get() == ticket - 1;
}
};
private final LongPredicate catchUp = new LongPredicate()
{
@Override
public boolean test( long ticket )
{
long queued = submittedTicket.get() - processedTicket.get();
return queued <= executor.processors( 0 );
}
};
private final Runnable healthCheck;
private volatile boolean done;

Expand All @@ -93,9 +102,20 @@ public TicketedProcessing( String name, int maxProcessors, BiFunction<FROM,STATE
this.processed = new ArrayBlockingQueue<>( maxProcessors );
}

public void submit( long ticket, FROM job )
/**
* Submits a job for processing. Results from processed jobs will be available from {@link #next()}
* in the order in which they got submitted. This method will queue jobs for processing, but not
* more than the number of current processors and never more than number of maximum processors
* given in the constructor; the call will block until queue size goes under this threshold.
* Blocking will provide push-back of submitting new jobs as to reduce unnecessary memory requirements
* for jobs that will sit and wait to be processed.
*
* @param job to process.
*/
public void submit( FROM job )
{
submittedTicket.incrementAndGet();
long ticket = submittedTicket.incrementAndGet();
await( catchUp, ticket, healthCheck, park );
executor.submit( threadLocalState ->
{
// Process this job (we're now in one of the processing threads)
Expand All @@ -116,7 +136,7 @@ public void submit( long ticket, FROM job )
}

/**
* Essentially starting a thread {@link #submit(long, Object) submitting} a stream of inputs which will
* Essentially starting a thread {@link #submit(Object) submitting} a stream of inputs which will
* each be processed and asynchronically made available in order of processing ticket by later calling
* {@link #next()}.
*
Expand All @@ -130,9 +150,9 @@ public Future<Void> slurp( Iterator<FROM> input, boolean shutdownAfterAllSubmitt
{
return future( () ->
{
for ( long ticket = 0; input.hasNext(); ticket++ )
while ( input.hasNext() )
{
submit( ticket, input.next() );
submit( input.next() );
}
if ( shutdownAfterAllSubmitted )
{
Expand Down
Expand Up @@ -22,23 +22,27 @@
import org.junit.Rule;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;

import org.neo4j.test.OtherThreadExecutor.WorkerCommand;
import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy;
import org.neo4j.test.OtherThreadRule;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

import static java.lang.Integer.parseInt;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import static org.neo4j.test.DoubleLatch.awaitLatch;

public class TicketedProcessingTest
{
@Rule
public final OtherThreadRule<Void> asserter = new OtherThreadRule<>();
public final OtherThreadRule<Void> t2 = new OtherThreadRule<>();

@Test
public void shouldReturnTicketsInOrder() throws Exception
Expand All @@ -60,7 +64,7 @@ public void shouldReturnTicketsInOrder() throws Exception
processing.processors( processorCount - processing.processors( 0 ) );

// WHEN
Future<Void> assertions = asserter.execute( new WorkerCommand<Void,Void>()
Future<Void> assertions = t2.execute( new WorkerCommand<Void,Void>()
{
@Override
public Void doWork( Void state ) throws Exception
Expand All @@ -77,11 +81,62 @@ public Void doWork( Void state ) throws Exception
} );
for ( int i = 0; i < items; i++ )
{
processing.submit( i, i );
processing.submit( i );
}
processing.shutdown( true );

// THEN
assertions.get();
}

@Test
public void shouldNotBeAbleToSubmitTooFarAhead() throws Exception
{
// GIVEN
TicketedProcessing<StringJob,Void,Integer> processing = new TicketedProcessing<>( "Parser", 2,
(job,state) ->
{
awaitLatch( job.latch );
return parseInt( job.string );
}, () -> null );
processing.processors( 1 );
StringJob firstJob = new StringJob( "1" );
processing.submit( firstJob );
StringJob secondJob = new StringJob( "2" );
processing.submit( secondJob );

// WHEN
StringJob thirdJob = new StringJob( "3" );
thirdJob.latch.countDown();
Future<Void> thirdSubmit = t2.execute( new WorkerCommand<Void,Void>()
{
@Override
public Void doWork( Void state ) throws Exception
{
processing.submit( thirdJob );
return null;
}
} );
t2.get().waitUntilThreadState( Thread.State.TIMED_WAITING, Thread.State.WAITING );
firstJob.latch.countDown();
assertEquals( 1, processing.next().intValue() );
thirdSubmit.get();
secondJob.latch.countDown();
assertEquals( 2, processing.next().intValue() );
assertEquals( 3, processing.next().intValue() );

// THEN
processing.shutdown( true );
}

private static class StringJob
{
final String string;
final CountDownLatch latch = new CountDownLatch( 1 );

StringJob( String string )
{
this.string = string;
}
}
}

0 comments on commit 07dd135

Please sign in to comment.