Skip to content

Commit

Permalink
Fixes for status endpoint
Browse files Browse the repository at this point in the history
Nullable fields are not included in status response
Improved flaky tests around code
  • Loading branch information
phughk committed Sep 7, 2018
1 parent a0920e0 commit 837ed25
Show file tree
Hide file tree
Showing 12 changed files with 27 additions and 108 deletions.
Expand Up @@ -53,7 +53,6 @@ public class BatchingTxApplier extends LifecycleAdapter
private final PullRequestMonitor monitor; private final PullRequestMonitor monitor;
private final PageCursorTracerSupplier pageCursorTracerSupplier; private final PageCursorTracerSupplier pageCursorTracerSupplier;
private final VersionContextSupplier versionContextSupplier; private final VersionContextSupplier versionContextSupplier;
private final ReadReplicaLastAppliedTransactionMonitor lastAppliedTransactionMonitor;
private final CommandIndexTracker commandIndexTracker; private final CommandIndexTracker commandIndexTracker;
private final Log log; private final Log log;


Expand All @@ -75,7 +74,6 @@ public BatchingTxApplier( int maxBatchSize, Supplier<TransactionIdStore> txIdSto
this.pageCursorTracerSupplier = pageCursorTracerSupplier; this.pageCursorTracerSupplier = pageCursorTracerSupplier;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.monitor = monitors.newMonitor( PullRequestMonitor.class ); this.monitor = monitors.newMonitor( PullRequestMonitor.class );
this.lastAppliedTransactionMonitor = monitors.newMonitor( ReadReplicaLastAppliedTransactionMonitor.class );
this.versionContextSupplier = versionContextSupplier; this.versionContextSupplier = versionContextSupplier;
this.commandIndexTracker = commandIndexTracker; this.commandIndexTracker = commandIndexTracker;
} }
Expand All @@ -89,7 +87,6 @@ public void start()
{ {
commitProcess.commit( first, NULL, EXTERNAL ); commitProcess.commit( first, NULL, EXTERNAL );
pageCursorTracerSupplier.get().reportEvents(); // Report paging metrics for the commit pageCursorTracerSupplier.get().reportEvents(); // Report paging metrics for the commit
lastAppliedTransactionMonitor.applyTransaction( last.transactionId() );
long lastAppliedRaftLogIndex = LogIndexTxHeaderEncoding.decodeLogIndexFromTxHeader( last.transactionRepresentation().additionalHeader() ); long lastAppliedRaftLogIndex = LogIndexTxHeaderEncoding.decodeLogIndexFromTxHeader( last.transactionRepresentation().additionalHeader() );
commandIndexTracker.setAppliedCommandIndex( lastAppliedRaftLogIndex ); commandIndexTracker.setAppliedCommandIndex( lastAppliedRaftLogIndex );
} ); } );
Expand Down

This file was deleted.

This file was deleted.

Expand Up @@ -149,7 +149,7 @@ public ConsensusModule( MemberId myself, final PlatformModule platformModule,
config.get( refuse_to_be_leader ), config.get( refuse_to_be_leader ),
supportsPreVoting, platformModule.monitors ); supportsPreVoting, platformModule.monitors );


DurationSinceLastMessageMonitor durationSinceLastMessageMonitor = new DurationSinceLastMessageMonitor( logProvider ); DurationSinceLastMessageMonitor durationSinceLastMessageMonitor = new DurationSinceLastMessageMonitor();
platformModule.monitors.addMonitorListener( durationSinceLastMessageMonitor ); platformModule.monitors.addMonitorListener( durationSinceLastMessageMonitor );
platformModule.dependencies.satisfyDependency( durationSinceLastMessageMonitor ); platformModule.dependencies.satisfyDependency( durationSinceLastMessageMonitor );


