Skip to content

Commit

Permalink
Wait checkpointing to complete on stop
Browse files Browse the repository at this point in the history
This is necessary to avoid a race between the check pointing thread
and the shutdown of the database.
  • Loading branch information
davidegrohmann committed Aug 21, 2015
1 parent 1ccac7d commit a965f3f
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 4 deletions.
Expand Up @@ -198,18 +198,24 @@ public boolean test( T item )

public static <TYPE> void await( Supplier<TYPE> supplier, Predicate<TYPE> predicate, long timeout, TimeUnit unit )
throws TimeoutException, InterruptedException
{
await( Suppliers.compose( supplier, predicate ), timeout, unit );
}

public static void await( Supplier<Boolean> condition, long timeout, TimeUnit unit )
throws TimeoutException, InterruptedException
{
long sleep = Math.max( unit.toMillis( timeout ) / 100, 1 );
long deadline = System.currentTimeMillis() + unit.toMillis( timeout );
do
{
if ( predicate.test( supplier.get() ) )
if ( condition.get() )
{
return;
}
Thread.sleep( sleep );
}
while ( System.currentTimeMillis() < deadline );
throw new TimeoutException( "Waited for " + timeout + " " + unit + ", but " + predicate + " was not accepted." );
throw new TimeoutException( "Waited for " + timeout + " " + unit + ", but " + condition + " was not accepted." );
}
}
12 changes: 12 additions & 0 deletions community/function/src/main/java/org/neo4j/function/Suppliers.java
Expand Up @@ -116,4 +116,16 @@ public T get()
}
};
}

public static <T> Supplier<Boolean> compose( final Supplier<T> input, final Predicate<T> predicate )
{
return new Supplier<Boolean>()
{
@Override
public Boolean get()
{
return predicate.test( input.get() );
}
};
}
}
Expand Up @@ -21,6 +21,8 @@

import java.io.IOException;

import org.neo4j.function.Predicates;
import org.neo4j.function.Supplier;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
Expand All @@ -38,6 +40,13 @@ public class CheckPointScheduler extends LifecycleAdapter
@Override
public void run()
{
checkPointing = true;

if ( stopped )
{
return;
}

try
{
checkPointer.checkPointIfNeeded();
Expand All @@ -47,6 +56,7 @@ public void run()
// no need to reschedule since the check pointer has raised a kernel panic and a shutdown is expected
throw new UnderlyingStorageException( e );
}
checkPointing = false;

// reschedule only if it is not stopped
if ( !stopped )
Expand All @@ -57,7 +67,16 @@ public void run()
};

private volatile JobScheduler.JobHandle handle;
private volatile boolean stopped = false;
private volatile boolean stopped;
private volatile boolean checkPointing;
private final Supplier<Boolean> checkPointingCondition = new Supplier<Boolean>()
{
@Override
public Boolean get()
{
return !checkPointing;
}
};

public CheckPointScheduler( CheckPointer checkPointer, JobScheduler scheduler, long recurringPeriodMillis )
{
Expand All @@ -80,5 +99,6 @@ public void stop() throws Throwable
{
handle.cancel( false );
}
Predicates.await( checkPointingCondition, recurringPeriodMillis, MILLISECONDS );
}
}
Expand Up @@ -21,13 +21,19 @@

import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.neo4j.test.DoubleLatch;
import org.neo4j.test.OnDemandJobScheduler;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -77,7 +83,7 @@ public void shouldRescheduleTheJobAfterARun() throws Throwable
// then
verify( jobScheduler, times( 2 ) ).schedule( eq( checkPoint ), any( Runnable.class ),
eq( 20l ), eq( TimeUnit.MILLISECONDS ) );
verify( checkPointer, times( 1 )).checkPointIfNeeded();
verify( checkPointer, times( 1 ) ).checkPointIfNeeded();
assertEquals( scheduledJob, jobScheduler.getJob() );
}

Expand All @@ -99,4 +105,79 @@ public void shouldNotRescheduleAJobWhenStopped() throws Throwable
// then
assertNull( jobScheduler.getJob() );
}

@Test
public void shouldWaitOnStopUntilTheRunningCheckpointIsDone() throws Throwable
{
// given
final AtomicReference<Throwable> ex = new AtomicReference<>();
final AtomicBoolean stoppedCompleted = new AtomicBoolean();
final DoubleLatch checkPointerLatch = new DoubleLatch( 1 );
CheckPointer checkPointer = new CheckPointer()
{
@Override
public void checkPointIfNeeded() throws IOException
{
checkPointerLatch.start();
checkPointerLatch.awaitFinish();
}

@Override
public void forceCheckPoint() throws IOException
{

}
};

final CheckPointScheduler scheduler = new CheckPointScheduler( checkPointer, jobScheduler, 20l );

// when
scheduler.start();

Thread runCheckPointer = new Thread()
{
@Override
public void run()
{
jobScheduler.runJob();
}
};
runCheckPointer.start();

checkPointerLatch.awaitStart();

Thread stopper = new Thread()
{
@Override
public void run()
{
try
{
scheduler.stop();
stoppedCompleted.set( true );
}
catch ( Throwable throwable )
{
ex.set( throwable );
}
}
};

stopper.start();

Thread.sleep( 10 );

// then
assertFalse( stoppedCompleted.get() );

checkPointerLatch.finish();
runCheckPointer.join();

Thread.sleep( 10 );

assertTrue( stoppedCompleted.get() );
stopper.join(); // just in case

assertNull( ex.get() );
}
}

0 comments on commit a965f3f

Please sign in to comment.