Skip to content

Commit

Permalink
Make the PageSwapper use longs and ints instead of Page objects
Browse files Browse the repository at this point in the history
for reading and writing data to/from pages in memory, to/from sections in
files.
  • Loading branch information
chrisvest committed May 26, 2017
1 parent 81d661f commit 8fada65
Show file tree
Hide file tree
Showing 11 changed files with 273 additions and 217 deletions.
19 changes: 9 additions & 10 deletions community/io/src/main/java/org/neo4j/io/pagecache/PageSwapper.java
Expand Up @@ -32,7 +32,7 @@ public interface PageSwapper
{
/**
* Read the page with the given filePageId, from the concrete file on the
* file system, into the given page.
* file system, into the page given by the bufferAddress and the bufferSize.
* <p>
* Returns the number of bytes read in from the file. May be zero if the
* requested page was beyond the end of the file. If less than the file
Expand All @@ -43,10 +43,10 @@ public interface PageSwapper
* interrupted. If this happens, then the implementation must reopen the
* channel and the operation must be retried.
*/
long read( long filePageId, Page page ) throws IOException;
long read( long filePageId, long bufferAddress, int bufferSize ) throws IOException;

/**
* Read pages from the file into the given pages, starting from the given startFilePageId.
* Read pages from the file into the pages given by the bufferAddresses, starting from the given startFilePageId.
* <p>
* Returns the number of bytes read in from the file. May be zero if the
* requested startFilePageId was beyond the end of the file. If the file does not have enough data
Expand All @@ -61,11 +61,12 @@ public interface PageSwapper
* interrupted. If this happens, then the implementation must reopen the
* channel and the operation must be retried.
*/
long read( long startFilePageId, Page[] pages, int arrayOffset, int length ) throws IOException;
long read( long startFilePageId, long[] bufferAddresses, int bufferSize, int arrayOffset, int length ) throws IOException;

/**
* Write the contents of the given page, to the concrete file on the file
* system, at the located indicated by the given filePageId.
* Write the contents of the page given by the bufferAddress and the bufferSize,
* to the concrete file on the file system, at the located indicated by the given
* filePageId.
* <p>
* Returns the number of bytes written to the file.
* <p>
Expand All @@ -74,7 +75,7 @@ public interface PageSwapper
* interrupted. If this happens, then implementation must reopen the
* channel and the operation must be retried.
*/
long write( long filePageId, Page page ) throws IOException;
long write( long filePageId, long bufferAddress, int bufferSize ) throws IOException;

/**
* Write the contents of the given pages, to the concrete file on the file system,
Expand All @@ -92,7 +93,7 @@ public interface PageSwapper
* interrupted. If this happens, then implementation must reopen the
* channel and the operation must be retried.
*/
long write( long startFilePageId, Page[] pages, int arrayOffset, int length ) throws IOException;
long write( long startFilePageId, long[] bufferAddresses, int bufferSize, int arrayOffset, int length ) throws IOException;

/**
* Notification that a page has been evicted, used to clean up state in structures
Expand Down Expand Up @@ -141,8 +142,6 @@ public interface PageSwapper
* Truncation may occur concurrently with writes, in which case both operations will appear to be atomic, such that
* either the write happens before the truncation and is lost, or the file is truncated and the write then extends
* the file with any zero padding and the written data.
*
* @throws IOException
*/
void truncate() throws IOException;
}
Expand Up @@ -226,14 +226,12 @@ private static int stripe( long filePageId )
return (int) (filePageId >>> channelStripeShift) & channelStripeMask;
}

