Skip to content

Commit

Permalink
Static marshal for member ID
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Jun 11, 2018
1 parent 093a873 commit 20e2b87
Show file tree
Hide file tree
Showing 20 changed files with 40 additions and 39 deletions.
Expand Up @@ -62,7 +62,7 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config

DurableStateStorage<GlobalSessionTrackerState> sessionTrackerStorage;
sessionTrackerStorage = life.add( new DurableStateStorage<>( fileSystem, clusterStateDirectory,
SESSION_TRACKER_NAME, new GlobalSessionTrackerState.Marshal( new MemberId.Marshal() ),
SESSION_TRACKER_NAME, new GlobalSessionTrackerState.Marshal( MemberId.MARSHAL ),
config.get( CausalClusteringSettings.global_session_tracker_state_size ), logProvider ) );

sessionTracker = new SessionTracker( sessionTrackerStorage );
Expand Down
Expand Up @@ -48,7 +48,7 @@ public class IdentityModule
Log log = logProvider.getLog( getClass() );

SimpleStorage<MemberId> memberIdStorage = new SimpleFileStorage<>( fileSystem, clusterStateDirectory,
CORE_MEMBER_ID_NAME, new MemberId.Marshal(), logProvider );
CORE_MEMBER_ID_NAME, MemberId.MARSHAL, logProvider );

