Skip to content

Commit

Permalink
Make sure that page cache profile files are not deleted during online…
Browse files Browse the repository at this point in the history
… backup and store copy.
  • Loading branch information
chrisvest committed Mar 20, 2018
1 parent 1be2c6d commit a3cc22f
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 50 deletions.
8 changes: 4 additions & 4 deletions enterprise/kernel/pom.xml
Expand Up @@ -67,14 +67,14 @@
<artifactId>neo4j-kernel</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Neo4j test dependencies -->
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-consistency-check</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<!-- Neo4j test dependencies -->
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-kernel</artifactId>
Expand Down Expand Up @@ -162,15 +162,15 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-kernel-api</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Expand Up @@ -25,7 +25,9 @@
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
Expand All @@ -34,9 +36,9 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipException;

import org.neo4j.graphdb.Resource;
import org.neo4j.io.IOUtils;
Expand All @@ -51,6 +53,7 @@
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.storageengine.api.StoreFileMetadata;

import static java.lang.Long.toHexString;
import static org.neo4j.io.pagecache.PagedFile.PF_NO_FAULT;
import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_READ_LOCK;

Expand All @@ -74,6 +77,7 @@ public class PageCacheWarmer implements NeoStoreFileListing.StoreFileProvider
private final PageCache pageCache;
private final ExecutorService executor;
private final PageLoaderFactory pageLoaderFactory;
private final ProfileRefCounts refCounts;
private volatile boolean stopped;

PageCacheWarmer( FileSystemAbstraction fs, PageCache pageCache, JobScheduler scheduler )
Expand All @@ -82,6 +86,7 @@ public class PageCacheWarmer implements NeoStoreFileListing.StoreFileProvider
this.pageCache = pageCache;
this.executor = buildExecutorService( scheduler );
this.pageLoaderFactory = new PageLoaderFactory( executor, pageCache );
this.refCounts = new ProfileRefCounts();
}

private ExecutorService buildExecutorService( JobScheduler scheduler )
Expand All @@ -102,15 +107,16 @@ public synchronized Resource addFilesTo( Collection<StoreFileMetadata> coll ) th
return Resource.EMPTY;
}
List<PagedFile> files = pageCache.listExistingMappings();
for ( PagedFile file : files )
Profile[] existingProfiles = findExistingProfiles( files );
for ( Profile profile : existingProfiles )
{
File profileFile = profileOutputFileName( file );
if ( fs.fileExists( profileFile ) )
{
coll.add( new StoreFileMetadata( profileFile, 1, false ) );
}
coll.add( new StoreFileMetadata( profile.profileFile, 1, false ) );
}
return Resource.EMPTY;
refCounts.incrementRefCounts( existingProfiles );
return () ->
{
refCounts.decrementRefCounts( existingProfiles );
};
}

