Skip to content

Commit

Permalink
Cleaner lifecycle of GBPTree.Writer
Browse files Browse the repository at this point in the history
Initialize SingleWriter now always leave writer in a fully
consistent state, meaning no additional cleanup is
necessary from the outside. In all cases when writer is
not successfully initialized, some exception will be thrown.
  • Loading branch information
burqen committed Apr 24, 2017
1 parent dfd202b commit 53b3892
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 60 deletions.
Expand Up @@ -237,12 +237,6 @@ default void startupState( boolean clean )
*/ */
private final Lock writerCheckpointMutex = new ReentrantLock(); private final Lock writerCheckpointMutex = new ReentrantLock();


/**
* Currently an index only supports one concurrent writer and so this boolean will act as
* guard so that only one thread can have it at any given time.
*/
private final AtomicBoolean writerTaken = new AtomicBoolean();

/** /**
* Page size, i.e. tree node size, of the tree nodes in this tree. The page size is determined on * Page size, i.e. tree node size, of the tree nodes in this tree. The page size is determined on
* tree creation, stored in meta page and read when opening tree later. * tree creation, stored in meta page and read when opening tree later.
Expand Down Expand Up @@ -790,8 +784,6 @@ public void close() throws IOException
return; return;
} }


// Force close on writer to not risk deadlock on pagedFile.close()
writer.close();
if ( !changesSinceLastCheckpoint ) if ( !changesSinceLastCheckpoint )
{ {
clean = true; clean = true;
Expand All @@ -818,39 +810,9 @@ public void close() throws IOException
*/ */
public Writer<KEY,VALUE> writer() throws IOException public Writer<KEY,VALUE> writer() throws IOException
{ {
if ( !writerTaken.compareAndSet( false, true ) ) writer.initialize();
{ changesSinceLastCheckpoint = true;
throw new IllegalStateException( "Writer in " + this + " is already acquired by someone else. " + return writer;
"Only a single writer is allowed. The writer will become available as soon as " +
"acquired writer is closed" );
}

writerCheckpointMutex.lock();
boolean success = false;
try
{
writer.take();
success = true;
changesSinceLastCheckpoint = true;
return writer;
}
finally
{
if ( !success )
{
releaseWriter();
}
}
}

private void releaseWriter()
{
writerCheckpointMutex.unlock();
if ( !writerTaken.compareAndSet( true, false ) )
{
throw new IllegalStateException( "Tried to give back the writer of " + this +
", but somebody else already did" );
}
} }


private void setRoot( long rootId, long rootGeneration ) private void setRoot( long rootId, long rootGeneration )
Expand Down Expand Up @@ -961,6 +923,11 @@ private TreeInconsistencyException appendTreeInformation( TreeInconsistencyExcep


private class SingleWriter implements Writer<KEY,VALUE> private class SingleWriter implements Writer<KEY,VALUE>
{ {
/**
* Currently an index only supports one concurrent writer and so this boolean will act as
* guard so that only one writer ever exist.
*/
private final AtomicBoolean writerTaken = new AtomicBoolean();
private final InternalTreeLogic<KEY,VALUE> treeLogic; private final InternalTreeLogic<KEY,VALUE> treeLogic;
private final StructurePropagation<KEY> structurePropagation; private final StructurePropagation<KEY> structurePropagation;
private PageCursor cursor; private PageCursor cursor;
Expand All @@ -970,27 +937,64 @@ private class SingleWriter implements Writer<KEY,VALUE>
private long stableGeneration; private long stableGeneration;
private long unstableGeneration; private long unstableGeneration;



SingleWriter( InternalTreeLogic<KEY,VALUE> treeLogic ) SingleWriter( InternalTreeLogic<KEY,VALUE> treeLogic )
{ {
this.structurePropagation = new StructurePropagation<>( layout.newKey(), layout.newKey(), layout.newKey() ); this.structurePropagation = new StructurePropagation<>( layout.newKey(), layout.newKey(), layout.newKey() );
this.treeLogic = treeLogic; this.treeLogic = treeLogic;
} }


void take() throws IOException /**
* When leaving initialize, writer should be in a fully consistent state.
* <p>
* Either fully initialized:
* <ul>
* <li>{@link #writerTaken} - true</li>
* <li>{@link #writerCheckpointMutex} - locked</li>
* <li>{@link #cursor} - not null</li>
* </ul>
* Of fully closed:
* <ul>
* <li>{@link #writerTaken} - false</li>
* <li>{@link #writerCheckpointMutex} - unlocked</li>
* <li>{@link #cursor} - null</li>
* </ul>
*
* @throws IOException if fail to open {@link PageCursor}
*/
void initialize() throws IOException
{ {
cursor = openRootCursor( PagedFile.PF_SHARED_WRITE_LOCK ); if ( !writerTaken.compareAndSet( false, true ) )
stableGeneration = stableGeneration( generation ); {
unstableGeneration = unstableGeneration( generation ); throw new IllegalStateException( "Writer in " + this + " is already acquired by someone else. " +
"Only a single writer is allowed. The writer will become available as soon as " +
"acquired writer is closed" );
}

boolean success = false;
writerCheckpointMutex.lock();
try try
{ {
cursor = openRootCursor( PagedFile.PF_SHARED_WRITE_LOCK );
stableGeneration = stableGeneration( generation );
unstableGeneration = unstableGeneration( generation );
PointerChecking.assertNoSuccessor( cursor, stableGeneration, unstableGeneration ); PointerChecking.assertNoSuccessor( cursor, stableGeneration, unstableGeneration );
treeLogic.initialize( cursor );
success = true;
} }
catch ( TreeInconsistencyException e ) catch ( TreeInconsistencyException e )
{ {
closeCursor();
throw appendTreeInformation( e ); throw appendTreeInformation( e );
} }
treeLogic.initialize( cursor ); finally
{
if ( !success )
{
closeCursor();
writerTaken.set( false );
writerCheckpointMutex.unlock();
}
}
} }


@Override @Override
Expand Down Expand Up @@ -1070,19 +1074,22 @@ public VALUE remove( KEY key ) throws IOException
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
if ( cursor == null ) if ( !writerTaken.compareAndSet( true, false ) )
{ {
return; throw new IllegalStateException( "Tried to close writer of " + GBPTree.this +
", but writer is already closed." );
} }

closeCursor(); closeCursor();
releaseWriter(); writerCheckpointMutex.unlock();
} }


private void closeCursor() private void closeCursor()
{ {
cursor.close(); if ( cursor != null )
cursor = null; {
cursor.close();
cursor = null;
}
} }
} }
} }
Expand Up @@ -67,6 +67,7 @@
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
Expand Down Expand Up @@ -396,6 +397,8 @@ public void shouldReturnNoResultsOnEmptyIndex() throws Exception
} }
} }


