Skip to content

Commit

Permalink
StoreCopyCheckPointMutext handles after-effects of failure
Browse files Browse the repository at this point in the history
So that state gets updated correctly after failure, otherwise the mutext would
be in a broken state after a failure in store-copy "before" action happened.
  • Loading branch information
tinwelint committed Feb 1, 2017
1 parent 3b5ae61 commit afd2f2a
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 17 deletions.
Expand Up @@ -102,32 +102,39 @@ public Resource storeCopy( ThrowingAction<IOException> beforeFirstConcurrentStor
{
Lock readLock = lock.readLock();
boolean firstConcurrentRead = incrementCount() == 0;
if ( firstConcurrentRead )
boolean success = false;
try
{
try
if ( firstConcurrentRead )
{
beforeFirstConcurrentStoreCopy.apply();
try
{
beforeFirstConcurrentStoreCopy.apply();
}
catch ( Throwable e )
{
storeCopyActionError = e;
throw launderedException( IOException.class, e );
}
storeCopyActionCompleted = true;
}
catch ( Throwable e )
else
{
storeCopyActionError = e;
throw launderedException( IOException.class, e );
// Wait for the "before" first store copy to complete
waitForFirstStoreCopyActionToComplete();
}
readLock.lock();
storeCopyActionCompleted = true;
success = true;
}
else
finally
{
// Wait for the "before" first store copy to complete
while ( !storeCopyActionCompleted )
if ( success )
{
if ( storeCopyActionError != null )
{
throw new IOException( "Co-operative action before store-copy failed", storeCopyActionError );
}
parkAWhile();
readLock.lock();
}
else
{
decrementCount();
}
readLock.lock();
}

return () ->
Expand All @@ -138,6 +145,18 @@ public Resource storeCopy( ThrowingAction<IOException> beforeFirstConcurrentStor
};
}

private void waitForFirstStoreCopyActionToComplete() throws IOException
{
while ( !storeCopyActionCompleted )
{
if ( storeCopyActionError != null )
{
throw new IOException( "Co-operative action before store-copy failed", storeCopyActionError );
}
parkAWhile();
}
}

private synchronized void decrementCount()
{
storeCopyCount--;
Expand Down
Expand Up @@ -23,18 +23,27 @@
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

import org.neo4j.function.ThrowingAction;
import org.neo4j.graphdb.Resource;
import org.neo4j.test.Barrier;
import org.neo4j.test.Race;
import org.neo4j.test.rule.concurrent.OtherThreadRule;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -46,9 +55,12 @@
public class StoreCopyCheckPointMutexTest
{
private static final ThrowingAction<IOException> NO_OP = () -> {};
private static final ThrowingAction<IOException> ASSERT_NOT_CALLED = () -> fail( "Should not be called" );

@Rule
public final OtherThreadRule<Void> t2 = new OtherThreadRule<>( "T2" );
@Rule
public final OtherThreadRule<Void> t3 = new OtherThreadRule<>( "T3" );

private final StoreCopyCheckPointMutex mutex = new StoreCopyCheckPointMutex();

Expand Down Expand Up @@ -191,6 +203,74 @@ public void shouldHandleMultipleConcurrentStoreCopyRequests() throws Throwable
assertThat( action.count(), lessThan( threads ) );
}

@Test
public void shouldPropagateStoreCopyActionFailureToOtherStoreCopyRequests() throws Exception
{
// GIVEN
Barrier.Control barrier = new Barrier.Control();
IOException controlledFailure = new IOException( "My own fault" );
AtomicReference<Future<Object>> secondRequest = new AtomicReference<>();
ThrowingAction<IOException> controllableAndFailingAction = new ThrowingAction<IOException>()
{
@Override
public void apply() throws IOException
{
// Now that we know we're first, start the second request...
secondRequest.set( t3.execute( state -> mutex.storeCopy( ASSERT_NOT_CALLED ) ) );
// ...and wait for it to reach its destination
barrier.awaitUninterruptibly();
try
{
// OK, second request has made progress into the request, so we can now produce our failure
throw controlledFailure;
}
finally
{
barrier.release();
}
}
};

Future<Object> firstRequest = t2.execute( state -> mutex.storeCopy( controllableAndFailingAction ) );
while ( secondRequest.get() == null )
{
parkARandomWhile();
}
t3.get().waitUntilWaiting( details -> details.isAt( StoreCopyCheckPointMutex.class,
"waitForFirstStoreCopyActionToComplete" ) );

// WHEN
barrier.reached();

// THEN
try
{
firstRequest.get();
}
catch ( ExecutionException e )
{
assertSame( controlledFailure, e.getCause() );
}
try
{
secondRequest.get().get();
}
catch ( ExecutionException e )
{
Throwable cooperativeActionFailure = e.getCause();
assertThat( cooperativeActionFailure.getMessage(), containsString( "Co-operative" ) );
assertSame( controlledFailure, cooperativeActionFailure.getCause() );
}

// WHEN afterwards trying another store-copy
CountingAction action = new CountingAction();
try ( Resource lock = mutex.storeCopy( action ) )
{
// THEN
assertEquals( 1, action.count() );
}
}

private static void parkARandomWhile()
{
LockSupport.parkNanos( MILLISECONDS.toNanos( ThreadLocalRandom.current().nextInt( 10 ) ) );
Expand Down

0 comments on commit afd2f2a

Please sign in to comment.