Skip to content

Commit

Permalink
Merge pull request #2091 from jakewins/1.9-learnit
Browse files Browse the repository at this point in the history
Fixes problem with re-electing master on message loss
  • Loading branch information
rickardoberg committed Mar 18, 2014
2 parents 768b644 + e2876d8 commit 5d9e836
Show file tree
Hide file tree
Showing 28 changed files with 340 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
import org.jboss.netty.logging.InternalLoggerFactory;

import org.neo4j.cluster.BindingListener;
import org.neo4j.cluster.InstanceId;
import org.neo4j.cluster.ClusterMonitor;
import org.neo4j.cluster.ClusterSettings;
import org.neo4j.cluster.ExecutorLifecycleAdapter;
import org.neo4j.cluster.InstanceId;
import org.neo4j.cluster.MultiPaxosServerFactory;
import org.neo4j.cluster.ProtocolServer;
import org.neo4j.cluster.StateMachines;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,45 @@ public <MESSAGETYPE extends MessageType> Message<MESSAGETYPE> copyHeadersTo( Mes
return message;
}

@Override
public boolean equals( Object o )
{
if ( this == o )
{
return true;
}
if ( o == null || getClass() != o.getClass() )
{
return false;
}

Message message = (Message) o;

if ( headers != null ? !headers.equals( message.headers ) : message.headers != null )
{
return false;
}
if ( messageType != null ? !messageType.equals( message.messageType ) : message.messageType != null )
{
return false;
}
if ( payload != null ? !payload.equals( message.payload ) : message.payload != null )
{
return false;
}

return true;
}

@Override
public int hashCode()
{
int result = messageType != null ? messageType.hashCode() : 0;
result = 31 * result + (payload != null ? payload.hashCode() : 0);
result = 31 * result + (headers != null ? headers.hashCode() : 0);
return result;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

public interface ConfigurationContext
{
org.neo4j.cluster.InstanceId getMyId();
InstanceId getMyId();

List<URI> getMemberURIs();

Expand All @@ -37,7 +37,7 @@ public interface ConfigurationContext

Map<InstanceId,URI> getMembers();

org.neo4j.cluster.InstanceId getCoordinator();
InstanceId getCoordinator();

URI getUriForId( InstanceId id );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ public AtomicBroadcastState handle( AtomicBroadcastContext context,
else
{
outgoing.offer( message.copyHeadersTo( internal( ProposerMessage.propose,
message.getPayload() ), Message.CONVERSATION_ID, InstanceId.INSTANCE ) );
message.getPayload() ), Message.CONVERSATION_ID, org.neo4j.cluster.protocol
.atomicbroadcast.multipaxos.InstanceId.INSTANCE ) );
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
class BiasedWinnerStrategy implements MultiPaxosContext.WinnerStrategy
{
private ClusterContext electionContext;
private final org.neo4j.cluster.InstanceId biasedNode;
private final InstanceId biasedNode;
private final boolean nodePromoted;

private BiasedWinnerStrategy( ClusterContext electionContext, InstanceId biasedNode, boolean nodePromoted )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,23 @@ public interface LearnerContext

long getLastKnownLearnedInstanceInCluster();

void setLastKnownLearnedInstanceInCluster( long lastKnownLearnedInstanceInCluster );

void learnedInstanceId( long instanceId );

boolean hasDeliveredAllKnownInstances();

void leave();

PaxosInstance getPaxosInstance( InstanceId instanceId );
PaxosInstance getPaxosInstance( org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId instanceId );

AtomicBroadcastSerializer newSerializer();

Iterable<org.neo4j.cluster.InstanceId> getAlive();

void setNextInstanceId( long id );

void notifyLearnMiss( InstanceId instanceId );
void notifyLearnMiss( org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId instanceId );

org.neo4j.cluster.InstanceId getLastKnownAliveUpToDateInstance();

void setLastKnownLearnedInstanceInCluster( long lastKnownLearnedInstanceInCluster, org.neo4j.cluster.InstanceId instanceId );
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,22 @@ public LearnRequestState()
{
}

@Override
public boolean equals( Object obj )
{
if(obj == null)
{
return false;
}
return getClass() == obj.getClass();
}

@Override
public int hashCode()
{
return 1;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public LearnerState handle( LearnerContext context,
outgoing.offer( message.copyHeadersTo( Message.respond( LearnerMessage.learnFailed,
message,
new LearnerMessage.LearnFailedState() ), org.neo4j.cluster.protocol
.atomicbroadcast.multipaxos.InstanceId.INSTANCE ) );
.atomicbroadcast.multipaxos.InstanceId.INSTANCE ) );
}
break;
}
Expand Down Expand Up @@ -259,22 +259,22 @@ public LearnerState handle( LearnerContext context,
PaxosInstance instance = context.getPaxosInstance( id );
if ( !instance.isState( PaxosInstance.State.closed ) && !instance.isState( PaxosInstance.State.delivered ) )
{
for ( org.neo4j.cluster.InstanceId node : context.getAlive() )
{
URI nodeUri = context.getUriForId( node );
if ( !node.equals( context.getMyId() ) )
{
outgoing.offer( Message.to( LearnerMessage.learnRequest, nodeUri,
new LearnerMessage.LearnRequestState() ).setHeader(
org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId.INSTANCE,
id.toString() ) );
break;
}
}
URI nodeUri = context.getUriForId( context.getLastKnownAliveUpToDateInstance() );


outgoing.offer( Message.to( LearnerMessage.learnRequest,
nodeUri,
new LearnerMessage.LearnRequestState() ).setHeader(
org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId.INSTANCE,
id.toString() ) );
context.setTimeout( "learn",
Message.timeout( LearnerMessage.learnTimedout, message ) );
break;
}
}

