diff --git a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/PageCacheWarmer.java b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/PageCacheWarmer.java index aded0626c3525..d859a7b356a39 100644 --- a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/PageCacheWarmer.java +++ b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/PageCacheWarmer.java @@ -19,11 +19,9 @@ */ package org.neo4j.kernel.impl.pagecache; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; import java.util.Collection; import java.util.Comparator; import java.util.List; @@ -37,14 +35,9 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; import org.neo4j.graphdb.Resource; -import org.neo4j.io.IOUtils; import org.neo4j.io.fs.FileSystemAbstraction; -import org.neo4j.io.fs.OpenMode; -import org.neo4j.io.fs.StoreChannel; import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCursor; import org.neo4j.io.pagecache.PagedFile; @@ -53,7 +46,7 @@ import org.neo4j.scheduler.JobScheduler; import org.neo4j.storageengine.api.StoreFileMetadata; -import static java.lang.Long.toHexString; +import static java.util.Comparator.naturalOrder; import static org.neo4j.io.pagecache.PagedFile.PF_NO_FAULT; import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_READ_LOCK; @@ -110,7 +103,7 @@ public synchronized Resource addFilesTo( Collection coll ) th Profile[] existingProfiles = findExistingProfiles( files ); for ( Profile profile : existingProfiles ) { - coll.add( new StoreFileMetadata( profile.profileFile, 1, false ) ); + coll.add( new StoreFileMetadata( profile.file(), 1, false ) ); } refCounts.incrementRefCounts( existingProfiles ); return () -> @@ -171,7 +164,7 @@ private long reheat( PagedFile file, Profile[] existingProfiles ) throws IOExcep // The file contents checks out. Let's load it in. long pagesLoaded = 0; - try ( InputStream inputStream = compressedInputStream( savedProfile.get().profileFile ); + try ( InputStream inputStream = savedProfile.get().read( fs ); PageLoader loader = pageLoaderFactory.getLoader( file ) ) { long pageId = 0; @@ -202,7 +195,7 @@ private long reheat( PagedFile file, Profile[] existingProfiles ) throws IOExcep private boolean verifyChecksum( Profile profile ) { // Successfully reading through and closing the compressed file implies verifying the gzip checksum. - try ( InputStream inputStream = compressedInputStream( profile.profileFile ) ) + try ( InputStream inputStream = profile.read( fs ) ) { int b; do @@ -256,9 +249,12 @@ public synchronized OptionalLong profile() throws IOException private long profile( PagedFile file, Profile[] existingProfiles ) throws IOException { long pagesInMemory = 0; - File outputFile = profileOutputFileName( file, filterRelevant( existingProfiles, file ) ); + Profile nextProfile = filterRelevant( existingProfiles, file ) + .max( naturalOrder() ) + .map( Profile::next ) + .orElse( Profile.first( file.file() ) ); - try ( OutputStream outputStream = compressedOutputStream( outputFile ); + try ( OutputStream outputStream = nextProfile.write( fs ); PageCursor cursor = file.io( 0, PF_SHARED_READ_LOCK | PF_NO_FAULT ) ) { int stepper = 0; @@ -289,73 +285,14 @@ private long profile( PagedFile file, Profile[] existingProfiles ) throws IOExce // Delete previous profile files. filterRelevant( existingProfiles, file ) .filter( profile -> !refCounts.contains( profile ) ) - .forEach( p -> fs.deleteFile( p.profileFile ) ); + .forEach( p -> p.delete( fs ) ); return pagesInMemory; } - private InputStream compressedInputStream( File input ) throws IOException - { - InputStream source = fs.openAsInputStream( input ); - try - { - return new GZIPInputStream( source ); - } - catch ( IOException e ) - { - IOUtils.closeAllSilently( source ); - throw new IOException( "Exception when building decompressor.", e ); - } - } - - private OutputStream compressedOutputStream( File output ) throws IOException - { - StoreChannel channel = fs.open( output, OpenMode.READ_WRITE ); - ByteBuffer buf = ByteBuffer.allocate( 1 ); - OutputStream sink = new OutputStream() - { - @Override - public void write( int b ) throws IOException - { - buf.put( (byte) b ); - buf.flip(); - channel.write( buf ); - buf.flip(); - } - - @Override - public void close() throws IOException - { - channel.truncate( channel.position() ); - channel.close(); - } - }; - try - { - return new GZIPOutputStream( sink ); - } - catch ( IOException e ) - { - // We close the channel instead of the sink here, because we don't want to truncate the file if we fail - // to open the gzip output stream. - IOUtils.closeAllSilently( channel ); - throw new IOException( "Exception when building compressor.", e ); - } - } - - private File profileOutputFileName( PagedFile file, Stream existingProfiles ) - { - long lastFileCount = existingProfiles.mapToLong( p -> p.profileCount ).max().orElse( 0L ); - long nextFileCount = lastFileCount + 1L; - File mappedFile = file.file(); - File parent = mappedFile.getParentFile(); - String profileOutputName = "." + mappedFile.getName() + "." + toHexString( nextFileCount ) + SUFFIX_CACHEPROF; - return new File( parent, profileOutputName ); - } - private Stream filterRelevant( Profile[] profiles, PagedFile pagedFile ) { - return Stream.of( profiles ).filter( profile -> profile.mappedFileName.equals( pagedFile.file().getName() ) ); + return Stream.of( profiles ).filter( Profile.relevantTo( pagedFile ) ); } private Profile[] findExistingProfiles( List pagedFiles ) @@ -363,41 +300,8 @@ private Profile[] findExistingProfiles( List pagedFiles ) return pagedFiles.stream() .map( pf -> pf.file().getParentFile() ) .distinct() - .flatMap( this::listFiles ) - .flatMap( this::parseProfileName ) + .flatMap( dir -> Profile.findProfilesInDirectory( fs, dir ) ) .sorted() .toArray( Profile[]::new ); } - - private Stream listFiles( File dir ) - { - File[] files = fs.listFiles( dir ); - if ( files == null ) - { - return Stream.empty(); - } - return Stream.of( files ); - } - - private Stream parseProfileName( File profile ) - { - String name = profile.getName(); - if ( !name.startsWith( "." ) && !name.endsWith( SUFFIX_CACHEPROF ) ) - { - return Stream.empty(); - } - int lastDot = name.lastIndexOf( '.' ); - int secondLastDot = name.lastIndexOf( '.', lastDot - 1 ); - String countStr = name.substring( secondLastDot + 1, lastDot ); - try - { - long count = Long.parseLong( countStr, 16 ); - String mappedFileName = name.substring( 1, secondLastDot ); - return Stream.of( new Profile( profile, mappedFileName, count ) ); - } - catch ( NumberFormatException e ) - { - return Stream.empty(); - } - } } diff --git a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/Profile.java b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/Profile.java index f2dac5010f41d..8600cb2f8c6f0 100644 --- a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/Profile.java +++ b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/Profile.java @@ -20,27 +20,41 @@ package org.neo4j.kernel.impl.pagecache; import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.Objects; +import java.util.function.Predicate; +import java.util.stream.Stream; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import org.neo4j.io.IOUtils; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.PagedFile; + +import static org.neo4j.kernel.impl.pagecache.PageCacheWarmer.SUFFIX_CACHEPROF; final class Profile implements Comparable { - final File profileFile; - final String mappedFileName; - final long profileCount; + private final File profileFile; + private final File pagedFile; + private final long profileCount; - Profile( File profileFile, String mappedFileName, long profileCount ) + private Profile( File profileFile, File pagedFile, long profileCount ) { Objects.requireNonNull( profileFile ); - Objects.requireNonNull( mappedFileName ); + Objects.requireNonNull( pagedFile ); this.profileFile = profileFile; - this.mappedFileName = mappedFileName; + this.pagedFile = pagedFile; this.profileCount = profileCount; } @Override public int compareTo( Profile that ) { - return Long.compare( profileCount, that.profileCount ); + int compare = profileFile.compareTo( that.profileFile ); + return compare == 0 ? Long.compare( profileCount, that.profileCount ) : compare; } @Override @@ -59,4 +73,98 @@ public int hashCode() { return profileFile.hashCode(); } + + File file() + { + return profileFile; + } + + void delete( FileSystemAbstraction fs ) + { + fs.deleteFile( profileFile ); + } + + InputStream read( FileSystemAbstraction fs ) throws IOException + { + InputStream source = fs.openAsInputStream( profileFile ); + try + { + return new GZIPInputStream( source ); + } + catch ( IOException e ) + { + IOUtils.closeAllSilently( source ); + throw new IOException( "Exception when building decompressor.", e ); + } + } + + OutputStream write( FileSystemAbstraction fs ) throws IOException + { + OutputStream sink = fs.openAsOutputStream( profileFile, false ); + try + { + return new GZIPOutputStream( sink ); + } + catch ( IOException e ) + { + IOUtils.closeAllSilently( sink ); + throw new IOException( "Exception when building compressor.", e ); + } + } + + Profile next() + { + long nextCount = profileCount + 1L; + return new Profile( profileName( pagedFile, nextCount ), pagedFile, nextCount ); + } + + static Profile first( File file ) + { + return new Profile( profileName( file, 0 ), file, 0 ); + } + + private static File profileName( File file, long count ) + { + String name = file.getName(); + File dir = file.getParentFile(); + return new File( dir, "." + name + "." + Long.toHexString( count ) + SUFFIX_CACHEPROF ); + } + + static Predicate relevantTo( PagedFile pagedFile ) + { + return p -> p.pagedFile.equals( pagedFile.file() ); + } + + static Stream findProfilesInDirectory( FileSystemAbstraction fs, File dir ) + { + File[] files = fs.listFiles( dir ); + if ( files == null ) + { + return Stream.empty(); + } + return Stream.of( files ).flatMap( Profile::parseProfileName ); + } + + private static Stream parseProfileName( File profile ) + { + File dir = profile.getParentFile(); + String name = profile.getName(); + if ( !name.startsWith( "." ) && !name.endsWith( SUFFIX_CACHEPROF ) ) + { + return Stream.empty(); + } + int lastDot = name.lastIndexOf( '.' ); + int secondLastDot = name.lastIndexOf( '.', lastDot - 1 ); + String countStr = name.substring( secondLastDot + 1, lastDot ); + try + { + long count = Long.parseLong( countStr, 16 ); + String mappedFileName = name.substring( 1, secondLastDot ); + return Stream.of( new Profile( profile, new File( dir, mappedFileName ), count ) ); + } + catch ( NumberFormatException e ) + { + return Stream.empty(); + } + } }