Skip to content

Commit

Permalink
Verify that cache profiles are copied in offline backup and Causal Cl…
Browse files Browse the repository at this point in the history
…uster store copies.
  • Loading branch information
chrisvest committed Feb 22, 2018
1 parent ff776f7 commit ff79cb4
Show file tree
Hide file tree
Showing 3 changed files with 274 additions and 62 deletions.
@@ -0,0 +1,108 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.causalclustering.discovery.Cluster;
import org.neo4j.causalclustering.discovery.ClusterMember;
import org.neo4j.causalclustering.discovery.CoreClusterMember;
import org.neo4j.concurrent.BinaryLatch;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.impl.pagecache.PageCacheWarmerMonitor;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.test.causalclustering.ClusterRule;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

public class PageCacheWarmupCcIT extends PageCacheWarmupTestSupport
{
@Rule
public ClusterRule clusterRule = new ClusterRule()
.withNumberOfReadReplicas( 0 )
.withSharedCoreParam( GraphDatabaseSettings.pagecache_warmup_profiling_interval, "100ms" )
.withSharedReadReplicaParam( GraphDatabaseSettings.pagecache_warmup_profiling_interval, "100ms" );

private Cluster cluster;

@Before
public void setup() throws Exception
{
cluster = clusterRule.startCluster();
}

private long warmUpCluster() throws TimeoutException
{
CoreClusterMember leader = cluster.awaitLeader();
createTestData( leader.database() );
long pagesInMemory = waitForCacheProfile( leader.database() );
for ( CoreClusterMember member : cluster.coreMembers() )
{
waitForCacheProfile( member.database() );
}
return pagesInMemory;
}

private void verifyWarmupHappensAfterStoreCopy( ClusterMember member, long pagesInMemory )
{
AtomicLong pagesLoadedInWarmup = new AtomicLong();
BinaryLatch warmupLatch = new BinaryLatch();
Monitors monitors = member.monitors();
monitors.addMonitorListener( new PageCacheWarmerMonitor()
{
@Override
public void warmupCompleted( long elapsedMillis, long pagesLoaded )
{
pagesLoadedInWarmup.set( pagesInMemory );
warmupLatch.release();
}

@Override
public void profileCompleted( long elapsedMillis, long pagesInMemory )
{
}
} );
member.start();
warmupLatch.await();
assertThat( pagesLoadedInWarmup.get(), is( pagesInMemory ) );
}

@Test
public void cacheProfilesMustBeIncludedInStoreCopyToCore() throws Exception
{
long pagesInMemory = warmUpCluster();
ClusterMember member = cluster.addCoreMemberWithId( 4 );
verifyWarmupHappensAfterStoreCopy( member, pagesInMemory );
}

@Test
public void cacheProfilesMustBeIncludedInStoreCopyToReadReplica() throws Exception
{
long pagesInMemory = warmUpCluster();
ClusterMember member = cluster.addReadReplicaWithId( 4 );
verifyWarmupHappensAfterStoreCopy( member, pagesInMemory );
}
}
Expand Up @@ -23,20 +23,17 @@
import org.junit.Test;

import java.io.File;
import java.util.concurrent.atomic.AtomicLong;
import java.nio.file.Path;

import org.neo4j.backup.OnlineBackup;
import org.neo4j.concurrent.BinaryLatch;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.Transaction;
import org.neo4j.commandline.admin.AdminTool;
import org.neo4j.commandline.admin.BlockerLocator;
import org.neo4j.commandline.admin.CommandLocator;
import org.neo4j.commandline.admin.RealOutsideWorld;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.io.fs.FileUtils;
import org.neo4j.kernel.configuration.Settings;
import org.neo4j.kernel.impl.enterprise.configuration.OnlineBackupSettings;
import org.neo4j.kernel.impl.pagecache.PageCacheWarmerMonitor;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.metrics.MetricsSettings;
import org.neo4j.ports.allocation.PortAuthority;
import org.neo4j.test.rule.DatabaseRule;
Expand All @@ -46,62 +43,27 @@

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.neo4j.metrics.MetricsTestHelper.metricsCsv;
import static org.neo4j.metrics.MetricsTestHelper.readLongValue;
import static org.neo4j.metrics.source.db.PageCacheMetrics.PC_PAGE_FAULTS;
import static org.neo4j.test.assertion.Assert.assertEventually;

