Skip to content

Commit

Permalink
New tests for core-edge startup order.
Browse files Browse the repository at this point in the history
Add clear assertions for which services are required to start
after one another.
  • Loading branch information
apcj committed Dec 1, 2015
1 parent ee5454f commit 32b33e9
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 105 deletions.
Expand Up @@ -23,81 +23,43 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.raft.membership.MembershipWaiter;
import org.neo4j.coreedge.catchup.CatchupServer;
import org.neo4j.coreedge.raft.RaftInstance;
import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.RaftServer;
import org.neo4j.coreedge.raft.ScheduledTimeoutService;
import org.neo4j.coreedge.raft.membership.MembershipWaiter;
import org.neo4j.coreedge.raft.replication.id.ReplicatedIdGeneratorFactory;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.raft.ScheduledTimeoutService;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.lifecycle.LifecycleException;

import org.neo4j.coreedge.catchup.CatchupServer;

import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class CoreServerStartupProcess implements Lifecycle
public class CoreServerStartupProcess
{
private final LifeSupport services = new LifeSupport();

public CoreServerStartupProcess( LocalDatabase localDatabase, DataSourceManager dataSourceManager,
ReplicatedIdGeneratorFactory idGeneratorFactory,
RaftInstance<CoreMember> raft, RaftLog raftLog, RaftServer<CoreMember> raftServer,
CatchupServer catchupServer,
ScheduledTimeoutService raftTimeoutService,
MembershipWaiter<CoreMember> membershipWaiter,
long joinCatchupTimeout )
public static LifeSupport createLifeSupport( DataSourceManager dataSourceManager,
ReplicatedIdGeneratorFactory idGeneratorFactory,
RaftInstance<CoreMember> raft, RaftServer<CoreMember> raftServer,
CatchupServer catchupServer,
ScheduledTimeoutService raftTimeoutService,
MembershipWaiter<CoreMember> membershipWaiter,
long joinCatchupTimeout, DeleteStoreOnStartUp deleteStoreOnStartUp,
RaftLogReplay raftLogReplay )
{
services.add( new LifecycleAdapter() {
@Override
public void start() throws Throwable
{
localDatabase.deleteStore();
}
});
LifeSupport services = new LifeSupport();
services.add( deleteStoreOnStartUp );
services.add( dataSourceManager );
services.add( idGeneratorFactory );
services.add( new LifecycleAdapter( ) {
@Override
public void start() throws Throwable
{
raftLog.replay();
}
} );
services.add( raftLogReplay );
services.add( raftServer );
services.add( raftTimeoutService );
services.add( catchupServer );
services.add( raftTimeoutService );
services.add( new MembershipWaiterLifecycle<>(membershipWaiter, joinCatchupTimeout, raft ) );
}

@Override
public void init() throws LifecycleException
{
services.init();
}

@Override
public void start() throws LifecycleException
{
services.start();
}

@Override
public void stop() throws LifecycleException
{
services.stop();
}

@Override
public void shutdown() throws LifecycleException
{
services.shutdown();
return services;
}

private static class MembershipWaiterLifecycle<MEMBER> extends LifecycleAdapter
Expand Down Expand Up @@ -133,4 +95,5 @@ public void start() throws Throwable
}
}
}

}
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.server.core;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;

public class DeleteStoreOnStartUp extends LifecycleAdapter
{
private final LocalDatabase localDatabase;

public DeleteStoreOnStartUp( LocalDatabase localDatabase )
{
this.localDatabase = localDatabase;
}

@Override
public void start() throws Throwable
{
localDatabase.deleteStore();
}
}
Expand Up @@ -258,10 +258,12 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
new CheckpointerSupplier( platformModule.dependencies ),
config.get( CoreEdgeClusterSettings.transaction_listen_address ) );

life.add( new CoreServerStartupProcess( localDatabase,
platformModule.dataSourceManager, replicatedIdGeneratorFactory, raft, raftLog, raftServer,
life.add( CoreServerStartupProcess.createLifeSupport(
platformModule.dataSourceManager, replicatedIdGeneratorFactory, raft, raftServer,
catchupServer, raftTimeoutService, membershipWaiter,
config.get( CoreEdgeClusterSettings.join_catch_up_timeout ) ) );
config.get( CoreEdgeClusterSettings.join_catch_up_timeout ) ,
new DeleteStoreOnStartUp( localDatabase ), new RaftLogReplay( raftLog )
));
}

public boolean isLeader()
Expand Down
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.server.core;

import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;

public class RaftLogReplay extends LifecycleAdapter
{
private final RaftLog raftLog;

public RaftLogReplay( RaftLog raftLog )
{
this.raftLog = raftLog;
}

@Override
public void start() throws Throwable
{
raftLog.replay();
}
}
Expand Up @@ -19,82 +19,116 @@
*/
package org.neo4j.coreedge;

import java.util.concurrent.CompletableFuture;
import java.util.List;

import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Test;

import org.neo4j.coreedge.catchup.CatchupServer;
import org.neo4j.coreedge.raft.membership.MembershipWaiter;
import org.neo4j.coreedge.raft.replication.id.ReplicatedIdGeneratorFactory;
import org.neo4j.coreedge.raft.RaftInstance;
import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.RaftServer;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.raft.ScheduledTimeoutService;
import org.neo4j.coreedge.raft.membership.MembershipWaiter;
import org.neo4j.coreedge.raft.replication.id.ReplicatedIdGeneratorFactory;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.core.CoreServerStartupProcess;
import org.neo4j.coreedge.raft.ScheduledTimeoutService;
import org.neo4j.coreedge.server.core.DeleteStoreOnStartUp;
import org.neo4j.coreedge.server.core.RaftLogReplay;
import org.neo4j.helpers.collection.IteratorUtil;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.Lifecycle;