try
{
Expand Down
Expand Up @@ -110,7 +110,7 @@ public ConsensusModule( MemberId myself, final PlatformModule platformModule,
termState = new MonitoredTermStateStorage( durableTermState, platformModule.monitors );

voteState = life.add( new DurableStateStorage<>( fileSystem, clusterStateDirectory, RAFT_VOTE_NAME,
new VoteState.Marshal( new MemberId.Marshal() ), config.get( CausalClusteringSettings.vote_state_size ),
new VoteState.Marshal( MemberId.MARSHAL ), config.get( CausalClusteringSettings.vote_state_size ),
logProvider ) );

raftMembershipStorage = life.add(
Expand Down
Expand Up @@ -57,7 +57,7 @@ public static void marshal( MemberIdSet memberSet, WritableChannel channel ) thr
Set<MemberId> members = memberSet.getMembers();
channel.putInt( members.size() );

MemberId.Marshal memberIdMarshal = new MemberId.Marshal();
MemberId.Marshal memberIdMarshal = MemberId.MARSHAL;

for ( MemberId member : members )
{
Expand All @@ -70,7 +70,7 @@ public static MemberIdSet unmarshal( ReadableChannel channel ) throws IOExceptio
HashSet<MemberId> members = new HashSet<>();
int memberCount = channel.getInt();

MemberId.Marshal memberIdMarshal = new MemberId.Marshal();
MemberId.Marshal memberIdMarshal = MemberId.MARSHAL;

for ( int i = 0; i < memberCount; i++ )
{
Expand Down
Expand Up @@ -89,7 +89,7 @@ public String toString()

public static class Marshal extends SafeStateMarshal<MembershipEntry>
{
MemberId.Marshal memberMarshal = new MemberId.Marshal();
MemberId.Marshal memberMarshal = MemberId.MARSHAL;

@Override
public MembershipEntry startState()
Expand Down
Expand Up @@ -88,7 +88,7 @@ public ByteBufAwareMarshal serialize()
{
channel1.putLong( globalSession().sessionId().getMostSignificantBits() );
channel1.putLong( globalSession().sessionId().getLeastSignificantBits() );
new MemberId.Marshal().marshal( globalSession().owner(), channel1 );
MemberId.MARSHAL.marshal( globalSession().owner(), channel1 );

channel1.putLong( operationId.localSessionId() );
channel1.putLong( operationId.sequenceNumber() );
Expand All @@ -99,7 +99,7 @@ public static ContentBuilder<ReplicatedContent> deserialize( ReadableChannel cha
{
long mostSigBits = channel.getLong();
long leastSigBits = channel.getLong();
MemberId owner = new MemberId.Marshal().unmarshal( channel );
MemberId owner = MemberId.MARSHAL.unmarshal( channel );
GlobalSession globalSession = new GlobalSession( new UUID( mostSigBits, leastSigBits ), owner );

long localSessionId = channel.getLong();
Expand Down
Expand Up @@ -38,7 +38,6 @@
import org.neo4j.causalclustering.core.state.storage.SimpleStorage;
import org.neo4j.causalclustering.core.state.storage.StateMarshal;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.MemberId.Marshal;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.configuration.Config;
Expand Down Expand Up @@ -91,22 +90,22 @@ public static void main( String[] args ) throws IOException, ClusterStateExcepti
void dump() throws IOException
{
SimpleStorage<MemberId> memberIdStorage = new SimpleFileStorage<>( fs, clusterStateDirectory, CORE_MEMBER_ID_NAME,
new Marshal(), NullLogProvider.getInstance() );
MemberId.MARSHAL, NullLogProvider.getInstance() );
if ( memberIdStorage.exists() )
{
MemberId memberId = memberIdStorage.readState();
out.println( CORE_MEMBER_ID_NAME + ": " + memberId );
}

dumpState( LAST_FLUSHED_NAME, new LongIndexMarshal() );
dumpState( LOCK_TOKEN_NAME, new ReplicatedLockTokenState.Marshal( new Marshal() ) );
dumpState( LOCK_TOKEN_NAME, new ReplicatedLockTokenState.Marshal( MemberId.MARSHAL ) );
dumpState( ID_ALLOCATION_NAME, new IdAllocationState.Marshal() );
dumpState( SESSION_TRACKER_NAME, new GlobalSessionTrackerState.Marshal( new Marshal() ) );
dumpState( SESSION_TRACKER_NAME, new GlobalSessionTrackerState.Marshal( MemberId.MARSHAL ) );

/* raft state */
dumpState( RAFT_MEMBERSHIP_NAME, new RaftMembershipState.Marshal() );
dumpState( RAFT_TERM_NAME, new TermState.Marshal() );
dumpState( RAFT_VOTE_NAME, new VoteState.Marshal( new Marshal() ) );
dumpState( RAFT_VOTE_NAME, new VoteState.Marshal( MemberId.MARSHAL ) );
}

private void dumpState( String name, StateMarshal<?> marshal )
Expand Down
Expand Up @@ -125,7 +125,7 @@ public CoreStateMachinesModule( MemberId myself, PlatformModule platformModule,

lockTokenState = life.add(
new DurableStateStorage<>( fileSystem, clusterStateDirectory, LOCK_TOKEN_NAME,
new ReplicatedLockTokenState.Marshal( new MemberId.Marshal() ),
new ReplicatedLockTokenState.Marshal( MemberId.MARSHAL ),
config.get( replicated_lock_token_state_size ), logProvider ) );

idAllocationState = life.add(
Expand Down
Expand Up @@ -40,15 +40,15 @@ private ReplicatedIdAllocationRequestSerializer()
public static void marshal( ReplicatedIdAllocationRequest idRangeRequest, WritableChannel channel )
throws IOException
{
new MemberId.Marshal().marshal( idRangeRequest.owner(), channel );
MemberId.MARSHAL.marshal( idRangeRequest.owner(), channel );
channel.putInt( idRangeRequest.idType().ordinal() );
channel.putLong( idRangeRequest.idRangeStart() );
channel.putInt( idRangeRequest.idRangeLength() );
}

public static ReplicatedIdAllocationRequest unmarshal( ReadableChannel channel ) throws IOException, EndOfStreamException
{
MemberId owner = new MemberId.Marshal().unmarshal( channel );
MemberId owner = MemberId.MARSHAL.unmarshal( channel );
IdType idType = IdType.values()[ channel.getInt() ];
long idRangeStart = channel.getLong();
int idRangeLength = channel.getInt();
Expand Down
Expand Up @@ -39,13 +39,13 @@ private ReplicatedLockTokenSerializer()
public static void marshal( ReplicatedLockTokenRequest tokenRequest, WritableChannel channel ) throws IOException
{
channel.putInt( tokenRequest.id() );
new MemberId.Marshal().marshal( tokenRequest.owner(), channel );
MemberId.MARSHAL.marshal( tokenRequest.owner(), channel );
}

public static ReplicatedLockTokenRequest unmarshal( ReadableChannel channel ) throws IOException, EndOfStreamException
{
int candidateId = channel.getInt();
MemberId owner = new MemberId.Marshal().unmarshal( channel );
MemberId owner = MemberId.MARSHAL.unmarshal( channel );

return new ReplicatedLockTokenRequest( owner, candidateId );
}
Expand Down
Expand Up @@ -30,8 +30,8 @@

public enum CoreStateType
{
LOCK_TOKEN( new ReplicatedLockTokenState.Marshal( new MemberId.Marshal() ) ),
SESSION_TRACKER( new GlobalSessionTrackerState.Marshal( new MemberId.Marshal() ) ),
LOCK_TOKEN( new ReplicatedLockTokenState.Marshal( MemberId.MARSHAL ) ),
SESSION_TRACKER( new GlobalSessionTrackerState.Marshal( MemberId.MARSHAL ) ),
ID_ALLOCATION( new IdAllocationState.Marshal() ),
RAFT_CORE_STATE( new RaftCoreState.Marshal() );

Expand Down
Expand Up @@ -24,7 +24,6 @@

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.Objects;
import java.util.UUID;

Expand All @@ -37,6 +36,8 @@
// TODO: basics for Serializable
public class MemberId implements Serializable
{
public static final Marshal MARSHAL = new Marshal();

private final UUID uuid;
private final String shortName;

Expand Down Expand Up @@ -89,7 +90,9 @@ public int hashCode()
*/
public static class Marshal extends SafeStateMarshal<MemberId>
{
private static final Charset UTF8 = Charset.forName("UTF-8");
private Marshal()
{
}

@Override
public void marshal( MemberId memberId, WritableChannel channel ) throws IOException
Expand Down
Expand Up @@ -174,7 +174,7 @@ else if ( messageType.equals( LOG_COMPACTION_INFO ) )

private MemberId retrieveMember( ReadableChannel buffer ) throws IOException, EndOfStreamException
{
MemberId.Marshal memberIdMarshal = new MemberId.Marshal();
MemberId.Marshal memberIdMarshal = MemberId.MARSHAL;
return memberIdMarshal.unmarshal( buffer );
}
}
Expand Up @@ -50,7 +50,7 @@ public synchronized void encode( ChannelHandlerContext ctx,
{
RaftMessages.RaftMessage message = decoratedMessage.message();
ClusterId clusterId = decoratedMessage.clusterId();
MemberId.Marshal memberMarshal = new MemberId.Marshal();
MemberId.Marshal memberMarshal = MemberId.MARSHAL;

NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( out );
ClusterId.Marshal.INSTANCE.marshal( clusterId, channel );
Expand Down
Expand Up @@ -183,8 +183,7 @@ Optional<RaftMessages.ClusterIdAwareMessage> maybeCompose( Clock clock, Queue<Lo

private MemberId retrieveMember( ReadableChannel buffer ) throws IOException, EndOfStreamException
{
MemberId.Marshal memberIdMarshal = new MemberId.Marshal();
return memberIdMarshal.unmarshal( buffer );
return MemberId.MARSHAL.unmarshal( buffer );
}

interface LazyComposer
Expand Down
Expand Up @@ -38,7 +38,7 @@ protected void encode( ChannelHandlerContext ctx, RaftMessages.ClusterIdAwareMes
{
RaftMessages.RaftMessage message = decoratedMessage.message();
ClusterId clusterId = decoratedMessage.clusterId();
MemberId.Marshal memberMarshal = new MemberId.Marshal();
MemberId.Marshal memberMarshal = MemberId.MARSHAL;

NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( out );
ClusterId.Marshal.INSTANCE.marshal( clusterId, channel );
Expand Down
Expand Up @@ -42,7 +42,7 @@ public class MemberIdMarshalTest
public void shouldSerializeAndDeserialize() throws Exception
{
// given
MemberId.Marshal marshal = new MemberId.Marshal();
MemberId.Marshal marshal = MemberId.MARSHAL;

final MemberId member = new MemberId( UUID.randomUUID() );

Expand All @@ -60,7 +60,7 @@ public void shouldThrowExceptionForHalfWrittenInstance() throws Exception
{
// given
// a CoreMember and a ByteBuffer to write it to
MemberId.Marshal marshal = new MemberId.Marshal();
MemberId.Marshal marshal = MemberId.MARSHAL;
final MemberId aRealMember = new MemberId( UUID.randomUUID() );

ByteBuf buffer = Unpooled.buffer( 1000 );
Expand Down
Expand Up @@ -89,18 +89,18 @@ public void shouldDumpClusterState() throws Exception
private void createStates() throws IOException
{
SimpleStorage<MemberId> memberIdStorage = new SimpleFileStorage<>( fsa.get(), clusterStateDirectory.get(),
CORE_MEMBER_ID_NAME, new MemberId.Marshal(), NullLogProvider.getInstance() );
CORE_MEMBER_ID_NAME, MemberId.MARSHAL, NullLogProvider.getInstance() );
memberIdStorage.writeState( new MemberId( UUID.randomUUID() ) );

createDurableState( LAST_FLUSHED_NAME, new LongIndexMarshal() );
createDurableState( LOCK_TOKEN_NAME, new ReplicatedLockTokenState.Marshal( new MemberId.Marshal() ) );
createDurableState( LOCK_TOKEN_NAME, new ReplicatedLockTokenState.Marshal( MemberId.MARSHAL ) );
createDurableState( ID_ALLOCATION_NAME, new IdAllocationState.Marshal() );
createDurableState( SESSION_TRACKER_NAME, new GlobalSessionTrackerState.Marshal( new MemberId.Marshal() ) );
createDurableState( SESSION_TRACKER_NAME, new GlobalSessionTrackerState.Marshal( MemberId.MARSHAL ) );

/* raft state */
createDurableState( RAFT_MEMBERSHIP_NAME, new RaftMembershipState.Marshal() );
createDurableState( RAFT_TERM_NAME, new TermState.Marshal() );
createDurableState( RAFT_VOTE_NAME, new VoteState.Marshal( new MemberId.Marshal() ) );
createDurableState( RAFT_VOTE_NAME, new VoteState.Marshal( MemberId.MARSHAL ) );
}

private <T> void createDurableState( String name, StateMarshal<T> marshal )
Expand Down
Expand Up @@ -192,15 +192,15 @@ public void shouldPersistAndRecoverState() throws Exception
fsa.mkdir( testDir.directory() );

StateMarshal<ReplicatedLockTokenState> marshal =
new ReplicatedLockTokenState.Marshal( new MemberId.Marshal() );
new ReplicatedLockTokenState.Marshal( MemberId.MARSHAL );

MemberId memberA = member( 0 );
MemberId memberB = member( 1 );
int candidateId;

DurableStateStorage<ReplicatedLockTokenState> storage = new DurableStateStorage<>( fsa, testDir.directory(),
"state", marshal, 100, NullLogProvider.getInstance() );
try ( Lifespan lifespan = new Lifespan( storage ) )
try ( Lifespan ignored = new Lifespan( storage ) )
{
ReplicatedLockTokenStateMachine stateMachine = new ReplicatedLockTokenStateMachine( storage );

Expand All @@ -217,7 +217,7 @@ public void shouldPersistAndRecoverState() throws Exception
// then
DurableStateStorage<ReplicatedLockTokenState> storage2 = new DurableStateStorage<>(
fsa, testDir.directory(), "state", marshal, 100, NullLogProvider.getInstance() );
try ( Lifespan lifespan = new Lifespan( storage2 ) )
try ( Lifespan ignored = new Lifespan( storage2 ) )
{
ReplicatedLockTokenState initialState = storage2.getInitialState();

Expand All @@ -234,12 +234,12 @@ public void shouldBeIdempotent()
fsa.mkdir( testDir.directory() );

StateMarshal<ReplicatedLockTokenState> marshal =
new ReplicatedLockTokenState.Marshal( new MemberId.Marshal() );
new ReplicatedLockTokenState.Marshal( MemberId.MARSHAL );

DurableStateStorage<ReplicatedLockTokenState> storage = new DurableStateStorage<>( fsa, testDir.directory(),
"state", marshal, 100, NullLogProvider.getInstance() );

try ( Lifespan lifespan = new Lifespan( storage ) )
try ( Lifespan ignored = new Lifespan( storage ) )
{
ReplicatedLockTokenStateMachine stateMachine = new ReplicatedLockTokenStateMachine( storage );

Expand Down
Expand Up @@ -44,7 +44,7 @@ public void shouldWriteAndReadState() throws Exception
{
// given
SimpleStorage<MemberId> storage = new SimpleFileStorage<>( fsa.get(), new File( "state-dir" ),
"member-id-a", new MemberId.Marshal(), NullLogProvider.getInstance() );
"member-id-a", MemberId.MARSHAL, NullLogProvider.getInstance() );

// when
MemberId idA = new MemberId( UUID.randomUUID() );
Expand Down

0 comments on commit 20e2b87

Please sign in to comment.