public class PageCacheWarmupEnterpriseEditionIT
public class PageCacheWarmupEnterpriseEditionIT extends PageCacheWarmupTestSupport
{
@Rule
public SuppressOutput suppressOutput = SuppressOutput.suppressAll();
@Rule
public EnterpriseDatabaseRule db = new EnterpriseDatabaseRule().startLazily();
private TestDirectory dir = db.getTestDirectory();

private void createTestData()
private void verifyEventuallyWarmsUp( long pagesInMemory, File metricsDirectory ) throws Exception
{
try ( Transaction tx = db.beginTx() )
{
Label label = Label.label( "Label" );
RelationshipType relationshipType = RelationshipType.withName( "REL" );
long[] largeValue = new long[1024];
for ( int i = 0; i < 1000; i++ )
{
Node node = db.createNode( label );
node.setProperty( "Niels", "Borh" );
node.setProperty( "Albert", largeValue );
for ( int j = 0; j < 30; j++ )
{
Relationship rel = node.createRelationshipTo( node, relationshipType );
rel.setProperty( "Max", "Planck" );
}
}
tx.success();
}
}

private long waitForCacheProfile()
{
AtomicLong pageCount = new AtomicLong();
BinaryLatch profileLatch = new BinaryLatch();
db.resolveDependency( Monitors.class ).addMonitorListener( new PageCacheWarmerMonitor()
{
@Override
public void warmupCompleted( long elapsedMillis, long pagesLoaded )
{
}

@Override
public void profileCompleted( long elapsedMillis, long pagesInMemory )
{
pageCount.set( pagesInMemory );
profileLatch.release();
}
} );
profileLatch.await();
return pageCount.get();
assertEventually( "Metrics report should include page cache page faults",
() -> readLongValue( metricsCsv( metricsDirectory, PC_PAGE_FAULTS ) ),
greaterThanOrEqualTo( pagesInMemory ), 20, SECONDS );
}

@Test
Expand All @@ -113,18 +75,16 @@ public void warmupMustReloadHotPagesAfterRestartAndFaultsMustBeVisibleViaMetrics
.setConfig( GraphDatabaseSettings.pagecache_warmup_profiling_interval, "100ms" );
db.ensureStarted();

createTestData();
long pagesInMemory = waitForCacheProfile();
createTestData( db );
long pagesInMemory = waitForCacheProfile( db );

db.restartDatabase(
MetricsSettings.neoPageCacheEnabled.name(), Settings.TRUE,
MetricsSettings.csvEnabled.name(), Settings.TRUE,
MetricsSettings.csvInterval.name(), "100ms",
MetricsSettings.csvPath.name(), metricsDirectory.getAbsolutePath() );

assertEventually( "Metrics report should include page cache page faults",
() -> readLongValue( metricsCsv( metricsDirectory, PC_PAGE_FAULTS ) ),
greaterThanOrEqualTo( pagesInMemory ), 20, SECONDS );
verifyEventuallyWarmsUp( pagesInMemory, metricsDirectory );
}

@Test
Expand All @@ -137,8 +97,8 @@ public void cacheProfilesMustBeIncludedInOnlineBackups() throws Exception
.setConfig( GraphDatabaseSettings.pagecache_warmup_profiling_interval, "100ms" );
db.ensureStarted();

createTestData();
long pagesInMemory = waitForCacheProfile();
createTestData( db );
long pagesInMemory = waitForCacheProfile( db );

File metricsDirectory = dir.cleanDirectory( "metrics" );
File backupDir = dir.cleanDirectory( "backup" );
Expand All @@ -153,10 +113,60 @@ public void cacheProfilesMustBeIncludedInOnlineBackups() throws Exception
MetricsSettings.neoPageCacheEnabled.name(), Settings.TRUE,
MetricsSettings.csvEnabled.name(), Settings.TRUE,
MetricsSettings.csvInterval.name(), "100ms",
MetricsSettings.csvPath.name(), metricsDirectory.getAbsolutePath());
MetricsSettings.csvPath.name(), metricsDirectory.getAbsolutePath() );

assertEventually( "Metrics report should include page cache page faults",
() -> readLongValue( metricsCsv( metricsDirectory, PC_PAGE_FAULTS ) ),
greaterThanOrEqualTo( pagesInMemory ), 5, SECONDS );
verifyEventuallyWarmsUp( pagesInMemory, metricsDirectory );
}