context.setLastKnownLearnedInstanceInCluster( catchUpTo );
context.setLastKnownLearnedInstanceInCluster( catchUpTo,
context.getIdForUri( new URI(message.getHeader( Message.FROM )) ) );
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ public class MultiPaxosContext
private ClusterConfiguration configuration;
private URI boundAt;
private long lastKnownLearnedInstanceInCluster = -1;
private org.neo4j.cluster.InstanceId lastKnownAliveUpToDateInstance;
private final ObjectInputStreamFactory objectInputStreamFactory;
private final ObjectOutputStreamFactory objectOutputStreamFactory;
private long nextInstanceId = 0;

private long nextInstanceId = 0;
private final ClusterContext clusterContext;
private final ProposerContext proposerContext;
private final AcceptorContext acceptorContext;
Expand Down Expand Up @@ -228,7 +229,7 @@ public org.neo4j.cluster.InstanceId getCoordinator()
}

@Override
public URI getUriForId(org.neo4j.cluster.InstanceId node )
public URI getUriForId( org.neo4j.cluster.InstanceId node )
{
return configuration.getUriForId( node );
}
Expand Down Expand Up @@ -287,18 +288,18 @@ private class ProposerContextImpl

// ProposerContext
final Deque<Message> pendingValues = new LinkedList<Message>();
final Map<InstanceId, Message> bookedInstances = new HashMap<InstanceId, Message>();
final Map<org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId, Message> bookedInstances = new HashMap<org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId, Message>();

@Override
public InstanceId newInstanceId()
public org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId newInstanceId()
{
// Never propose something lower than last received instance id
if ( lastKnownLearnedInstanceInCluster >= nextInstanceId )
{
nextInstanceId = lastKnownLearnedInstanceInCluster + 1;
}

return new InstanceId( nextInstanceId++ );
return new org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId( nextInstanceId++ );
}

@Override
Expand All @@ -312,7 +313,7 @@ public void leave()
}

@Override
public void bookInstance( InstanceId instanceId, Message message )
public void bookInstance( org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId instanceId, Message message )
{
if ( message.getPayload() == null )
{
Expand All @@ -322,7 +323,7 @@ public void bookInstance( InstanceId instanceId, Message message )
}

@Override
public PaxosInstance getPaxosInstance( InstanceId instanceId )
public PaxosInstance getPaxosInstance( org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId instanceId )
{
return paxosInstances.getPaxosInstance( instanceId );
}
Expand Down Expand Up @@ -352,13 +353,13 @@ public boolean canBookInstance()
}

@Override
public Message getBookedInstance( InstanceId id )
public Message getBookedInstance( org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId id )
{
return bookedInstances.get( id );
}

