Skip to content

Commit

Permalink
Log the first heartbeat after a series of missed
Browse files Browse the repository at this point in the history
Previously we logged all heartbeat messages. This is too much logging
for the system to handle, so we removed it. This change introduces
logging of the interesting heartbeat messages, the ones for when the
heartbeat comes back after having missed a few beats. We already log
heartbeat timeouts, so adding logging of when the heartbeat returns is
sufficient for being able to reason about the heartbeats.
  • Loading branch information
thobe committed Nov 11, 2015
1 parent 62a2537 commit ffa5c68
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 17 deletions.
Expand Up @@ -70,14 +70,22 @@ public static <MESSAGETYPE extends MessageType> Message<MESSAGETYPE> timeout( ME
public static <MESSAGETYPE extends MessageType> Message<MESSAGETYPE> timeout( MESSAGETYPE message, public static <MESSAGETYPE extends MessageType> Message<MESSAGETYPE> timeout( MESSAGETYPE message,
Message<?> causedBy, Object payload ) Message<?> causedBy, Object payload )
{ {
return causedBy.copyHeadersTo( new Message<MESSAGETYPE>( message, payload ), Message.CONVERSATION_ID, Message<MESSAGETYPE> timeout = causedBy.copyHeadersTo( new Message<>( message, payload ),
Message.CREATED_BY ); Message.CONVERSATION_ID, Message.CREATED_BY );
int timeoutCount = 0;
if ( causedBy.hasHeader( TIMEOUT_COUNT ) )
{
timeoutCount = Integer.parseInt( causedBy.getHeader( TIMEOUT_COUNT ) ) + 1;
}
timeout.setHeader( TIMEOUT_COUNT, "" + timeoutCount );
return timeout;
} }




// Standard headers // Standard headers
public static final String CONVERSATION_ID = "conversation-id"; public static final String CONVERSATION_ID = "conversation-id";
public static final String CREATED_BY = "created-by"; public static final String CREATED_BY = "created-by";
public static final String TIMEOUT_COUNT = "timeout-count";
public static final String FROM = "from"; public static final String FROM = "from";
public static final String TO = "to"; public static final String TO = "to";
public static final String INSTANCE_ID = "instance-id"; public static final String INSTANCE_ID = "instance-id";
Expand Down
Expand Up @@ -26,5 +26,7 @@ public interface TimeoutsContext
{ {
void setTimeout( Object key, Message<? extends MessageType> timeoutMessage ); void setTimeout( Object key, Message<? extends MessageType> timeoutMessage );


void cancelTimeout( Object key ); Message<? extends MessageType> cancelTimeout( Object key );

long getTimeoutFor( Message<? extends MessageType> timeoutMessage );
} }
Expand Up @@ -91,9 +91,15 @@ public void setTimeout( Object key, Message<? extends MessageType> timeoutMessag
} }


@Override @Override
public void cancelTimeout( Object key ) public long getTimeoutFor( Message<? extends MessageType> timeoutMessage )
{ {
timeouts.cancelTimeout( key ); return timeouts.getTimeoutFor( timeoutMessage );
}

@Override
public Message<? extends MessageType> cancelTimeout( Object key )
{
return timeouts.cancelTimeout( key );
} }


// ConfigurationContext // ConfigurationContext
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.neo4j.cluster.InstanceId; import org.neo4j.cluster.InstanceId;
import org.neo4j.cluster.com.message.Message; import org.neo4j.cluster.com.message.Message;
import org.neo4j.cluster.com.message.MessageHolder; import org.neo4j.cluster.com.message.MessageHolder;
import org.neo4j.cluster.com.message.MessageType;
import org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.LearnerMessage; import org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.LearnerMessage;
import org.neo4j.cluster.statemachine.State; import org.neo4j.cluster.statemachine.State;


Expand Down Expand Up @@ -122,11 +123,7 @@ public HeartbeatState handle( HeartbeatContext context,
} }
} }


context.cancelTimeout( HeartbeatMessage.i_am_alive + "-" + resetTimeout( context, message, state );
state.getServer() );
context.setTimeout( HeartbeatMessage.i_am_alive + "-" +
state.getServer(), timeout( HeartbeatMessage.timed_out, message, state
.getServer() ) );


// Check if this server knows something that we don't // Check if this server knows something that we don't
if ( message.hasHeader( "last-learned" ) ) if ( message.hasHeader( "last-learned" ) )
Expand Down Expand Up @@ -267,5 +264,24 @@ public HeartbeatState handle( HeartbeatContext context,


return this; return this;
} }

