Skip to content

Commit

Permalink
Guarantee release cleanerLock in GBPTree
Browse files Browse the repository at this point in the history
In CleanupJob, processing the job and releasing the cleaner lock
is separated into run and close such that a RecoveryCleanupWorkCollector
can close a CleanupJob and thus release the cleaner lock without
actually running the cleanup job. This makes it possible to open a dirty
GBPTree and write a new header without starting cleanup jobs and
not risking a deadlock between checkpoint (needed for writing header)
and cleanup job.
  • Loading branch information
burqen committed Jan 9, 2018
1 parent 73c2a6f commit f5bf7c5
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 16 deletions.
Expand Up @@ -79,9 +79,16 @@ private static class RecoveryPreventingCollector extends LifecycleAdapter implem
@Override
public void add( CleanupJob job )
{
if ( job.needed() )
try
{
throw new IllegalStateException( "Consistency checker should not do recovery" );
if ( job.needed() )
{
throw new IllegalStateException( "Consistency checker should not do recovery" );
}
}
finally
{
job.close();
}
}
}
Expand Down
Expand Up @@ -42,6 +42,11 @@ public interface CleanupJob extends Runnable
*/
Exception getCause();

/**
* Mark this job and cleanup all it's resources.
*/
void close();

/**
* A {@link CleanupJob} that doesn't need cleaning, i.e. it's already clean.
*/
Expand Down Expand Up @@ -69,5 +74,10 @@ public Exception getCause()
{
return null;
}

@Override
public void close()
{ // no-op
}
};
}
Expand Up @@ -56,6 +56,12 @@ public Exception getCause()
return failure;
}

@Override
public void close()
{
gbpTreeLock.cleanerUnlock();
}

@Override
public void run()
{
Expand All @@ -68,9 +74,5 @@ public void run()
{
failure = e;
}
finally
{
gbpTreeLock.cleanerUnlock();
}
}
}
Expand Up @@ -80,7 +80,14 @@ private Runnable allJobs()
CleanupJob job;
while ( (job = jobs.poll()) != null )
{
job.run();
try
{
job.run();
}
finally
{
job.close();
}
}
};
}
Expand Down
Expand Up @@ -29,6 +29,8 @@
* this, implementing class must be ready to receive new jobs through {@link #add(CleanupJob)}.
* <p>
* Jobs may be processed during {@link #add(CleanupJob) add} or {@link Lifecycle#start() start}.
* <p>
* Take full responsibility for closing added {@link CleanupJob CleanupJobs} as soon as possible after run.
*/
public interface RecoveryCleanupWorkCollector extends Lifecycle
{
Expand All @@ -48,7 +50,7 @@ public interface RecoveryCleanupWorkCollector extends Lifecycle
/**
* Ignore all clean jobs.
*/
RecoveryCleanupWorkCollector NULL = new NullRecoveryCleanupWorkCollector();
RecoveryCleanupWorkCollector IGNORE = new IgnoringRecoveryCleanupWorkCollector();

/**
* {@link RecoveryCleanupWorkCollector} which runs added {@link CleanupJob} as part of the {@link #add(CleanupJob)}
Expand All @@ -59,18 +61,26 @@ class ImmediateRecoveryCleanupWorkCollector extends LifecycleAdapter implements
@Override
public void add( CleanupJob job )
{
job.run();
try
{
job.run();
}
finally
{
job.close();
}
}
}

/**
* {@link RecoveryCleanupWorkCollector} ignoring all {@link CleanupJob} added to it.
*/
class NullRecoveryCleanupWorkCollector extends LifecycleAdapter implements RecoveryCleanupWorkCollector
class IgnoringRecoveryCleanupWorkCollector extends LifecycleAdapter implements RecoveryCleanupWorkCollector
{
@Override
public void add( CleanupJob job )
{ // no-op
{
job.close();
}
}
}
Expand Up @@ -625,6 +625,21 @@ private void verifyHeaderDataAfterClose( BiConsumer<GBPTree<MutableLong,MutableL
verifyHeader( pageCache, expectedHeader );
}

@Test( timeout = 10_000 )
public void writeHeaderInDirtyTreeMustNotDeadlock() throws Exception
{
PageCache pageCache = createPageCache( 256 );
makeDirty( pageCache );

Consumer<PageCursor> headerWriter = pc -> pc.putBytes( "failed".getBytes() );
try ( GBPTree<MutableLong,MutableLong> index = index( pageCache ).with( RecoveryCleanupWorkCollector.IGNORE ).build() )
{
index.checkpoint( IOLimiter.unlimited(), headerWriter );
}

verifyHeader( pageCache, "failed".getBytes() );
}

private void verifyHeader( PageCache pageCache, byte[] expectedHeader ) throws IOException
{
// WHEN
Expand Down Expand Up @@ -1605,8 +1620,15 @@ public void start() throws Throwable
CleanupJob job;
while ( (job = jobs.poll()) != null )
{
job.run();
startedJobs.add( job );
try
{
job.run();
startedJobs.add( job );
}
finally
{
job.close();
}
}
}

Expand Down Expand Up @@ -1689,7 +1711,12 @@ private void maybeBlock() throws IOException

private void makeDirty() throws IOException
{
try ( GBPTree<MutableLong,MutableLong> index = index().build() )
makeDirty( createPageCache( DEFAULT_PAGE_SIZE ) );
}

private void makeDirty( PageCache pageCache ) throws IOException
{
try ( GBPTree<MutableLong,MutableLong> index = index( pageCache ).build() )
{
// Make dirty
index.writer().close();
Expand Down
Expand Up @@ -184,6 +184,11 @@ public Exception getCause()
return null;
}

@Override
public void close()
{ // no-op
}

@Override
public void run()
{
Expand Down
Expand Up @@ -211,7 +211,7 @@ private void ensureTreeInstantiated() throws IOException
{
if ( tree == null )
{
instantiateTree( RecoveryCleanupWorkCollector.NULL, NO_HEADER_WRITER );
instantiateTree( RecoveryCleanupWorkCollector.IGNORE, NO_HEADER_WRITER );
}
}

Expand Down
Expand Up @@ -200,7 +200,7 @@ public void reportProgressOnNativeIndexPopulation() throws IOException
private NativeLabelScanStore getNativeLabelScanStore( File dir, boolean readOnly )
{
return new NativeLabelScanStore( pageCache, dir, FullStoreChangeStream.EMPTY, readOnly, new Monitors(),
RecoveryCleanupWorkCollector.NULL );
RecoveryCleanupWorkCollector.IGNORE );
}

private void initializeNativeLabelScanStoreWithContent( File dir ) throws IOException
Expand Down

0 comments on commit f5bf7c5

Please sign in to comment.