@Override
public Message<ProposerMessage> unbookInstance( InstanceId id )
public Message<ProposerMessage> unbookInstance( org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId id )
{
return bookedInstances.remove( id );
}
Expand Down Expand Up @@ -394,7 +395,7 @@ public void patchBookedInstances( ClusterMessage.ConfigurationChangeState value
{
if ( value.getJoin() != null )
{
for ( InstanceId instanceId : bookedInstances.keySet() )
for ( org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId instanceId : bookedInstances.keySet() )
{
PaxosInstance instance = paxosInstances.getPaxosInstance( instanceId );
if ( instance.getAcceptors() != null )
Expand All @@ -415,7 +416,7 @@ public void patchBookedInstances( ClusterMessage.ConfigurationChangeState value
}
else if ( value.getLeave() != null )
{
for ( InstanceId instanceId : bookedInstances.keySet() )
for ( org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId instanceId : bookedInstances.keySet() )
{
PaxosInstance instance = paxosInstances.getPaxosInstance( instanceId );
if ( instance.getAcceptors() != null )
Expand Down Expand Up @@ -476,7 +477,7 @@ public void joining( String name, Iterable<URI> instanceList )

@Override
public void acquiredConfiguration( final Map<org.neo4j.cluster.InstanceId, URI> memberList, final Map<String,
org.neo4j.cluster.InstanceId> roles )
org.neo4j.cluster.InstanceId> roles )
{
configuration.setMembers( memberList );
configuration.setRoles( roles );
Expand Down Expand Up @@ -717,7 +718,7 @@ private class AcceptorContextImpl
implements AcceptorContext
{
@Override
public AcceptorInstance getAcceptorInstance( InstanceId instanceId )
public AcceptorInstance getAcceptorInstance( org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId instanceId )
{
return instanceStore.getAcceptorInstance( instanceId );
}
Expand Down Expand Up @@ -750,7 +751,7 @@ private class LearnerContextImpl
private long lastLearnedInstanceId = -1;

/** To minimize logging, keep track of the latest learn miss, only log when it changes. */
private InstanceId latestLearnMiss = null;
private org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId latestLearnMiss = null;

@Override
public long getLastDeliveredInstanceId()
Expand All @@ -762,7 +763,7 @@ public long getLastDeliveredInstanceId()
public void setLastDeliveredInstanceId( long lastDeliveredInstanceId )
{
this.lastDeliveredInstanceId = lastDeliveredInstanceId;
instanceStore.lastDelivered( new InstanceId( lastDeliveredInstanceId ) );
instanceStore.lastDelivered( new org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId( lastDeliveredInstanceId ) );
}

@Override
Expand All @@ -771,16 +772,32 @@ public long getLastLearnedInstanceId()
return lastLearnedInstanceId;
}


@Override
public long getLastKnownLearnedInstanceInCluster()
{
return lastKnownLearnedInstanceInCluster;
return lastKnownLearnedInstanceInCluster; //To change body of implemented methods use File | Settings | File Templates.
}

@Override
public void setLastKnownLearnedInstanceInCluster( long lastKnownLearnedInstanceInCluster, org.neo4j.cluster.InstanceId instanceId )
{
if(MultiPaxosContext.this.lastKnownLearnedInstanceInCluster <= lastKnownLearnedInstanceInCluster)
{
MultiPaxosContext.this.lastKnownLearnedInstanceInCluster = lastKnownLearnedInstanceInCluster;
MultiPaxosContext.this.lastKnownAliveUpToDateInstance = instanceId;
}
else if(lastKnownLearnedInstanceInCluster == -1)
{
// Special case for clearing the state
MultiPaxosContext.this.lastKnownLearnedInstanceInCluster = -1;
}
}

@Override
public void setLastKnownLearnedInstanceInCluster( long lastKnownLearnedInstanceInCluster )
public org.neo4j.cluster.InstanceId getLastKnownAliveUpToDateInstance()
{
MultiPaxosContext.this.lastKnownLearnedInstanceInCluster = lastKnownLearnedInstanceInCluster;
return lastKnownAliveUpToDateInstance;
}

@Override
Expand Down Expand Up @@ -808,7 +825,7 @@ public void leave()
}

@Override
public PaxosInstance getPaxosInstance( InstanceId instanceId )
public PaxosInstance getPaxosInstance( org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId instanceId )
{
return paxosInstances.getPaxosInstance( instanceId );
}
Expand All @@ -832,7 +849,7 @@ public void setNextInstanceId( long id )
}

@Override
public void notifyLearnMiss( InstanceId instanceId )
public void notifyLearnMiss( org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.InstanceId instanceId )
{
if(latestLearnMiss != instanceId)
{
Expand All @@ -849,8 +866,8 @@ private class HeartbeatContextImpl
// HeartbeatContext
Set<org.neo4j.cluster.InstanceId> failed = new HashSet<org.neo4j.cluster.InstanceId>();

Map<org.neo4j.cluster.InstanceId, Set<org.neo4j.cluster.InstanceId>> nodeSuspicions = new HashMap<org.neo4j
.cluster.InstanceId, Set<org.neo4j.cluster.InstanceId>>();
Map<org.neo4j.cluster.InstanceId, Set<org.neo4j.cluster.InstanceId>> nodeSuspicions = new HashMap<org.neo4j.cluster.InstanceId, Set<org.neo4j.cluster.InstanceId>>();


Iterable<HeartbeatListener> heartBeatListeners = Listeners.newListeners();

Expand Down
Loading

0 comments on commit 5d9e836

Please sign in to comment.