Skip to content

Commit

Permalink
Fix core edge metrics
Browse files Browse the repository at this point in the history
The commit index listener was never notified about the new commit
indexex.  This change will fix the issue by moving the listener
notification in the CoreState.
  • Loading branch information
davidegrohmann committed May 18, 2016
1 parent f9f6891 commit d204f72
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 63 deletions.
Expand Up @@ -28,23 +28,13 @@

public class MonitoredRaftLog implements RaftLog
{
public static final String APPEND_INDEX_TAG = "appendIndex";
public static final String COMMIT_INDEX_TAG = "commitIndex";

private final RaftLog delegate;
private final RaftLogAppendIndexMonitor appendIndexMonitor;
private final RaftLogCommitIndexMonitor commitIndexMonitor;

public MonitoredRaftLog( RaftLog delegate, Monitors monitors )
{
this.delegate = delegate;
this.appendIndexMonitor = monitors.newMonitor( RaftLogAppendIndexMonitor.class, getClass(), APPEND_INDEX_TAG );
this.commitIndexMonitor = monitors.newMonitor( RaftLogCommitIndexMonitor.class, getClass(), COMMIT_INDEX_TAG );
}

public RaftLog delegate()
{
return delegate;
this.appendIndexMonitor = monitors.newMonitor( RaftLogAppendIndexMonitor.class, getClass() );
}

@Override
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.log.RaftLogCompactedException;
import org.neo4j.coreedge.raft.log.RaftLogCursor;
import org.neo4j.coreedge.raft.log.monitoring.RaftLogCommitIndexMonitor;
import org.neo4j.coreedge.raft.log.pruning.LogPruner;
import org.neo4j.coreedge.raft.replication.DistributedOperation;
import org.neo4j.coreedge.raft.replication.ProgressTracker;
Expand All @@ -39,12 +40,12 @@
import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public class CoreState extends LifecycleAdapter implements RaftStateMachine, LogPruner
{

private static final long NOTHING = -1;
private CoreStateMachines coreStateMachines;
private final RaftLog raftLog;
Expand All @@ -65,6 +66,7 @@ public class CoreState extends LifecycleAdapter implements RaftStateMachine, Log
private final CoreServerSelectionStrategy selectionStrategy;

private final CoreStateDownloader downloader;
private final RaftLogCommitIndexMonitor commitIndexMonitor;

public CoreState(
RaftLog raftLog,
Expand All @@ -77,7 +79,8 @@ public CoreState(
StateStorage<GlobalSessionTrackerState<CoreMember>> sessionStorage,
CoreServerSelectionStrategy selectionStrategy,
CoreStateApplier applier,
CoreStateDownloader downloader )
CoreStateDownloader downloader,
Monitors monitors )
{
this.raftLog = raftLog;
this.lastFlushedStorage = lastFlushedStorage;
Expand All @@ -90,6 +93,7 @@ public CoreState(
this.selectionStrategy = selectionStrategy;
this.log = logProvider.getLog( getClass() );
this.dbHealth = dbHealth;
this.commitIndexMonitor = monitors.newMonitor( RaftLogCommitIndexMonitor.class, getClass() );
}

public synchronized void setStateMachine( CoreStateMachines coreStateMachines, long lastApplied )
Expand All @@ -101,10 +105,12 @@ public synchronized void setStateMachine( CoreStateMachines coreStateMachines, l
@Override
public synchronized void notifyCommitted( long commitIndex )
{
if ( this.lastSeenCommitIndex != commitIndex )
assert this.lastSeenCommitIndex <= commitIndex;
if ( this.lastSeenCommitIndex < commitIndex )
{
this.lastSeenCommitIndex = commitIndex;
submitApplyJob( commitIndex );
commitIndexMonitor.commitIndex( commitIndex );
}
}

Expand Down
Expand Up @@ -274,8 +274,8 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
final DelayedRenewableTimeoutService raftTimeoutService =
new DelayedRenewableTimeoutService( Clock.systemUTC(), logProvider );

RaftLog underlyingLog = createRaftLog( config, life, fileSystem, clusterStateDirectory, marshal, logProvider,
databaseHealthSupplier );
RaftLog underlyingLog = createRaftLog(
config, life, fileSystem, clusterStateDirectory, marshal, logProvider, databaseHealthSupplier );

MonitoredRaftLog raftLog = new MonitoredRaftLog( underlyingLog, platformModule.monitors );

Expand Down Expand Up @@ -335,7 +335,8 @@ fileSystem, new File( clusterStateDirectory, "session-tracker-state" ), "session
coreState = new CoreState(
raftLog, config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ),
databaseHealthSupplier, logProvider, progressTracker, lastFlushedStorage, lastApplyingStorage,
sessionTrackerStorage, new NotMyselfSelectionStrategy( discoveryService, myself ), applier, downloader );
sessionTrackerStorage, new NotMyselfSelectionStrategy( discoveryService, myself ), applier,
downloader, platformModule.monitors );

raft = createRaft( life, loggingOutbound, discoveryService, config, messageLogger, raftLog,
coreState, fileSystem, clusterStateDirectory, myself, logProvider, raftServer,
Expand Down
Expand Up @@ -44,11 +44,13 @@
import org.neo4j.coreedge.server.CoreEdgeClusterSettings;
import org.neo4j.coreedge.server.core.CoreGraphDatabase;
import org.neo4j.coreedge.server.edge.EdgeGraphDatabase;
import org.neo4j.dbms.DatabaseManagementSystemSettings;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionFailureException;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.GraphDatabaseDependencies;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.Settings;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.impl.store.format.standard.StandardV3_0;

Expand Down Expand Up @@ -180,9 +182,6 @@ private Map<String,String> serverParams( String serverType, int serverId, String
params.put( ClusterSettings.cluster_name.name(), CLUSTER_NAME );
params.put( ClusterSettings.server_id.name(), String.valueOf( serverId ) );
params.put( CoreEdgeClusterSettings.initial_core_cluster_members.name(), initialHosts );
params.put( "metrics.csv.enabled", "true" );
params.put( "metrics.neo4j.core_edge.enabled", "true" );
params.put( "metrics.csv.path", "metrics" );
return params;
}

Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.neo4j.coreedge.raft.NewLeaderBarrier;
import org.neo4j.coreedge.raft.log.InMemoryRaftLog;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.monitoring.RaftLogCommitIndexMonitor;
import org.neo4j.coreedge.raft.replication.DistributedOperation;
import org.neo4j.coreedge.raft.replication.ProgressTrackerImpl;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
Expand All @@ -39,6 +40,7 @@
import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy;
import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.NullLogProvider;

import static junit.framework.TestCase.assertEquals;
Expand All @@ -65,13 +67,15 @@ public class CoreStateTest
private final int flushEvery = 10;

private final CoreStateApplier applier = new CoreStateApplier( NullLogProvider.getInstance() );
private final Monitors monitors = new Monitors();
private final CoreState coreState = new CoreState( raftLog, flushEvery, () -> dbHealth, NullLogProvider.getInstance(),
new ProgressTrackerImpl( globalSession ), lastFlushedStorage, lastApplyingStorage, sessionStorage,
mock( CoreServerSelectionStrategy.class ), applier, mock( CoreStateDownloader.class ) );
mock( CoreServerSelectionStrategy.class ), applier, mock( CoreStateDownloader.class ), monitors );

private ReplicatedTransaction nullTx = new ReplicatedTransaction( null );

private final CoreStateMachines txStateMachine = txStateMachinesMock();

private CoreStateMachines txStateMachinesMock()
{
CoreStateMachines stateMachines = mock( CoreStateMachines.class );
Expand All @@ -97,6 +101,8 @@ private synchronized ReplicatedContent operation( CoreReplicatedContent tx )
public void shouldApplyCommittedCommand() throws Exception
{
// given
RaftLogCommitIndexMonitor listener = mock( RaftLogCommitIndexMonitor.class );
monitors.addMonitorListener( listener );
coreState.setStateMachine( txStateMachine, -1 );
coreState.start();

Expand All @@ -111,6 +117,7 @@ public void shouldApplyCommittedCommand() throws Exception
verify( txStateMachine ).dispatch( nullTx, 0 );
verify( txStateMachine ).dispatch( nullTx, 1 );
verify( txStateMachine ).dispatch( nullTx, 2 );
verify( listener).commitIndex( 2 );
}

@Test
Expand Down
Expand Up @@ -30,13 +30,18 @@
import org.neo4j.coreedge.discovery.Cluster;
import org.neo4j.coreedge.server.core.CoreGraphDatabase;
import org.neo4j.coreedge.server.edge.EdgeGraphDatabase;
import org.neo4j.cypher.internal.compiler.v3_1.planner.logical.Metrics;
import org.neo4j.function.ThrowingSupplier;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.Settings;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.metrics.source.coreedge.CoreMetrics;
import org.neo4j.test.coreedge.ClusterRule;
import org.neo4j.test.rule.TargetDirectory;

import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -46,7 +51,9 @@
import static org.junit.Assert.assertEquals;
import static org.neo4j.coreedge.server.CoreEdgeClusterSettings.raft_advertised_address;
import static org.neo4j.graphdb.Label.label;
import static org.neo4j.helpers.collection.Iterables.concat;
import static org.neo4j.helpers.collection.Iterables.count;
import static org.neo4j.metrics.MetricsSettings.csvPath;
import static org.neo4j.metrics.MetricsTestHelper.metricsCsv;
import static org.neo4j.metrics.MetricsTestHelper.readLongValue;
import static org.neo4j.metrics.source.coreedge.EdgeMetrics.PULL_UPDATES;
Expand All @@ -56,8 +63,18 @@

public class CoreEdgeMetricsIT
{
private static final int TIMEOUT = 15;

@Rule
public final TargetDirectory.TestDirectory dir = TargetDirectory.testDirForTest( getClass() );
public final ClusterRule clusterRule = new ClusterRule( getClass() )
.withNumberOfCoreServers( 3 )
.withNumberOfEdgeServers( 1 )
.withSharedCoreParam( MetricsSettings.metricsEnabled, Settings.TRUE )
.withSharedEdgeParam( MetricsSettings.metricsEnabled, Settings.TRUE )
.withSharedCoreParam( MetricsSettings.csvEnabled, Settings.TRUE )
.withSharedEdgeParam( MetricsSettings.csvEnabled, Settings.TRUE )
.withSharedCoreParam( MetricsSettings.csvInterval, "100ms" )
.withSharedEdgeParam( MetricsSettings.csvInterval, "100ms" );

private Cluster cluster;

Expand All @@ -74,11 +91,10 @@ public void shutdown() throws ExecutionException, InterruptedException
public void shouldMonitorCoreEdge() throws Exception
{
// given
File dbDir = dir.directory();
cluster = Cluster.start( dbDir, 3, 1 );
cluster = clusterRule.startCluster();

// when
GraphDatabaseService coreDB = cluster.awaitLeader( 5000 );
CoreGraphDatabase coreDB = cluster.awaitLeader( 5000 );

try ( Transaction tx = coreDB.beginTx() )
{
Expand All @@ -88,81 +104,71 @@ public void shouldMonitorCoreEdge() throws Exception
}

// then
for ( final CoreGraphDatabase db : cluster.coreServers() )
{
assertAllNodesVisible( db );
}

for ( final EdgeGraphDatabase db : cluster.edgeServers() )
for ( GraphDatabaseAPI db : concat( cluster.coreServers(), cluster.edgeServers() ) )
{
assertAllNodesVisible( db );
}

File coreServerMetricsDir = new File( cluster.getCoreServerById( 0 ).getStoreDir(), "metrics" );
File coreServerMetricsDir = new File( cluster.getCoreServerById( 0 ).getStoreDir(), csvPath.getDefaultValue() );

assertEventually( "append index eventually accurate",
() -> readLongValue( metricsCsv( coreServerMetricsDir, CoreMetrics.APPEND_INDEX ) ),
greaterThan( 0L ), 5, TimeUnit.SECONDS );
greaterThan( 0L ), TIMEOUT, TimeUnit.SECONDS );

assertEventually( "commit index eventually accurate",
() -> readLongValue( metricsCsv( coreServerMetricsDir, CoreMetrics.COMMIT_INDEX ) ),
greaterThan( 0L ), 5, TimeUnit.SECONDS );
greaterThan( 0L ), TIMEOUT, TimeUnit.SECONDS );

assertEventually( "term eventually accurate",
() -> readLongValue( metricsCsv( coreServerMetricsDir, CoreMetrics.TERM ) ),
greaterThanOrEqualTo( 0L ), 5, TimeUnit.SECONDS );
greaterThanOrEqualTo( 0L ), TIMEOUT, TimeUnit.SECONDS );

assertEventually( "leader not found eventually accurate",
() -> readLongValue( metricsCsv( coreServerMetricsDir, CoreMetrics.LEADER_NOT_FOUND ) ),
equalTo( 0L ), 5, TimeUnit.SECONDS );

assertEventually( "tx pull requests received eventually accurate",
() ->
{
long total = 0;
for ( final CoreGraphDatabase db : cluster.coreServers() )
{
File metricsDir = new File( db.getStoreDir(), "metrics" );
total += readLongValue( metricsCsv( metricsDir, CoreMetrics.TX_PULL_REQUESTS_RECEIVED ) );
}
return total;
},
greaterThan( 0L ), 5, TimeUnit.SECONDS );
equalTo( 0L ), TIMEOUT, TimeUnit.SECONDS );

assertEventually( "tx pull requests received eventually accurate", () -> {
long total = 0;
for ( final CoreGraphDatabase db : cluster.coreServers() )
{
File metricsDir = new File( db.getStoreDir(), "metrics" );
total += readLongValue( metricsCsv( metricsDir, CoreMetrics.TX_PULL_REQUESTS_RECEIVED ) );
}
return total;
}, greaterThan( 0L ), TIMEOUT, TimeUnit.SECONDS );

assertEventually( "tx retries eventually accurate",
() -> readLongValue( metricsCsv( coreServerMetricsDir, CoreMetrics.TX_RETRIES ) ),
equalTo( 0L ), 5, TimeUnit.SECONDS );
() -> readLongValue( metricsCsv( coreServerMetricsDir, CoreMetrics.TX_RETRIES ) ), equalTo( 0L ),
TIMEOUT, TimeUnit.SECONDS );

