Skip to content

Commit

Permalink
Move streamFilesRecursive to FSA
Browse files Browse the repository at this point in the history
  • Loading branch information
ragadeeshu committed May 24, 2017
1 parent febeb00 commit c3498d7
Show file tree
Hide file tree
Showing 32 changed files with 912 additions and 1,150 deletions.
Expand Up @@ -40,6 +40,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Stream;


import org.neo4j.io.IOUtils; import org.neo4j.io.IOUtils;
import org.neo4j.io.fs.watcher.DefaultFileSystemWatcher; import org.neo4j.io.fs.watcher.DefaultFileSystemWatcher;
Expand Down Expand Up @@ -218,6 +219,12 @@ public void deleteFileOrThrow( File file ) throws IOException
Files.delete( file.toPath() ); Files.delete( file.toPath() );
} }


@Override
public Stream<FileHandle> streamFilesRecursive( File directory ) throws IOException
{
return StreamFilesRecursive.streamFilesRecursive( directory, this );
}

protected StoreFileChannel getStoreFileChannel( FileChannel channel ) protected StoreFileChannel getStoreFileChannel( FileChannel channel )
{ {
return new StoreFileChannel( channel ); return new StoreFileChannel( channel );
Expand Down
Expand Up @@ -297,6 +297,12 @@ public void deleteFileOrThrow( File file ) throws IOException
Files.delete( path( file ) ); Files.delete( path( file ) );
} }


@Override
public Stream<FileHandle> streamFilesRecursive( File directory ) throws IOException
{
return StreamFilesRecursive.streamFilesRecursive( directory, this );
}

@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
Expand Down
Expand Up @@ -17,19 +17,17 @@
* You should have received a copy of the GNU General Public License * You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
package org.neo4j.io.pagecache; package org.neo4j.io.fs;


import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.nio.file.CopyOption; import java.nio.file.CopyOption;
import java.util.function.Consumer; import java.util.function.Consumer;


import org.neo4j.io.fs.FileUtils;

/** /**
* A handle to a file as seen by the page cache. The file may or may not be mapped. * A handle to a file as seen by the page cache. The file may or may not be mapped.
* @see PageCache#streamFilesRecursive(File) * @see FileSystemAbstraction#streamFilesRecursive(File)
*/ */
public interface FileHandle public interface FileHandle
{ {
Expand Down Expand Up @@ -102,7 +100,8 @@ static Consumer<FileHandle> handleRename( File to )


/** /**
* Get a {@link File} object for the abstract path name that this file handle represents, and that is * Get a {@link File} object for the abstract path name that this file handle represents, and that is
* <em>relative</em> to the base path that was passed into the {@link PageCache#streamFilesRecursive(File)} method. * <em>relative</em> to the base path that was passed into the
* {@link FileSystemAbstraction#streamFilesRecursive(File)} method.
* <p> * <p>
* This method is otherwise behaviourally the same as {@link #getFile()}. * This method is otherwise behaviourally the same as {@link #getFile()}.
* *
Expand Down
Expand Up @@ -29,7 +29,9 @@
import java.io.Writer; import java.io.Writer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.file.CopyOption; import java.nio.file.CopyOption;
import java.nio.file.NoSuchFileException;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Stream;
import java.util.zip.ZipOutputStream; import java.util.zip.ZipOutputStream;


import org.neo4j.io.fs.watcher.FileWatcher; import org.neo4j.io.fs.watcher.FileWatcher;
Expand All @@ -40,6 +42,7 @@ public interface FileSystemAbstraction extends Closeable
/** /**
* Create file watcher that provides possibilities to monitor directories on underlying file system * Create file watcher that provides possibilities to monitor directories on underlying file system
* abstraction * abstraction
*
* @return specific file system abstract watcher * @return specific file system abstract watcher
* @throws IOException in case exception occur during file watcher creation * @throws IOException in case exception occur during file watcher creation
*/ */
Expand Down Expand Up @@ -83,7 +86,7 @@ public interface FileSystemAbstraction extends Closeable


void copyRecursively( File fromDirectory, File toDirectory ) throws IOException; void copyRecursively( File fromDirectory, File toDirectory ) throws IOException;


<K extends ThirdPartyFileSystem> K getOrCreateThirdPartyFileSystem( Class<K> clazz, Function<Class<K>, K> creator ); <K extends ThirdPartyFileSystem> K getOrCreateThirdPartyFileSystem( Class<K> clazz, Function<Class<K>,K> creator );


void truncate( File path, long size ) throws IOException; void truncate( File path, long size ) throws IOException;


Expand All @@ -97,4 +100,27 @@ interface ThirdPartyFileSystem extends Closeable


void dumpToZip( ZipOutputStream zip, byte[] scratchPad ) throws IOException; void dumpToZip( ZipOutputStream zip, byte[] scratchPad ) throws IOException;
} }

