Skip to content

Commit

Permalink
Refactoring and cleanups in the PageCacheWarmer.
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed Mar 20, 2018
1 parent a3cc22f commit 07e4767
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 115 deletions.
Expand Up @@ -19,11 +19,9 @@
*/
package org.neo4j.kernel.impl.pagecache;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
Expand All @@ -37,14 +35,9 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import org.neo4j.graphdb.Resource;
import org.neo4j.io.IOUtils;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.OpenMode;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PagedFile;
Expand All @@ -53,7 +46,7 @@
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.storageengine.api.StoreFileMetadata;

import static java.lang.Long.toHexString;
import static java.util.Comparator.naturalOrder;
import static org.neo4j.io.pagecache.PagedFile.PF_NO_FAULT;
import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_READ_LOCK;

Expand Down Expand Up @@ -110,7 +103,7 @@ public synchronized Resource addFilesTo( Collection<StoreFileMetadata> coll ) th
Profile[] existingProfiles = findExistingProfiles( files );
for ( Profile profile : existingProfiles )
{
coll.add( new StoreFileMetadata( profile.profileFile, 1, false ) );
coll.add( new StoreFileMetadata( profile.file(), 1, false ) );
}
refCounts.incrementRefCounts( existingProfiles );
return () ->
Expand Down Expand Up @@ -171,7 +164,7 @@ private long reheat( PagedFile file, Profile[] existingProfiles ) throws IOExcep