private void resetTimeout( HeartbeatContext context, Message<HeartbeatMessage> message,
HeartbeatMessage.IAmAliveState state )
{
String key = HeartbeatMessage.i_am_alive + "-" + state.getServer();
Message<? extends MessageType> oldTimeout = context.cancelTimeout( key );
if ( oldTimeout != null && oldTimeout.hasHeader( Message.TIMEOUT_COUNT ) )
{
int timeoutCount = Integer.parseInt( oldTimeout.getHeader( Message.TIMEOUT_COUNT ) );
if ( timeoutCount > 0 )
{
long timeout = context.getTimeoutFor( oldTimeout );
context.getInternalLog( HeartbeatState.class ).debug(
"Received " + state + " after missing " + timeoutCount +
" (" + timeout * timeoutCount + "ms)" );
}
}
context.setTimeout( key, timeout( HeartbeatMessage.timed_out, message, state.getServer() ) );
}
} }
} }
Expand Up @@ -26,6 +26,9 @@
*/ */
public interface TimeoutStrategy public interface TimeoutStrategy
{ {
/**
* @return the timeout (in milliseconds) for the given message.
*/
long timeoutFor( Message message ); long timeoutFor( Message message );


void timeoutTriggered( Message timeoutMessage ); void timeoutTriggered( Message timeoutMessage );
Expand Down
Expand Up @@ -72,19 +72,26 @@ public void setTimeout( Object key, Message<? extends MessageType> timeoutMessag
timeouts.put( key, new Timeout( timeoutAt, timeoutMessage ) ); timeouts.put( key, new Timeout( timeoutAt, timeoutMessage ) );
} }


public long getTimeoutFor( Message<? extends MessageType> timeoutMessage )
{
return timeoutStrategy.timeoutFor( timeoutMessage );
}

/** /**
* Cancel a timeout corresponding to a particular key. Use the same key * Cancel a timeout corresponding to a particular key. Use the same key
* that was used to set it up. * that was used to set it up.
* *
* @param key * @param key
*/ */
public void cancelTimeout( Object key ) public Message<? extends MessageType> cancelTimeout( Object key )
{ {
Timeout timeout = timeouts.remove( key ); Timeout timeout = timeouts.remove( key );
if ( timeout != null ) if ( timeout != null )
{ {
timeoutStrategy.timeoutCancelled( timeout.timeoutMessage ); timeoutStrategy.timeoutCancelled( timeout.timeoutMessage );
return timeout.getTimeoutMessage();
} }
return null;
} }


/** /**
Expand Down
Expand Up @@ -19,16 +19,21 @@
*/ */
package org.neo4j.cluster.protocol.heartbeat; package org.neo4j.cluster.protocol.heartbeat;


import java.net.URI;
import java.util.concurrent.Executor;

import org.junit.Test; import org.junit.Test;
import org.mockito.Matchers; import org.mockito.Matchers;
import org.mockito.Mockito; import org.mockito.Mockito;


import java.net.URI; import org.neo4j.cluster.DelayedDirectExecutor;
import java.util.concurrent.Executor;

import org.neo4j.cluster.InstanceId; import org.neo4j.cluster.InstanceId;
import org.neo4j.cluster.StateMachines;
import org.neo4j.cluster.com.message.Message; import org.neo4j.cluster.com.message.Message;
import org.neo4j.cluster.com.message.MessageHolder; import org.neo4j.cluster.com.message.MessageHolder;
import org.neo4j.cluster.com.message.MessageSender;
import org.neo4j.cluster.com.message.MessageSource;
import org.neo4j.cluster.protocol.MessageArgumentMatcher;
import org.neo4j.cluster.protocol.atomicbroadcast.ObjectInputStreamFactory; import org.neo4j.cluster.protocol.atomicbroadcast.ObjectInputStreamFactory;
import org.neo4j.cluster.protocol.atomicbroadcast.ObjectOutputStreamFactory; import org.neo4j.cluster.protocol.atomicbroadcast.ObjectOutputStreamFactory;
import org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.AcceptorInstanceStore; import org.neo4j.cluster.protocol.atomicbroadcast.multipaxos.AcceptorInstanceStore;
Expand All @@ -37,20 +42,26 @@
import org.neo4j.cluster.protocol.cluster.ClusterConfiguration; import org.neo4j.cluster.protocol.cluster.ClusterConfiguration;
import org.neo4j.cluster.protocol.election.ElectionCredentialsProvider; import org.neo4j.cluster.protocol.election.ElectionCredentialsProvider;
import org.neo4j.cluster.protocol.election.ElectionRole; import org.neo4j.cluster.protocol.election.ElectionRole;
import org.neo4j.cluster.protocol.MessageArgumentMatcher; import org.neo4j.cluster.statemachine.StateMachine;
import org.neo4j.cluster.timeout.TimeoutStrategy;
import org.neo4j.cluster.timeout.Timeouts; import org.neo4j.cluster.timeout.Timeouts;
import org.neo4j.helpers.collection.Iterables; import org.neo4j.helpers.collection.Iterables;
import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.logging.NullLogService; import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.logging.SimpleLogService;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.NullLogProvider; import org.neo4j.logging.NullLogProvider;


import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.neo4j.logging.AssertableLogProvider.inLog;


public class HeartbeatStateTest public class HeartbeatStateTest
{ {
Expand Down Expand Up @@ -162,4 +173,82 @@ public void shouldAddInstanceIdHeaderInCatchUpMessages() throws Throwable
verify( holder, times( 1 ) ).offer( Matchers.argThat( new MessageArgumentMatcher<LearnerMessage>() verify( holder, times( 1 ) ).offer( Matchers.argThat( new MessageArgumentMatcher<LearnerMessage>()
.onMessageType( LearnerMessage.catchUp ).withHeader( Message.INSTANCE_ID, "2" ) ) ); .onMessageType( LearnerMessage.catchUp ).withHeader( Message.INSTANCE_ID, "2" ) ) );
} }

@Test
public void shouldLogFirstHeartbeatAfterTimeout() throws Throwable
{
// given
InstanceId instanceId = new InstanceId( 1 ), otherInstance = new InstanceId( 2 );
ClusterConfiguration configuration = new ClusterConfiguration( "whatever", NullLogProvider.getInstance(),
"cluster://1", "cluster://2" );
configuration.getMembers().put( otherInstance, URI.create( "cluster://2" ) );
AssertableLogProvider userLog = new AssertableLogProvider( true );
AssertableLogProvider internalLog = new AssertableLogProvider( true );
LogService logging = new SimpleLogService( userLog, internalLog );
TimeoutStrategy timeoutStrategy = mock( TimeoutStrategy.class );
Timeouts timeouts = new Timeouts( timeoutStrategy );

MultiPaxosContext context = new MultiPaxosContext(
instanceId,
Iterables.<ElectionRole,ElectionRole>iterable( new ElectionRole( "coordinator" ) ),
configuration,
mock( Executor.class ),
logging,
mock( ObjectInputStreamFactory.class ),
mock( ObjectOutputStreamFactory.class ),
mock( AcceptorInstanceStore.class ),
timeouts,
mock( ElectionCredentialsProvider.class ) );

StateMachines stateMachines = new StateMachines(
logging.getInternalLogProvider(),
mock( StateMachines.Monitor.class ),
mock( MessageSource.class ),
mock( MessageSender.class ),
timeouts,
mock( DelayedDirectExecutor.class ),
new Executor()
{
@Override
public void execute( Runnable command )
{
command.run();
}
},
instanceId );
stateMachines.addStateMachine(
new StateMachine( context.getHeartbeatContext(), HeartbeatMessage.class, HeartbeatState.start,
logging.getInternalLogProvider() ) );

timeouts.tick( 0 );
when( timeoutStrategy.timeoutFor( any( Message.class ) ) ).thenReturn( 5l );

// when
stateMachines.process( Message.internal( HeartbeatMessage.join ) );
stateMachines.process(
Message.internal( HeartbeatMessage.i_am_alive, new HeartbeatMessage.IAmAliveState( otherInstance ) )
.setHeader( Message.CREATED_BY, otherInstance.toString() ) );
for ( int i = 1; i <= 15; i++ )
{
timeouts.tick( i );
}

// then
verify( timeoutStrategy, times( 3 ) ).timeoutTriggered( argThat( new MessageArgumentMatcher<>()
.onMessageType( HeartbeatMessage.timed_out ) ) );
internalLog.assertExactly(
inLog( HeartbeatState.class ).debug( "Received timed out for server 2" ),
inLog( HeartbeatContext.class ).info( "1(me) is now suspecting 2" ),
inLog( HeartbeatState.class ).debug( "Received timed out for server 2" ),
inLog( HeartbeatState.class ).debug( "Received timed out for server 2" ) );
internalLog.clear();

// when
stateMachines.process(
Message.internal( HeartbeatMessage.i_am_alive, new HeartbeatMessage.IAmAliveState( otherInstance ) )
.setHeader( Message.CREATED_BY, otherInstance.toString() ) );

// then
internalLog.assertExactly( inLog( HeartbeatState.class ).debug( "Received i_am_alive[2] after missing 3 (15ms)" ) );
}
} }
Expand Up @@ -69,9 +69,14 @@ public void setTimeout( Object key, Message<? extends MessageType> timeoutMessag
} }


@Override @Override
public void cancelTimeout( Object key ) public Message<? extends MessageType> cancelTimeout( Object key )
{ {
timeouts.remove( key ); Pair<ProverTimeout,Long> timeout = timeouts.remove( key );
if ( timeout != null )
{
return timeout.first().getTimeoutMessage();
}
return null;
} }


@Override @Override
Expand Down

0 comments on commit ffa5c68

Please sign in to comment.