/**
* Return a stream of {@link FileHandle file handles} for every file in the given directory, and its
* sub-directories.
* <p>
* Alternatively, if the {@link File} given as an argument refers to a file instead of a directory, then a stream
* will be returned with a file handle for just that file.
* <p>
* The stream is based on a snapshot of the file tree, so changes made to the tree using the returned file handles
* will not be reflected in the stream.
* <p>
* No directories will be returned. Only files. If a file handle ends up leaving a directory empty through a
* rename or a delete, then the empty directory will automatically be deleted as well.
* Likewise, if a file is moved to a path where not all of the directories in the path exists, then those missing
* directories will be created prior to the file rename.
*
* @param directory The base directory to start streaming files from, or the specific individual file to stream.
* @return A stream of all files in the tree.
* @throws NoSuchFileException If the given base directory or file does not exists.
* @throws IOException If an I/O error occurs, possibly with the canonicalisation of the paths.
*/
Stream<FileHandle> streamFilesRecursive( File directory ) throws IOException;

} }
@@ -0,0 +1,94 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.io.fs;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.NoSuchFileException;
import java.util.List;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;

public class StreamFilesRecursive
{
private StreamFilesRecursive()
{
//This is a helper class, do not instantiate it.
}

/**
* Static implementation of {@link FileSystemAbstraction#streamFilesRecursive(File)} that does not require
* any external state, other than what is presented through the given {@link FileSystemAbstraction}.
*
* Return a stream of {@link FileHandle file handles} for every file in the given directory, and its
* sub-directories.
* <p>
* Alternatively, if the {@link File} given as an argument refers to a file instead of a directory, then a stream
* will be returned with a file handle for just that file.
* <p>
* The stream is based on a snapshot of the file tree, so changes made to the tree using the returned file handles
* will not be reflected in the stream.
* <p>
* No directories will be returned. Only files. If a file handle ends up leaving a directory empty through a
* rename or a delete, then the empty directory will automatically be deleted as well.
* Likewise, if a file is moved to a path where not all of the directories in the path exists, then those missing
* directories will be created prior to the file rename.
*
* @param directory The base directory to start streaming files from, or the specific individual file to stream.
* @param fs The {@link FileSystemAbstraction} to use for manipulating files.
* @return A {@link Stream} of {@link FileHandle}s
* @throws NoSuchFileException If the given base directory or file does not exists.
* @throws IOException If an I/O error occurs, possibly with the canonicalisation of the paths.
*/
public static Stream<FileHandle> streamFilesRecursive( File directory, FileSystemAbstraction fs ) throws IOException
{
try
{
// We grab a snapshot of the file tree to avoid seeing the same file twice or more due to renames.
List<File> snapshot = streamFilesRecursiveInner( directory.getCanonicalFile(), fs ).collect( toList() );
return snapshot.stream().map( f -> new WrappingFileHandle( f, directory, fs ) );
}
catch ( UncheckedIOException e )
{
// We sneak checked IOExceptions through UncheckedIOExceptions due to our use of streams and lambdas.
throw e.getCause();
}
}

private static Stream<File> streamFilesRecursiveInner( File directory, FileSystemAbstraction fs )
{
File[] files = fs.listFiles( directory );
if ( files == null )
{
if ( !fs.fileExists( directory ) )
{
throw new UncheckedIOException( new NoSuchFileException( directory.getPath() ) );
}
return Stream.of( directory );
}
else
{
return Stream.of( files )
.flatMap( f -> fs.isDirectory( f ) ? streamFilesRecursiveInner( f, fs ) : Stream.of( f ) );
}
}
}
91 changes: 91 additions & 0 deletions community/io/src/main/java/org/neo4j/io/fs/WrappingFileHandle.java
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.io.fs;

import java.io.File;
import java.io.IOException;
import java.nio.file.CopyOption;

class WrappingFileHandle implements FileHandle
{
private final File file;
private final File baseDirectory;
private final FileSystemAbstraction fs;

WrappingFileHandle( File file, File baseDirectory, FileSystemAbstraction fs )
{
this.file = file;
this.baseDirectory = baseDirectory;
this.fs = fs;
}

@Override
public File getFile()
{
return file;
}

@Override
public File getRelativeFile()
{
int baseLength = baseDirectory.getPath().length();
if ( baseDirectory.getParent() != null )
{
baseLength++;
}
return new File( file.getPath().substring( baseLength ) );
}

@Override
public void rename( File to, CopyOption... options ) throws IOException
{
File parentFile = file.getParentFile();
File cannonicalTarget = to.getCanonicalFile();
fs.mkdirs( cannonicalTarget.getParentFile() );
fs.renameFile( file, cannonicalTarget, options );
removeEmptyParent( parentFile );
}

private void removeEmptyParent( File parentFile )
{
// delete up to and including the base directory, but not above.
// Note that this may be 'null' if 'baseDirectory' is the top directory.
// Fortunately, 'File.equals(other)' handles 'null' and returns 'false' when 'other' is 'null'.
File end = baseDirectory.getParentFile();
while ( parentFile != null && !parentFile.equals( end ) )
{
File[] files = fs.listFiles( parentFile );
if ( files == null || files.length > 0 )
{
return;
}
fs.deleteFile( parentFile );
parentFile = parentFile.getParentFile();
}
}

@Override
public void delete() throws IOException
{
File parentFile = file.getParentFile();
fs.deleteFileOrThrow( file );
removeEmptyParent( parentFile );
}
}
26 changes: 5 additions & 21 deletions community/io/src/main/java/org/neo4j/io/pagecache/PageCache.java
Expand Up @@ -21,11 +21,11 @@