/* Lifecycle tests */

@Test @Test
public void shouldNotBeAbleToAcquireModifierTwice() throws Exception public void shouldNotBeAbleToAcquireModifierTwice() throws Exception
{ {
Expand All @@ -415,12 +418,15 @@ public void shouldNotBeAbleToAcquireModifierTwice() throws Exception
// THEN good // THEN good
} }


// Should be able to close old writer
writer.close(); writer.close();
// And open and closing a new one
index.writer().close();
} }
} }


@Test @Test
public void shouldAllowClosingWriterMultipleTimes() throws Exception public void shouldNotAllowClosingWriterMultipleTimes() throws Exception
{ {
// GIVEN // GIVEN
try ( GBPTree<MutableLong,MutableLong> index = index().build() ) try ( GBPTree<MutableLong,MutableLong> index = index().build() )
Expand All @@ -429,13 +435,73 @@ public void shouldAllowClosingWriterMultipleTimes() throws Exception
writer.put( new MutableLong( 0 ), new MutableLong( 1 ) ); writer.put( new MutableLong( 0 ), new MutableLong( 1 ) );
writer.close(); writer.close();


try
{
// WHEN
writer.close();
fail( "Should have failed" );
}
catch ( IllegalStateException e )
{
// THEN
assertThat( e.getMessage(), containsString( "already closed" ) );
}
}
}

@Test
public void failureDuringInitializeWriterShouldNotFailNextInitialize() throws Exception
{
// GIVEN
IOException no = new IOException( "No" );
AtomicBoolean throwOnNextIO = new AtomicBoolean();
PageCache controlledPageCache = pageCacheThatThrowOnIOWhenToldTo( no, throwOnNextIO );
try ( GBPTree<MutableLong, MutableLong> index = index().with( controlledPageCache ).build() )
{
// WHEN // WHEN
writer.close(); assert throwOnNextIO.compareAndSet( false, true );
try ( Writer<MutableLong, MutableLong> ignored = index.writer() )
{
fail( "Expected to throw" );
}
catch ( IOException e )
{
assertSame( no, e );
}


// THEN that should be OK // THEN
try ( Writer<MutableLong, MutableLong> writer = index.writer() )
{
writer.put( new MutableLong( 1 ), new MutableLong( 1 ) );
}
} }
} }


private PageCache pageCacheThatThrowOnIOWhenToldTo( final IOException e, final AtomicBoolean throwOnNextIO )
{
return new DelegatingPageCache( createPageCache( 256 ) )
{
@Override
public PagedFile map( File file, int pageSize, OpenOption... openOptions ) throws IOException
{
return new DelegatingPagedFile( super.map( file, pageSize, openOptions ) )
{
@Override
public PageCursor io( long pageId, int pf_flags ) throws IOException
{
if ( throwOnNextIO.get() )
{
throwOnNextIO.set( false );
assert e != null;
throw e;
}
return super.io( pageId, pf_flags );
}
};
}
};
}

@Test @Test
public void shouldAllowClosingTreeMultipleTimes() throws Exception public void shouldAllowClosingTreeMultipleTimes() throws Exception
{ {
Expand Down Expand Up @@ -546,7 +612,7 @@ private void verifyHeaderDataAfterClose( BiConsumer<GBPTree<MutableLong,MutableL


/* Check-pointing tests */ /* Check-pointing tests */


@Test @Test( timeout = 5_000L )
public void checkPointShouldLockOutWriter() throws Exception public void checkPointShouldLockOutWriter() throws Exception
{ {
// GIVEN // GIVEN
Expand Down Expand Up @@ -575,7 +641,7 @@ public void checkPointShouldLockOutWriter() throws Exception
} }
} }


@Test @Test( timeout = 5_000L )
public void checkPointShouldWaitForWriter() throws Exception public void checkPointShouldWaitForWriter() throws Exception
{ {
// GIVEN // GIVEN
Expand All @@ -602,7 +668,7 @@ public void checkPointShouldWaitForWriter() throws Exception
} }
} }


@Test @Test( timeout = 5_000L )
public void closeShouldLockOutWriter() throws Exception public void closeShouldLockOutWriter() throws Exception
{ {
// GIVEN // GIVEN
Expand Down Expand Up @@ -667,7 +733,7 @@ public void close() throws IOException
}; };
} }


@Test @Test( timeout = 5_000L )
public void closeShouldWaitForWriter() throws Exception public void closeShouldWaitForWriter() throws Exception
{ {
// GIVEN // GIVEN
Expand Down

0 comments on commit 53b3892

Please sign in to comment.