public void stop()
Expand All @@ -136,11 +142,12 @@ public synchronized OptionalLong reheat() throws IOException
{
long pagesLoaded = 0;
List<PagedFile> files = pageCache.listExistingMappings();
Profile[] existingProfiles = findExistingProfiles( files );
for ( PagedFile file : files )
{
try
{
pagesLoaded += reheat( file );
pagesLoaded += reheat( file, existingProfiles );
}
catch ( FileIsNotMappedException ignore )
{
Expand All @@ -150,35 +157,21 @@ public synchronized OptionalLong reheat() throws IOException
return stopped ? OptionalLong.empty() : OptionalLong.of( pagesLoaded );
}

private long reheat( PagedFile file ) throws IOException
private long reheat( PagedFile file, Profile[] existingProfiles ) throws IOException
{
long pagesLoaded = 0;
File savedProfile = profileOutputFileName( file );
Optional<Profile> savedProfile = filterRelevant( existingProfiles, file )
.sorted( Comparator.reverseOrder() ) // Try most recent profile first.
.filter( this::verifyChecksum )
.findFirst();

if ( !fs.fileExists( savedProfile ) )
if ( !savedProfile.isPresent() )
{
return pagesLoaded;
}

// First read through the profile to verify its checksum.
try ( InputStream inputStream = compressedInputStream( savedProfile ) )
{
int b;
do
{
b = inputStream.read();
}
while ( b != -1 );
}
catch ( ZipException ignore )
{
// ZipException is used to indicate checksum failures.
// Let's ignore this file since it's corrupt.
return pagesLoaded;
return 0;
}

// The file contents checks out. Let's load it in.
try ( InputStream inputStream = compressedInputStream( savedProfile );
long pagesLoaded = 0;
try ( InputStream inputStream = compressedInputStream( savedProfile.get().profileFile );
PageLoader loader = pageLoaderFactory.getLoader( file ) )
{
long pageId = 0;
Expand All @@ -189,6 +182,7 @@ private long reheat( PagedFile file ) throws IOException
{
if ( stopped )
{
pageCache.reportEvents();
return pagesLoaded;
}
if ( (b & 1) == 1 )
Expand All @@ -205,6 +199,25 @@ private long reheat( PagedFile file ) throws IOException
return pagesLoaded;
}

private boolean verifyChecksum( Profile profile )
{
// Successfully reading through and closing the compressed file implies verifying the gzip checksum.
try ( InputStream inputStream = compressedInputStream( profile.profileFile ) )
{
int b;
do
{
b = inputStream.read();
}
while ( b != -1 );
}
catch ( IOException ignore )
{
return false;
}
return true;
}

/**
* Profile the in-memory data in the page cache, and write it to "cacheprof" file siblings of the mapped files.
*
Expand All @@ -219,25 +232,31 @@ public synchronized OptionalLong profile() throws IOException
// profiling in parallel is just not worth it.
long pagesInMemory = 0;
List<PagedFile> files = pageCache.listExistingMappings();
Profile[] existingProfiles = findExistingProfiles( files );
for ( PagedFile file : files )
{
try
{
pagesInMemory += profile( file );
pagesInMemory += profile( file, existingProfiles );
}
catch ( FileIsNotMappedException ignore )
{
// The database is allowed to map and unmap files while we are profiling the page cache.
}
if ( stopped )
{
pageCache.reportEvents();
return OptionalLong.empty();
}
}
pageCache.reportEvents();
return stopped ? OptionalLong.empty() : OptionalLong.of( pagesInMemory );
return OptionalLong.of( pagesInMemory );
}

private long profile( PagedFile file ) throws IOException
private long profile( PagedFile file, Profile[] existingProfiles ) throws IOException
{
long pagesInMemory = 0;
File outputFile = profileOutputFileName( file );
File outputFile = profileOutputFileName( file, filterRelevant( existingProfiles, file ) );

try ( OutputStream outputStream = compressedOutputStream( outputFile );
PageCursor cursor = file.io( 0, PF_SHARED_READ_LOCK | PF_NO_FAULT ) )
Expand All @@ -246,10 +265,6 @@ private long profile( PagedFile file ) throws IOException
int b = 0;
for ( ; ; )
{
if ( stopped )
{
return pagesInMemory;
}
if ( !cursor.next() )
{
break; // Exit the loop if there are no more pages.
Expand All @@ -271,6 +286,11 @@ private long profile( PagedFile file ) throws IOException
outputStream.flush();
}

// Delete previous profile files.
filterRelevant( existingProfiles, file )
.filter( profile -> !refCounts.contains( profile ) )
.forEach( p -> fs.deleteFile( p.profileFile ) );

return pagesInMemory;
}

Expand Down Expand Up @@ -323,11 +343,61 @@ public void close() throws IOException
}
}

private File profileOutputFileName( PagedFile file )
private File profileOutputFileName( PagedFile file, Stream<Profile> existingProfiles )
{
long lastFileCount = existingProfiles.mapToLong( p -> p.profileCount ).max().orElse( 0L );
long nextFileCount = lastFileCount + 1L;
File mappedFile = file.file();
String profileOutputName = "." + mappedFile.getName() + SUFFIX_CACHEPROF;
File parent = mappedFile.getParentFile();
String profileOutputName = "." + mappedFile.getName() + "." + toHexString( nextFileCount ) + SUFFIX_CACHEPROF;
return new File( parent, profileOutputName );
}

private Stream<Profile> filterRelevant( Profile[] profiles, PagedFile pagedFile )
{
return Stream.of( profiles ).filter( profile -> profile.mappedFileName.equals( pagedFile.file().getName() ) );
}

private Profile[] findExistingProfiles( List<PagedFile> pagedFiles )
{
return pagedFiles.stream()
.map( pf -> pf.file().getParentFile() )
.distinct()
.flatMap( this::listFiles )
.flatMap( this::parseProfileName )
.sorted()
.toArray( Profile[]::new );
}

private Stream<File> listFiles( File dir )
{
File[] files = fs.listFiles( dir );
if ( files == null )
{
return Stream.empty();
}
return Stream.of( files );
}

private Stream<Profile> parseProfileName( File profile )
{
String name = profile.getName();
if ( !name.startsWith( "." ) && !name.endsWith( SUFFIX_CACHEPROF ) )
{
return Stream.empty();
}
int lastDot = name.lastIndexOf( '.' );
int secondLastDot = name.lastIndexOf( '.', lastDot - 1 );
String countStr = name.substring( secondLastDot + 1, lastDot );
try
{
long count = Long.parseLong( countStr, 16 );
String mappedFileName = name.substring( 1, secondLastDot );
return Stream.of( new Profile( profile, mappedFileName, count ) );
}
catch ( NumberFormatException e )
{
return Stream.empty();
}
}
}
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.kernel.impl.pagecache;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
Expand Down Expand Up @@ -68,7 +67,7 @@ class PageCacheWarmerKernelExtension extends LifecycleAdapter
}

@Override
public void start() throws Throwable
public void start()
{
if ( ENABLED )
{
Expand Down Expand Up @@ -108,7 +107,7 @@ private void doReheat()
" to load " + pagesLoaded + " pages." );
} );
}
catch ( IOException e )
catch ( Exception e )
{
log.debug( "Active page cache warmup failed, " +
"so it may take longer for the cache to be populated with hot data.", e );
Expand Down Expand Up @@ -143,7 +142,7 @@ private void doProfile()
", and found " + pagesInMemory + " pages in memory." );
});
}
catch ( IOException e )
catch ( Exception e )
{
log.debug( "Page cache profiling failed, so no new profile of what data is hot or not was produced. " +
"This may reduce the effectiveness of a future page cache warmup process.", e );
Expand Down

0 comments on commit a3cc22f

Please sign in to comment.