Skip to content

Commit

Permalink
StoreCopyServer now report hasData correctly depending on file size
Browse files Browse the repository at this point in the history
Add fileSize() to PagedFile interface
  • Loading branch information
burqen authored and chrisvest committed Sep 16, 2016
1 parent 667a9b4 commit 5f00d4e
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 11 deletions.
Expand Up @@ -130,6 +130,11 @@ public interface PagedFile extends AutoCloseable
*/
int pageSize();

/**
* Size of file, in bytes.
*/
long fileSize() throws IOException;

/**
* Flush all dirty pages into the file channel, and force the file channel to disk.
*/
Expand Down
Expand Up @@ -172,6 +172,17 @@ public int pageSize()
return filePageSize;
}

@Override
public long fileSize()
{
final long lastPageId = getLastPageId();
if ( lastPageId < 0 )
{
return 0L;
}
return (lastPageId + 1) * pageSize();
}

File file()
{
return swapper.file();
Expand Down
Expand Up @@ -69,6 +69,13 @@ public int pageSize()
return delegate.pageSize();
}

@Override
public long fileSize() throws IOException
{
adversary.injectFailure( IllegalStateException.class );
return delegate.fileSize();
}

@Override
public void flushAndForce() throws IOException
{
Expand Down
Expand Up @@ -52,6 +52,12 @@ public int pageSize()
return delegate.pageSize();
}

@Override
public long fileSize() throws IOException
{
return delegate.fileSize();
}

public void close() throws IOException
{
delegate.close();
Expand Down
Expand Up @@ -53,6 +53,17 @@ public int pageSize()
return exposedPageSize;
}

@Override
public long fileSize() throws IOException
{
final long lastPageId = getLastPageId();
if ( lastPageId < 0 )
{
return 0L;
}
return (lastPageId + 1) * pageSize();
}

@Override
public void flushAndForce() throws IOException
{
Expand Down
Expand Up @@ -165,20 +165,25 @@ public RequestContext flushStoresAndStreamStoreFiles( String triggerName, StoreW

// Read from paged file if mapping exists. Otherwise read through file system.
final Optional<PagedFile> optionalPagedFile = pageCache.getExistingMapping( file );
try ( ReadableByteChannel fileChannel = optionalPagedFile.isPresent() ?
optionalPagedFile.get().openReadableByteChannel() :
fileSystem.open( file, "r" ) )
if ( optionalPagedFile.isPresent() )
{
monitor.startStreamingStoreFile( file );
writer.write( relativePath( storeDirectory, file ), fileChannel,
temporaryBuffer, file.length() > 0, recordSize );
monitor.finishStreamingStoreFile( file );
PagedFile pagedFile = optionalPagedFile.get();
long fileSize = pagedFile.fileSize();
try ( ReadableByteChannel fileChannel = pagedFile.openReadableByteChannel() )
{
doWrite( writer, temporaryBuffer, file, recordSize, fileChannel, fileSize );
}
finally
{
pagedFile.close();
}
}
finally
else
{
if ( optionalPagedFile.isPresent() )
try ( ReadableByteChannel fileChannel = fileSystem.open( file, "r" ) )
{
optionalPagedFile.get().close();
long fileSize = file.length();
doWrite( writer, temporaryBuffer, file, recordSize, fileChannel, fileSize );
}
}
}
Expand All @@ -195,4 +200,13 @@ public RequestContext flushStoresAndStreamStoreFiles( String triggerName, StoreW
throw new ServerFailureException( e );
}
}

private void doWrite( StoreWriter writer, ByteBuffer temporaryBuffer, File file, int recordSize,
ReadableByteChannel fileChannel, long fileSize ) throws IOException
{
monitor.startStreamingStoreFile( file );
writer.write( relativePath( storeDirectory, file ), fileChannel,
temporaryBuffer, fileSize > 0, recordSize );
monitor.finishStreamingStoreFile( file );
}
}
Expand Up @@ -60,7 +60,6 @@ public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBu
totalWritten += 4;
totalWritten += buffer.write( data );
buffer.close();

}
return totalWritten;
}
Expand Down

0 comments on commit 5f00d4e

Please sign in to comment.