Skip to content

Commit

Permalink
Decorate all the Raft messages with StoreId
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Needham committed Jun 16, 2016
1 parent 8ff6bd9 commit 2651e8c
Show file tree
Hide file tree
Showing 37 changed files with 254 additions and 214 deletions.
Expand Up @@ -94,4 +94,9 @@ public Flushable prepareForFlush()
{
return null;
}

public ByteBuf buffer()
{
return delegate;
}
}
Expand Up @@ -27,6 +27,7 @@
import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.raft.RaftMessages.RaftMessage;
import org.neo4j.coreedge.raft.net.Inbound.MessageHandler;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

Expand All @@ -36,17 +37,15 @@ public class BatchingMessageHandler<MEMBER> implements Runnable, MessageHandler<
{
private final Log log;
private final MessageHandler<RaftMessage<MEMBER>> innerHandler;
private final LocalDatabase localDatabase;

private final BlockingQueue<RaftMessage<MEMBER>> messageQueue;
private final int maxBatch;
private final List<RaftMessage<MEMBER>> batch;

public BatchingMessageHandler( MessageHandler<RaftMessage<MEMBER>> innerHandler, LogProvider logProvider,
int queueSize, int maxBatch, LocalDatabase localDatabase )
int queueSize, int maxBatch )
{
this.innerHandler = innerHandler;
this.localDatabase = localDatabase;
this.log = logProvider.getLog( getClass() );
this.maxBatch = maxBatch;

Expand All @@ -55,9 +54,9 @@ public BatchingMessageHandler( MessageHandler<RaftMessage<MEMBER>> innerHandler,
}

@Override
public boolean validate( RaftMessage<MEMBER> message )
public boolean validate( RaftMessage<MEMBER> message, StoreId storeId )
{
return innerHandler.validate( message );
return innerHandler.validate( message, storeId );
}

@Override
Expand Down Expand Up @@ -115,7 +114,7 @@ private void collateAndHandleBatch( List<RaftMessage<MEMBER>> batch )

if ( batchRequest == null )
{
batchRequest = new RaftMessages.NewEntry.Batch<>( batch.size(), localDatabase.storeId() );
batchRequest = new RaftMessages.NewEntry.Batch<>( batch.size() );
}
batchRequest.add( newEntryRequest.content() );
}
Expand Down
Expand Up @@ -51,6 +51,7 @@
import org.neo4j.coreedge.server.core.NotMyselfSelectionStrategy;
import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy;
import org.neo4j.kernel.impl.store.MismatchingStoreIdException;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.store.kvstore.Rotation;
import org.neo4j.kernel.impl.util.Listener;
import org.neo4j.kernel.internal.DatabaseHealth;
Expand Down Expand Up @@ -309,12 +310,11 @@ else if ( oldLeader != null && !oldLeader.equals( outcome.getLeader() ) )
}

