Skip to content

Commit

Permalink
core-edge: dump cluster state tool
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski authored and apcj committed Jul 26, 2016
1 parent f2e3aaf commit 4e06d5f
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 33 deletions.
Expand Up @@ -93,8 +93,8 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C
try
{
lastFlushedStorage = life.add(
new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "last-flushed-state" ),
"last-flushed", new LongIndexMarshal(), config.get( CoreEdgeClusterSettings.last_flushed_state_size ),
new DurableStateStorage<>( fileSystem, clusterStateDirectory, ReplicationModule.LAST_FLUSHED_NAME,
new LongIndexMarshal(), config.get( CoreEdgeClusterSettings.last_flushed_state_size ),
databaseHealthSupplier, logProvider ) );
}
catch ( IOException e )
Expand Down
Expand Up @@ -71,6 +71,9 @@

public class CoreStateMachinesModule
{
public static final String ID_ALLOCATION_NAME = "id-allocation";
public static final String LOCK_TOKEN_NAME = "lock-token";

public final IdGeneratorFactory idGeneratorFactory;
public final IdTypeConfigurationProvider idTypeConfigurationProvider;
public final LabelTokenHolder labelTokenHolder;
Expand All @@ -97,14 +100,14 @@ public CoreStateMachinesModule( MemberId myself, PlatformModule platformModule,
try
{
lockTokenState = life.add(
new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "lock-token-state" ),
"lock-token", new ReplicatedLockTokenState.Marshal( new MemberId.MemberIdMarshal() ),
new DurableStateStorage<>( fileSystem, clusterStateDirectory, LOCK_TOKEN_NAME,
new ReplicatedLockTokenState.Marshal( new MemberId.MemberIdMarshal() ),
config.get( CoreEdgeClusterSettings.replicated_lock_token_state_size ),
databaseHealthSupplier, logProvider ) );

idAllocationState = life.add(
new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "id-allocation-state" ),
"id-allocation", new IdAllocationState.Marshal(),
new DurableStateStorage<>( fileSystem, clusterStateDirectory, ID_ALLOCATION_NAME,
new IdAllocationState.Marshal(),
config.get( CoreEdgeClusterSettings.id_alloc_state_size ), databaseHealthSupplier,
logProvider ) );
}
Expand Down
Expand Up @@ -47,6 +47,9 @@

public class ReplicationModule
{
public static final String LAST_FLUSHED_NAME = "last-flushed";
public static final String SESSION_TRACKER_NAME = "session-tracker";

private final RaftReplicator replicator;
private final ProgressTrackerImpl progressTracker;
private final SessionTracker sessionTracker;
Expand All @@ -60,13 +63,10 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config
DurableStateStorage<GlobalSessionTrackerState> sessionTrackerStorage;
try
{
sessionTrackerStorage = life.add(
new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "session-tracker-state" ),
"session-tracker",
new GlobalSessionTrackerState.Marshal( new MemberId.MemberIdMarshal() ),
config.get( CoreEdgeClusterSettings.global_session_tracker_state_size ),
databaseHealthSupplier, logProvider ) );

sessionTrackerStorage = life.add( new DurableStateStorage<>( fileSystem, clusterStateDirectory,
SESSION_TRACKER_NAME, new GlobalSessionTrackerState.Marshal( new MemberId.MemberIdMarshal() ),
config.get( CoreEdgeClusterSettings.global_session_tracker_state_size ), databaseHealthSupplier,
logProvider ) );
}
catch ( IOException e )
{
Expand Down
Expand Up @@ -72,6 +72,10 @@

public class ConsensusModule
{
public static final String RAFT_MEMBERSHIP_NAME = "membership";
public static final String RAFT_TERM_NAME = "term";
public static final String RAFT_VOTE_NAME = "vote";

private final MonitoredRaftLog raftLog;
private final RaftInstance raftInstance;
private final DelayedRenewableTimeoutService raftTimeoutService;
Expand Down Expand Up @@ -124,24 +128,22 @@ public ConsensusModule( MemberId myself, final PlatformModule platformModule,
try
{
StateStorage<TermState> durableTermState = life.add(
new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "term-state" ),
"term-state", new TermState.Marshal(),
config.get( CoreEdgeClusterSettings.term_state_size ), databaseHealthSupplier,
logProvider ) );
new DurableStateStorage<>( fileSystem, clusterStateDirectory, RAFT_TERM_NAME,
new TermState.Marshal(), config.get( CoreEdgeClusterSettings.term_state_size ),
databaseHealthSupplier, logProvider ) );

termState = new MonitoredTermStateStorage( durableTermState, platformModule.monitors );

voteState = life.add(
new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "vote-state" ),
"vote-state", new VoteState.Marshal( new MemberId.MemberIdMarshal() ),
new DurableStateStorage<>( fileSystem, clusterStateDirectory, RAFT_VOTE_NAME,
new VoteState.Marshal( new MemberId.MemberIdMarshal() ),
config.get( CoreEdgeClusterSettings.vote_state_size ), databaseHealthSupplier,
logProvider ) );