@Test
public void cacheProfilesMustBeIncludedInOfflineBackups() throws Exception
{
db.setConfig( MetricsSettings.metricsEnabled, Settings.FALSE )
.setConfig( OnlineBackupSettings.online_backup_enabled, Settings.FALSE )
.setConfig( GraphDatabaseSettings.pagecache_warmup_profiling_interval, "100ms" );
db.ensureStarted();
createTestData( db );
long pagesInMemory = waitForCacheProfile( db );

db.shutdownAndKeepStore();

AdminTool adminTool = new AdminTool(
CommandLocator.fromServiceLocator(),
BlockerLocator.fromServiceLocator(),
new RealOutsideWorld()
{
@Override
public void exit( int status )
{
assertThat( "exit code", status, is( 0 ) );
}
},
true );
File storeDir = db.getStoreDir();
File data = dir.cleanDirectory( "data" );
File databases = new File( data, "databases" );
File graphdb = new File( databases, "graph.db" );
assertTrue( graphdb.mkdirs() );
FileUtils.copyRecursively( storeDir, graphdb );
FileUtils.deleteRecursively( storeDir );
Path homePath = data.toPath().getParent();
File dumpDir = dir.cleanDirectory( "dump-dir" );
adminTool.execute( homePath, homePath, "dump", "--database=graph.db", "--to=" + dumpDir );

FileUtils.deleteRecursively( graphdb );
File dumpFile = new File( dumpDir, "graph.db.dump" );
adminTool.execute( homePath, homePath, "load", "--database=graph.db", "--from=" + dumpFile );
FileUtils.copyRecursively( graphdb, storeDir );
FileUtils.deleteRecursively( graphdb );

File metricsDirectory = dir.cleanDirectory( "metrics" );
db.ensureStarted(
OnlineBackupSettings.online_backup_enabled.name(), Settings.FALSE,
MetricsSettings.neoPageCacheEnabled.name(), Settings.TRUE,
MetricsSettings.csvEnabled.name(), Settings.TRUE,
MetricsSettings.csvInterval.name(), "100ms",
MetricsSettings.csvPath.name(), metricsDirectory.getAbsolutePath() );

verifyEventuallyWarmsUp( pagesInMemory, metricsDirectory );
}
}
@@ -0,0 +1,94 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel;

import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.concurrent.BinaryLatch;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.impl.pagecache.PageCacheWarmerMonitor;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.monitoring.Monitors;

class PageCacheWarmupTestSupport
{
void createTestData( GraphDatabaseService db )
{
try ( Transaction tx = db.beginTx() )
{
Label label = Label.label( "Label" );
RelationshipType relationshipType = RelationshipType.withName( "REL" );
long[] largeValue = new long[1024];
for ( int i = 0; i < 1000; i++ )
{
Node node = db.createNode( label );
node.setProperty( "Niels", "Borh" );
node.setProperty( "Albert", largeValue );
for ( int j = 0; j < 30; j++ )
{
Relationship rel = node.createRelationshipTo( node, relationshipType );
rel.setProperty( "Max", "Planck" );
}
}
tx.success();
}
}

long waitForCacheProfile( GraphDatabaseAPI db )
{
AtomicLong pageCount = new AtomicLong();
BinaryLatch profileLatch = new BinaryLatch();
PageCacheWarmerMonitor listener = new AwaitProfileMonitor( pageCount, profileLatch );
Monitors monitors = db.getDependencyResolver().resolveDependency( Monitors.class );
monitors.addMonitorListener( listener );
profileLatch.await();
monitors.removeMonitorListener( listener );
return pageCount.get();
}

private static class AwaitProfileMonitor implements PageCacheWarmerMonitor
{
private final AtomicLong pageCount;
private final BinaryLatch profileLatch;

AwaitProfileMonitor( AtomicLong pageCount, BinaryLatch profileLatch )
{
this.pageCount = pageCount;
this.profileLatch = profileLatch;
}

@Override
public void warmupCompleted( long elapsedMillis, long pagesLoaded )
{
}

@Override
public void profileCompleted( long elapsedMillis, long pagesInMemory )
{
pageCount.set( pagesInMemory );
profileLatch.release();
}
}
}

0 comments on commit ff79cb4

Please sign in to comment.