Skip to content

Commit

Permalink
Ensure that cache profiles are copied in online backup.
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed Feb 22, 2018
1 parent 2d563d5 commit ff776f7
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 21 deletions.
Expand Up @@ -31,7 +31,7 @@

/**
* JUnit @Rule for configuring, creating and managing an EmbeddedGraphDatabase instance.
*
* <p>
* The database instance is created lazily, so configurations can be injected prior to calling
* {@link #getGraphDatabaseAPI()}.
*/
Expand Down Expand Up @@ -79,4 +79,15 @@ public Statement apply( Statement base, Description description )
{
return testDirectory.apply( super.apply( base, description ), description );
}

/**
* Get the inner {@link TestDirectory} instance that is used to prepare the store directory for this database.
* <p>
* <strong>Note:</strong> There is no need to add a {@link org.junit.Rule} annotation on this {@link TestDirectory}
* instance.
*/
public TestDirectory getTestDirectory()
{
return testDirectory;
}
}
Expand Up @@ -27,6 +27,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.StandardCopyOption;
import java.util.Collection;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.BlockingQueue;
Expand All @@ -37,12 +38,15 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.neo4j.graphdb.Resource;
import org.neo4j.io.IOUtils;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.kernel.impl.transaction.state.NeoStoreFileListing;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.storageengine.api.StoreFileMetadata;

import static org.neo4j.io.pagecache.PagedFile.PF_NO_FAULT;
import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_READ_LOCK;
Expand All @@ -57,7 +61,7 @@
* These cacheprof files are compressed bitmaps where each raised bit indicates that the page identified by the
* bit-index was in memory.
*/
public class PageCacheWarmer
public class PageCacheWarmer implements NeoStoreFileListing.StoreFileProvider
{
public static final String SUFFIX_CACHEPROF = ".cacheprof";
public static final String SUFFIX_CACHEPROF_TMP = ".cacheprof.tmp";
Expand Down Expand Up @@ -93,6 +97,32 @@ private ExecutorService buildExecutorService( JobScheduler scheduler )
threadFactory, rejectionPolicy );
}

@Override
public synchronized Resource addFilesTo( Collection<StoreFileMetadata> coll ) throws IOException
{
if ( stopped )
{
return Resource.EMPTY;
}
List<PagedFile> files = pageCache.listExistingMappings();
try
{
for ( PagedFile file : files )
{
File profileFile = profileOutputFileFinal( file );
if ( fs.fileExists( profileFile ) )
{
coll.add( new StoreFileMetadata( profileFile, 1, false ) );
}
}
}
finally
{
IOUtils.closeAll( files );
}
return Resource.EMPTY;
}