Expand Down
Expand Up @@ -30,24 +30,15 @@
public class DurationSinceLastMessageMonitor implements RaftMessageTimerResetMonitor public class DurationSinceLastMessageMonitor implements RaftMessageTimerResetMonitor
{ {
private long lastMessageNanos = -1; private long lastMessageNanos = -1;
private final Log log;

public DurationSinceLastMessageMonitor( LogProvider logProvider )
{
log = logProvider.getLog( DurationSinceLastMessageMonitor.class );
log.info( "Created duration tracker" );
}


@Override @Override
public void timerReset() public void timerReset()
{ {
lastMessageNanos = System.nanoTime(); lastMessageNanos = System.nanoTime();
log.info( "Timer reset" );
} }


public Duration durationSinceLastMessage() public Duration durationSinceLastMessage()
{ {
log.info( "Duration retrieved" );
return Duration.ofNanos( System.nanoTime() - lastMessageNanos ); return Duration.ofNanos( System.nanoTime() - lastMessageNanos );
} }
} }
Expand Down
Expand Up @@ -49,8 +49,6 @@
import org.neo4j.causalclustering.catchup.storecopy.StoreFiles; import org.neo4j.causalclustering.catchup.storecopy.StoreFiles;
import org.neo4j.causalclustering.catchup.tx.BatchingTxApplier; import org.neo4j.causalclustering.catchup.tx.BatchingTxApplier;
import org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess; import org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess;
import org.neo4j.causalclustering.catchup.tx.ReadReplicaLastAppliedTransactionMonitor;
import org.neo4j.causalclustering.catchup.tx.TrackingLastAppliedTransactionMonitor;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory; import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory;
import org.neo4j.causalclustering.catchup.tx.TxPullClient; import org.neo4j.causalclustering.catchup.tx.TxPullClient;
import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.CausalClusteringSettings;
Expand Down Expand Up @@ -276,11 +274,6 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule,
LifeSupport txPulling = new LifeSupport(); LifeSupport txPulling = new LifeSupport();
int maxBatchSize = config.get( CausalClusteringSettings.read_replica_transaction_applier_batch_size ); int maxBatchSize = config.get( CausalClusteringSettings.read_replica_transaction_applier_batch_size );


// Used in the web interface to correctly report the transaction id
TrackingLastAppliedTransactionMonitor readReplicaLastAppliedTransactionMonitor = new TrackingLastAppliedTransactionMonitor();
dependencies.satisfyDependency( readReplicaLastAppliedTransactionMonitor );
platformModule.monitors.addMonitorListener( readReplicaLastAppliedTransactionMonitor );

CommandIndexTracker commandIndexTracker = platformModule.dependencies.satisfyDependency( new CommandIndexTracker() ); CommandIndexTracker commandIndexTracker = platformModule.dependencies.satisfyDependency( new CommandIndexTracker() );
BatchingTxApplier batchingTxApplier = new BatchingTxApplier( BatchingTxApplier batchingTxApplier = new BatchingTxApplier(
maxBatchSize, () -> localDatabase.dataSource().getDependencyResolver().resolveDependency( TransactionIdStore.class ), writableCommitProcess, maxBatchSize, () -> localDatabase.dataSource().getDependencyResolver().resolveDependency( TransactionIdStore.class ), writableCommitProcess,
Expand Down
Expand Up @@ -23,8 +23,10 @@
package org.neo4j.server.rest.causalclustering; package org.neo4j.server.rest.causalclustering;


import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.annotate.JsonSerialize;


import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import java.util.Collection; import java.util.Collection;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -54,7 +56,7 @@ public Response discover()
} }