import static org.mockito.Matchers.any;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import static org.neo4j.coreedge.CoreServerStartupProcessTest.LifeSupportMatcherBuilder.startsComponent;

public class CoreServerStartupProcessTest
{
@Test
public void startShouldDeleteStoreAndStartNewDatabase() throws Throwable
public void raftTimeOutServiceTriggersMessagesSentToAnotherServer() throws Exception
{
// given
LocalDatabase localDatabase = mock( LocalDatabase.class );
DataSourceManager dataSourceManager = mock( DataSourceManager.class );
ReplicatedIdGeneratorFactory idGeneratorFactory = mock( ReplicatedIdGeneratorFactory.class );
RaftLog raftLog = mock( RaftLog.class );
RaftServer<CoreMember> raftServer = mock( RaftServer.class );
CatchupServer catchupServer = mock( CatchupServer.class );
ScheduledTimeoutService timeoutService = mock( ScheduledTimeoutService.class );
ScheduledTimeoutService raftTimeoutService = mock( ScheduledTimeoutService.class );
MembershipWaiter<CoreMember> membershipWaiter = mock( MembershipWaiter.class );
when(membershipWaiter.waitUntilCaughtUpMember( any(ReadableRaftState.class) ))
.thenReturn( mock( CompletableFuture.class ) );
RaftInstance raftInstance = mock( RaftInstance.class );

DeleteStoreOnStartUp deleteStoreOnStartUp = mock( DeleteStoreOnStartUp.class );
RaftLogReplay raftLogReplay = mock( RaftLogReplay.class );

LifeSupport lifeSupport = CoreServerStartupProcess.createLifeSupport( dataSourceManager,
idGeneratorFactory, raftInstance, raftServer, catchupServer, raftTimeoutService,
membershipWaiter, 0, deleteStoreOnStartUp, raftLogReplay );

CoreServerStartupProcess leaderProcess = new CoreServerStartupProcess(
localDatabase, dataSourceManager, idGeneratorFactory, mock( RaftInstance.class ), raftLog, raftServer,
catchupServer, timeoutService, membershipWaiter, 1000 );
assertThat( lifeSupport, startsComponent( raftTimeoutService ).after( raftServer )
.because( "server need to be ready to handle responses generated by timeout events" ) );

// when
leaderProcess.start();
assertThat( lifeSupport, startsComponent( raftTimeoutService ).after( raftLogReplay )
.because( "elections which must request votes from the latest known voting members" ) );

// then
verify( localDatabase ).deleteStore();
verify( dataSourceManager ).start();
verify( idGeneratorFactory ).start();
assertThat( lifeSupport, startsComponent( raftLogReplay ).after( dataSourceManager )
.because( "transactions are replayed from the RAFT log into the data source" ) );

assertThat( lifeSupport, startsComponent( idGeneratorFactory ).after( dataSourceManager )
.because( "IDs are generated into the data source" ) );
}

@Test
public void stopShouldStopDatabase() throws Throwable
static class LifeSupportMatcher extends TypeSafeMatcher<LifeSupport>
{
// given
LocalDatabase localDatabase = mock( LocalDatabase.class );
DataSourceManager dataSourceManager = mock( DataSourceManager.class );
ReplicatedIdGeneratorFactory idGeneratorFactory = mock( ReplicatedIdGeneratorFactory.class );
RaftLog raftLog = mock( RaftLog.class );
RaftServer<CoreMember> raftServer = mock( RaftServer.class );
CatchupServer catchupServer = mock( CatchupServer.class );
ScheduledTimeoutService timeoutService = mock( ScheduledTimeoutService.class );
MembershipWaiter<CoreMember> membershipListener = mock( MembershipWaiter.class );
when(membershipListener.waitUntilCaughtUpMember( any(ReadableRaftState.class) )).thenReturn( mock(CompletableFuture.class) );
private final Lifecycle component1;
private final Lifecycle component2;
private final String reason;

public LifeSupportMatcher( Lifecycle component1, Lifecycle component2, String reason )
{
this.component1 = component1;
this.component2 = component2;
this.reason = reason;
}

@Override
protected boolean matchesSafely( LifeSupport lifeSupport )
{
List<Lifecycle> lifeCycles = IteratorUtil.asList( lifeSupport.getLifecycleInstances() );
return lifeCycles.indexOf( component2 ) < lifeCycles.indexOf( component1 );
}

@Override
public void describeTo( Description description )
{
description.appendText( component1.toString().replaceAll( "Mock for (.*), hashCode.*", "$1" ) );
description.appendText( " starts after " );
description.appendText( component2.toString().replaceAll( "Mock for (.*), hashCode.*", "$1" ) );
description.appendText( " because " + this.reason );
}
}

static class LifeSupportMatcherBuilder
{
private Lifecycle component1;
private Lifecycle component2;

CoreServerStartupProcess leaderProcess = new CoreServerStartupProcess(
localDatabase, dataSourceManager, idGeneratorFactory, mock( RaftInstance.class ), raftLog, raftServer,
catchupServer, timeoutService, membershipListener, 1000 );
public static LifeSupportMatcherBuilder startsComponent( Lifecycle component )
{
LifeSupportMatcherBuilder builder = new LifeSupportMatcherBuilder();
builder.component1 = component;
return builder;
}

// when
leaderProcess.start();
leaderProcess.stop();
public LifeSupportMatcherBuilder after( Lifecycle component2 )
{
this.component2 = component2;
return this;
}

// then
verify( dataSourceManager ).stop();
verify( idGeneratorFactory ).stop();
public LifeSupportMatcher because( String reason )
{
return new LifeSupportMatcher( component1, component2, reason );
}
}
}

0 comments on commit 32b33e9

Please sign in to comment.