Skip to content

Commit

Permalink
Handle extreme race condition in RingRecentBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
fickludd committed Mar 29, 2019
1 parent e71a554 commit 3709ba7
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,40 @@ class DataCollectorMetaAcceptanceTest extends ExecutionEngineFunSuite {

// then
res.toList.head should beMapContaining(
"section" -> "META",
"data" -> beMapContaining(
"graphToken" -> "myGraphToken",
"system" -> beSystemData
)
"section" -> "META",
"data" -> beMapContaining(
"graphToken" -> "myGraphToken",
"system" -> beSystemData
)
)
}

test("should get internal data on retrieve('META')") {
// when
val res = execute("CALL db.stats.retrieve('META')")

// then
res.toList.head should beMapContaining(
"section" -> "META",
"data" -> beMapContaining(
"graphToken" -> null,
"internal" -> beInternalData
)
)
}

test("should get internal data on retrieveAllAnonymized") {
// when
val res = execute("CALL db.stats.retrieveAllAnonymized('myGraphToken')")

// then
res.toList.head should beMapContaining(
"section" -> "META",
"data" -> beMapContaining(
"graphToken" -> "myGraphToken",
"internal" -> beInternalData
)
)
}

private val beSystemData =
Expand All @@ -81,4 +109,9 @@ class DataCollectorMetaAcceptanceTest extends ExecutionEngineFunSuite {
"userTimezone" -> TimeZone.getDefault.getID,
"fileEncoding" -> System.getProperty( "file.encoding" )
)

