Skip to content

Commit

Permalink
Failing test
Browse files Browse the repository at this point in the history
  • Loading branch information
davidegrohmann committed Sep 21, 2016
1 parent fb2c889 commit 2ca2dbe
Showing 1 changed file with 89 additions and 21 deletions.
Expand Up @@ -37,6 +37,7 @@
import org.neo4j.coreedge.core.consensus.log.segmented.FileNames;
import org.neo4j.coreedge.core.consensus.roles.Role;
import org.neo4j.coreedge.discovery.Cluster;
import org.neo4j.coreedge.discovery.ClusterMember;
import org.neo4j.coreedge.discovery.CoreClusterMember;
import org.neo4j.coreedge.discovery.EdgeClusterMember;
import org.neo4j.coreedge.discovery.HazelcastDiscoveryServiceFactory;
Expand All @@ -47,6 +48,7 @@
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionFailureException;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.graphdb.security.WriteOperationsNotAllowedException;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileUtils;
Expand All @@ -56,6 +58,7 @@
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.format.highlimit.HighLimit;
import org.neo4j.kernel.impl.store.format.standard.StandardV3_0;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.lifecycle.LifecycleException;
import org.neo4j.logging.Log;
Expand All @@ -69,6 +72,7 @@
import static java.util.stream.Collectors.toSet;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.Is.is;
Expand All @@ -88,9 +92,7 @@ public class EdgeServerReplicationIT
{
@Rule
public final ClusterRule clusterRule =
new ClusterRule( getClass() )
.withNumberOfCoreMembers( 3 )
.withNumberOfEdgeMembers( 1 )
new ClusterRule( getClass() ).withNumberOfCoreMembers( 3 ).withNumberOfEdgeMembers( 1 )
.withDiscoveryServiceFactory( new HazelcastDiscoveryServiceFactory() );

@Test
Expand Down Expand Up @@ -141,18 +143,13 @@ public void shouldEventuallyPullTransactionDownToAllEdgeServers() throws Excepti
int nodesBeforeEdgeServerStarts = 1;

// when
executeOnLeaderWithRetry( db -> {
for ( int i = 0; i < nodesBeforeEdgeServerStarts; i++ )
{
Node node = db.createNode();
node.setProperty( "foobar", "baz_bat" );
}
}, cluster );
executeOnLeaderWithRetry( db -> createData( db, nodesBeforeEdgeServerStarts ), cluster );

cluster.addEdgeMemberWithId( 0 ).start();

// when
executeOnLeaderWithRetry( db -> {
executeOnLeaderWithRetry( db ->
{
Node node = db.createNode();
node.setProperty( "foobar", "baz_bat" );
}, cluster );
Expand All @@ -178,7 +175,8 @@ public void shouldEventuallyPullTransactionDownToAllEdgeServers() throws Excepti
}

@Test
public void shouldShutdownRatherThanPullUpdatesFromCoreMemberWithDifferentStoreIdIfLocalStoreIsNonEmpty() throws Exception
public void shouldShutdownRatherThanPullUpdatesFromCoreMemberWithDifferentStoreIdIfLocalStoreIsNonEmpty()
throws Exception
{
Cluster cluster = clusterRule.withNumberOfEdgeMembers( 0 ).startCluster();

Expand Down Expand Up @@ -231,8 +229,10 @@ public void anEdgeServerShouldBeAbleToRejoinTheCluster() throws Exception

awaitEx( () -> edgesUpToDateAsTheLeader( cluster.awaitLeader(), cluster.edgeMembers() ), 1, TimeUnit.MINUTES );

List<File> coreStoreDirs = cluster.coreMembers().stream().map( CoreClusterMember::storeDir ).collect( toList() );
List<File> edgeStoreDirs = cluster.edgeMembers().stream().map( EdgeClusterMember::storeDir ).collect( toList() );
List<File> coreStoreDirs =
cluster.coreMembers().stream().map( CoreClusterMember::storeDir ).collect( toList() );
List<File> edgeStoreDirs =
cluster.edgeMembers().stream().map( EdgeClusterMember::storeDir ).collect( toList() );

cluster.shutdown();

Expand All @@ -259,7 +259,59 @@ public void edgeServersShouldRestartIfTheWholeClusterIsRestarted() throws Except
}
}

private boolean edgesUpToDateAsTheLeader( CoreClusterMember leader, Collection<EdgeClusterMember> edgeClusterMembers )
@Test
public void shouldBeAbleToDownloadToANewStoreAfterPruning() throws Exception
{
// given
Map<String,String> params = stringMap( GraphDatabaseSettings.keep_logical_logs.name(), "keep_none",
GraphDatabaseSettings.logical_log_rotation_threshold.name(), "1M",
GraphDatabaseSettings.check_point_interval_time.name(), "100ms" );

Cluster cluster = clusterRule.withSharedCoreParams( params ).startCluster();

cluster.coreTx( ( db, tx ) ->
{
createData( db, 10 );
tx.success();
} );

awaitEx( () -> edgesUpToDateAsTheLeader( cluster.awaitLeader(), cluster.edgeMembers() ), 1, TimeUnit.MINUTES );

EdgeClusterMember edgeClusterMember = cluster.getEdgeMemberById( 0 );
long highestEdgeLogVersion = physicalLogFiles( edgeClusterMember ).getHighestLogVersion();

// when
edgeClusterMember.shutdown();

CoreClusterMember core;
do
{
core = cluster.coreTx( ( db, tx ) ->
{
createData( db, 1_000 );
tx.success();
} );

}
while ( physicalLogFiles( core ).getLowestLogVersion() <= highestEdgeLogVersion );

edgeClusterMember.start();

// then
awaitEx( () -> edgesUpToDateAsTheLeader( cluster.awaitLeader(), cluster.edgeMembers() ), 1, TimeUnit.MINUTES );

assertEventually( "The edge member has the same data as the core members",
() -> DbRepresentation.of( edgeClusterMember.database() ),
equalTo( DbRepresentation.of( cluster.awaitLeader().database() ) ), 10, TimeUnit.SECONDS );
}

private PhysicalLogFiles physicalLogFiles( ClusterMember clusterMember )
{
return clusterMember.database().getDependencyResolver().resolveDependency( PhysicalLogFiles.class );
}

private boolean edgesUpToDateAsTheLeader( CoreClusterMember leader,
Collection<EdgeClusterMember> edgeClusterMembers )
{
long leaderTxId = lastClosedTransactionId( leader.database() );
return edgeClusterMembers.stream().map( EdgeClusterMember::database ).map( this::lastClosedTransactionId )
Expand Down Expand Up @@ -317,7 +369,8 @@ public void shouldBeAbleToCopyStoresFromCoreToEdge() throws Exception
Cluster cluster = clusterRule.withNumberOfEdgeMembers( 0 ).withSharedCoreParams( params )
.withRecordFormat( HighLimit.NAME ).startCluster();

cluster.coreTx( ( db, tx ) -> {
cluster.coreTx( ( db, tx ) ->
{
Node node = db.createNode( Label.label( "L" ) );
for ( int i = 0; i < 10; i++ )
{
Expand All @@ -331,7 +384,8 @@ public void shouldBeAbleToCopyStoresFromCoreToEdge() throws Exception
CoreClusterMember coreGraphDatabase = null;
for ( int j = 0; j < 2; j++ )
{
coreGraphDatabase = cluster.coreTx( ( db, tx ) -> {
coreGraphDatabase = cluster.coreTx( ( db, tx ) ->
{
Node node = db.createNode( Label.label( "L" ) );
for ( int i = 0; i < 10; i++ )
{
Expand All @@ -342,8 +396,8 @@ public void shouldBeAbleToCopyStoresFromCoreToEdge() throws Exception
}

File storeDir = coreGraphDatabase.storeDir();
assertEventually( "pruning happened", () -> versionBy( storeDir, Math::min ), greaterThan( baseVersion ),
5, SECONDS );
assertEventually( "pruning happened", () -> versionBy( storeDir, Math::min ), greaterThan( baseVersion ), 5,
SECONDS );

// when
cluster.addEdgeMemberWithIdAndRecordFormat( 42, HighLimit.NAME ).start();
Expand All @@ -366,7 +420,8 @@ private long versionBy( File storeDir, BinaryOperator<Long> operator )

private void executeOnLeaderWithRetry( Workload workload, Cluster cluster ) throws Exception
{
assertEventually( "Executed on leader", () -> {
assertEventually( "Executed on leader", () ->
{
try
{
CoreGraphDatabase coreDB = cluster.awaitLeader( 5000 ).database();
Expand All @@ -393,10 +448,23 @@ private interface Workload

private void createData( GraphDatabaseService db )
{
for ( int i = 0; i < 10; i++ )
createData( db, 10 );
}

private void createData( GraphDatabaseService db, int amount )
{
for ( int i = 0; i < amount; i++ )
{
Node node = db.createNode();
node.setProperty( "foobar", "baz_bat" );
node.setProperty( "foobar1", "baz_bat" );
node.setProperty( "foobar2", "baz_bat" );
node.setProperty( "foobar3", "baz_bat" );
node.setProperty( "foobar4", "baz_bat" );
node.setProperty( "foobar5", "baz_bat" );
node.setProperty( "foobar6", "baz_bat" );
node.setProperty( "foobar7", "baz_bat" );
node.setProperty( "foobar8", "baz_bat" );
}
}
}

0 comments on commit 2ca2dbe

Please sign in to comment.