public void stop()
{
stopped = true;
Expand Down
Expand Up @@ -22,13 +22,15 @@
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.Format;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.transaction.state.NeoStoreFileListing;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.scheduler.JobScheduler;
Expand All @@ -43,19 +45,21 @@ class PageCacheWarmerKernelExtension extends LifecycleAdapter

private final JobScheduler scheduler;
private final AvailabilityGuard availabilityGuard;
private final Supplier<NeoStoreFileListing> fileListing;
private final Log log;
private final PageCacheWarmerMonitor monitor;
private final Config config;
private final PageCacheWarmer pageCacheWarmer;
private final AtomicBoolean profilingStarted;
private volatile JobScheduler.JobHandle profileHandle;

PageCacheWarmerKernelExtension( JobScheduler scheduler, AvailabilityGuard availabilityGuard,
PageCache pageCache, FileSystemAbstraction fs, Log log,
PageCacheWarmerMonitor monitor, Config config )
PageCacheWarmerKernelExtension(
JobScheduler scheduler, AvailabilityGuard availabilityGuard, PageCache pageCache, FileSystemAbstraction fs,
Supplier<NeoStoreFileListing> fileListing, Log log, PageCacheWarmerMonitor monitor, Config config )
{
this.scheduler = scheduler;
this.availabilityGuard = availabilityGuard;
this.fileListing = fileListing;
this.log = log;
this.monitor = monitor;
this.config = config;
Expand All @@ -69,6 +73,7 @@ public void start() throws Throwable
if ( ENABLED )
{
scheduleTryReheat();
fileListing.get().registerStoreFileProvider( pageCacheWarmer );
}
}

Expand Down
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.kernel.impl.pagecache;

import java.util.function.Supplier;

import org.neo4j.helpers.Service;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
Expand All @@ -27,6 +29,7 @@
import org.neo4j.kernel.extension.KernelExtensionFactory;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.spi.KernelContext;
import org.neo4j.kernel.impl.transaction.state.NeoStoreFileListing;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
Expand All @@ -46,6 +49,8 @@ public interface Dependencies

FileSystemAbstraction fileSystemAbstraction();

NeoStoreFileListing fileListing();

LogService logService();

Monitors monitors();
Expand All @@ -65,10 +70,12 @@ public Lifecycle newInstance( KernelContext context, Dependencies deps )
AvailabilityGuard availabilityGuard = deps.availabilityGuard();
PageCache pageCache = deps.pageCache();
FileSystemAbstraction fs = deps.fileSystemAbstraction();
Supplier<NeoStoreFileListing> fileListing = deps::fileListing;
LogService logService = deps.logService();
Log log = logService.getInternalLog( PageCacheWarmer.class );
PageCacheWarmerMonitor monitor = deps.monitors().newMonitor( PageCacheWarmerMonitor.class );
Config config = deps.config();
return new PageCacheWarmerKernelExtension( scheduler, availabilityGuard, pageCache, fs, log, monitor, config );
return new PageCacheWarmerKernelExtension(
scheduler, availabilityGuard, pageCache, fs, fileListing, log, monitor, config );
}
}
Expand Up @@ -21,11 +21,11 @@

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

import java.io.File;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.backup.OnlineBackup;
import org.neo4j.concurrent.BinaryLatch;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
Expand All @@ -38,51 +38,52 @@
import org.neo4j.kernel.impl.pagecache.PageCacheWarmerMonitor;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.metrics.MetricsSettings;
import org.neo4j.ports.allocation.PortAuthority;
import org.neo4j.test.rule.DatabaseRule;
import org.neo4j.test.rule.EnterpriseDatabaseRule;
import org.neo4j.test.rule.SuppressOutput;
import org.neo4j.test.rule.TestDirectory;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertTrue;
import static org.neo4j.metrics.MetricsTestHelper.metricsCsv;
import static org.neo4j.metrics.MetricsTestHelper.readLongValue;
import static org.neo4j.metrics.source.db.PageCacheMetrics.PC_PAGE_FAULTS;
import static org.neo4j.test.assertion.Assert.assertEventually;

public class PageCacheWarmupEnterpriseEditionIT
{
private TestDirectory dir = TestDirectory.testDirectory();
private EnterpriseDatabaseRule db = new EnterpriseDatabaseRule().startLazily();
@Rule
public RuleChain rules = RuleChain.outerRule( dir ).around( db );
public SuppressOutput suppressOutput = SuppressOutput.suppressAll();
@Rule
public EnterpriseDatabaseRule db = new EnterpriseDatabaseRule().startLazily();
private TestDirectory dir = db.getTestDirectory();

@Test
public void warmupMustReloadHotPagesAfterRestartAndFaultsMustBeVisibleViaMetrics() throws Exception
private void createTestData()
{
File metricsDirectory = dir.directory( "metrics" );
db.setConfig( MetricsSettings.metricsEnabled, Settings.FALSE )
.setConfig( OnlineBackupSettings.online_backup_enabled, Settings.FALSE )
.setConfig( GraphDatabaseSettings.pagecache_warmup_profiling_interval, "100ms" );
db.ensureStarted();

try ( Transaction tx = db.beginTx() )
{
Label label = Label.label( "Label" );
RelationshipType relationshipType = RelationshipType.withName( "REL" );
long[] largeValue = new long[1024];
for ( int i = 0; i < 100; i++ )
for ( int i = 0; i < 1000; i++ )
{
Node node = db.createNode( label );
node.setProperty( "Niels", "Borh" );
node.setProperty( "Albert", largeValue );
for ( int j = 0; j < 10; j++ )
for ( int j = 0; j < 30; j++ )
{
Relationship rel = node.createRelationshipTo( node, relationshipType );
rel.setProperty( "Max", "Planck" );
}
}
tx.success();
}
}

private long waitForCacheProfile()
{
AtomicLong pageCount = new AtomicLong();
BinaryLatch profileLatch = new BinaryLatch();
db.resolveDependency( Monitors.class ).addMonitorListener( new PageCacheWarmerMonitor()
Expand All @@ -100,14 +101,60 @@ public void profileCompleted( long elapsedMillis, long pagesInMemory )
}
} );
profileLatch.await();
return pageCount.get();
}