private val beInternalData =
beMapContaining(
"numSilentQueryCollectionMisses" -> 0L
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public Stream<RetrieveResult> retrieve( @Name( value = "section" ) String sectio
return TokensSection.retrieve( dataCollector.kernel );

case Sections.META:
return MetaSection.retrieve( null, dataCollector.kernel );
return MetaSection.retrieve( null, dataCollector.kernel, dataCollector.queryCollector.nSilentQueryDrops() );

case Sections.QUERIES:
return QueriesSection.retrieve( dataCollector.queryCollector.getData(),
Expand All @@ -87,7 +87,7 @@ public Stream<RetrieveResult> retrieveAllAnonymized( @Name( value = "graphToken"
throw new InvalidArgumentsException( "Graph token must be a non-empty string" );
}

return Stream.of( MetaSection.retrieve( graphToken, dataCollector.kernel ),
return Stream.of( MetaSection.retrieve( graphToken, dataCollector.kernel, dataCollector.queryCollector.nSilentQueryDrops() ),
GraphCountsSection.retrieve( dataCollector.kernel, Anonymizer.IDS ),
QueriesSection.retrieve( dataCollector.queryCollector.getData(),
new IdAnonymizer( transaction.tokenRead() ),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private MetaSection()
{ // only static methods
}

static Stream<RetrieveResult> retrieve( String graphToken, Kernel kernel ) throws TransactionFailureException
static Stream<RetrieveResult> retrieve( String graphToken, Kernel kernel, long nSilentQueryDrops ) throws TransactionFailureException
{
Map<String, Object> systemData = new HashMap<>();
systemData.put( "jvmMemoryFree", Runtime.getRuntime().freeMemory() );
Expand Down Expand Up @@ -76,10 +76,15 @@ static Stream<RetrieveResult> retrieve( String graphToken, Kernel kernel ) throw
systemData.put( "userTimezone", TimeZone.getDefault().getID() );
systemData.put( "fileEncoding", System.getProperty( "file.encoding" ) );

Map<String, Object> internalData = new HashMap<>();
internalData.put( "numSilentQueryCollectionMisses", nSilentQueryDrops );

Map<String, Object> metaData = new HashMap<>();
metaData.put( "graphToken", graphToken );
metaData.put( "retrieveTime", ZonedDateTime.now() );
metaData.put( "system", systemData );
metaData.put( "internal", internalData );

TokensSection.putTokenCounts( metaData, kernel );

return Stream.of( new RetrieveResult( Sections.META, metaData ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class QueryCollector extends CollectorStateMachine<Iterator<QuerySnapshot>> impl
queries = new RingRecentBuffer<>( 13 );
}

long nSilentQueryDrops()
{
return queries.nSilentQueryDrops();
}

// CollectorStateMachine

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class RingRecentBuffer<T> implements RecentBuffer<T>

private final AtomicLong produceCount;
private final AtomicLong consumeCount;
private final AtomicLong dropEvents;

public RingRecentBuffer( int bitSize )
{
Expand All @@ -44,10 +45,17 @@ public RingRecentBuffer( int bitSize )
for ( int i = 0; i < size; i++ )
{
data[i] = new VolatileRef<>();
data[i].produceNumber = i - size;
}

produceCount = new AtomicLong( 0 );
consumeCount = new AtomicLong( 0 );
dropEvents = new AtomicLong( 0 );
}

long nSilentQueryDrops()
{
return dropEvents.get();
}

/* ---- many producers ---- */
Expand All @@ -58,8 +66,42 @@ public void produce( T t )
long produceNumber = produceCount.getAndIncrement();
int offset = (int) (produceNumber & mask);
VolatileRef<T> volatileRef = data[offset];
volatileRef.ref = t;
volatileRef.produceNumber = produceNumber;
if ( assertPreviousCompleted( produceNumber, volatileRef ) )
{
volatileRef.ref = t;
volatileRef.produceNumber = produceNumber;
}
else
{
// If we don't manage to wait for the previous produce to complete even after
// all the yields in `assertPreviousCompleted`, we drop `t` to avoid causing
// a problem in db operation. We increment dropEvents to so the RecentBuffer
// consumer can detect that there has been a drop.
dropEvents.incrementAndGet();
}
}

private boolean assertPreviousCompleted( long produceNumber, VolatileRef<T> volatileRef )
{
int attempts = 100;
long prevProduceNumber = volatileRef.produceNumber;
while ( prevProduceNumber != produceNumber - size && attempts > 0 )
{
// Coming in here is expected to be very rare, because it means that producers have
// circled around the ring buffer, and the producer `size` elements ago hasn't finished
// writing to the buffer. We yield and hope the previous produce is done when we get back.
try
{
Thread.sleep(0, 1000);
}
catch ( InterruptedException e )
{
// continue
}
prevProduceNumber = volatileRef.produceNumber;
attempts--;
}
return attempts > 0;
}

/* ---- single consumer ---- */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.function.LongConsumer;

import static java.lang.String.format;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -60,6 +61,8 @@ void shouldJustWork()
buffer.produce( i );
}
buffer.foreach( Assertions::assertNotNull );

assertEquals( 0, buffer.nSilentQueryDrops() );
}

@Test
Expand All @@ -72,25 +75,32 @@ void shouldNotReadSameElementTwice() throws ExecutionException, InterruptedExcep
RingRecentBuffer<Long> buffer = new RingRecentBuffer<>( bufferBitSize );
ExecutorService executor = Executors.newFixedThreadPool( 2 );

UniqueElementsConsumer consumer = new UniqueElementsConsumer();

// when
// producer thread
CountDownLatch latch = new CountDownLatch( 1 );
Future<?> produce = executor.submit( stressUntil( latch, buffer::produce ) );

// consumer thread
Future<?> consume = executor.submit( stress( n, i -> {
consumer.reset();
buffer.foreach( consumer );
assertTrue( consumer.values.size() <= bufferSize, format( "Should see at most %d elements", bufferSize ) );
} ) );

// then without illegal transitions or exceptions
consume.get();
latch.countDown();
produce.get();
executor.shutdown();
try
{
UniqueElementsConsumer consumer = new UniqueElementsConsumer();

// when
// producer thread
CountDownLatch latch = new CountDownLatch( 1 );
Future<?> produce = executor.submit( stressUntil( latch, buffer::produce ) );

// consumer thread
Future<?> consume = executor.submit( stress( n, i -> {
consumer.reset();
buffer.foreach( consumer );
assertTrue( consumer.values.size() <= bufferSize, format( "Should see at most %d elements", bufferSize ) );
} ) );

// then without illegal transitions or exceptions
consume.get();
latch.countDown();
produce.get();
}
finally
{
executor.shutdown();
}
assertEquals( 0, buffer.nSilentQueryDrops() );
}

@Test
Expand All @@ -102,21 +112,28 @@ void shouldNeverReadUnwrittenElements() throws ExecutionException, InterruptedEx
RingRecentBuffer<Long> buffer = new RingRecentBuffer<>( bufferBitSize );
ExecutorService executor = Executors.newFixedThreadPool( 2 );

// when
// producer thread
CountDownLatch latch = new CountDownLatch( 1 );
Future<?> produce = executor.submit( stressUntil( latch, buffer::produce ) );
// consumer thread
Future<?> consume = executor.submit( stress( n, i -> {
buffer.clear();
buffer.foreach( Assertions::assertNotNull );
} ) );

// then without illegal transitions or exceptions
consume.get();
latch.countDown();
produce.get();
executor.shutdown();
try
{
// when
// producer thread
CountDownLatch latch = new CountDownLatch( 1 );
Future<?> produce = executor.submit( stressUntil( latch, buffer::produce ) );
// consumer thread
Future<?> consume = executor.submit( stress( n, i -> {
buffer.clear();
buffer.foreach( Assertions::assertNotNull );
} ) );

// then without illegal transitions or exceptions
consume.get();
latch.countDown();
produce.get();
}
finally
{
executor.shutdown();
}
assertEquals( 0, buffer.nSilentQueryDrops() );
}

@Test
Expand All @@ -128,25 +145,32 @@ void shouldWorkWithManyConcurrentProducers() throws ExecutionException, Interrup
RingRecentBuffer<Long> buffer = new RingRecentBuffer<>( bufferBitSize );
ExecutorService executor = Executors.newFixedThreadPool( 4 );

// when
// producer threads
CountDownLatch latch = new CountDownLatch( 1 );
Future<?> produce1 = executor.submit( stressUntil( latch, buffer::produce ) );
Future<?> produce2 = executor.submit( stressUntil( latch, buffer::produce ) );
Future<?> produce3 = executor.submit( stressUntil( latch, buffer::produce ) );
// consumer thread
Future<?> consume = executor.submit( stress( n, i -> {
buffer.clear();
buffer.foreach( Assertions::assertNotNull );
} ) );

// then without illegal transitions or exceptions
consume.get();
latch.countDown();
produce1.get();
produce2.get();
produce3.get();
executor.shutdown();
try
{
// when
// producer threads
CountDownLatch latch = new CountDownLatch( 1 );
Future<?> produce1 = executor.submit( stressUntil( latch, buffer::produce ) );
Future<?> produce2 = executor.submit( stressUntil( latch, buffer::produce ) );
Future<?> produce3 = executor.submit( stressUntil( latch, buffer::produce ) );
// consumer thread
Future<?> consume = executor.submit( stress( n, i -> {
buffer.clear();
buffer.foreach( Assertions::assertNotNull );
} ) );

// then without illegal transitions or exceptions
consume.get();
latch.countDown();
produce1.get();
produce2.get();
produce3.get();
}
finally
{
executor.shutdown();
}
assertEquals( 0, buffer.nSilentQueryDrops() );
}

private <T> Runnable stress( int n, LongConsumer action )
Expand Down

0 comments on commit 3709ba7

Please sign in to comment.