// The file contents checks out. Let's load it in.
long pagesLoaded = 0;
try ( InputStream inputStream = compressedInputStream( savedProfile.get().profileFile );
try ( InputStream inputStream = savedProfile.get().read( fs );
PageLoader loader = pageLoaderFactory.getLoader( file ) )
{
long pageId = 0;
Expand Down Expand Up @@ -202,7 +195,7 @@ private long reheat( PagedFile file, Profile[] existingProfiles ) throws IOExcep
private boolean verifyChecksum( Profile profile )
{
// Successfully reading through and closing the compressed file implies verifying the gzip checksum.
try ( InputStream inputStream = compressedInputStream( profile.profileFile ) )
try ( InputStream inputStream = profile.read( fs ) )
{
int b;
do
Expand Down Expand Up @@ -256,9 +249,12 @@ public synchronized OptionalLong profile() throws IOException
private long profile( PagedFile file, Profile[] existingProfiles ) throws IOException
{
long pagesInMemory = 0;
File outputFile = profileOutputFileName( file, filterRelevant( existingProfiles, file ) );
Profile nextProfile = filterRelevant( existingProfiles, file )
.max( naturalOrder() )
.map( Profile::next )
.orElse( Profile.first( file.file() ) );

try ( OutputStream outputStream = compressedOutputStream( outputFile );
try ( OutputStream outputStream = nextProfile.write( fs );
PageCursor cursor = file.io( 0, PF_SHARED_READ_LOCK | PF_NO_FAULT ) )
{
int stepper = 0;
Expand Down Expand Up @@ -289,115 +285,23 @@ private long profile( PagedFile file, Profile[] existingProfiles ) throws IOExce
// Delete previous profile files.
filterRelevant( existingProfiles, file )
.filter( profile -> !refCounts.contains( profile ) )
.forEach( p -> fs.deleteFile( p.profileFile ) );
.forEach( p -> p.delete( fs ) );

return pagesInMemory;
}

private InputStream compressedInputStream( File input ) throws IOException
{
InputStream source = fs.openAsInputStream( input );
try
{
return new GZIPInputStream( source );
}
catch ( IOException e )
{
IOUtils.closeAllSilently( source );
throw new IOException( "Exception when building decompressor.", e );
}
}

private OutputStream compressedOutputStream( File output ) throws IOException
{
StoreChannel channel = fs.open( output, OpenMode.READ_WRITE );
ByteBuffer buf = ByteBuffer.allocate( 1 );
OutputStream sink = new OutputStream()
{
@Override
public void write( int b ) throws IOException
{
buf.put( (byte) b );
buf.flip();
channel.write( buf );
buf.flip();
}

@Override
public void close() throws IOException
{
channel.truncate( channel.position() );
channel.close();
}
};
try
{
return new GZIPOutputStream( sink );
}
catch ( IOException e )
{
// We close the channel instead of the sink here, because we don't want to truncate the file if we fail
// to open the gzip output stream.
IOUtils.closeAllSilently( channel );
throw new IOException( "Exception when building compressor.", e );
}
}

private File profileOutputFileName( PagedFile file, Stream<Profile> existingProfiles )
{
long lastFileCount = existingProfiles.mapToLong( p -> p.profileCount ).max().orElse( 0L );
long nextFileCount = lastFileCount + 1L;
File mappedFile = file.file();
File parent = mappedFile.getParentFile();
String profileOutputName = "." + mappedFile.getName() + "." + toHexString( nextFileCount ) + SUFFIX_CACHEPROF;
return new File( parent, profileOutputName );
}

private Stream<Profile> filterRelevant( Profile[] profiles, PagedFile pagedFile )
{
return Stream.of( profiles ).filter( profile -> profile.mappedFileName.equals( pagedFile.file().getName() ) );
return Stream.of( profiles ).filter( Profile.relevantTo( pagedFile ) );
}

private Profile[] findExistingProfiles( List<PagedFile> pagedFiles )
{
return pagedFiles.stream()
.map( pf -> pf.file().getParentFile() )
.distinct()
.flatMap( this::listFiles )
.flatMap( this::parseProfileName )
.flatMap( dir -> Profile.findProfilesInDirectory( fs, dir ) )
.sorted()
.toArray( Profile[]::new );
}

private Stream<File> listFiles( File dir )
{
File[] files = fs.listFiles( dir );
if ( files == null )
{
return Stream.empty();
}
return Stream.of( files );
}

private Stream<Profile> parseProfileName( File profile )
{
String name = profile.getName();
if ( !name.startsWith( "." ) && !name.endsWith( SUFFIX_CACHEPROF ) )
{
return Stream.empty();
}
int lastDot = name.lastIndexOf( '.' );
int secondLastDot = name.lastIndexOf( '.', lastDot - 1 );
String countStr = name.substring( secondLastDot + 1, lastDot );
try
{
long count = Long.parseLong( countStr, 16 );
String mappedFileName = name.substring( 1, secondLastDot );
return Stream.of( new Profile( profile, mappedFileName, count ) );
}
catch ( NumberFormatException e )
{
return Stream.empty();
}
}
}
Expand Up @@ -20,27 +20,41 @@
package org.neo4j.kernel.impl.pagecache;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import org.neo4j.io.IOUtils;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PagedFile;

import static org.neo4j.kernel.impl.pagecache.PageCacheWarmer.SUFFIX_CACHEPROF;

final class Profile implements Comparable<Profile>
{
final File profileFile;
final String mappedFileName;
final long profileCount;
private final File profileFile;
private final File pagedFile;
private final long profileCount;

Profile( File profileFile, String mappedFileName, long profileCount )
private Profile( File profileFile, File pagedFile, long profileCount )
{
Objects.requireNonNull( profileFile );
Objects.requireNonNull( mappedFileName );
Objects.requireNonNull( pagedFile );
this.profileFile = profileFile;
this.mappedFileName = mappedFileName;
this.pagedFile = pagedFile;
this.profileCount = profileCount;
}

@Override
public int compareTo( Profile that )
{
return Long.compare( profileCount, that.profileCount );
int compare = profileFile.compareTo( that.profileFile );
return compare == 0 ? Long.compare( profileCount, that.profileCount ) : compare;
}

@Override
Expand All @@ -59,4 +73,98 @@ public int hashCode()
{
return profileFile.hashCode();
}

File file()
{
return profileFile;
}

void delete( FileSystemAbstraction fs )
{
fs.deleteFile( profileFile );
}

InputStream read( FileSystemAbstraction fs ) throws IOException
{
InputStream source = fs.openAsInputStream( profileFile );
try
{
return new GZIPInputStream( source );
}
catch ( IOException e )
{
IOUtils.closeAllSilently( source );
throw new IOException( "Exception when building decompressor.", e );
}
}

OutputStream write( FileSystemAbstraction fs ) throws IOException
{
OutputStream sink = fs.openAsOutputStream( profileFile, false );
try
{
return new GZIPOutputStream( sink );
}
catch ( IOException e )
{
IOUtils.closeAllSilently( sink );
throw new IOException( "Exception when building compressor.", e );
}
}

Profile next()
{
long nextCount = profileCount + 1L;
return new Profile( profileName( pagedFile, nextCount ), pagedFile, nextCount );
}

static Profile first( File file )
{
return new Profile( profileName( file, 0 ), file, 0 );
}

private static File profileName( File file, long count )
{
String name = file.getName();
File dir = file.getParentFile();
return new File( dir, "." + name + "." + Long.toHexString( count ) + SUFFIX_CACHEPROF );
}

static Predicate<Profile> relevantTo( PagedFile pagedFile )
{
return p -> p.pagedFile.equals( pagedFile.file() );
}

static Stream<Profile> findProfilesInDirectory( FileSystemAbstraction fs, File dir )
{
File[] files = fs.listFiles( dir );
if ( files == null )
{
return Stream.empty();
}
return Stream.of( files ).flatMap( Profile::parseProfileName );
}

private static Stream<Profile> parseProfileName( File profile )
{
File dir = profile.getParentFile();
String name = profile.getName();
if ( !name.startsWith( "." ) && !name.endsWith( SUFFIX_CACHEPROF ) )
{
return Stream.empty();
}
int lastDot = name.lastIndexOf( '.' );
int secondLastDot = name.lastIndexOf( '.', lastDot - 1 );
String countStr = name.substring( secondLastDot + 1, lastDot );
try
{
long count = Long.parseLong( countStr, 16 );
String mappedFileName = name.substring( 1, secondLastDot );
return Stream.of( new Profile( profile, new File( dir, mappedFileName ), count ) );
}
catch ( NumberFormatException e )
{
return Stream.empty();
}
}
}

0 comments on commit 07e4767

Please sign in to comment.