raftMembershipStorage = life.add(
new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "membership-state" ),
"membership-state", new RaftMembershipState.Marshal(),
config.get( CoreEdgeClusterSettings.raft_membership_state_size ), databaseHealthSupplier,
logProvider ) );
new DurableStateStorage<>( fileSystem, clusterStateDirectory, RAFT_MEMBERSHIP_NAME,
new RaftMembershipState.Marshal(), config.get( CoreEdgeClusterSettings.raft_membership_state_size ),
databaseHealthSupplier, logProvider ) );
}
catch ( IOException e )
{
Expand Down
Expand Up @@ -45,7 +45,12 @@ public class DurableStateStorage<STATE> extends LifecycleAdapter implements Stat

private PhysicalFlushableChannel currentStoreChannel;

public DurableStateStorage( FileSystemAbstraction fileSystemAbstraction, File stateDir, String name,
private File stateDir( File baseDir, String name )
{
return new File( baseDir, name + "-state" );
}

public DurableStateStorage( FileSystemAbstraction fileSystemAbstraction, File baseDir, String name,
StateMarshal<STATE> marshal, int numberOfEntriesBeforeRotation,
Supplier<DatabaseHealth> databaseHealthSupplier, LogProvider logProvider )
throws IOException
Expand All @@ -56,8 +61,8 @@ public DurableStateStorage( FileSystemAbstraction fileSystemAbstraction, File st
this.numberOfEntriesBeforeRotation = numberOfEntriesBeforeRotation;
this.databaseHealthSupplier = databaseHealthSupplier;

fileA = new File( stateDir, name + ".a" );
fileB = new File( stateDir, name + ".b" );
fileA = new File( stateDir( baseDir, name ), name + ".a" );
fileB = new File( stateDir( baseDir, name ), name + ".b" );

StateRecoveryManager<STATE> recoveryManager =
new StateRecoveryManager<>( fileSystemAbstraction, marshal );
Expand Down Expand Up @@ -109,7 +114,7 @@ public synchronized void persistStoreData( STATE state ) throws IOException
}
}

protected void switchStoreFile() throws IOException
void switchStoreFile() throws IOException
{
currentStoreChannel.close();

Expand Down
Expand Up @@ -98,4 +98,12 @@ public long ordinal( TermState state )
return state.currentTerm();
}
}

@Override
public String toString()
{
return "TermState{" +
"term=" + term +
'}';
}
}
Expand Up @@ -120,4 +120,13 @@ public long ordinal( VoteState state )
return state.term();
}
}

@Override
public String toString()
{
return "VoteState{" +
"votedFor=" + votedFor +
", term=" + term +
'}';
}
}
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.server.core;

import java.io.File;
import java.io.IOException;

import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState;
import org.neo4j.coreedge.raft.state.DurableStateStorage;
import org.neo4j.coreedge.raft.state.LongIndexMarshal;
import org.neo4j.coreedge.raft.state.StateMarshal;
import org.neo4j.coreedge.raft.state.id_allocation.IdAllocationState;
import org.neo4j.coreedge.raft.state.membership.RaftMembershipState;
import org.neo4j.coreedge.raft.state.term.TermState;
import org.neo4j.coreedge.raft.state.vote.VoteState;
import org.neo4j.coreedge.server.MemberId.MemberIdMarshal;
import org.neo4j.coreedge.server.core.locks.ReplicatedLockTokenState;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.logging.NullLogProvider;

import static org.neo4j.coreedge.CoreStateMachinesModule.ID_ALLOCATION_NAME;
import static org.neo4j.coreedge.CoreStateMachinesModule.LOCK_TOKEN_NAME;
import static org.neo4j.coreedge.ReplicationModule.LAST_FLUSHED_NAME;
import static org.neo4j.coreedge.ReplicationModule.SESSION_TRACKER_NAME;
import static org.neo4j.coreedge.raft.ConsensusModule.RAFT_MEMBERSHIP_NAME;
import static org.neo4j.coreedge.raft.ConsensusModule.RAFT_TERM_NAME;
import static org.neo4j.coreedge.raft.ConsensusModule.RAFT_VOTE_NAME;
import static org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule.CLUSTER_STATE_DIRECTORY_NAME;
import static org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule.CORE_MEMBER_ID_NAME;

