Skip to content

Commit

Permalink
Allow PageSwappers to control how IO is performed.
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed May 25, 2015
1 parent cbd5ee1 commit 44a4d7d
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 207 deletions.
53 changes: 6 additions & 47 deletions community/io/src/main/java/org/neo4j/io/pagecache/Page.java
Expand Up @@ -19,10 +19,6 @@
*/
package org.neo4j.io.pagecache;

import java.io.IOException;

import org.neo4j.io.fs.StoreChannel;

/**
* A page in the page cache. Always represents a concrete page in memory, and may
* represent a particular page in a file, if that file-page has been swapped into the
Expand All @@ -31,53 +27,16 @@
public interface Page
{
/**
* Swap a file-page into memory.
*
* The file-page location is given by the offset into the file represented by the StoreChannel, and the length.
*
* The contents of the page in the file is unchanged by this operation.
* Get the size of the cache page in bytes.
*
* May throw an AssertionError or a RuntimeException if the given length is greater than the cache-page size.
*
* Returns the number of bytes read in from the file. May be zero if the
* requested page was beyond the end of the file. If less than the file
* page size, then the rest of the page will contain zeros.
*
* @throws IOException If the file could not be read from.
* For instance, if the file has been closed, moved or deleted,
* or if the requested range of data goes beyond the length of the file.
* The possible causes of an IOException is platform dependent.
* If the channel has been closed, then it must be
* reopened and the swapIn operation must be retried.
* Don't access memory beyond address() + size().
*/
int swapIn( StoreChannel channel, long offset, int length ) throws IOException;
int size();

/**
* Swap the page out to storage.
*
* The contents of the current page is written to the file represented by the given StoreChannel, at the
* location expressed by the given offset into the file, and the length.
* Get the memory address of the beginning of the page.
*
* The contents of the page in memory are unchanged by this operation.
*
* May throw an AssertionError or a RuntimeException if the length is greater than the cache-page size.
*
* If the offset is greater than the length of the file, the space in between will be filled with null-bytes.
*
* Note: This only performs the write on the OS level.
* No forcing of the StoreChannel is implied.
*
* @throws IOException If the file could not be written to.
* For instance, if the storage device is out of space,
* or the file has been moved or deleted.
* The possible causes of an IOException is platform dependent.
* If the channel has been closed, then it must be
* reopened and the swapIn operation must be retried.
*/
void swapOut( StoreChannel channel, long offset, int length ) throws IOException;

/**
* Set the byte contents of this page to be all zeros.
* Don't access memory beyond address() + size().
*/
void clear();
long address();
}
Expand Up @@ -21,6 +21,7 @@

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;

import org.neo4j.io.fs.FileSystemAbstraction;
Expand All @@ -29,8 +30,11 @@
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PageEvictionCallback;
import org.neo4j.io.pagecache.PageSwapper;
import org.neo4j.io.pagecache.impl.muninn.MuninnPageCache;
import org.neo4j.unsafe.impl.internal.dragons.UnsafeUtil;

import static java.lang.String.format;

