Skip to content

Commit

Permalink
Make the PageSwapper read and write methods return the byte counts as…
Browse files Browse the repository at this point in the history
… longs instead of ints

This is necessary now that the vectored methods can cover more than one page, though it won't make any difference in practice.

The non-vectored methods also return long byte counts for consistency.
  • Loading branch information
chrisvest committed Aug 7, 2015
1 parent 04f1379 commit 99f2bd4
Show file tree
Hide file tree
Showing 13 changed files with 86 additions and 82 deletions.
Expand Up @@ -43,8 +43,7 @@ public interface PageSwapper
* interrupted. If this happens, then the implementation must reopen the
* channel and the operation must be retried.
*/
// TODO all these read and write methods should return long instead of int
int read( long filePageId, Page page ) throws IOException;
long read( long filePageId, Page page ) throws IOException;

/**
* Read pages from the file into the given pages, starting from the given startFilePageId.
Expand All @@ -62,7 +61,7 @@ public interface PageSwapper
* interrupted. If this happens, then the implementation must reopen the
* channel and the operation must be retried.
*/
int read( long startFilePageId, Page[] pages, int arrayOffset, int length ) throws IOException;
long read( long startFilePageId, Page[] pages, int arrayOffset, int length ) throws IOException;

/**
* Write the contents of the given page, to the concrete file on the file
Expand All @@ -75,7 +74,7 @@ public interface PageSwapper
* interrupted. If this happens, then implementation must reopen the
* channel and the operation must be retried.
*/
int write( long filePageId, Page page ) throws IOException;
long write( long filePageId, Page page ) throws IOException;

/**
* Write the contents of the given pages, to the concrete file on the file system,
Expand All @@ -93,7 +92,7 @@ public interface PageSwapper
* interrupted. If this happens, then implementation must reopen the
* channel and the operation must be retried.
*/
int write( long startFilePageId, Page[] pages, int arrayOffset, int length ) throws IOException;
long write( long startFilePageId, Page[] pages, int arrayOffset, int length ) throws IOException;