Response statusResponse( long lastAppliedRaftIndex, boolean isParticipatingInRaftGroup, Collection<MemberId> votingMembers, boolean isHealthy, Response statusResponse( long lastAppliedRaftIndex, boolean isParticipatingInRaftGroup, Collection<MemberId> votingMembers, boolean isHealthy,
MemberId memberId, MemberId leader, Long millisSinceLastLeaderMessage, boolean isCore ) MemberId memberId, MemberId leader, Duration millisSinceLastLeaderMessage, boolean isCore )
{ {
String jsonObject; String jsonObject;
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
Expand Down
Expand Up @@ -22,11 +22,17 @@
*/ */
package org.neo4j.server.rest.causalclustering; package org.neo4j.server.rest.causalclustering;


import org.codehaus.jackson.map.annotate.JsonSerialize;

import java.time.Duration;
import java.util.Collection; import java.util.Collection;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;


import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;


@JsonSerialize( include = JsonSerialize.Inclusion.NON_NULL )
public class ClusterStatusResponse public class ClusterStatusResponse
{ {
private final boolean isCore; private final boolean isCore;
Expand All @@ -39,22 +45,15 @@ public class ClusterStatusResponse
private final Long millisSinceLastLeaderMessage; private final Long millisSinceLastLeaderMessage;


ClusterStatusResponse( long lastAppliedRaftIndex, boolean isParticipatingInRaftGroup, Collection<MemberId> votingMembers, boolean isHealthy, ClusterStatusResponse( long lastAppliedRaftIndex, boolean isParticipatingInRaftGroup, Collection<MemberId> votingMembers, boolean isHealthy,
MemberId memberId, MemberId leader, Long millisSinceLastLeaderMessage, boolean isCore ) MemberId memberId, MemberId leader, Duration millisSinceLastLeaderMessage, boolean isCore )
{ {
this.lastAppliedRaftIndex = lastAppliedRaftIndex; this.lastAppliedRaftIndex = lastAppliedRaftIndex;
this.isParticipatingInRaftGroup = isParticipatingInRaftGroup; this.isParticipatingInRaftGroup = isParticipatingInRaftGroup;
this.votingMembers = votingMembers.stream().map( member -> member.getUuid().toString() ).sorted().collect( Collectors.toList() ); this.votingMembers = votingMembers.stream().map( member -> member.getUuid().toString() ).sorted().collect( Collectors.toList() );
this.isHealthy = isHealthy; this.isHealthy = isHealthy;
this.memberId = memberId.getUuid().toString(); this.memberId = memberId.getUuid().toString();
if ( leader != null ) this.leader = Optional.ofNullable( leader ).map( MemberId::getUuid ).map( UUID::toString ).orElse( null );
{ this.millisSinceLastLeaderMessage = Optional.ofNullable( millisSinceLastLeaderMessage ).map( Duration::toMillis ).orElse( null );
this.leader = leader.getUuid().toString();
}
else
{
this.leader = "";
}
this.millisSinceLastLeaderMessage = millisSinceLastLeaderMessage;
this.isCore = isCore; this.isCore = isCore;
} }


Expand Down
Expand Up @@ -22,6 +22,7 @@
*/ */
package org.neo4j.server.rest.causalclustering; package org.neo4j.server.rest.causalclustering;


import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
Expand Down Expand Up @@ -105,7 +106,7 @@ public Response description()
boolean participatingInRaftGroup = votingMembers.contains( myself ) && Objects.nonNull( leader ); boolean participatingInRaftGroup = votingMembers.contains( myself ) && Objects.nonNull( leader );


long lastAppliedRaftIndex = commandIndexTracker.getAppliedCommandIndex(); long lastAppliedRaftIndex = commandIndexTracker.getAppliedCommandIndex();
long millisSinceLastLeaderMessage = raftMessageTimerResetMonitor.durationSinceLastMessage().toMillis(); Duration millisSinceLastLeaderMessage = raftMessageTimerResetMonitor.durationSinceLastMessage();


return statusResponse( lastAppliedRaftIndex, participatingInRaftGroup, votingMembers, databaseHealth.isHealthy(), myself, leader, return statusResponse( lastAppliedRaftIndex, participatingInRaftGroup, votingMembers, databaseHealth.isHealthy(), myself, leader,
millisSinceLastLeaderMessage, true ); millisSinceLastLeaderMessage, true );
Expand Down
Expand Up @@ -22,6 +22,7 @@
*/ */
package org.neo4j.server.rest.causalclustering; package org.neo4j.server.rest.causalclustering;


import java.time.Duration;
import java.util.Collection; import java.util.Collection;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;


Expand Down Expand Up @@ -94,7 +95,7 @@ public Response description()
.orElse( null ); .orElse( null );
long lastAppliedRaftIndex = commandIndexTracker.getAppliedCommandIndex(); long lastAppliedRaftIndex = commandIndexTracker.getAppliedCommandIndex();
// leader message duration is meaningless for replicas since communication is not guaranteed with leader and transactions are streamed periodically // leader message duration is meaningless for replicas since communication is not guaranteed with leader and transactions are streamed periodically
Long millisSinceLastLeaderMessage = null; Duration millisSinceLastLeaderMessage = null;
return statusResponse( lastAppliedRaftIndex, false, votingMembers, isHealthy, memberId, leader, millisSinceLastLeaderMessage, false ); return statusResponse( lastAppliedRaftIndex, false, votingMembers, isHealthy, memberId, leader, millisSinceLastLeaderMessage, false );
} }
} }
Expand Up @@ -61,6 +61,7 @@
import static javax.ws.rs.core.Response.Status.OK; import static javax.ws.rs.core.Response.Status.OK;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -102,7 +103,7 @@ public void setup() throws Exception
topologyService = dependencyResolver.satisfyDependency( topologyService = dependencyResolver.satisfyDependency(
new FakeTopologyService( Arrays.asList( core2, core3 ), Collections.singleton( replica ), myself, RoleInfo.FOLLOWER ) ); new FakeTopologyService( Arrays.asList( core2, core3 ), Collections.singleton( replica ), myself, RoleInfo.FOLLOWER ) );