@Test
public void warmupMustReloadHotPagesAfterRestartAndFaultsMustBeVisibleViaMetrics() throws Exception
{
File metricsDirectory = dir.directory( "metrics" );
db.setConfig( MetricsSettings.metricsEnabled, Settings.FALSE )
.setConfig( OnlineBackupSettings.online_backup_enabled, Settings.FALSE )
.setConfig( GraphDatabaseSettings.pagecache_warmup_profiling_interval, "100ms" );
db.ensureStarted();

createTestData();
long pagesInMemory = waitForCacheProfile();

db.restartDatabase(
MetricsSettings.neoPageCacheEnabled.name(), Settings.TRUE,
MetricsSettings.csvEnabled.name(), Settings.TRUE,
MetricsSettings.csvInterval.name(), "100ms",
MetricsSettings.csvPath.name(), metricsDirectory.getAbsolutePath() );

long pagesInMemory = pageCount.get();
assertEventually( "Metrics report should include page cache page faults",
() -> readLongValue( metricsCsv( metricsDirectory, PC_PAGE_FAULTS ) ),
greaterThanOrEqualTo( pagesInMemory ), 20, SECONDS );
}

@Test
public void cacheProfilesMustBeIncludedInOnlineBackups() throws Exception
{
int backupPort = PortAuthority.allocatePort();
db.setConfig( MetricsSettings.metricsEnabled, Settings.FALSE )
.setConfig( OnlineBackupSettings.online_backup_enabled, Settings.TRUE )
.setConfig( OnlineBackupSettings.online_backup_server, "localhost:" + backupPort )
.setConfig( GraphDatabaseSettings.pagecache_warmup_profiling_interval, "100ms" );
db.ensureStarted();

createTestData();
long pagesInMemory = waitForCacheProfile();

File metricsDirectory = dir.cleanDirectory( "metrics" );
File backupDir = dir.cleanDirectory( "backup" );
assertTrue( OnlineBackup.from( "localhost", backupPort ).backup( backupDir ).isConsistent() );
DatabaseRule.RestartAction useBackupDir = ( fs, storeDir ) ->
{
fs.deleteRecursively( storeDir );
fs.copyRecursively( backupDir, storeDir );
};
db.restartDatabase( useBackupDir,
OnlineBackupSettings.online_backup_enabled.name(), Settings.FALSE,
MetricsSettings.neoPageCacheEnabled.name(), Settings.TRUE,
MetricsSettings.csvEnabled.name(), Settings.TRUE,
MetricsSettings.csvInterval.name(), "100ms",
MetricsSettings.csvPath.name(), metricsDirectory.getAbsolutePath());

assertEventually( "Metrics report should include page cache page faults",
() -> readLongValue( metricsCsv( metricsDirectory, PC_PAGE_FAULTS ) ),
greaterThanOrEqualTo( pagesInMemory ), 5, SECONDS );
Expand Down

0 comments on commit ff776f7

Please sign in to comment.