/**
* A simple PageSwapper implementation that directs all page swapping to a
* single file on the file system.
Expand Down Expand Up @@ -63,6 +67,34 @@ private static int defaultChannelStripePower()
private static final long fileSizeOffset =
UnsafeUtil.getFieldOffset( SingleFilePageSwapper.class, "fileSize" );

private static final ThreadLocal<ByteBuffer> proxyCache = new ThreadLocal<>();

private static ByteBuffer proxy( long buffer, int bufferLength ) throws IOException
{
ByteBuffer buf = proxyCache.get();
if ( buf != null )
{
UnsafeUtil.initDirectByteBuffer( buf, buffer, bufferLength );
return buf;
}
return createAndGetNewBuffer( buffer, bufferLength );
}

private static ByteBuffer createAndGetNewBuffer( long buffer, int bufferLength ) throws IOException
{
ByteBuffer buf;
try
{
buf = UnsafeUtil.newDirectByteBuffer( buffer, bufferLength );
}
catch ( Exception e )
{
throw new IOException( e );
}
proxyCache.set( buf );
return buf;
}

private final FileSystemAbstraction fs;
private final File file;
private final int filePageSize;
Expand Down Expand Up @@ -111,28 +143,88 @@ private long getCurrentFileSize()

private StoreChannel channel( long filePageId )
{
int stripe = channelStripe( filePageId );
int stripe = stripe( filePageId );
return channels[stripe];
}

private static int channelStripe( long filePageId )
private static int stripe( long filePageId )
{
return (int) (filePageId >>> channelStripeShift) & channelStripeMask;
}

private int swapIn( StoreChannel channel, Page page, long fileOffset, int filePageSize ) throws IOException
{
int cachePageSize = page.size();
long address = page.address();
int readTotal = 0;
try
{
ByteBuffer bufferProxy = proxy( address, filePageSize );
bufferProxy.position( 0 );
int read;
do
{
read = channel.read( bufferProxy, fileOffset + readTotal );
}
while ( read != -1 && (readTotal += read) < filePageSize );

// Zero-fill the rest.
assert readTotal >= 0 && filePageSize <= cachePageSize && readTotal <= filePageSize: format(
"pointer = %h, readTotal = %s, length = %s, page size = %s",
address, readTotal, filePageSize, cachePageSize );
UnsafeUtil.setMemory( address + readTotal, filePageSize - readTotal, MuninnPageCache.ZERO_BYTE );
return readTotal;
}
catch ( IOException e )
{
throw e;
}
catch ( Throwable e )
{
String msg = format(
"Read failed after %s of %s bytes from fileOffset %s",
readTotal, filePageSize, fileOffset );
throw new IOException( msg, e );
}
}

private int swapOut( Page page, long fileOffset, StoreChannel channel ) throws IOException
{
try
{
ByteBuffer bufferProxy = proxy( page.address(), filePageSize );
bufferProxy.position( 0 );
channel.writeAll( bufferProxy, fileOffset );
}
catch ( IOException e )
{
throw e;
}
catch ( Throwable e )
{
throw new IOException( e );
}
return filePageSize;
}

private void clear( Page page )
{
UnsafeUtil.setMemory( page.address(), page.size(), MuninnPageCache.ZERO_BYTE );
}

@Override
public int read( long filePageId, Page page ) throws IOException
{
long offset = pageIdToPosition( filePageId );
long fileOffset = pageIdToPosition( filePageId );
try
{
if ( offset < getCurrentFileSize() )
if ( fileOffset < getCurrentFileSize() )
{
return page.swapIn( channel( filePageId ), offset, filePageSize );
return swapIn( channel( filePageId ), page, fileOffset, filePageSize );
}
else
{
page.clear();
clear( page );
}
}
catch ( ClosedChannelException e )
Expand All @@ -156,12 +248,12 @@ public int read( long filePageId, Page page ) throws IOException
@Override
public int write( long filePageId, Page page ) throws IOException
{
long offset = pageIdToPosition( filePageId );
increaseFileSizeTo( offset + filePageSize );
long fileOffset = pageIdToPosition( filePageId );
increaseFileSizeTo( fileOffset + filePageSize );
try
{
page.swapOut( channel( filePageId ), offset, filePageSize );
return filePageSize;
StoreChannel channel = channel( filePageId );
return swapOut( page, fileOffset, channel );
}
catch ( ClosedChannelException e )
{
Expand Down Expand Up @@ -235,7 +327,7 @@ public int hashCode()
*/
private synchronized void tryReopen( long filePageId, ClosedChannelException closedException ) throws ClosedChannelException
{
int stripe = channelStripe( filePageId );
int stripe = stripe( filePageId );
StoreChannel channel = channels[stripe];
if ( channel.isOpen() )
{
Expand Down

0 comments on commit 44a4d7d

Please sign in to comment.