assertEventually( "is leader eventually accurate",
() -> readLongValue( metricsCsv( coreServerMetricsDir, CoreMetrics.IS_LEADER ) ),
greaterThanOrEqualTo( 0L ), 5, TimeUnit.SECONDS );
greaterThanOrEqualTo( 0L ), TIMEOUT, TimeUnit.SECONDS );

File edgeServerMetricsDir = new File( cluster.getEdgeServerById( 0 ).getStoreDir(), "metrics" );

assertEventually( "pull update request registered",
() -> readLongValue( metricsCsv( edgeServerMetricsDir, PULL_UPDATES ) ),
greaterThan( 0L ), 5, TimeUnit.SECONDS );
greaterThan( 0L ), TIMEOUT, TimeUnit.SECONDS );

assertEventually( "pull update request registered",
() ->
readLongValue( metricsCsv( edgeServerMetricsDir, PULL_UPDATE_HIGHEST_TX_ID_REQUESTED ) ),
greaterThan( 0L ), 5, TimeUnit.SECONDS );
() -> readLongValue( metricsCsv( edgeServerMetricsDir, PULL_UPDATE_HIGHEST_TX_ID_REQUESTED ) ),
greaterThan( 0L ), TIMEOUT, TimeUnit.SECONDS );

assertEventually( "pull update response received",
() ->
readLongValue( metricsCsv( edgeServerMetricsDir, PULL_UPDATE_HIGHEST_TX_ID_RECEIVED ) ),
greaterThan( 0L ), 5, TimeUnit.SECONDS );
() -> readLongValue( metricsCsv( edgeServerMetricsDir, PULL_UPDATE_HIGHEST_TX_ID_RECEIVED ) ),
greaterThan( 0L ), TIMEOUT, TimeUnit.SECONDS );

