From a3cc22f47e9a46a9d88f6628fc42fd847a462fc5 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Mon, 19 Mar 2018 14:22:15 +0100 Subject: [PATCH] Make sure that page cache profile files are not deleted during online backup and store copy. --- enterprise/kernel/pom.xml | 8 +- .../impl/pagecache/PageCacheWarmer.java | 154 +++++++++++++----- .../PageCacheWarmerKernelExtension.java | 7 +- .../neo4j/kernel/impl/pagecache/Profile.java | 62 +++++++ .../impl/pagecache/ProfileRefCounts.java | 56 +++++++ .../impl/pagecache/PageCacheWarmerTest.java | 66 ++++++++ 6 files changed, 303 insertions(+), 50 deletions(-) create mode 100644 enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/Profile.java create mode 100644 enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/ProfileRefCounts.java diff --git a/enterprise/kernel/pom.xml b/enterprise/kernel/pom.xml index da2b193a623d4..f081b3a4c34fa 100644 --- a/enterprise/kernel/pom.xml +++ b/enterprise/kernel/pom.xml @@ -67,14 +67,14 @@ neo4j-kernel ${project.version} + + org.neo4j neo4j-consistency-check ${project.version} test - - org.neo4j neo4j-kernel @@ -162,8 +162,6 @@ test-jar test - - org.neo4j neo4j-kernel-api @@ -171,6 +169,8 @@ test-jar test + + junit junit diff --git a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/PageCacheWarmer.java b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/PageCacheWarmer.java index be27a6f5657b8..aded0626c3525 100644 --- a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/PageCacheWarmer.java +++ b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/PageCacheWarmer.java @@ -25,7 +25,9 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; @@ -34,9 +36,9 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -import java.util.zip.ZipException; import org.neo4j.graphdb.Resource; import org.neo4j.io.IOUtils; @@ -51,6 +53,7 @@ import org.neo4j.scheduler.JobScheduler; import org.neo4j.storageengine.api.StoreFileMetadata; +import static java.lang.Long.toHexString; import static org.neo4j.io.pagecache.PagedFile.PF_NO_FAULT; import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_READ_LOCK; @@ -74,6 +77,7 @@ public class PageCacheWarmer implements NeoStoreFileListing.StoreFileProvider private final PageCache pageCache; private final ExecutorService executor; private final PageLoaderFactory pageLoaderFactory; + private final ProfileRefCounts refCounts; private volatile boolean stopped; PageCacheWarmer( FileSystemAbstraction fs, PageCache pageCache, JobScheduler scheduler ) @@ -82,6 +86,7 @@ public class PageCacheWarmer implements NeoStoreFileListing.StoreFileProvider this.pageCache = pageCache; this.executor = buildExecutorService( scheduler ); this.pageLoaderFactory = new PageLoaderFactory( executor, pageCache ); + this.refCounts = new ProfileRefCounts(); } private ExecutorService buildExecutorService( JobScheduler scheduler ) @@ -102,15 +107,16 @@ public synchronized Resource addFilesTo( Collection coll ) th return Resource.EMPTY; } List files = pageCache.listExistingMappings(); - for ( PagedFile file : files ) + Profile[] existingProfiles = findExistingProfiles( files ); + for ( Profile profile : existingProfiles ) { - File profileFile = profileOutputFileName( file ); - if ( fs.fileExists( profileFile ) ) - { - coll.add( new StoreFileMetadata( profileFile, 1, false ) ); - } + coll.add( new StoreFileMetadata( profile.profileFile, 1, false ) ); } - return Resource.EMPTY; + refCounts.incrementRefCounts( existingProfiles ); + return () -> + { + refCounts.decrementRefCounts( existingProfiles ); + }; } public void stop() @@ -136,11 +142,12 @@ public synchronized OptionalLong reheat() throws IOException { long pagesLoaded = 0; List files = pageCache.listExistingMappings(); + Profile[] existingProfiles = findExistingProfiles( files ); for ( PagedFile file : files ) { try { - pagesLoaded += reheat( file ); + pagesLoaded += reheat( file, existingProfiles ); } catch ( FileIsNotMappedException ignore ) { @@ -150,35 +157,21 @@ public synchronized OptionalLong reheat() throws IOException return stopped ? OptionalLong.empty() : OptionalLong.of( pagesLoaded ); } - private long reheat( PagedFile file ) throws IOException + private long reheat( PagedFile file, Profile[] existingProfiles ) throws IOException { - long pagesLoaded = 0; - File savedProfile = profileOutputFileName( file ); + Optional savedProfile = filterRelevant( existingProfiles, file ) + .sorted( Comparator.reverseOrder() ) // Try most recent profile first. + .filter( this::verifyChecksum ) + .findFirst(); - if ( !fs.fileExists( savedProfile ) ) + if ( !savedProfile.isPresent() ) { - return pagesLoaded; - } - - // First read through the profile to verify its checksum. - try ( InputStream inputStream = compressedInputStream( savedProfile ) ) - { - int b; - do - { - b = inputStream.read(); - } - while ( b != -1 ); - } - catch ( ZipException ignore ) - { - // ZipException is used to indicate checksum failures. - // Let's ignore this file since it's corrupt. - return pagesLoaded; + return 0; } // The file contents checks out. Let's load it in. - try ( InputStream inputStream = compressedInputStream( savedProfile ); + long pagesLoaded = 0; + try ( InputStream inputStream = compressedInputStream( savedProfile.get().profileFile ); PageLoader loader = pageLoaderFactory.getLoader( file ) ) { long pageId = 0; @@ -189,6 +182,7 @@ private long reheat( PagedFile file ) throws IOException { if ( stopped ) { + pageCache.reportEvents(); return pagesLoaded; } if ( (b & 1) == 1 ) @@ -205,6 +199,25 @@ private long reheat( PagedFile file ) throws IOException return pagesLoaded; } + private boolean verifyChecksum( Profile profile ) + { + // Successfully reading through and closing the compressed file implies verifying the gzip checksum. + try ( InputStream inputStream = compressedInputStream( profile.profileFile ) ) + { + int b; + do + { + b = inputStream.read(); + } + while ( b != -1 ); + } + catch ( IOException ignore ) + { + return false; + } + return true; + } + /** * Profile the in-memory data in the page cache, and write it to "cacheprof" file siblings of the mapped files. * @@ -219,25 +232,31 @@ public synchronized OptionalLong profile() throws IOException // profiling in parallel is just not worth it. long pagesInMemory = 0; List files = pageCache.listExistingMappings(); + Profile[] existingProfiles = findExistingProfiles( files ); for ( PagedFile file : files ) { try { - pagesInMemory += profile( file ); + pagesInMemory += profile( file, existingProfiles ); } catch ( FileIsNotMappedException ignore ) { // The database is allowed to map and unmap files while we are profiling the page cache. } + if ( stopped ) + { + pageCache.reportEvents(); + return OptionalLong.empty(); + } } pageCache.reportEvents(); - return stopped ? OptionalLong.empty() : OptionalLong.of( pagesInMemory ); + return OptionalLong.of( pagesInMemory ); } - private long profile( PagedFile file ) throws IOException + private long profile( PagedFile file, Profile[] existingProfiles ) throws IOException { long pagesInMemory = 0; - File outputFile = profileOutputFileName( file ); + File outputFile = profileOutputFileName( file, filterRelevant( existingProfiles, file ) ); try ( OutputStream outputStream = compressedOutputStream( outputFile ); PageCursor cursor = file.io( 0, PF_SHARED_READ_LOCK | PF_NO_FAULT ) ) @@ -246,10 +265,6 @@ private long profile( PagedFile file ) throws IOException int b = 0; for ( ; ; ) { - if ( stopped ) - { - return pagesInMemory; - } if ( !cursor.next() ) { break; // Exit the loop if there are no more pages. @@ -271,6 +286,11 @@ private long profile( PagedFile file ) throws IOException outputStream.flush(); } + // Delete previous profile files. + filterRelevant( existingProfiles, file ) + .filter( profile -> !refCounts.contains( profile ) ) + .forEach( p -> fs.deleteFile( p.profileFile ) ); + return pagesInMemory; } @@ -323,11 +343,61 @@ public void close() throws IOException } } - private File profileOutputFileName( PagedFile file ) + private File profileOutputFileName( PagedFile file, Stream existingProfiles ) { + long lastFileCount = existingProfiles.mapToLong( p -> p.profileCount ).max().orElse( 0L ); + long nextFileCount = lastFileCount + 1L; File mappedFile = file.file(); - String profileOutputName = "." + mappedFile.getName() + SUFFIX_CACHEPROF; File parent = mappedFile.getParentFile(); + String profileOutputName = "." + mappedFile.getName() + "." + toHexString( nextFileCount ) + SUFFIX_CACHEPROF; return new File( parent, profileOutputName ); } + + private Stream filterRelevant( Profile[] profiles, PagedFile pagedFile ) + { + return Stream.of( profiles ).filter( profile -> profile.mappedFileName.equals( pagedFile.file().getName() ) ); + } + + private Profile[] findExistingProfiles( List pagedFiles ) + { + return pagedFiles.stream() + .map( pf -> pf.file().getParentFile() ) + .distinct() + .flatMap( this::listFiles ) + .flatMap( this::parseProfileName ) + .sorted() + .toArray( Profile[]::new ); + } + + private Stream listFiles( File dir ) + { + File[] files = fs.listFiles( dir ); + if ( files == null ) + { + return Stream.empty(); + } + return Stream.of( files ); + } + + private Stream parseProfileName( File profile ) + { + String name = profile.getName(); + if ( !name.startsWith( "." ) && !name.endsWith( SUFFIX_CACHEPROF ) ) + { + return Stream.empty(); + } + int lastDot = name.lastIndexOf( '.' ); + int secondLastDot = name.lastIndexOf( '.', lastDot - 1 ); + String countStr = name.substring( secondLastDot + 1, lastDot ); + try + { + long count = Long.parseLong( countStr, 16 ); + String mappedFileName = name.substring( 1, secondLastDot ); + return Stream.of( new Profile( profile, mappedFileName, count ) ); + } + catch ( NumberFormatException e ) + { + return Stream.empty(); + } + } } diff --git a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/PageCacheWarmerKernelExtension.java b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/PageCacheWarmerKernelExtension.java index 4d6797cab5811..b915a9b3430cd 100644 --- a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/PageCacheWarmerKernelExtension.java +++ b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/PageCacheWarmerKernelExtension.java @@ -19,7 +19,6 @@ */ package org.neo4j.kernel.impl.pagecache; -import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -68,7 +67,7 @@ class PageCacheWarmerKernelExtension extends LifecycleAdapter } @Override - public void start() throws Throwable + public void start() { if ( ENABLED ) { @@ -108,7 +107,7 @@ private void doReheat() " to load " + pagesLoaded + " pages." ); } ); } - catch ( IOException e ) + catch ( Exception e ) { log.debug( "Active page cache warmup failed, " + "so it may take longer for the cache to be populated with hot data.", e ); @@ -143,7 +142,7 @@ private void doProfile() ", and found " + pagesInMemory + " pages in memory." ); }); } - catch ( IOException e ) + catch ( Exception e ) { log.debug( "Page cache profiling failed, so no new profile of what data is hot or not was produced. " + "This may reduce the effectiveness of a future page cache warmup process.", e ); diff --git a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/Profile.java b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/Profile.java new file mode 100644 index 0000000000000..f2dac5010f41d --- /dev/null +++ b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/Profile.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.pagecache; + +import java.io.File; +import java.util.Objects; + +final class Profile implements Comparable +{ + final File profileFile; + final String mappedFileName; + final long profileCount; + + Profile( File profileFile, String mappedFileName, long profileCount ) + { + Objects.requireNonNull( profileFile ); + Objects.requireNonNull( mappedFileName ); + this.profileFile = profileFile; + this.mappedFileName = mappedFileName; + this.profileCount = profileCount; + } + + @Override + public int compareTo( Profile that ) + { + return Long.compare( profileCount, that.profileCount ); + } + + @Override + public boolean equals( Object o ) + { + if ( o instanceof Profile ) + { + Profile profile = (Profile) o; + return profileFile.equals( profile.profileFile ); + } + return false; + } + + @Override + public int hashCode() + { + return profileFile.hashCode(); + } +} diff --git a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/ProfileRefCounts.java b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/ProfileRefCounts.java new file mode 100644 index 0000000000000..c869aa386ef1b --- /dev/null +++ b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/pagecache/ProfileRefCounts.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.pagecache; + +import org.apache.commons.lang3.mutable.MutableInt; + +import java.util.HashMap; +import java.util.Map; + +class ProfileRefCounts +{ + private final Map fileBag; + + public ProfileRefCounts() + { + fileBag = new HashMap<>(); + } + + synchronized void incrementRefCounts( Profile[] profiles ) + { + for ( Profile profile : profiles ) + { + fileBag.computeIfAbsent( profile, p -> new MutableInt() ).increment(); + } + } + + synchronized void decrementRefCounts( Profile[] profiles ) + { + for ( Profile profile : profiles ) + { + fileBag.computeIfPresent( profile, (p,i) -> i.decrementAndGet() == 0 ? null : i ); + } + } + + synchronized boolean contains( Profile profile ) + { + return fileBag.containsKey( profile ); + } +} diff --git a/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/pagecache/PageCacheWarmerTest.java b/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/pagecache/PageCacheWarmerTest.java index 5077656d71a34..0b95eefb511de 100644 --- a/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/pagecache/PageCacheWarmerTest.java +++ b/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/pagecache/PageCacheWarmerTest.java @@ -28,12 +28,15 @@ import java.io.File; import java.io.IOException; import java.nio.file.StandardOpenOption; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.ThreadLocalRandom; import org.neo4j.collection.primitive.Primitive; import org.neo4j.collection.primitive.PrimitiveIntIterator; import org.neo4j.collection.primitive.PrimitiveIntSet; +import org.neo4j.graphdb.Resource; import org.neo4j.io.ByteUnit; import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCursor; @@ -44,12 +47,15 @@ import org.neo4j.kernel.impl.scheduler.CentralJobScheduler; import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.scheduler.JobScheduler; +import org.neo4j.storageengine.api.StoreFileMetadata; import org.neo4j.test.rule.PageCacheRule; import org.neo4j.test.rule.TestDirectory; import org.neo4j.test.rule.fs.EphemeralFileSystemRule; import org.neo4j.test.rule.fs.FileSystemRule; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -195,6 +201,66 @@ public void reheatingMustWorkOnLargeNumberOfPages() throws Exception } } + @SuppressWarnings( "unused" ) + @Test + public void profileMustNotDeleteFilesCurrentlyExposedViaFileListing() throws Exception + { + try ( PageCache pageCache = pageCacheRule.getPageCache( fs, cfg ); + PagedFile pf = pageCache.map( file, pageCache.pageSize(), StandardOpenOption.CREATE ) ) + { + try ( PageCursor writer = pf.io( 0, PagedFile.PF_SHARED_WRITE_LOCK ) ) + { + assertTrue( writer.next( 1 ) ); + assertTrue( writer.next( 3 ) ); + } + pf.flushAndForce(); + PageCacheWarmer warmer = new PageCacheWarmer( fs, pageCache, scheduler ); + warmer.profile(); + warmer.profile(); + warmer.profile(); + + List fileListing = new ArrayList<>(); + try ( Resource firstListing = warmer.addFilesTo( fileListing ) ) + { + warmer.profile(); + warmer.profile(); + + // The files in the file listing cannot be deleted while the listing is in use. + assertThat( fileListing.size(), greaterThan( 0 ) ); + assertFilesExists( fileListing ); + warmer.profile(); + try ( Resource secondListing = warmer.addFilesTo( new ArrayList<>( ) ) ) + { + warmer.profile(); + // This must hold even when there are file listings overlapping in time. + assertFilesExists( fileListing ); + } + warmer.profile(); + // And continue to hold after other overlapping listing finishes. + assertFilesExists( fileListing ); + } + // Once we are done with the file listing, profile should remove those files. + warmer.profile(); + assertFilesNotExists( fileListing ); + } + } + + private void assertFilesExists( List fileListing ) + { + for ( StoreFileMetadata fileMetadata : fileListing ) + { + assertTrue( fs.fileExists( fileMetadata.file() ) ); + } + } + + private void assertFilesNotExists( List fileListing ) + { + for ( StoreFileMetadata fileMetadata : fileListing ) + { + assertFalse( fs.fileExists( fileMetadata.file() ) ); + } + } + private int[] randomSortedPageIds( int maxPagesInMemory ) { PrimitiveIntSet setIds = Primitive.intSet();