/**
* Notification that a page has been evicted, used to clean up state in structures
Expand Down
Expand Up @@ -285,7 +285,7 @@ private void clear( Page page )
}

@Override
public int read( long filePageId, Page page ) throws IOException
public long read( long filePageId, Page page ) throws IOException
{
long fileOffset = pageIdToPosition( filePageId );
try
Expand All @@ -307,7 +307,7 @@ public int read( long filePageId, Page page ) throws IOException
tryReopen( filePageId, e );
boolean interrupted = Thread.interrupted();
// Recurse because this is hopefully a very rare occurrence.
int bytesRead = read( filePageId, page );
long bytesRead = read( filePageId, page );
if ( interrupted )
{
Thread.currentThread().interrupt();
Expand All @@ -318,7 +318,7 @@ public int read( long filePageId, Page page ) throws IOException
}

@Override
public int read( long startFilePageId, Page[] pages, int arrayOffset, int length ) throws IOException
public long read( long startFilePageId, Page[] pages, int arrayOffset, int length ) throws IOException
{
if ( positionLockGetter != null && hasPositionLock )
{
Expand All @@ -339,13 +339,13 @@ public int read( long startFilePageId, Page[] pages, int arrayOffset, int length
return readPositionedVectoredFallback( startFilePageId, pages, arrayOffset, length );
}

private int readPositionedVectoredToFileChannel(
private long readPositionedVectoredToFileChannel(
long startFilePageId, Page[] pages, int arrayOffset, int length ) throws Exception
{
long fileOffset = pageIdToPosition( startFilePageId );
FileChannel channel = unwrappedChannel( startFilePageId );
ByteBuffer[] srcs = convertToByteBuffers( pages, arrayOffset, length );
int bytesRead = lockPositionReadVector( startFilePageId, channel, fileOffset, srcs );
long bytesRead = lockPositionReadVector( startFilePageId, channel, fileOffset, srcs );
if ( bytesRead == -1 )
{
for ( Page page : pages )
Expand All @@ -356,8 +356,8 @@ private int readPositionedVectoredToFileChannel(
}
else if ( bytesRead < filePageSize * length )
{
int pagesRead = bytesRead / filePageSize;
int bytesReadIntoLastReadPage = bytesRead % filePageSize;
int pagesRead = (int) (bytesRead / filePageSize);
int bytesReadIntoLastReadPage = (int) (bytesRead % filePageSize);
int pagesNeedingZeroing = length - pagesRead;
for ( int i = 0; i < pagesNeedingZeroing; i++ )
{
Expand All @@ -375,15 +375,15 @@ else if ( bytesRead < filePageSize * length )
return bytesRead;
}

private int lockPositionReadVector(
private long lockPositionReadVector(
long filePageId, FileChannel channel, long fileOffset, ByteBuffer[] srcs ) throws IOException
{
try
{
synchronized ( positionLock( channel ) )
{
channel.position( fileOffset );
return (int) channel.read( srcs );
return channel.read( srcs );
}
}
catch ( ClosedChannelException e )
Expand All @@ -395,7 +395,7 @@ private int lockPositionReadVector(
boolean interrupted = Thread.interrupted();
// Recurse because this is hopefully a very rare occurrence.
channel = unwrappedChannel( filePageId );
int bytesWritten = lockPositionReadVector( filePageId, channel, fileOffset, srcs );
long bytesWritten = lockPositionReadVector( filePageId, channel, fileOffset, srcs );
if ( interrupted )
{
Thread.currentThread().interrupt();
Expand All @@ -416,7 +416,7 @@ private int readPositionedVectoredFallback(
}

@Override
public int write( long filePageId, Page page ) throws IOException
public long write( long filePageId, Page page ) throws IOException
{
long fileOffset = pageIdToPosition( filePageId );
increaseFileSizeTo( fileOffset + filePageSize );
Expand All @@ -433,7 +433,7 @@ public int write( long filePageId, Page page ) throws IOException
tryReopen( filePageId, e );
boolean interrupted = Thread.interrupted();
// Recurse because this is hopefully a very rare occurrence.
int bytesWritten = write( filePageId, page );
long bytesWritten = write( filePageId, page );
if ( interrupted )
{
Thread.currentThread().interrupt();
Expand All @@ -443,7 +443,7 @@ public int write( long filePageId, Page page ) throws IOException
}

@Override
public int write( long startFilePageId, Page[] pages, int arrayOffset, int length ) throws IOException
public long write( long startFilePageId, Page[] pages, int arrayOffset, int length ) throws IOException
{
if ( positionLockGetter != null && hasPositionLock )
{
Expand All @@ -464,7 +464,7 @@ public int write( long startFilePageId, Page[] pages, int arrayOffset, int lengt
return writePositionVectoredFallback( startFilePageId, pages, arrayOffset, length );
}

private int writePositionedVectoredToFileChannel(
private long writePositionedVectoredToFileChannel(
long startFilePageId, Page[] pages, int arrayOffset, int length ) throws Exception
{
long fileOffset = pageIdToPosition( startFilePageId );
Expand All @@ -476,13 +476,13 @@ private int writePositionedVectoredToFileChannel(

private ByteBuffer[] convertToByteBuffers( Page[] pages, int arrayOffset, int length ) throws Exception
{
ByteBuffer[] srcs = new ByteBuffer[length];
ByteBuffer[] buffers = new ByteBuffer[length];
for ( int i = 0; i < length; i++ )
{
Page page = pages[arrayOffset + i];
srcs[i] = UnsafeUtil.newDirectByteBuffer( page.address(), filePageSize );
buffers[i] = UnsafeUtil.newDirectByteBuffer( page.address(), filePageSize );
}
return srcs;
return buffers;
}

private FileChannel unwrappedChannel( long startFilePageId )
Expand All @@ -491,15 +491,15 @@ private FileChannel unwrappedChannel( long startFilePageId )
return StoreFileChannel.unwrap( storeChannel );
}

private int lockPositionWriteVector(
private long lockPositionWriteVector(
long filePageId, FileChannel channel, long fileOffset, ByteBuffer[] srcs ) throws IOException
{
try
{
synchronized ( positionLock( channel ) )
{
channel.position( fileOffset );
return (int) channel.write( srcs );
return channel.write( srcs );
}
}
catch ( ClosedChannelException e )
Expand All @@ -511,7 +511,7 @@ private int lockPositionWriteVector(
boolean interrupted = Thread.interrupted();
// Recurse because this is hopefully a very rare occurrence.
channel = unwrappedChannel( filePageId );
int bytesWritten = lockPositionWriteVector( filePageId, channel, fileOffset, srcs );
long bytesWritten = lockPositionWriteVector( filePageId, channel, fileOffset, srcs );
if ( interrupted )
{
Thread.currentThread().interrupt();
Expand Down
Expand Up @@ -341,7 +341,7 @@ private void doFlush(
FlushEvent event = flushOpportunity.beginFlush( filePageId, getCachePageId(), swapper );
try
{
int bytesWritten = swapper.write( filePageId, this );
long bytesWritten = swapper.write( filePageId, this );
markAsClean();
event.addBytesWritten( bytesWritten );
event.done();
Expand Down Expand Up @@ -380,7 +380,7 @@ public void fault(
// the file page, so any subsequent thread that finds the page in their
// translation table will re-do the page fault.
this.filePageId = filePageId; // Page now considered isLoaded()
int bytesRead = swapper.read( filePageId, this );
long bytesRead = swapper.read( filePageId, this );
faultEvent.addBytesRead( bytesRead );
faultEvent.setCachePageId( getCachePageId() );
this.swapper = swapper; // Page now considered isBoundTo( swapper, filePageId )
Expand Down
Expand Up @@ -244,7 +244,7 @@ private int vectoredFlush(
MuninnPage firstPage = pages[0];
long startFilePageId = firstPage.getFilePageId();
flush = flushOpportunity.beginFlush( startFilePageId, firstPage.getCachePageId(), swapper );
int bytesWritten = swapper.write( startFilePageId, pages, 0, pagesGrabbed );
long bytesWritten = swapper.write( startFilePageId, pages, 0, pagesGrabbed );
flush.addBytesWritten( bytesWritten );
flush.addPagesFlushed( pagesGrabbed );
flush.done();
Expand Down
Expand Up @@ -94,7 +94,7 @@ public static void enablePinUnpinTracing()
private final FlushEvent flushEvent = new FlushEvent()
{
@Override
public void addBytesWritten( int bytes )
public void addBytesWritten( long bytes )
{
bytesWritten.getAndAdd( bytes );
}
Expand Down Expand Up @@ -179,7 +179,7 @@ public void close()
private final PageFaultEvent pageFaultEvent = new PageFaultEvent()
{
@Override
public void addBytesRead( int bytes )
public void addBytesRead( long bytes )
{
bytesRead.getAndAdd( bytes );
}
Expand Down
Expand Up @@ -32,7 +32,7 @@ public interface FlushEvent
FlushEvent NULL = new FlushEvent()
{
@Override
public void addBytesWritten( int bytes )
public void addBytesWritten( long bytes )
{
}

Expand All @@ -55,7 +55,7 @@ public void addPagesFlushed( int pageCount )
/**
* Add up a number of bytes that has been written to the file.
*/
void addBytesWritten( int bytes );
void addBytesWritten( long bytes );