private int swapIn( StoreChannel channel, Page page, long fileOffset, int filePageSize ) throws IOException
private int swapIn( StoreChannel channel, long bufferAddress, int bufferSize, long fileOffset, int filePageSize ) throws IOException
{
int cachePageSize = page.size();
long address = page.address();
int readTotal = 0;
try
{
ByteBuffer bufferProxy = proxy( address, filePageSize );
ByteBuffer bufferProxy = proxy( bufferAddress, filePageSize );
int read;
do
{
Expand All @@ -242,10 +240,10 @@ private int swapIn( StoreChannel channel, Page page, long fileOffset, int filePa
while ( read != -1 && (readTotal += read) < filePageSize );

// Zero-fill the rest.
assert readTotal >= 0 && filePageSize <= cachePageSize && readTotal <= filePageSize :
format( "pointer = %h, readTotal = %s, length = %s, page size = %s", address, readTotal,
filePageSize, cachePageSize );
UnsafeUtil.setMemory( address + readTotal, filePageSize - readTotal, MuninnPageCache.ZERO_BYTE );
assert readTotal >= 0 && filePageSize <= bufferSize && readTotal <= filePageSize : format(
"pointer = %h, readTotal = %s, length = %s, page size = %s",
bufferAddress, readTotal, filePageSize, bufferSize );
UnsafeUtil.setMemory( bufferAddress + readTotal, filePageSize - readTotal, MuninnPageCache.ZERO_BYTE );
return readTotal;
}
catch ( IOException e )
Expand All @@ -261,12 +259,11 @@ private int swapIn( StoreChannel channel, Page page, long fileOffset, int filePa
}
}

private int swapOut( Page page, long fileOffset, StoreChannel channel ) throws IOException
private int swapOut( long bufferAddress, long fileOffset, StoreChannel channel ) throws IOException
{
long address = page.address();
try
{
ByteBuffer bufferProxy = proxy( address, filePageSize );
ByteBuffer bufferProxy = proxy( bufferAddress, filePageSize );
channel.writeAll( bufferProxy, fileOffset );
}
catch ( IOException e )
Expand All @@ -280,24 +277,24 @@ private int swapOut( Page page, long fileOffset, StoreChannel channel ) throws I
return filePageSize;
}

private void clear( Page page )
private void clear( long bufferAddress, int bufferSize )
{
UnsafeUtil.setMemory( page.address(), page.size(), MuninnPageCache.ZERO_BYTE );
UnsafeUtil.setMemory( bufferAddress, bufferSize, MuninnPageCache.ZERO_BYTE );
}

@Override
public long read( long filePageId, Page page ) throws IOException
public long read( long filePageId, long bufferAddress, int bufferSize ) throws IOException
{
long fileOffset = pageIdToPosition( filePageId );
try
{
if ( fileOffset < getCurrentFileSize() )
{
return swapIn( channel( filePageId ), page, fileOffset, filePageSize );
return swapIn( channel( filePageId ), bufferAddress, bufferSize, fileOffset, filePageSize );
}
else
{
clear( page );
clear( bufferAddress, bufferSize );
}
}
catch ( ClosedChannelException e )
Expand All @@ -308,7 +305,7 @@ public long read( long filePageId, Page page ) throws IOException
tryReopen( filePageId, e );
boolean interrupted = Thread.interrupted();
// Recurse because this is hopefully a very rare occurrence.
long bytesRead = read( filePageId, page );
long bytesRead = read( filePageId, bufferAddress, bufferSize );
if ( interrupted )
{
Thread.currentThread().interrupt();
Expand All @@ -319,13 +316,13 @@ public long read( long filePageId, Page page ) throws IOException
}

@Override
public long read( long startFilePageId, Page[] pages, int arrayOffset, int length ) throws IOException
public long read( long startFilePageId, long[] bufferAddresses, int bufferSize, int arrayOffset, int length ) throws IOException
{
if ( positionLockGetter != null && hasPositionLock )
{
try
{
return readPositionedVectoredToFileChannel( startFilePageId, pages, arrayOffset, length );
return readPositionedVectoredToFileChannel( startFilePageId, bufferAddresses, arrayOffset, length );
}
catch ( IOException ioe )
{
Expand All @@ -337,22 +334,22 @@ public long read( long startFilePageId, Page[] pages, int arrayOffset, int lengt
// isn't exactly an IOException. Instead, we'll try our fallback code and see what it says.
}
}
return readPositionedVectoredFallback( startFilePageId, pages, arrayOffset, length );
return readPositionedVectoredFallback( startFilePageId, bufferAddresses, bufferSize, arrayOffset, length );
}

private long readPositionedVectoredToFileChannel(
long startFilePageId, Page[] pages, int arrayOffset, int length ) throws Exception
long startFilePageId, long[] bufferAddresses, int arrayOffset, int length ) throws Exception
{
long fileOffset = pageIdToPosition( startFilePageId );
FileChannel channel = unwrappedChannel( startFilePageId );
ByteBuffer[] srcs = convertToByteBuffers( pages, arrayOffset, length );
ByteBuffer[] srcs = convertToByteBuffers( bufferAddresses, arrayOffset, length );
long bytesRead = lockPositionReadVector(
startFilePageId, channel, fileOffset, srcs );
if ( bytesRead == -1 )
{
for ( Page page : pages )
for ( long address : bufferAddresses )
{
UnsafeUtil.setMemory( page.address(), filePageSize, MuninnPageCache.ZERO_BYTE );
UnsafeUtil.setMemory( address, filePageSize, MuninnPageCache.ZERO_BYTE );
}
return 0;
}
Expand All @@ -363,9 +360,8 @@ else if ( bytesRead < ((long) filePageSize) * length )
int pagesNeedingZeroing = length - pagesRead;
for ( int i = 0; i < pagesNeedingZeroing; i++ )
{
Page page = pages[arrayOffset + pagesRead + i];
long address = bufferAddresses[arrayOffset + pagesRead + i];
long bytesToZero = filePageSize;
long address = page.address();
if ( i == 0 )
{
address += bytesReadIntoLastReadPage;
Expand Down Expand Up @@ -415,25 +411,26 @@ private long lockPositionReadVector(
}

private int readPositionedVectoredFallback(
long startFilePageId, Page[] pages, int arrayOffset, int length ) throws IOException
long startFilePageId, long[] bufferAddresses, int bufferSize, int arrayOffset, int length ) throws IOException
{
int bytes = 0;
for ( int i = 0; i < length; i++ )
{
bytes += read( startFilePageId + i, pages[arrayOffset + i] );
long address = bufferAddresses[arrayOffset + i];
bytes += read( startFilePageId + i, address, bufferSize );
}
return bytes;
}

@Override
public long write( long filePageId, Page page ) throws IOException
public long write( long filePageId, long bufferAddress, int bufferSize ) throws IOException
{
long fileOffset = pageIdToPosition( filePageId );
increaseFileSizeTo( fileOffset + filePageSize );
try
{
StoreChannel channel = channel( filePageId );
return swapOut( page, fileOffset, channel );
return swapOut( bufferAddress, fileOffset, channel );
}
catch ( ClosedChannelException e )
{
Expand All @@ -443,7 +440,7 @@ public long write( long filePageId, Page page ) throws IOException
tryReopen( filePageId, e );
boolean interrupted = Thread.interrupted();
// Recurse because this is hopefully a very rare occurrence.
long bytesWritten = write( filePageId, page );
long bytesWritten = write( filePageId, bufferAddress, bufferSize );
if ( interrupted )
{
Thread.currentThread().interrupt();
Expand All @@ -453,13 +450,13 @@ public long write( long filePageId, Page page ) throws IOException
}

@Override
public long write( long startFilePageId, Page[] pages, int arrayOffset, int length ) throws IOException
public long write( long startFilePageId, long[] bufferAddresses, int bufferSize, int arrayOffset, int length ) throws IOException
{
if ( positionLockGetter != null && hasPositionLock )
{
try
{
return writePositionedVectoredToFileChannel( startFilePageId, pages, arrayOffset, length );
return writePositionedVectoredToFileChannel( startFilePageId, bufferAddresses, arrayOffset, length );
}
catch ( IOException ioe )
{
Expand All @@ -471,26 +468,26 @@ public long write( long startFilePageId, Page[] pages, int arrayOffset, int leng
// isn't exactly an IOException. Instead, we'll try our fallback code and see what it says.
}
}
return writePositionVectoredFallback( startFilePageId, pages, arrayOffset, length );
return writePositionVectoredFallback( startFilePageId, bufferAddresses, bufferSize, arrayOffset, length );
}

private long writePositionedVectoredToFileChannel(
long startFilePageId, Page[] pages, int arrayOffset, int length ) throws Exception
long startFilePageId, long[] bufferAddresses, int arrayOffset, int length ) throws Exception
{
long fileOffset = pageIdToPosition( startFilePageId );
increaseFileSizeTo( fileOffset + (((long) filePageSize) * length) );
FileChannel channel = unwrappedChannel( startFilePageId );
ByteBuffer[] srcs = convertToByteBuffers( pages, arrayOffset, length );
ByteBuffer[] srcs = convertToByteBuffers( bufferAddresses, arrayOffset, length );
return lockPositionWriteVector( startFilePageId, channel, fileOffset, srcs );
}

private ByteBuffer[] convertToByteBuffers( Page[] pages, int arrayOffset, int length ) throws Exception
private ByteBuffer[] convertToByteBuffers( long[] bufferAddresses, int arrayOffset, int length ) throws Exception
{
ByteBuffer[] buffers = new ByteBuffer[length];
for ( int i = 0; i < length; i++ )
{
Page page = pages[arrayOffset + i];
buffers[i] = UnsafeUtil.newDirectByteBuffer( page.address(), filePageSize );
long address = bufferAddresses[arrayOffset + i];
buffers[i] = UnsafeUtil.newDirectByteBuffer( address, filePageSize );
}
return buffers;
}
Expand Down Expand Up @@ -550,13 +547,14 @@ private Object positionLock( FileChannel channel )
}
}

private int writePositionVectoredFallback( long startFilePageId, Page[] pages, int arrayOffset, int length )
private int writePositionVectoredFallback( long startFilePageId, long[] bufferAddresses, int bufferSize, int arrayOffset, int length )
throws IOException
{
int bytes = 0;
for ( int i = 0; i < length; i++ )
{
bytes += write( startFilePageId + i, pages[arrayOffset + i] );
long address = bufferAddresses[arrayOffset + i];
bytes += write( startFilePageId + i, address, bufferSize );
}
return bytes;
}
Expand Down
Expand Up @@ -151,7 +151,7 @@ private void doFlush(
FlushEvent event = flushOpportunity.beginFlush( filePageId, getCachePageId(), swapper );
try
{
long bytesWritten = swapper.write( filePageId, this );
long bytesWritten = swapper.write( filePageId, address(), size() );
markAsClean();
event.addBytesWritten( bytesWritten );
event.done();
Expand Down Expand Up @@ -189,7 +189,7 @@ public void fault(
// the file page, so any subsequent thread that finds the page in their
// translation table will re-do the page fault.
this.filePageId = filePageId; // Page now considered isLoaded()
long bytesRead = swapper.read( filePageId, this );
long bytesRead = swapper.read( filePageId, address(), size() );
faultEvent.addBytesRead( bytesRead );
faultEvent.setCachePageId( getCachePageId() );
this.swapper = swapper; // Page now considered isBoundTo( swapper, filePageId )
Expand Down
Expand Up @@ -330,6 +330,7 @@ void flushAndForceInternal( FlushEventOpportunity flushOpportunity, boolean forC
{
// TODO it'd be awesome if, on Linux, we'd call sync_file_range(2) instead of fsync
MuninnPage[] pages = new MuninnPage[translationTableChunkSize];
long[] bufferAddresses = new long[translationTableChunkSize];
long filePageId = -1; // Start at -1 because we increment at the *start* of the chunk-loop iteration.
long limiterStamp = IOLimiter.INITIAL_STAMP;
Object[][] tt = this.translationTable;
Expand Down Expand Up @@ -385,14 +386,14 @@ else if ( forClosing )
}
if ( pagesGrabbed > 0 )
{
vectoredFlush( pages, pagesGrabbed, flushOpportunity, forClosing );
vectoredFlush( pages, bufferAddresses, pagesGrabbed, flushOpportunity, forClosing );
limiterStamp = limiter.maybeLimitIO( limiterStamp, pagesGrabbed, this );
pagesGrabbed = 0;
}
}
if ( pagesGrabbed > 0 )
{
vectoredFlush( pages, pagesGrabbed, flushOpportunity, forClosing );
vectoredFlush( pages, bufferAddresses, pagesGrabbed, flushOpportunity, forClosing );
limiterStamp = limiter.maybeLimitIO( limiterStamp, pagesGrabbed, this );
}
}
Expand All @@ -401,7 +402,8 @@ else if ( forClosing )
}

private void vectoredFlush(
MuninnPage[] pages, int pagesGrabbed, FlushEventOpportunity flushOpportunity, boolean forClosing )
MuninnPage[] pages, long[] bufferAddresses, int pagesGrabbed, FlushEventOpportunity flushOpportunity,
boolean forClosing )
throws IOException
{
FlushEvent flush = null;
Expand All @@ -410,17 +412,20 @@ private void vectoredFlush(
// Write the pages vector
MuninnPage firstPage = pages[0];
long startFilePageId = firstPage.getFilePageId();
int cachePageSize = firstPage.size();

// Mark the flushed pages as clean before our flush, so concurrent page writes can mark it as dirty and
// we'll be able to write those changes out on the next flush.
for ( int j = 0; j < pagesGrabbed; j++ )
{
// If the flush fails, we'll undo this
pages[j].markAsClean();
// copy over the native page memory address
bufferAddresses[j] = pages[j].address();
}

flush = flushOpportunity.beginFlush( startFilePageId, firstPage.getCachePageId(), swapper );
long bytesWritten = swapper.write( startFilePageId, pages, 0, pagesGrabbed );
long bytesWritten = swapper.write( startFilePageId, bufferAddresses, cachePageSize, 0, pagesGrabbed );

// Update the flush event
flush.addBytesWritten( bytesWritten );
Expand Down

0 comments on commit 8fada65

Please sign in to comment.