public class DumpClusterState
{
public static void main( String[] args ) throws IOException
{
FileSystemAbstraction fs = new DefaultFileSystemAbstraction();
File baseDir = new File( args[0], CLUSTER_STATE_DIRECTORY_NAME );

/* core state */
dumpState( fs, baseDir, LAST_FLUSHED_NAME, new LongIndexMarshal() );
dumpState( fs, baseDir, CORE_MEMBER_ID_NAME, new MemberIdMarshal() );

dumpState( fs, baseDir, LOCK_TOKEN_NAME, new ReplicatedLockTokenState.Marshal( new MemberIdMarshal() ) );
dumpState( fs, baseDir, ID_ALLOCATION_NAME, new IdAllocationState.Marshal() );
dumpState( fs, baseDir, SESSION_TRACKER_NAME, new GlobalSessionTrackerState.Marshal( new MemberIdMarshal() ) );

/* raft state */
dumpState( fs, baseDir, RAFT_MEMBERSHIP_NAME, new RaftMembershipState.Marshal() );
dumpState( fs, baseDir, RAFT_TERM_NAME, new TermState.Marshal() );
dumpState( fs, baseDir, RAFT_VOTE_NAME, new VoteState.Marshal( new MemberIdMarshal() ) );
}

private static void dumpState( FileSystemAbstraction fs, File baseDir, String name, StateMarshal<?> marshal ) throws IOException
{
DurableStateStorage<?> storage = new DurableStateStorage<>(
fs, baseDir, name, marshal, 1024, null, NullLogProvider.getInstance() );

System.out.println( name + ": " + storage.getInitialState() );
storage.shutdown();
}
}
Expand Up @@ -97,6 +97,7 @@
public class EnterpriseCoreEditionModule extends EditionModule
{
public static final String CLUSTER_STATE_DIRECTORY_NAME = "cluster-state";
public static final String CORE_MEMBER_ID_NAME = "core-member-id";

private final ConsensusModule consensusModule;
private final CoreTopologyService discoveryService;
Expand Down Expand Up @@ -142,7 +143,7 @@ public void registerProcedures( Procedures procedures )
try
{
StateStorage<MemberId> idStorage = life.add( new DurableStateStorage<>(
fileSystem, clusterStateDirectory, "raft-member-id", new MemberIdMarshal(), 1,
fileSystem, clusterStateDirectory, CORE_MEMBER_ID_NAME, new MemberIdMarshal(), 1,
databaseHealthSupplier, logProvider ) );
MemberId member = idStorage.getInitialState();
if ( member == null )
Expand Down
Expand Up @@ -98,7 +98,7 @@ public void shouldProperlyRecoveryAfterCrashOnFileCreationDuringRotation() throw
FileSystemAbstraction.class.getMethod( "truncate", File.class, long.class ) ),
normalFSA );
SelectiveFileSystemAbstraction combinedFSA = new SelectiveFileSystemAbstraction(
new File( testDir.directory(), "long.a" ), breakingFSA, normalFSA );
new File( new File( testDir.directory(), "long-state" ), "long.a" ), breakingFSA, normalFSA );

LongState persistedState = new LongState( combinedFSA, testDir.directory(), 14 );
long lastValue = 0;
Expand Down Expand Up @@ -140,7 +140,7 @@ public void shouldProperlyRecoveryAfterCrashOnFileForceDuringWrite() throws Exce
StoreChannel.class.getMethod( "force", boolean.class ) ),
normalFSA );
SelectiveFileSystemAbstraction combinedFSA = new SelectiveFileSystemAbstraction(
new File( testDir.directory(), "long.a" ), breakingFSA, normalFSA );
new File( new File( testDir.directory(), "long-state" ), "long.a" ), breakingFSA, normalFSA );

LongState persistedState = new LongState( combinedFSA, testDir.directory(), 14 );
long lastValue = 0;
Expand Down Expand Up @@ -210,7 +210,7 @@ public void shouldProperlyRecoveryAfterCloseOnActiveFileDuringRotation() throws
StoreChannel.class.getMethod( "close" ) ),
normalFSA );
SelectiveFileSystemAbstraction combinedFSA = new SelectiveFileSystemAbstraction(
new File( testDir.directory(), "long.a" ), breakingFSA, normalFSA );
new File( new File( testDir.directory(), "long-state" ), "long.a" ), breakingFSA, normalFSA );

LongState persistedState = new LongState( combinedFSA, testDir.directory(), 14 );
long lastValue = 0;
Expand Down
Expand Up @@ -200,12 +200,12 @@ public long ordinal( AtomicInteger atomicInteger )

private File stateFileA()
{
return new File( testDir.directory(), "state.a" );
return new File( new File( testDir.directory(), "state-state" ), "state.a" );
}

private File stateFileB()
{
return new File( testDir.directory(), "state.b" );
return new File( new File( testDir.directory(), "state-state" ), "state.b" );
}

@SuppressWarnings("unchecked")
Expand Down

0 comments on commit 4e06d5f

Please sign in to comment.