/**
* The page flush has completed successfully.
Expand Down
Expand Up @@ -30,7 +30,7 @@ public interface PageFaultEvent
PageFaultEvent NULL = new PageFaultEvent()
{
@Override
public void addBytesRead( int bytes )
public void addBytesRead( long bytes )
{
}

Expand Down Expand Up @@ -59,7 +59,7 @@ public void setCachePageId( int cachePageId )
/**
* Add up a number of bytes that has been read from the backing file into the free page being bound.
*/
void addBytesRead( int bytes );
void addBytesRead( long bytes );

/**
* The id of the cache page that is being faulted into.
Expand Down
Expand Up @@ -36,7 +36,7 @@ public DelegatingPageSwapper( PageSwapper delegate )
this.delegate = delegate;
}

public int read( long filePageId, Page page ) throws IOException
public long read( long filePageId, Page page ) throws IOException
{
return delegate.read( filePageId, page );
}
Expand All @@ -61,7 +61,7 @@ public File file()
return delegate.file();
}

public int write( long filePageId, Page page ) throws IOException
public long write( long filePageId, Page page ) throws IOException
{
return delegate.write( filePageId, page );
}
Expand All @@ -76,12 +76,12 @@ public void truncate() throws IOException
delegate.truncate();
}

public int read( long startFilePageId, Page[] pages, int arrayOffset, int length ) throws IOException
public long read( long startFilePageId, Page[] pages, int arrayOffset, int length ) throws IOException
{
return delegate.read( startFilePageId, pages, arrayOffset, length );
}

public int write( long startFilePageId, Page[] pages, int arrayOffset, int length ) throws IOException
public long write( long startFilePageId, Page[] pages, int arrayOffset, int length ) throws IOException
{
return delegate.write( startFilePageId, pages, arrayOffset, length );
}
Expand Down
Expand Up @@ -3797,7 +3797,7 @@ public PageSwapper createPageSwapper(
return new DelegatingPageSwapper( delegate )
{
@Override
public int write( long filePageId, Page page ) throws IOException
public long write( long filePageId, Page page ) throws IOException
{
try
{
Expand Down

0 comments on commit 99f2bd4

Please sign in to comment.