Skip to content

Commit

Permalink
Merge pull request #8540 from chrisvest/3.2-pc-javadocs
Browse files Browse the repository at this point in the history
Simplify write page cursor usage
  • Loading branch information
burqen committed Jan 2, 2017
2 parents 5426e15 + f86bd5d commit c5e4ac0
Show file tree
Hide file tree
Showing 15 changed files with 228 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,12 @@ public void zapPage()
{
Arrays.fill( buffer.array(), (byte) 0 );
}

@Override
public boolean isWriteLocked()
{
// Because we allow writes; they can't possibly conflict because this class is meant to be used by only one
// thread at a time anyway.
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -381,4 +381,10 @@ public void zapPage()
{
current.zapPage();
}

@Override
public boolean isWriteLocked()
{
return current == null || current.isWriteLocked();
}
}
28 changes: 23 additions & 5 deletions community/io/src/main/java/org/neo4j/io/pagecache/PageCursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,17 @@ public abstract class PageCursor implements AutoCloseable
public abstract void close();

/**
* Returns true if the page has entered an inconsistent state since the
* last call to next() or shouldRetry().
* If this method returns true, the in-page offset of the cursor will be
* reset to zero.
* Returns true if the page has entered an inconsistent state since the last call to next() or shouldRetry().
* <p>
* If this method returns true, the in-page offset of the cursor will be reset to zero.
* <p>
* Note that {@link PagedFile#PF_SHARED_WRITE_LOCK write locked} cursors never conflict with each other, nor with
* eviction, and thus technically don't require a {@code shouldRetry} check. This method always returns
* {@code false} for write-locking cursors.
* <p>
* Cursors that are {@link PagedFile#PF_SHARED_READ_LOCK read locked} must <em>always</em> perform their reads in a
* {@code shouldRetry} loop, and avoid interpreting the data they read until {@code shouldRetry} has confirmed that
* the reads were consistent.
*
* @throws IOException If the page was evicted while doing IO, the cursor will have
* to do a page fault to get the page back.
Expand All @@ -281,6 +288,10 @@ public abstract class PageCursor implements AutoCloseable
* <p>
* If the length reaches beyond the end of either cursor, then only as many bytes as are available in this cursor,
* or can fit in the target cursor, are actually copied.
* <p>
* <strong>Note</strong> that {@code copyTo} is only guaranteed to work when both target and source cursor are from
* the <em>same</em> page cache implementation. Using wrappers, delegates or mixing cursor implementations may
* produce unspecified errors.
*
* @param sourceOffset The offset into this page to copy from.
* @param targetCursor The cursor the data will be copied to.
Expand Down Expand Up @@ -319,7 +330,8 @@ public abstract class PageCursor implements AutoCloseable
* message, unless the error has gotten cleared by a {@link #shouldRetry()} call that returned {@code true},
* a call to {@link #next()} or {@link #next(long)}, or the cursor is closed.
*
* @param message The message of the {@link CursorException} that {@link #checkAndClearCursorException()} will throw.
* @param message The message of the {@link CursorException} that {@link #checkAndClearCursorException()} will
* throw.
*/
public abstract void setCursorException( String message );

Expand All @@ -346,4 +358,10 @@ public abstract class PageCursor implements AutoCloseable
* Sets all bytes in this page to zero, as if this page was newly allocated at the end of the file.
*/
public abstract void zapPage();

