Skip to content
Browse files

PersistenceWindowPool (and friends) bug fixes and cleanup:

o Proper closing of windows, not just closing for the sake of preventing
marking as in use in all places.
o Proper handover of PersistenceRows to other threads if marked deciding
whether to close or not.
o Simplified state in PersistenceRow and LockableWindow.
o Reimplemented releasing a PersistenceRow to force unlock being the last
operation
o PersistenceRows are marked as in use in the constructor to avoid an
issue where a row could be shared too early
o activeRowWindows map has got Long as key instead of Integer to prevent
wrap-around of big integers.
o Fixed a potential problem in LockableWindow#unLock where a locking
thread might have registered itself more than once in a row preventing
another thread waiting in line to lock to become notified (via interrupt)

Initially we set out to fix BufferOverflow/UnderflowExceptions arising from
closed or contemporary closing PersistenceRow that leaked out by mistake.
In the meantime we cleaned up the code and fixed other problems that the
changes exposed.

co-authors: Alistair Jones, Tobias Lindaaker
  • Loading branch information...
1 parent abb1bf2 commit ee5c630c694226d694f56171ed6dd9a248ea0f49 @tinwelint tinwelint committed Aug 21, 2012
View
7 kernel/src/main/java/org/neo4j/kernel/impl/nioneo/store/AbstractPersistenceWindow.java
@@ -117,13 +117,6 @@ private void writeContents()
}
@Override
- protected synchronized void writeOutAndClose()
- {
- writeContents();
- closed = true;
- }
-
- @Override
public int size()
{
return windowSize;
View
5 kernel/src/main/java/org/neo4j/kernel/impl/nioneo/store/Buffer.java
@@ -72,6 +72,11 @@ public ByteBuffer getBuffer()
{
return buf;
}
+
+ public void reset()
+ {
+ buf.clear();
+ }
/**
* Sets the offset from persistence window position in the underlying byte
View
47 kernel/src/main/java/org/neo4j/kernel/impl/nioneo/store/LockableWindow.java
@@ -31,13 +31,12 @@
*/
abstract class LockableWindow implements PersistenceWindow
{
- private OperationType type = null;
private final FileChannel fileChannel;
private Thread lockingThread = null;
private final LinkedList<LockElement> waitingThreadList =
new LinkedList<LockElement>();
- private int lockCount = 0;
+ private boolean locked;
private int marked = 0;
protected boolean closed;
@@ -56,20 +55,14 @@ FileChannel getFileChannel()
return fileChannel;
}
- OperationType getOperationType()
- {
- return type;
- }
-
/**
* Writes out any changes to the underlying {@link FileChannel} and is then
* considered unusable.
*/
- protected abstract void writeOutAndClose();
-
- void setOperationType( OperationType type )
+ protected final void writeOutAndClose()
{
- this.type = type;
+ force();
+ close();
}
/**
@@ -95,11 +88,11 @@ synchronized boolean markAsInUse()
}
}
- synchronized void lock()
+ synchronized void lock( OperationType operationType )
{
Thread currentThread = Thread.currentThread();
LockElement le = new LockElement( currentThread );
- while ( lockCount > 0 && lockingThread != currentThread )
+ while ( locked && lockingThread != currentThread )
{
waitingThreadList.addFirst( le );
try
@@ -111,7 +104,7 @@ synchronized void lock()
Thread.interrupted();
}
}
- lockCount++;
+ locked = true;
lockingThread = currentThread;
le.movedOn = true;
marked--;
@@ -120,34 +113,34 @@ synchronized void lock()
synchronized void unLock()
{
Thread currentThread = Thread.currentThread();
- if ( lockCount == 0 )
+ if ( !locked )
{
throw new LockException( "" + currentThread
+ " don't have window lock on " + this );
}
- lockCount--;
- if ( lockCount == 0 )
+ locked = false;
+ lockingThread = null;
+ while ( !waitingThreadList.isEmpty() )
{
- lockingThread = null;
- if ( waitingThreadList.size() > 0 )
+ LockElement le = waitingThreadList.removeLast();
+ if ( !le.movedOn )
{
- LockElement le = waitingThreadList.removeLast();
- if ( !le.movedOn )
- {
- le.thread.interrupt();
- }
+ le.thread.interrupt();
+ break;
}
}
}
- synchronized boolean isFree()
+ private boolean isFree( boolean assumingOwnerUnlockedIt )
{
- return lockCount == 0 && marked == 0;
+ return assumingOwnerUnlockedIt ?
+ marked == 0 : // excluding myself (the owner) no other must have marked this window
+ marked == 0 && !locked; // no one must have this marked and it mustn't be locked
}
synchronized boolean writeOutAndCloseIfFree( boolean readOnly )
{
- if ( isFree() )
+ if ( isFree( lockingThread == Thread.currentThread() ) )
{
if ( !readOnly )
writeOutAndClose();
View
10 kernel/src/main/java/org/neo4j/kernel/impl/nioneo/store/MappedPersistenceWindow.java
@@ -103,15 +103,6 @@ public boolean equals( Object o )
return position() == ((MappedPersistenceWindow) o).position();
}
- @Override
- protected synchronized void writeOutAndClose()
- {
- ((java.nio.MappedByteBuffer) buffer.getBuffer()).force();
- buffer.close();
- position = -1;
- closed = true;
- }
-
private volatile int hashCode = 0;
@Override
@@ -135,6 +126,7 @@ public String toString()
public synchronized void close()
{
buffer.close();
+ position = -1;
closed = true;
}
View
105 kernel/src/main/java/org/neo4j/kernel/impl/nioneo/store/PersistenceRow.java
@@ -31,6 +31,7 @@
*/
class PersistenceRow extends LockableWindow
{
+ private State bufferState = State.EMPTY;
private int recordSize = -1;
private final long position;
private final Buffer buffer;
@@ -45,7 +46,31 @@
this.position = position;
this.recordSize = recordSize;
this.buffer = new Buffer( this, ByteBuffer.allocate( recordSize ) );
- // this.buffer.setByteBuffer( ByteBuffer.allocate( recordSize ) );
+ markAsInUse();
+ }
+
+ @Override
+ void lock( OperationType operationType )
+ {
+ super.lock( operationType );
+ boolean success = false;
+ try
+ {
+ bufferState = bufferState.transition( operationType, this );
+ success = true;
+ }
+ finally
+ {
+ if ( !success )
+ {
+ unLock();
+ }
+ }
+ }
+
+ public boolean isDirty()
+ {
+ return bufferState == State.DIRTY;
}
@Override
@@ -77,27 +102,61 @@ public long position()
return position;
}
+ private static enum State
+ {
+ EMPTY
+ {
+ @Override
+ State transition( OperationType operationType, PersistenceRow persistenceRow )
+ {
+ switch ( operationType)
+ {
+ case READ:
+ persistenceRow.readFullWindow();
+ return CLEAN;
+ case WRITE:
+ return DIRTY;
+ default:
+ throw new IllegalStateException( "Unknown operation type: " + operationType );
+ }
+ }
+ },
+ CLEAN
+ {
+ @Override
+ State transition( OperationType operationType, PersistenceRow persistenceRow )
+ {
+ switch ( operationType)
+ {
+ case READ:
+ return CLEAN;
+ case WRITE:
+ return DIRTY;
+ default:
+ throw new IllegalStateException( "Unknown operation type: " + operationType );
+ }
+ }
+ },
+ DIRTY
+ {
+ @Override
+ State transition( OperationType operationType, PersistenceRow persistenceRow )
+ {
+ return DIRTY;
+ }
+ };
+
+ abstract State transition( OperationType operationType, PersistenceRow persistenceRow );
+ }
+
void readFullWindow()
{
try
{
- // long fileSize = getFileChannel().size();
- // long recordCount = fileSize / recordSize;
- // possible last element not written completely, therefore if
- // fileSize % recordSize can be non 0 and we check > instead of >=
- // if ( position > recordCount )
- // {
- // use new buffer since it will contain only zeros
- // return;
- // }
ByteBuffer byteBuffer = buffer.getBuffer();
byteBuffer.clear();
- int count = getFileChannel().read( byteBuffer,
- position * recordSize );
-// if ( position < recordCount )
-// {
-// assert count == recordSize;
-// }
+ int count = getFileChannel().read( byteBuffer, position * recordSize );
+ assert count == recordSize;
byteBuffer.clear();
}
catch ( IOException e )
@@ -110,7 +169,7 @@ void readFullWindow()
private void writeContents()
{
ByteBuffer byteBuffer = buffer.getBuffer();
- if ( getOperationType() == OperationType.WRITE )
+ if ( isDirty() )
{
byteBuffer.clear();
try
@@ -127,13 +186,6 @@ private void writeContents()
}
byteBuffer.clear();
}
-
- @Override
- protected synchronized void writeOutAndClose()
- {
- writeContents();
- closed = true;
- }
@Override
public int size()
@@ -175,4 +227,9 @@ public synchronized void close()
buffer.close();
closed = true;
}
+
+ public void reset()
+ {
+ buffer.reset();
+ }
}
View
63 kernel/src/main/java/org/neo4j/kernel/impl/nioneo/store/PersistenceWindowPool.java
@@ -52,8 +52,8 @@
// == recordSize
private final int blockSize;
private FileChannel fileChannel;
- private final ConcurrentMap<Integer,PersistenceRow> activeRowWindows =
- new ConcurrentHashMap<Integer,PersistenceRow>();
+ private final ConcurrentMap<Long,PersistenceRow> activeRowWindows =
+ new ConcurrentHashMap<Long,PersistenceRow>();
private long availableMem = 0;
private long memUsed = 0;
private int brickCount = 0;
@@ -126,7 +126,6 @@ public PersistenceWindow acquire( long position, OperationType operationType )
{
refreshBricks();
}
- boolean readFullRow = false;
while ( window == null )
{
if ( brickSize > 0 )
@@ -159,12 +158,9 @@ public PersistenceWindow acquire( long position, OperationType operationType )
miss++;
brickMiss++;
- if ( operationType == OperationType.READ )
- readFullRow = true;
-
// Lock-free implementation of instantiating an active window for this position
// See if there's already an active window for this position
- PersistenceRow dpw = activeRowWindows.get( (int) position );
+ PersistenceRow dpw = activeRowWindows.get( position );
if ( dpw != null && dpw.markAsInUse() )
{ // ... there was and we managed to mark it as in use
window = dpw;
@@ -175,48 +171,28 @@ public PersistenceWindow acquire( long position, OperationType operationType )
// closed right before we managed to mark it as in use.
// Either way instantiate a new active window for this position
dpw = new PersistenceRow( position, blockSize, fileChannel );
- PersistenceRow existing = activeRowWindows.putIfAbsent( (int) position, dpw );
- if ( existing == null && dpw.markAsInUse() )
+ PersistenceRow existing = activeRowWindows.putIfAbsent( position, dpw );
+ if ( existing == null )
{
- // No one else made it here before us. We also marked it as in use
- // before anyone else potentially acquired and released it
+ // No other thread managed to create an active window for
+ // this position before us.
window = dpw;
}
else
{
- // Someone else put it there before us, or managed to get and release it
- // after we put it but before we marked it. Close this row
- // which was unnecessarily opened.
+ // Someone else put it there before us. Close this row
+ // which was unnecessarily opened. The next go in this loop
+ // will get that one instead.
dpw.close();
- readFullRow = false;
}
}
else
{
hit++;
}
}
-
- window.lock();
- if ( readFullRow )
- {
- PersistenceRow dpw = (PersistenceRow) window;
- boolean success = false;
- try
- {
- dpw.readFullWindow();
- success = true;
- }
- finally
- {
- if ( !success )
- {
- activeRowWindows.remove( (int) dpw.position() );
- window.unLock();
- }
- }
- }
- window.setOperationType( operationType );
+
+ window.lock( operationType );
return window;
}
@@ -250,19 +226,22 @@ public void release( PersistenceWindow window )
if ( window instanceof PersistenceRow )
{
PersistenceRow dpw = (PersistenceRow) window;
- dpw.writeOutAndClose();
// If the corresponding window has been instantiated while we had
// this active row we need to hand over the changes to that
// window if the window isn't memory mapped.
- if ( brickSize > 0 && dpw.getOperationType() == OperationType.WRITE )
+ if ( brickSize > 0 && dpw.isDirty() )
applyChangesToWindowIfNecessary( dpw );
- dpw.unLock();
- if ( dpw.isFree() )
+ if ( dpw.writeOutAndCloseIfFree( readOnly ) )
{
- activeRowWindows.remove( (int) dpw.position(), dpw );
+ activeRowWindows.remove( dpw.position(), dpw );
}
+ else
+ {
+ dpw.reset();
+ }
+ dpw.unLock();
}
else
{
@@ -280,7 +259,7 @@ private void applyChangesToWindowIfNecessary( PersistenceRow dpw )
{
// There is a non-mapped brick window here, let's have it
// know about my changes.
- existingBrickWindow.lock();
+ existingBrickWindow.lock( OperationType.WRITE );
try
{
existingBrickWindow.acceptContents( dpw );
View
19 kernel/src/test/java/org/neo4j/kernel/impl/nioneo/store/MappedPersistenceWindowTest.java
@@ -24,6 +24,8 @@
import org.junit.Rule;
import org.junit.Test;
+import org.neo4j.test.OtherThreadExecutor;
+import org.neo4j.test.OtherThreadExecutor.WorkerCommand;
import org.neo4j.test.ResourceCollection;
import org.neo4j.test.TargetDirectory;
@@ -77,16 +79,27 @@ public void shouldNotCloseLockedWindow() throws Exception
// given
String filename = new File( directory.directory(), "mapped.file" ).getAbsolutePath();
RandomAccessFile file = resources.add( new RandomAccessFile( filename, "rw" ) );
- MappedPersistenceWindow window = new MappedPersistenceWindow( 0, 8, 16, file.getChannel(), READ_WRITE );
+ final MappedPersistenceWindow window = new MappedPersistenceWindow( 0, 8, 16, file.getChannel(), READ_WRITE );
window.markAsInUse();
- window.lock();
+ OtherThreadExecutor<Void> executor = new OtherThreadExecutor<Void>(null);
+ executor.execute( new WorkerCommand<Void, Void>()
+ {
+ @Override
+ public Void doWork( Void state )
+ {
+ window.lock( OperationType.WRITE );
+ return null;
+ }
+ } );
// when
boolean wasClosed = window.writeOutAndCloseIfFree( false );
// then
assertFalse( wasClosed );
+
+ executor.shutdown();
}
@Test
@@ -98,7 +111,7 @@ public void shouldCloseReleasedWindow() throws Exception
MappedPersistenceWindow window = new MappedPersistenceWindow( 0, 8, 16, file.getChannel(), READ_WRITE );
window.markAsInUse();
- window.lock();
+ window.lock( OperationType.WRITE );
window.unLock();
// when
View
86 kernel/src/test/java/org/neo4j/kernel/impl/nioneo/store/PersistenceWindowPoolTest.java
@@ -19,16 +19,23 @@
*/
package org.neo4j.kernel.impl.nioneo.store;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+import static org.neo4j.helpers.Exceptions.launderedException;
+
import java.io.File;
import java.io.RandomAccessFile;
+import java.util.concurrent.Future;
import org.junit.Rule;
import org.junit.Test;
+import org.neo4j.test.OtherThreadExecutor;
+import org.neo4j.test.OtherThreadExecutor.WorkerCommand;
import org.neo4j.test.ResourceCollection;
import org.neo4j.test.TargetDirectory;
-import static org.junit.Assert.assertNotSame;
-
public class PersistenceWindowPoolTest
{
private static final TargetDirectory target = TargetDirectory.forTest( MappedPersistenceWindowTest.class );
@@ -54,4 +61,79 @@ public void shouldBeAbleToReAcquireReleasedWindow() throws Exception
// then
assertNotSame( initialWindow, window );
}
+
+ @Test
+ public void handOverDirtyPersistenceRowToReaderShouldWriteWhenClosing() throws Exception
+ {
+ String filename = new File( target.graphDbDir( true ), "dirty" ).getAbsolutePath();
+ RandomAccessFile file = resources.add( new RandomAccessFile( filename, "rw" ) );
+ final int blockSize = 8;
+ final PersistenceWindowPool pool = new PersistenceWindowPool( "test.store", blockSize, file.getChannel(), 0, false, false );
+
+ // The gist:
+ // T1 acquires position 0 as WRITE
+ // T2 would like to acquire position 0 as READ, marks it and goes to wait in lock()
+ // T1 writes stuff to the buffer and releases it
+ // T2 gets the PR handed over from T1, reads and verifies that it got the changes made by T1
+ // T2 releases it
+ // Verify that what T1 wrote is on disk
+
+ final PersistenceWindow t1Row = pool.acquire( 0, OperationType.WRITE );
+ OtherThreadExecutor<Void> otherThread = new OtherThreadExecutor<Void>( null );
+ Future<Throwable> future = otherThread.executeDontWait( new WorkerCommand<Void, Throwable>()
+ {
+ @Override
+ public Throwable doWork( Void state )
+ {
+ PersistenceWindow t2Row = pool.acquire( 0, OperationType.READ ); // Will block until t1Row is released.
+ try
+ {
+ assertTrue( t1Row == t2Row );
+ assertBufferContents( blockSize, t2Row );
+ return null;
+ }
+ catch ( Throwable t )
+ {
+ return t;
+ }
+ finally
+ {
+ pool.release( t2Row );
+ }
+ }
+ } );
+ try
+ {
+ writeBufferContents( blockSize, t1Row );
+ otherThread.waitUntilWaiting();
+ }
+ finally
+ {
+ pool.release( t1Row );
+ }
+ Throwable failure = future.get();
+ if ( failure != null )
+ throw launderedException( failure );
+
+ PersistenceWindow row = pool.acquire( 0, OperationType.READ );
+ assertFalse( t1Row == row );
+ assertBufferContents( blockSize, row );
+
+ pool.close();
+ otherThread.shutdown();
+ }
+
+ private void writeBufferContents( final int blockSize, final PersistenceWindow t1Row )
+ {
+ Buffer buffer = t1Row.getBuffer();
+ for ( int i = 0; i < blockSize; i++ )
+ buffer.put( (byte) i );
+ }
+
+ private void assertBufferContents( final int blockSize, PersistenceWindow row )
+ {
+ Buffer buffer = row.getBuffer();
+ for ( int i = 0; i < blockSize; i++ )
+ assertEquals( (byte)i, buffer.get() );
+ }
}

0 comments on commit ee5c630

Please sign in to comment.
Something went wrong with that request. Please try again.