Skip to content

Commit

Permalink
Make the PageCacheNumberArrays thread-safe
Browse files Browse the repository at this point in the history
This is done by explicitly opening a new page cursor for each operation.
  • Loading branch information
ragadeeshu committed Jul 17, 2017
1 parent 7d29f1b commit 277b2ec
Show file tree
Hide file tree
Showing 8 changed files with 451 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PagedFile;

import static org.neo4j.io.pagecache.PagedFile.PF_NO_GROW;
import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_READ_LOCK;
import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_WRITE_LOCK;

public class PageCacheByteArray extends PageCacheNumberArray<ByteArray> implements ByteArray
{
private final byte[] defaultValue;
Expand All @@ -34,7 +38,7 @@ public PageCacheByteArray( PagedFile pagedFile, long length, byte[] defaultValue
// '0' default value means we skip filling it out in super
super( pagedFile, defaultValue.length, length, 0, base );
this.defaultValue = defaultValue;
setDefaultValue( writeCursor( 0 ), -1 );
setDefaultValue( -1 );
}

@Override
Expand Down Expand Up @@ -62,9 +66,9 @@ public void get( long index, byte[] into )
{
long pageId = pageId( index );
int offset = offset( index );
try
try ( PageCursor cursor = pagedFile.io( pageId, PF_SHARED_READ_LOCK ) )
{
PageCursor cursor = readCursor( pageId );
cursor.next();
do
{
for ( int i = 0; i < into.length; i++ )
Expand All @@ -86,9 +90,9 @@ public byte getByte( long index, int offset )
{
long pageId = pageId( index );
offset += offset( index );
try
try ( PageCursor cursor = pagedFile.io( pageId, PF_SHARED_READ_LOCK ) )
{
PageCursor cursor = readCursor( pageId );
cursor.next();
byte result;
do
{
Expand All @@ -109,9 +113,9 @@ public short getShort( long index, int offset )
{
long pageId = pageId( index );
offset += offset( index );
try
try ( PageCursor cursor = pagedFile.io( pageId, PF_SHARED_READ_LOCK ) )
{
PageCursor cursor = readCursor( pageId );
cursor.next();
short result;
do
{
Expand All @@ -132,9 +136,9 @@ public int getInt( long index, int offset )
{
long pageId = pageId( index );
offset += offset( index );
try
try ( PageCursor cursor = pagedFile.io( pageId, PF_SHARED_READ_LOCK ) )
{
PageCursor cursor = readCursor( pageId );
cursor.next();
int result;
do
{
Expand All @@ -155,9 +159,9 @@ public long get6ByteLong( long index, int offset )
{
long pageId = pageId( index );
offset += offset( index );
try
try ( PageCursor cursor = pagedFile.io( pageId, PF_SHARED_READ_LOCK ) )
{
PageCursor cursor = readCursor( pageId );
cursor.next();
long result;
do
{
Expand All @@ -180,9 +184,9 @@ public long getLong( long index, int offset )
{
long pageId = pageId( index );
offset += offset( index );
try
try ( PageCursor cursor = pagedFile.io( pageId, PF_SHARED_READ_LOCK ) )
{
PageCursor cursor = readCursor( pageId );
cursor.next();
long result;
do
{
Expand All @@ -204,9 +208,9 @@ public void set( long index, byte[] value )
assert value.length == entrySize;
long pageId = pageId( index );
int offset = offset( index );
try
try ( PageCursor cursor = pagedFile.io( pageId, PF_SHARED_WRITE_LOCK | PF_NO_GROW ); )
{
PageCursor cursor = writeCursor( pageId );
cursor.next();
for ( int i = 0; i < value.length; i++ )
{
cursor.putByte( offset + i, value[i] );
Expand All @@ -224,9 +228,9 @@ public void setByte( long index, int offset, byte value )
{
long pageId = pageId( index );
offset += offset( index );
try
try ( PageCursor cursor = pagedFile.io( pageId, PF_SHARED_WRITE_LOCK | PF_NO_GROW ); )
{
PageCursor cursor = writeCursor( pageId );
cursor.next();
cursor.putByte( offset, value );
checkBounds( cursor );
}
Expand All @@ -241,9 +245,9 @@ public void setShort( long index, int offset, short value )
{
long pageId = pageId( index );
offset += offset( index );
try
try ( PageCursor cursor = pagedFile.io( pageId, PF_SHARED_WRITE_LOCK | PF_NO_GROW ); )
{
PageCursor cursor = writeCursor( pageId );
cursor.next();
cursor.putShort( offset, value );
checkBounds( cursor );
}
Expand All @@ -258,9 +262,9 @@ public void setInt( long index, int offset, int value )
{
long pageId = pageId( index );
offset += offset( index );
try
try ( PageCursor cursor = pagedFile.io( pageId, PF_SHARED_WRITE_LOCK | PF_NO_GROW ); )
{
PageCursor cursor = writeCursor( pageId );
cursor.next();
cursor.putInt( offset, value );
checkBounds( cursor );
}
Expand All @@ -275,9 +279,9 @@ public void set6ByteLong( long index, int offset, long value )
{
long pageId = pageId( index );
offset += offset( index );
try
try ( PageCursor cursor = pagedFile.io( pageId, PF_SHARED_WRITE_LOCK | PF_NO_GROW ); )
{
PageCursor cursor = writeCursor( pageId );
cursor.next();
cursor.putInt( offset, (int) value );
cursor.putShort( offset + Integer.BYTES, (short) (value >>> 32) );
checkBounds( cursor );
Expand All @@ -293,9 +297,9 @@ public void setLong( long index, int offset, long value )
{
long pageId = pageId( index );
offset += offset( index );
try
try ( PageCursor cursor = pagedFile.io( pageId, PF_SHARED_WRITE_LOCK | PF_NO_GROW ); )
{
PageCursor cursor = writeCursor( pageId );
cursor.next();
cursor.putLong( offset, value );
checkBounds( cursor );
}
Expand All @@ -311,9 +315,9 @@ public int get3ByteInt( long index, int offset )

long pageId = pageId( index );
offset += offset( index );
try
try ( PageCursor cursor = pagedFile.io( pageId, PF_SHARED_READ_LOCK ) )
{
PageCursor cursor = readCursor( pageId );
cursor.next();
int result;
do
{
Expand All @@ -336,9 +340,9 @@ public void set3ByteInt( long index, int offset, int value )
{
long pageId = pageId( index );
offset += offset( index );
try
try ( PageCursor cursor = pagedFile.io( pageId, PF_SHARED_WRITE_LOCK | PF_NO_GROW ); )
{
PageCursor cursor = writeCursor( pageId );
cursor.next();
cursor.putShort( offset, (short) value );
cursor.putByte( offset + Short.BYTES, (byte) (value >>> Short.SIZE) );
checkBounds( cursor );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PagedFile;

import static org.neo4j.io.pagecache.PagedFile.PF_NO_GROW;
import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_READ_LOCK;
import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_WRITE_LOCK;

public class PageCacheIntArray extends PageCacheNumberArray<IntArray> implements IntArray
{
public PageCacheIntArray( PagedFile pagedFile, long length, long defaultValue, long base ) throws IOException
Expand All @@ -37,9 +41,9 @@ public int get( long index )
{
long pageId = pageId( index );
int offset = offset( index );
try
try ( PageCursor cursor = pagedFile.io( pageId, PF_SHARED_READ_LOCK ) )
{
PageCursor cursor = readCursor( pageId );
cursor.next();
int result;
do
{
Expand All @@ -60,9 +64,9 @@ public void set( long index, int value )
{
long pageId = pageId( index );
int offset = offset( index );
try
try ( PageCursor cursor = pagedFile.io( pageId, PF_SHARED_WRITE_LOCK | PF_NO_GROW ); )
{
PageCursor cursor = writeCursor( pageId );
cursor.next();
cursor.putInt( offset, value );
checkBounds( cursor );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PagedFile;

import static org.neo4j.io.pagecache.PagedFile.PF_NO_GROW;
import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_READ_LOCK;
import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_WRITE_LOCK;

public class PageCacheLongArray extends PageCacheNumberArray<LongArray> implements LongArray
{
public PageCacheLongArray( PagedFile pagedFile, long length, long defaultValue, long base ) throws IOException
Expand All @@ -37,9 +41,9 @@ public long get( long index )
{
long pageId = pageId( index );
int offset = offset( index );
try
try ( PageCursor cursor = pagedFile.io( pageId, PF_SHARED_READ_LOCK ) )
{
PageCursor cursor = readCursor( pageId );
cursor.next();
long result;
do
{
Expand All @@ -60,9 +64,9 @@ public void set( long index, long value )
{
long pageId = pageId( index );
int offset = offset( index );
try
try ( PageCursor cursor = pagedFile.io( pageId, PF_SHARED_WRITE_LOCK | PF_NO_GROW ) )
{
PageCursor cursor = writeCursor( pageId );
cursor.next();
cursor.putLong( offset, value );
checkBounds( cursor );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,24 @@

import static java.lang.Math.toIntExact;
import static org.neo4j.io.pagecache.PagedFile.PF_NO_GROW;
import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_READ_LOCK;
import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_WRITE_LOCK;

// todo make safe for concurrent access
public abstract class PageCacheNumberArray<N extends NumberArray<N>> implements NumberArray<N>
{
protected final PagedFile pagedFile;
protected final int entriesPerPage;
protected final int entrySize;
private final PageCursor readCursor;
private final PageCursor writeCursor;
private final long length;
private final long defaultValue;
private final long base;
private boolean closed;

public PageCacheNumberArray( PagedFile pagedFile, int entrySize, long length,
protected PageCacheNumberArray( PagedFile pagedFile, int entrySize, long length,
long defaultValue, long base ) throws IOException
{
this.pagedFile = pagedFile;
this.entrySize = entrySize;
this.entriesPerPage = pagedFile.pageSize() / entrySize;
this.readCursor = pagedFile.io( 0, PF_SHARED_READ_LOCK );
this.writeCursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK | PF_NO_GROW );
this.length = length;
this.defaultValue = defaultValue;
this.base = base;
Expand All @@ -62,13 +56,16 @@ public PageCacheNumberArray( PagedFile pagedFile, int entrySize, long length,

if ( defaultValue != 0 )
{
setDefaultValue( writeCursor, defaultValue );
setDefaultValue( defaultValue );
}
}

private void setLength( PageCursor cursor, long length ) throws IOException
{
goTo( cursor, ( length - 1 ) / entriesPerPage );
if ( !cursor.next( (length - 1) / entriesPerPage ) )
{
throw new IllegalStateException();
}
}

protected long pageId( long index )
Expand All @@ -86,22 +83,25 @@ private long rebase( long index )
return index - base;
}

protected void setDefaultValue( PageCursor writeCursor, long defaultValue ) throws IOException
protected void setDefaultValue( long defaultValue ) throws IOException
{
if ( entrySize == Integer.BYTES )
{
defaultValue |= defaultValue << 32;
}
goTo( writeCursor, 0 );
int pageSize = pagedFile.pageSize();
fillPageWithDefaultValue( writeCursor, defaultValue, pageSize );
if ( pageId( length - 1 ) > 0 )
try ( PageCursor writeCursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK | PF_NO_GROW ) )
{
try ( PageCursor cursor = pagedFile.io( 1, PF_NO_GROW | PF_SHARED_WRITE_LOCK ) )
writeCursor.next();
int pageSize = pagedFile.pageSize();
fillPageWithDefaultValue( writeCursor, defaultValue, pageSize );
if ( pageId( length - 1 ) > 0 )
{
while ( cursor.next() )
try ( PageCursor cursor = pagedFile.io( 1, PF_NO_GROW | PF_SHARED_WRITE_LOCK ) )
{
writeCursor.copyTo( 0, cursor, 0, pageSize );
while ( cursor.next() )
{
writeCursor.copyTo( 0, cursor, 0, pageSize );
}
}
}
}
Expand All @@ -127,7 +127,7 @@ public void clear()
{
try
{
setDefaultValue( writeCursor, defaultValue );
setDefaultValue( defaultValue );
}
catch ( IOException e )
{
Expand All @@ -144,8 +144,6 @@ public void close()
}
try
{
readCursor.close();
writeCursor.close();
pagedFile.close();
}
catch ( IOException e )
Expand Down Expand Up @@ -177,27 +175,4 @@ protected void checkBounds( PageCursor cursor )
throw new IllegalStateException();
}
}

protected PageCursor goTo( PageCursor cursor, long pageId ) throws IOException
{
if ( !cursor.next( pageId ) )
{
throw new IllegalStateException();
}
return cursor;
}

protected PageCursor writeCursor( long pageId ) throws IOException
{
return goTo( writeCursor, pageId );
}

protected PageCursor readCursor( long pageId ) throws IOException
{
if ( writeCursor.getCurrentPageId() == pageId )
{
return writeCursor;
}
return goTo( readCursor, pageId );
}
}

0 comments on commit 277b2ec

Please sign in to comment.