/**
* @return {@code true} if this page cursor was opened with {@link PagedFile#PF_SHARED_WRITE_LOCK},
* {@code false} otherwise.
*/
public abstract boolean isWriteLocked();
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public interface PagedFile extends AutoCloseable
* Note that write locks are <em>not</em> exclusive. You must use other means to coordinate access to the data on
* the pages. The write lock only means that the page will not be concurrently evicted.
* <p>
* Note also that write locks exclude eviction. So since we can assume that write locks never make conflicting
* modifications (higher level locks should ensure this), it is safe to perform page writes without a
* {@link PageCursor#shouldRetry() shouldRetry} loop. The {@code shouldRetry} method on write locking cursors
* always returns {@code false}.
* <p>
* This cannot be combined with {@link #PF_SHARED_READ_LOCK}.
*/
int PF_SHARED_WRITE_LOCK = 1 << 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,12 @@ public void zapPage()
second.zapPage();
}

@Override
public boolean isWriteLocked()
{
return first.isWriteLocked() && second.isWriteLocked();
}

/**
* Build a CompositePageCursor that is a view of the first page cursor from its current offset through the given
* length, concatenated with the second cursor from its current offset through the given length. The offsets are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,12 @@ public void zapPage()
delegate.zapPage();
}

@Override
public boolean isWriteLocked()
{
return delegate.isWriteLocked();
}

public DelegatingPageCursor( PageCursor delegate )
{
this.delegate = delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,11 @@ public int write( ByteBuffer src ) throws IOException
}
bytesLeftInCurrentPage = cursor.getCurrentPageSize();
}
int position = src.position();
int remaining = Math.min( src.remaining(), bytesLeftInCurrentPage );
int offset = cursor.getOffset();
do
for ( int i = 0; i < remaining; i++ )
{
src.position( position );
cursor.setOffset( offset );
for ( int i = 0; i < remaining; i++ )
{
cursor.putByte( src.get() );
}
cursor.putByte( src.get() );
}
while ( cursor.shouldRetry() );
bytesLeftInCurrentPage -= remaining;
return remaining;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.neo4j.io.pagecache.tracing.PinEvent;
import org.neo4j.unsafe.impl.internal.dragons.UnsafeUtil;

import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_WRITE_LOCK;
import static org.neo4j.unsafe.impl.internal.dragons.FeatureToggles.flag;

abstract class MuninnPageCursor extends PageCursor
Expand Down Expand Up @@ -791,4 +792,10 @@ public void zapPage()
UnsafeUtil.setMemory( pointer, pageSize, (byte) 0 );
}
}

@Override
public boolean isWriteLocked()
{
return (pf_flags & PF_SHARED_WRITE_LOCK) == PF_SHARED_WRITE_LOCK;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,12 @@ public void zapPage()
throw new IllegalStateException( "Cannot write using read cursor" );
}

@Override
public boolean isWriteLocked()
{
return delegate.isWriteLocked();
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,10 @@ public void zapPage()
{
delegate.zapPage();
}

@Override
public boolean isWriteLocked()
{
return true;
}
}
121 changes: 51 additions & 70 deletions community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,7 @@ public void writesFlushedFromPageFileMustBeExternallyObservable() throws IOExcep
{
while ( cursor.getCurrentPageId() < endPageId && cursor.next() )
{
do
{
writeRecords( cursor );
} while ( cursor.shouldRetry() );
writeRecords( cursor );
}
}

Expand Down Expand Up @@ -445,10 +442,7 @@ public void writesFlushedFromPageCacheMustBeExternallyObservable() throws IOExce
{
while ( cursor.getCurrentPageId() < endPageId && cursor.next() )
{
do
{
writeRecords( cursor );
} while ( cursor.shouldRetry() );
writeRecords( cursor );
}
} // closing the PagedFile implies flushing because it was the last reference

Expand Down Expand Up @@ -726,10 +720,7 @@ public void dirtyPagesMustBeFlushedWhenTheCacheIsClosed() throws IOException
{
while ( cursor.getCurrentPageId() < endPageId && cursor.next() )
{
do
{
writeRecords( cursor );
} while ( cursor.shouldRetry() );
writeRecords( cursor );
}
}
finally
Expand Down Expand Up @@ -953,39 +944,29 @@ public void nextMustResetTheCursorOffset() throws IOException
try ( PageCursor cursor = pagedFile.io( 0L, PF_SHARED_WRITE_LOCK ) )
{
assertTrue( cursor.next() );
do
{
cursor.setOffset( 0 );
cursor.putByte( (byte) 1 );
cursor.putByte( (byte) 2 );
cursor.putByte( (byte) 3 );
cursor.putByte( (byte) 4 );
} while ( cursor.shouldRetry() );
cursor.setOffset( 0 );
cursor.putByte( (byte) 1 );
cursor.putByte( (byte) 2 );
cursor.putByte( (byte) 3 );
cursor.putByte( (byte) 4 );

assertTrue( cursor.next() );
do
{
cursor.setOffset( 0 );
cursor.putByte( (byte) 5 );
cursor.putByte( (byte) 6 );
cursor.putByte( (byte) 7 );
cursor.putByte( (byte) 8 );
} while ( cursor.shouldRetry() );
cursor.setOffset( 0 );
cursor.putByte( (byte) 5 );
cursor.putByte( (byte) 6 );
cursor.putByte( (byte) 7 );
cursor.putByte( (byte) 8 );
}

try ( PageCursor cursor = pagedFile.io( 0L, PF_SHARED_WRITE_LOCK ) )
{
byte[] bytes = new byte[4];
assertTrue( cursor.next() );
do
{
cursor.getBytes( bytes );
} while ( cursor.shouldRetry() );
cursor.getBytes( bytes );
assertThat( bytes, byteArray( new byte[]{1, 2, 3, 4} ) );

assertTrue( cursor.next() );
do
{
cursor.getBytes( bytes );
} while ( cursor.shouldRetry() );
cursor.getBytes( bytes );
assertThat( bytes, byteArray( new byte[]{5, 6, 7, 8} ) );
}
pagedFile.close();
Expand Down Expand Up @@ -1521,10 +1502,7 @@ public void newlyWrittenPagesMustBeAccessibleWithNoGrow() throws IOException
for ( int i = 0; i < pagesToAdd; i++ )
{
assertTrue( cursor.next() );
do
{
writeRecords( cursor );
} while ( cursor.shouldRetry() );
writeRecords( cursor );
}
}

Expand Down Expand Up @@ -1596,12 +1574,7 @@ public void retryMustResetCursorOffset() throws Exception
{
if ( cursor.next() )
{
do
{
cursor.putByte( expectedByte );
// Give some hint to scheduler to give some CPU cycles to the read thread below
Thread.yield();
} while ( cursor.shouldRetry() );
cursor.putByte( expectedByte );
}
}

Expand All @@ -1613,11 +1586,8 @@ public void retryMustResetCursorOffset() throws Exception
{
if ( cursor.next() )
{
do
{
cursor.setOffset( recordSize );
cursor.putByte( (byte) 14 );
} while ( cursor.shouldRetry() );
cursor.setOffset( recordSize );
cursor.putByte( (byte) 14 );
}
startLatch.countDown();
}
Expand Down Expand Up @@ -1705,20 +1675,11 @@ public void pagesAddedWithNextWithPageIdMustBeAccessibleWithNoGrowSpecified() th
try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) )
{
assertTrue( cursor.next( 2 ) );
do
{
writeRecords( cursor );
} while ( cursor.shouldRetry() );
writeRecords( cursor );
assertTrue( cursor.next( 0 ) );
do
{
writeRecords( cursor );
} while ( cursor.shouldRetry() );
writeRecords( cursor );
assertTrue( cursor.next( 1 ) );
do
{
writeRecords( cursor );
} while ( cursor.shouldRetry() );
writeRecords( cursor );
}

try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK | PF_NO_GROW ) )
Expand Down Expand Up @@ -2102,13 +2063,11 @@ public void evictionMustFlushPagesToTheRightFiles() throws IOException
cursorReady2 = cursor.next();
if ( cursorReady2 )
{
do {
for ( int i = 0; i < filePageSize2; i++ )
{
cursor.putByte( (byte) 'b' );
}
for ( int i = 0; i < filePageSize2; i++ )
{
cursor.putByte( (byte) 'b' );
}
while ( cursor.shouldRetry() );
assertFalse( cursor.shouldRetry() );
}
pageId2++;
}
Expand Down Expand Up @@ -4615,7 +4574,7 @@ public void openingLinkedCursorOnClosedCursorMustThrow() throws Exception
// all good
}

PageCursor reader = pf.io( 0, PF_SHARED_WRITE_LOCK );
PageCursor reader = pf.io( 0, PF_SHARED_READ_LOCK );
assertTrue( reader.next() );
reader.close();
try
Expand Down Expand Up @@ -5420,4 +5379,26 @@ public void shouldZeroAllBytesOnClear() throws Exception
}
}
}

@Test
public void isWriteLockingMustBeTrueForCursorOpenedWithSharedWriteLock() throws Exception
{
configureStandardPageCache();
try ( PagedFile pf = pageCache.map( file( "a" ), filePageSize );
PageCursor cursor = pf.io( 0, PF_SHARED_WRITE_LOCK ) )
{
assertTrue( cursor.isWriteLocked() );
}
}

@Test
public void isWriteLockingMustBeFalseForCursorOpenedWithSharedReadLock() throws Exception
{
configureStandardPageCache();
try ( PagedFile pf = pageCache.map( file( "a" ), filePageSize );
PageCursor cursor = pf.io( 0, PF_SHARED_READ_LOCK ) )
{
assertFalse( cursor.isWriteLocked() );
}
}
}

0 comments on commit c5e4ac0

Please sign in to comment.