raftMessageTimerResetMonitor = dependencyResolver.satisfyDependency( new DurationSinceLastMessageMonitor( logProvider ) ); raftMessageTimerResetMonitor = dependencyResolver.satisfyDependency( new DurationSinceLastMessageMonitor() );
raftMachine = dependencyResolver.satisfyDependency( mock( RaftMachine.class ) ); raftMachine = dependencyResolver.satisfyDependency( mock( RaftMachine.class ) );
commandIndexTracker = dependencyResolver.satisfyDependency( new CommandIndexTracker() ); commandIndexTracker = dependencyResolver.satisfyDependency( new CommandIndexTracker() );


Expand Down Expand Up @@ -176,12 +177,13 @@ public void testAnswersWhenFollower()
} }


@Test @Test
public void expectedStatusFieldsAreIncluded() throws IOException, NoLeaderFoundException public void expectedStatusFieldsAreIncluded() throws IOException, NoLeaderFoundException, InterruptedException
{ {
// given ideal normal conditions // given ideal normal conditions
commandIndexTracker.setAppliedCommandIndex( 123 ); commandIndexTracker.setAppliedCommandIndex( 123 );
when( raftMachine.getLeader() ).thenReturn( core2 ); when( raftMachine.getLeader() ).thenReturn( core2 );
raftMessageTimerResetMonitor.timerReset(); raftMessageTimerResetMonitor.timerReset();
Thread.sleep( 1 ); // Sometimes the test can be fast. This guarantees at least 1 ms since message received


// and helpers // and helpers
List<String> votingMembers = List<String> votingMembers =
Expand All @@ -206,7 +208,7 @@ public void expectedStatusFieldsAreIncluded() throws IOException, NoLeaderFoundE
public void notParticipatingInRaftGroupWhenNotInVoterSet() throws IOException public void notParticipatingInRaftGroupWhenNotInVoterSet() throws IOException
{ {
// given not in voting set // given not in voting set
topologyService.replaceWithRole( core2, RoleInfo.LEADER ); // TODO necessary? topologyService.replaceWithRole( core2, RoleInfo.LEADER );
when( raftMembershipManager.votingMembers() ).thenReturn( new HashSet<>( Arrays.asList( core2, core3 ) ) ); when( raftMembershipManager.votingMembers() ).thenReturn( new HashSet<>( Arrays.asList( core2, core3 ) ) );


// when // when
Expand Down Expand Up @@ -236,7 +238,6 @@ public void databaseHealthIsReflected() throws IOException
{ {
// given database is not healthy // given database is not healthy
databaseHealth.panic( new RuntimeException() ); databaseHealth.panic( new RuntimeException() );
topologyService.replaceWithRole( myself, RoleInfo.LEADER ); // TODO necessary?


// when // when
Response description = status.description(); Response description = status.description();
Expand All @@ -247,7 +248,7 @@ public void databaseHealthIsReflected() throws IOException
} }


@Test @Test
public void leaderIsEmptyStringIfNonExistent() throws IOException public void leaderNotIncludedIfUnknown() throws IOException
{ {
// given no leader // given no leader
topologyService.replaceWithRole( null, RoleInfo.LEADER ); topologyService.replaceWithRole( null, RoleInfo.LEADER );
Expand All @@ -257,7 +258,7 @@ public void leaderIsEmptyStringIfNonExistent() throws IOException


// then // then
Map<String,Object> response = responseAsMap( description ); Map<String,Object> response = responseAsMap( description );
assertThat( response, containsAndEquals( "leader", "" ) ); assertFalse( description.getEntity().toString(), response.containsKey( "leader" ) );
} }


static RaftMembershipManager fakeRaftMembershipManager( Set<MemberId> votingMembers ) static RaftMembershipManager fakeRaftMembershipManager( Set<MemberId> votingMembers )
Expand Down
Expand Up @@ -54,6 +54,7 @@
import static javax.ws.rs.core.Response.Status.NOT_FOUND; import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.OK; import static javax.ws.rs.core.Response.Status.OK;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -157,10 +158,10 @@ public void includesMemberId() throws IOException
} }


@Test @Test
public void leaderIsReported() throws IOException public void leaderIsOptional() throws IOException
{ {
Response description = status.description(); Response description = status.description();
assertEquals( "", responseAsMap( description ).get( "leader" ) ); assertFalse( responseAsMap( description ).containsKey( "leader" ) );


MemberId selectedLead = topologyService.allCoreServers() MemberId selectedLead = topologyService.allCoreServers()
.members() .members()
Expand Down

0 comments on commit 837ed25

Please sign in to comment.