@Override
public boolean validate( RaftMessages.RaftMessage<MEMBER> incomingMessage )
public boolean validate( RaftMessages.RaftMessage<MEMBER> incomingMessage, StoreId storeId )
{
try
{
Outcome<MEMBER> outcome = currentRole.handler.validate( incomingMessage, state, log, localDatabase );

Outcome<MEMBER> outcome = currentRole.handler.validate( incomingMessage, storeId, state, localDatabase );
boolean processable = outcome.isProcessable();
if ( !processable )
{
Expand Down
Expand Up @@ -25,13 +25,14 @@
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.state.RaftState;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.logging.Log;

public interface RaftMessageHandler
{
<MEMBER> Outcome<MEMBER> handle( RaftMessages.RaftMessage<MEMBER> message, ReadableRaftState<MEMBER> context,
Log log, LocalDatabase localDatabase ) throws IOException;

<MEMBER> Outcome<MEMBER> validate( RaftMessages.RaftMessage<MEMBER> message, RaftState<MEMBER> context,
Log log, LocalDatabase localDatabase );
<MEMBER> Outcome<MEMBER> validate( RaftMessages.RaftMessage<MEMBER> message, StoreId storeId,
RaftState<MEMBER> context, LocalDatabase localDatabase );
}
Expand Up @@ -25,6 +25,8 @@
import java.util.List;
import java.util.Objects;

import com.sun.org.apache.xml.internal.security.algorithms.MessageDigestAlgorithm;

import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
Expand Down Expand Up @@ -60,7 +62,6 @@ interface RaftMessage<MEMBER> extends Message
{
MEMBER from();
Type type();
StoreId storeId();
}

class Directed<MEMBER>
Expand Down Expand Up @@ -100,10 +101,9 @@ class Request<MEMBER> extends BaseMessage<MEMBER>
private long lastLogIndex;
private long lastLogTerm;

public Request( MEMBER from, long term, MEMBER candidate, long lastLogIndex, long lastLogTerm,
StoreId storeId)
public Request( MEMBER from, long term, MEMBER candidate, long lastLogIndex, long lastLogTerm )
{
super( from, Type.VOTE_REQUEST, storeId );
super( from, Type.VOTE_REQUEST );
this.term = term;
this.candidate = candidate;
this.lastLogIndex = lastLogIndex;
Expand Down Expand Up @@ -171,9 +171,9 @@ class Response<MEMBER> extends BaseMessage<MEMBER>
private long term;
private boolean voteGranted;

public Response( MEMBER from, long term, boolean voteGranted, StoreId storeId )
public Response( MEMBER from, long term, boolean voteGranted )
{
super( from, Type.VOTE_RESPONSE, storeId );
super( from, Type.VOTE_RESPONSE );
this.term = term;
this.voteGranted = voteGranted;
}
Expand Down Expand Up @@ -233,9 +233,9 @@ class Request<MEMBER> extends BaseMessage<MEMBER>
private long leaderCommit;

public Request( MEMBER from, long leaderTerm, long prevLogIndex, long prevLogTerm,
RaftLogEntry[] entries, long leaderCommit, StoreId storeId )
RaftLogEntry[] entries, long leaderCommit )
{
super( from, Type.APPEND_ENTRIES_REQUEST, storeId );
super( from, Type.APPEND_ENTRIES_REQUEST );
Objects.requireNonNull( entries );
assert !((prevLogIndex == -1 && prevLogTerm != -1) || (prevLogTerm == -1 && prevLogIndex != -1)) :
format( "prevLogIndex was %d and prevLogTerm was %d", prevLogIndex, prevLogTerm );
Expand Down Expand Up @@ -312,10 +312,9 @@ class Response<MEMBER> extends BaseMessage<MEMBER>
private long matchIndex;
private long appendIndex;

public Response( MEMBER from, long term, boolean success, long matchIndex, long appendIndex,
StoreId storeId )
public Response( MEMBER from, long term, boolean success, long matchIndex, long appendIndex )
{
super( from, Type.APPEND_ENTRIES_RESPONSE, storeId );
super( from, Type.APPEND_ENTRIES_RESPONSE );
this.term = term;
this.success = success;
this.matchIndex = matchIndex;
Expand Down Expand Up @@ -373,8 +372,8 @@ public int hashCode()
@Override
public String toString()
{
return format( "AppendEntries.Response from %s {term=%d, storeId=%s, success=%s, matchIndex=%d, appendIndex=%d}",
from, term, storeId(), success, matchIndex, appendIndex );
return format( "AppendEntries.Response from %s {term=%d, success=%s, matchIndex=%d, appendIndex=%d}",
from, term, success, matchIndex, appendIndex );
}
}
}
Expand All @@ -385,9 +384,9 @@ class Heartbeat<MEMBER> extends BaseMessage<MEMBER>
private long commitIndex;
private long commitIndexTerm;

public Heartbeat( MEMBER from, long leaderTerm, long commitIndex, long commitIndexTerm, StoreId storeId )
public Heartbeat( MEMBER from, long leaderTerm, long commitIndex, long commitIndexTerm )
{
super( from, Type.HEARTBEAT, storeId );
super( from, Type.HEARTBEAT );
this.leaderTerm = leaderTerm;
this.commitIndex = commitIndex;
this.commitIndexTerm = commitIndexTerm;
Expand Down Expand Up @@ -454,9 +453,9 @@ class LogCompactionInfo<MEMBER> extends BaseMessage<MEMBER>
private long leaderTerm;
private long prevIndex;

public LogCompactionInfo( MEMBER from, long leaderTerm, long prevIndex, StoreId storeId )
public LogCompactionInfo( MEMBER from, long leaderTerm, long prevIndex )
{
super( from, Type.LOG_COMPACTION_INFO, storeId );
super( from, Type.LOG_COMPACTION_INFO );
this.leaderTerm = leaderTerm;
this.prevIndex = prevIndex;
}
Expand Down Expand Up @@ -515,7 +514,7 @@ class Election<MEMBER> extends BaseMessage<MEMBER>
{
public Election( MEMBER from )
{
super( from, Type.ELECTION_TIMEOUT, null );
super( from, Type.ELECTION_TIMEOUT );
}

@Override
Expand All @@ -529,7 +528,7 @@ class Heartbeat<MEMBER> extends BaseMessage<MEMBER>
{
public Heartbeat( MEMBER from )
{
super( from, Type.HEARTBEAT_TIMEOUT, null );
super( from, Type.HEARTBEAT_TIMEOUT );
}

@Override
Expand All @@ -546,9 +545,9 @@ class Request<MEMBER> extends BaseMessage<MEMBER>
{
private ReplicatedContent content;

public Request( MEMBER from, ReplicatedContent content, StoreId storeId )
public Request( MEMBER from, ReplicatedContent content )
{
super( from, Type.NEW_ENTRY_REQUEST, storeId );
super( from, Type.NEW_ENTRY_REQUEST );
this.content = content;
}

Expand Down Expand Up @@ -591,9 +590,9 @@ class Batch<MEMBER> extends BaseMessage<MEMBER>
{
private List<ReplicatedContent> list;

public Batch( int batchSize, StoreId storeId )
public Batch( int batchSize )
{
super( null, Type.NEW_BATCH_REQUEST, storeId );
super( null, Type.NEW_BATCH_REQUEST );
list = new ArrayList<>( batchSize );
}

Expand Down Expand Up @@ -636,17 +635,61 @@ public List<ReplicatedContent> contents()
}
}

class StoreIdAwareMessage<MEMBER> implements Message
{
private final StoreId storeId;
private final RaftMessage<MEMBER> message;

public StoreIdAwareMessage( StoreId storeId, RaftMessage<MEMBER> message )
{
Objects.requireNonNull( message );
this.storeId = storeId;
this.message = message;
}

public StoreId storeId()
{
return storeId;
}

public RaftMessage<MEMBER> message()
{
return message;
}

@Override
public boolean equals( Object o )
{
if ( this == o )
{
return true;
}
if ( o == null || getClass() != o.getClass() )
{
return false;
}
StoreIdAwareMessage<?> that = (StoreIdAwareMessage<?>) o;
return Objects.equals( message, that.message ) &&
(storeId == that.storeId || (storeId != null && storeId.theRealEquals( that.storeId )));
}

@Override
public int hashCode()
{
int result = 31 + (storeId == null ? 0 : storeId.theRealHashCode());
return 31 * result + Objects.hash( message );
}
}

abstract class BaseMessage<MEMBER> implements RaftMessage<MEMBER>
{
protected MEMBER from;
private Type type;
private StoreId storeId;

public BaseMessage( MEMBER from, Type type, StoreId storeId )
public BaseMessage( MEMBER from, Type type )
{
this.from = from;
this.type = type;
this.storeId = storeId;
}

@Override
Expand All @@ -661,12 +704,6 @@ public Type type()
return type;
}

@Override
public StoreId storeId()
{
return storeId;
}

@Override
public boolean equals( Object o )
{
Expand All @@ -679,16 +716,13 @@ public boolean equals( Object o )
return false;
}
BaseMessage<?> that = (BaseMessage<?>) o;
return Objects.equals( from, that.from ) &&
type == that.type &&
((storeId == that.storeId) || (storeId != null && storeId.theRealEquals( that.storeId )));
return Objects.equals( from, that.from ) && type == that.type;
}

@Override
public int hashCode()
{
int result = storeId == null ? 0 : storeId.theRealHashCode();
return 31 * result + Objects.hash( from, type );
return Objects.hash( from, type );
}
}
}
Expand Up @@ -42,6 +42,7 @@
import org.neo4j.coreedge.server.ListenSocketAddress;
import org.neo4j.coreedge.server.logging.ExceptionLoggingHandler;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
Expand Down Expand Up @@ -123,13 +124,15 @@ public void registerHandler( Inbound.MessageHandler<RaftMessages.RaftMessage<MEM
this.messageHandler = handler;
}

private class RaftMessageHandler extends SimpleChannelInboundHandler<RaftMessages.RaftMessage<MEMBER>>
private class RaftMessageHandler extends SimpleChannelInboundHandler<RaftMessages.StoreIdAwareMessage<MEMBER>>
{
@Override
protected void channelRead0( ChannelHandlerContext channelHandlerContext,
RaftMessages.RaftMessage<MEMBER> message ) throws Exception
RaftMessages.StoreIdAwareMessage<MEMBER> storeIdAwareMessage ) throws Exception
{
if ( messageHandler.validate( message ) )
RaftMessages.RaftMessage<MEMBER> message = storeIdAwareMessage.message();
StoreId storeId = storeIdAwareMessage.storeId();
if ( messageHandler.validate( message, storeId ) )
{
messageHandler.handle( message );
}
Expand Down
Expand Up @@ -20,14 +20,15 @@
package org.neo4j.coreedge.raft.net;

import org.neo4j.coreedge.network.Message;
import org.neo4j.kernel.impl.store.StoreId;

public interface Inbound<M extends Message>
{
void registerHandler( MessageHandler<M> handler );

interface MessageHandler<M extends Message>
{
boolean validate( M message );
boolean validate( M message, StoreId storeId );

void handle( M message );
}
Expand Down

0 comments on commit 2651e8c

Please sign in to comment.