assertEventually( "dropped messages eventually accurate",
() -> readLongValue( metricsCsv( coreServerMetricsDir, CoreMetrics.DROPPED_MESSAGES ) ),
greaterThanOrEqualTo( 0L ), 5, TimeUnit.SECONDS );
greaterThanOrEqualTo( 0L ), TIMEOUT, TimeUnit.SECONDS );

assertEventually( "queue size eventually accurate",
() -> readLongValue( metricsCsv( coreServerMetricsDir, CoreMetrics.QUEUE_SIZE ) ),
greaterThanOrEqualTo( 0L ), 5, TimeUnit.SECONDS );
greaterThanOrEqualTo( 0L ), TIMEOUT, TimeUnit.SECONDS );
}

private void assertAllNodesVisible( GraphDatabaseFacade db ) throws Exception
private void assertAllNodesVisible( GraphDatabaseAPI db ) throws Exception
{
try ( Transaction tx = db.beginTx() )
{
Expand All @@ -171,7 +177,7 @@ private void assertAllNodesVisible( GraphDatabaseFacade db ) throws Exception
Config config = db.getDependencyResolver().resolveDependency( Config.class );

assertEventually( "node to appear on core server " + config.get( raft_advertised_address ), nodeCount,
greaterThan( 0L ), 15, SECONDS );
greaterThan( 0L ), TIMEOUT, SECONDS );

for ( Node node : db.getAllNodes() )
{
Expand Down

0 comments on commit d204f72

Please sign in to comment.