Skip to content

Commit

Permalink
Add a method to the page cache to test whether the backing page cache
Browse files Browse the repository at this point in the history
supports regular file operations.
  • Loading branch information
klaren committed Mar 14, 2018
1 parent 642755c commit 54494b7
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,5 +122,4 @@ interface ThirdPartyFileSystem extends Closeable
* @throws IOException If an I/O error occurs, possibly with the canonicalisation of the paths.
*/
Stream<FileHandle> streamFilesRecursive( File directory ) throws IOException;

}
10 changes: 10 additions & 0 deletions community/io/src/main/java/org/neo4j/io/pagecache/PageCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,14 @@ public interface PageCache extends AutoCloseable
* @return the filesystem that the page cache is using.
*/
FileSystemAbstraction getCachedFileSystem();

/**
* Check if the backing {@link FileSystemAbstraction file system} supports regular file operations or not.
* <p>
* E.g. the file system for block device will not work with generic open and read/write calls and all operations
* needs to be done through the page cache.
*
* @return {@code true} if the backing file system supports regular file operations.
*/
boolean fileSystemSupportsFileOperations();
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.io.pagecache.PageCache;
Expand Down Expand Up @@ -639,6 +640,13 @@ public FileSystemAbstraction getCachedFileSystem()
return swapperFactory.getFileSystemAbstraction();
}

@Override
public boolean fileSystemSupportsFileOperations()
{
// Default filesystem supports direct file access.
return getCachedFileSystem() instanceof DefaultFileSystemAbstraction;
}

int getPageCacheId()
{
return pageCacheId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,7 @@ public Optional<PagedFile> getExistingMapping( File file ) throws IOException
{
adversary.injectFailure( IOException.class, SecurityException.class );
final Optional<PagedFile> optional = delegate.getExistingMapping( file );
if ( optional.isPresent() )
{
return Optional.of( new AdversarialPagedFile( optional.get(), adversary ) );
}
return optional;
return optional.map( pagedFile -> new AdversarialPagedFile( pagedFile, adversary ) );
}

@Override
Expand Down Expand Up @@ -132,4 +128,10 @@ public FileSystemAbstraction getCachedFileSystem()
{
return delegate.getCachedFileSystem();
}

@Override
public boolean fileSystemSupportsFileOperations()
{
return delegate.fileSystemSupportsFileOperations();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import java.nio.file.OpenOption;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

import org.neo4j.io.fs.FileHandle;
import org.neo4j.io.fs.FileSystemAbstraction;

public class DelegatingPageCache implements PageCache
Expand All @@ -38,6 +36,7 @@ public DelegatingPageCache( PageCache delegate )
this.delegate = delegate;
}

@Override
public PagedFile map( File file, int pageSize, OpenOption... openOptions ) throws IOException
{
return delegate.map( file, pageSize, openOptions );
Expand All @@ -61,11 +60,13 @@ public int pageSize()
return delegate.pageSize();
}

@Override
public void close()
{
delegate.close();
}

@Override
public long maxCachedPages()
{
return delegate.maxCachedPages();
Expand All @@ -77,13 +78,21 @@ public FileSystemAbstraction getCachedFileSystem()
return delegate.getCachedFileSystem();
}

@Override
public void flushAndForce( IOLimiter limiter ) throws IOException
{
delegate.flushAndForce( limiter );
}

@Override
public void flushAndForce() throws IOException
{
delegate.flushAndForce();
}

@Override
public boolean fileSystemSupportsFileOperations()
{
return delegate.fileSystemSupportsFileOperations();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Objects;
import java.util.Optional;

import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PagedFile;
Expand All @@ -52,8 +51,7 @@ class StoreResource implements Closeable

ReadableByteChannel open() throws IOException
{
boolean isDefaultFileSystem = pageCache.getCachedFileSystem() instanceof DefaultFileSystemAbstraction;
if ( !isDefaultFileSystem )
if ( !pageCache.fileSystemSupportsFileOperations() )
{
Optional<PagedFile> existingMapping = pageCache.getExistingMapping( file );
if ( existingMapping.isPresent() )
Expand All @@ -64,6 +62,7 @@ ReadableByteChannel open() throws IOException
}
}
}

return fs.open( file, "r" );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Map;

import org.neo4j.causalclustering.catchup.tx.FileCopyMonitor;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PagedFile;
Expand Down Expand Up @@ -65,10 +64,7 @@ public void write( String destination, int requiredAlignment, byte[] data ) thro

fileCopyMonitor.copyFile( fileName );

// Default filesystem supports writing of data through the file system directly
boolean isDefaultFileSystem = pageCache.getCachedFileSystem() instanceof DefaultFileSystemAbstraction;

if ( !isDefaultFileSystem && StoreType.shouldBeManagedByPageCache( destination ) )
if ( !pageCache.fileSystemSupportsFileOperations() && StoreType.shouldBeManagedByPageCache( destination ) )
{
WritableByteChannel channel = channels.get( destination );
if ( channel == null )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ public FileSystemAbstraction getCachedFileSystem()
return delegate.getCachedFileSystem();
}

@Override
public boolean fileSystemSupportsFileOperations()
{
return delegate.fileSystemSupportsFileOperations();
}

/**
* Create a GraphDatabaseFactory that will build EmbeddedGraphDatabase instances that all use the given page cache.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.neo4j.graphdb.Resource;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.io.ByteUnit;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PagedFile;
Expand Down Expand Up @@ -176,8 +175,7 @@ public RequestContext flushStoresAndStreamStoreFiles( String triggerName, StoreW
File file = meta.file();
int recordSize = meta.recordSize();

boolean isDefaultFileSystem = pageCache.getCachedFileSystem() instanceof DefaultFileSystemAbstraction;
if ( !isDefaultFileSystem )
if ( !pageCache.fileSystemSupportsFileOperations() )
{
// Read from paged file if mapping exists. Otherwise read through file system.
// A file is mapped if it is a store, and we have a running database, which will be the case for
Expand All @@ -196,7 +194,7 @@ public RequestContext flushStoresAndStreamStoreFiles( String triggerName, StoreW
}
}
}
// in the case where isDefaultFileSystem == true we always read the file from disk

try ( ReadableByteChannel fileChannel = fileSystem.open( file, "r" ) )
{
long fileSize = fileSystem.getFileSize( file );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.nio.channels.WritableByteChannel;
import java.util.List;

import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.pagecache.PageCache;
Expand Down Expand Up @@ -70,12 +69,9 @@ public long write( String path, ReadableByteChannel data, ByteBuffer temporaryBu
monitor.startReceivingStoreFile( file );
try
{
// Default filesystem supports writing of data through the file system directly
boolean isDefaultFileSystem = pageCache.getCachedFileSystem() instanceof DefaultFileSystemAbstraction;

// Note that we don't bother checking if the page cache already has a mapping for the given file.
// The reason is that we are copying to a temporary store location, and then we'll move the files later.
if ( !isDefaultFileSystem && StoreType.shouldBeManagedByPageCache( filename ) )
if ( !pageCache.fileSystemSupportsFileOperations() && StoreType.shouldBeManagedByPageCache( filename ) )
{
int filePageSize = filePageSize( requiredElementAlignment );
try ( PagedFile pagedFile = pageCache.map( file, filePageSize, CREATE, WRITE ) )
Expand Down

0 comments on commit 54494b7

Please sign in to comment.