import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.nio.file.OpenOption; import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Stream;
import org.neo4j.io.fs.FileSystemAbstraction;


/** /**
* A page caching mechanism that allows caching multiple files and accessing their data * A page caching mechanism that allows caching multiple files and accessing their data
Expand Down Expand Up @@ -110,24 +110,8 @@ public interface PageCache extends AutoCloseable
int maxCachedPages(); int maxCachedPages();


/** /**
* Return a stream of {@link FileHandle file handles} for every file in the given directory, and its * Get the {@link FileSystemAbstraction} that represents the filesystem where the paged files reside.
* sub-directories. * @return the filesystem that the page cache is using.
* <p>
* Alternatively, if the {@link File} given as an argument refers to a file instead of a directory, then a stream
* will be returned with a file handle for just that file.
* <p>
* The stream is based on a snapshot of the file tree, so changes made to the tree using the returned file handles
* will not be reflected in the stream.
* <p>
* No directories will be returned. Only files. If a file handle ends up leaving a directory empty through a
* rename or a delete, then the empty directory will automatically be deleted as well.
* Likewise, if a file is moved to a path where not all of the directories in the path exists, then those missing
* directories will be created prior to the file rename.
*
* @param directory The base directory to start streaming files from, or the specific individual file to stream.
* @return A stream of all files in the tree.
* @throws NoSuchFileException If the given base directory or file does not exists.
* @throws IOException If an I/O error occurs, possibly with the canonicalisation of the paths.
*/ */
Stream<FileHandle> streamFilesRecursive( File directory ) throws IOException; FileSystemAbstraction getCachedFileSystem();
} }
Expand Up @@ -47,6 +47,11 @@ public interface PageSwapperFactory
*/ */
void open( FileSystemAbstraction fs, Configuration config ); void open( FileSystemAbstraction fs, Configuration config );


/**
* Get the {@link FileSystemAbstraction} that represents the underlying storage for the page swapper.
*/
FileSystemAbstraction getFileSystemAbstraction();

/** /**
* Get the name of this PageSwapperFactory implementation, for configuration purpose. * Get the name of this PageSwapperFactory implementation, for configuration purpose.
*/ */
Expand Down Expand Up @@ -105,33 +110,6 @@ PageSwapper createPageSwapper(
*/ */
void syncDevice() throws IOException; void syncDevice() throws IOException;


/**
* Return a stream of {@link FileHandle file handles} for every file in the given directory, and its
* sub-directories.
* <p>
* Alternatively, if the {@link File} given as an argument refers to a file instead of a directory, then a stream
* will be returned with a file handle for just that file.
* <p>
* The stream is based on a snapshot of the file tree, so changes made to the tree using the returned file handles
* will not be reflected in the stream.
* <p>
* No directories will be returned. Only files. If a file handle ends up leaving a directory empty through a
* rename or a delete, then the empty directory will automatically be deleted as well.
* Likewise, if a file is moved to a path where not all of the directories in the path exists, then those missing
* directories will be created prior to the file rename.
* <p>
* This method form the basis of the implementation of the {@link PageCache#streamFilesRecursive(File)} method.
* The key difference is that the stream and file handles are oblivious about which files are mapped or not.
* Thus, the returned {@link FileHandle file handles} will never throw any
* {@link org.neo4j.io.pagecache.impl.FileIsMappedException}s.
*
* @param directory The base directory to start streaming files from, or the specific individual file to stream.
* @return A stream of all files in the tree.
* @throws NoSuchFileException If the given base directory or file does not exists.
* @throws IOException If an I/O error occurs, possibly with the canonicalisation of the paths.
*/
Stream<FileHandle> streamFilesRecursive( File directory ) throws IOException;

/** /**
* Close and release any resources associated with this PageSwapperFactory, that it may have opened or acquired * Close and release any resources associated with this PageSwapperFactory, that it may have opened or acquired
* during its construction or use. * during its construction or use.
Expand Down

0 comments on commit c3498d7

Please sign in to comment.