Skip to content

Commit

Permalink
More core-edge metrics
Browse files Browse the repository at this point in the history
* Number of times LeaderNotFound exception is received
* Number of TX pull requests core servers receive
* Number of times transactions are retried
* Who is leader?
  • Loading branch information
Mark Needham committed Jan 19, 2016
1 parent 12324ac commit 564eb85
Show file tree
Hide file tree
Showing 23 changed files with 491 additions and 68 deletions.
Expand Up @@ -51,12 +51,14 @@
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer; import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


public class CatchupServer extends LifecycleAdapter public class CatchupServer extends LifecycleAdapter
{ {
private final LogProvider logProvider; private final LogProvider logProvider;
private Monitors monitors;


private final Supplier<StoreId> storeIdSupplier; private final Supplier<StoreId> storeIdSupplier;
private final Supplier<TransactionIdStore> transactionIdStoreSupplier; private final Supplier<TransactionIdStore> transactionIdStoreSupplier;
Expand All @@ -77,13 +79,14 @@ public CatchupServer( LogProvider logProvider,
Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier, Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier,
Supplier<NeoStoreDataSource> dataSourceSupplier, Supplier<NeoStoreDataSource> dataSourceSupplier,
Supplier<CheckPointer> checkPointerSupplier, Supplier<CheckPointer> checkPointerSupplier,
ListenSocketAddress listenAddress ) ListenSocketAddress listenAddress, Monitors monitors )
{ {
this.listenAddress = listenAddress; this.listenAddress = listenAddress;
this.transactionIdStoreSupplier = transactionIdStoreSupplier; this.transactionIdStoreSupplier = transactionIdStoreSupplier;
this.storeIdSupplier = storeIdSupplier; this.storeIdSupplier = storeIdSupplier;
this.logicalTransactionStoreSupplier = logicalTransactionStoreSupplier; this.logicalTransactionStoreSupplier = logicalTransactionStoreSupplier;
this.logProvider = logProvider; this.logProvider = logProvider;
this.monitors = monitors;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.dataSourceSupplier = dataSourceSupplier; this.dataSourceSupplier = dataSourceSupplier;
this.checkPointerSupplier = checkPointerSupplier; this.checkPointerSupplier = checkPointerSupplier;
Expand Down Expand Up @@ -125,7 +128,8 @@ protected void initChannel( SocketChannel ch ) throws Exception


pipeline.addLast( new TxPullRequestDecoder( protocol ) ); pipeline.addLast( new TxPullRequestDecoder( protocol ) );
pipeline.addLast( new TxPullRequestHandler( protocol, storeIdSupplier, pipeline.addLast( new TxPullRequestHandler( protocol, storeIdSupplier,
transactionIdStoreSupplier, logicalTransactionStoreSupplier ) ); transactionIdStoreSupplier, logicalTransactionStoreSupplier,
monitors) );


pipeline.addLast( new ChunkedWriteHandler() ); pipeline.addLast( new ChunkedWriteHandler() );
pipeline.addLast( new GetStoreRequestDecoder( protocol ) ); pipeline.addLast( new GetStoreRequestDecoder( protocol ) );
Expand Down
Expand Up @@ -34,23 +34,27 @@
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.monitoring.Monitors;


public class TxPullRequestHandler extends SimpleChannelInboundHandler<TxPullRequest> public class TxPullRequestHandler extends SimpleChannelInboundHandler<TxPullRequest>
{ {
private final CatchupServerProtocol protocol; private final CatchupServerProtocol protocol;
private final StoreId storeId; private final StoreId storeId;
private final TransactionIdStore transactionIdStore; private final TransactionIdStore transactionIdStore;
private final LogicalTransactionStore logicalTransactionStore; private final LogicalTransactionStore logicalTransactionStore;
private final TxPullRequestsMonitor monitor;


public TxPullRequestHandler( CatchupServerProtocol protocol, public TxPullRequestHandler( CatchupServerProtocol protocol,
Supplier<StoreId> storeIdSupplier, Supplier<StoreId> storeIdSupplier,
Supplier<TransactionIdStore> transactionIdStoreSupplier, Supplier<TransactionIdStore> transactionIdStoreSupplier,
Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier ) Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier,
Monitors monitors )
{ {
this.protocol = protocol; this.protocol = protocol;
this.storeId = storeIdSupplier.get(); this.storeId = storeIdSupplier.get();
this.transactionIdStore = transactionIdStoreSupplier.get(); this.transactionIdStore = transactionIdStoreSupplier.get();
this.logicalTransactionStore = logicalTransactionStoreSupplier.get(); this.logicalTransactionStore = logicalTransactionStoreSupplier.get();
this.monitor = monitors.newMonitor( TxPullRequestsMonitor.class );
} }


@Override @Override
Expand Down Expand Up @@ -79,6 +83,7 @@ protected void channelRead0( ChannelHandlerContext ctx, final TxPullRequest msg
ctx.write( new TxStreamFinishedResponse( endTxId ) ); ctx.write( new TxStreamFinishedResponse( endTxId ) );
ctx.flush(); ctx.flush();


monitor.increment();
protocol.expect( NextMessage.MESSAGE_TYPE ); protocol.expect( NextMessage.MESSAGE_TYPE );
} }


Expand Down
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.catchup.tx.core;

public interface TxPullRequestsMonitor
{
long txPullRequestsReceived();
void increment();
}
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.catchup.tx.core;

public interface TxRetryMonitor
{
long transactionsRetries();
void retry();
}
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft;

public interface CoreEdgeMetaData
{
boolean isLeader();
}
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft;

public interface CoreMetaData
{
boolean isLeader();
}
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft;

public interface LeaderNotFoundMonitor
{
long leaderNotFoundExceptions();
void increment();
}
Expand Up @@ -40,6 +40,7 @@
import org.neo4j.coreedge.raft.state.vote.VoteState; import org.neo4j.coreedge.raft.state.vote.VoteState;
import org.neo4j.helpers.Clock; import org.neo4j.helpers.Clock;
import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


Expand Down Expand Up @@ -68,8 +69,11 @@
* *
* @param <MEMBER> The membership type. * @param <MEMBER> The membership type.
*/ */
public class RaftInstance<MEMBER> implements LeaderLocator<MEMBER>, Inbound.MessageHandler public class RaftInstance<MEMBER> implements LeaderLocator<MEMBER>, Inbound.MessageHandler, CoreMetaData
{ {

private final LeaderNotFoundMonitor leaderNotFoundMonitor;

public enum Timeouts implements RenewableTimeoutService.TimeoutName public enum Timeouts implements RenewableTimeoutService.TimeoutName
{ {
ELECTION, HEARTBEAT ELECTION, HEARTBEAT
Expand Down Expand Up @@ -103,7 +107,7 @@ public RaftInstance( MEMBER myself, TermState termState, VoteState<MEMBER> voteS
LogProvider logProvider, RaftMembershipManager<MEMBER> membershipManager, LogProvider logProvider, RaftMembershipManager<MEMBER> membershipManager,
RaftLogShippingManager<MEMBER> logShipping, RaftLogShippingManager<MEMBER> logShipping,
Supplier<DatabaseHealth> databaseHealthSupplier, Supplier<DatabaseHealth> databaseHealthSupplier,
Clock clock ) Clock clock, Monitors monitors )


{ {
this.myself = myself; this.myself = myself;
Expand All @@ -124,6 +128,8 @@ public RaftInstance( MEMBER myself, TermState termState, VoteState<MEMBER> voteS


this.state = new RaftState<>( myself, termState, membershipManager, entryLog, voteState ); this.state = new RaftState<>( myself, termState, membershipManager, entryLog, voteState );


leaderNotFoundMonitor = monitors.newMonitor( LeaderNotFoundMonitor.class );

initTimers(); initTimers();


inbound.registerHandler( this ); inbound.registerHandler( this );
Expand Down Expand Up @@ -191,6 +197,7 @@ public MEMBER getLeader() throws NoLeaderTimeoutException


if ( state.leader() == null ) if ( state.leader() == null )
{ {
leaderNotFoundMonitor.increment();
throw new NoLeaderTimeoutException(); throw new NoLeaderTimeoutException();
} }


Expand Down Expand Up @@ -270,6 +277,7 @@ public synchronized void handle( Serializable incomingMessage )
} }
} }


@Override
public boolean isLeader() public boolean isLeader()
{ {
return currentRole == LEADER; return currentRole == LEADER;
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;


import org.neo4j.coreedge.catchup.tx.core.TxRetryMonitor;
import org.neo4j.coreedge.raft.replication.Replicator; import org.neo4j.coreedge.raft.replication.Replicator;
import org.neo4j.coreedge.raft.replication.Replicator.ReplicationFailedException; import org.neo4j.coreedge.raft.replication.Replicator.ReplicationFailedException;
import org.neo4j.coreedge.raft.replication.session.LocalSessionPool; import org.neo4j.coreedge.raft.replication.session.LocalSessionPool;
Expand All @@ -33,6 +34,7 @@
import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent; import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.storageengine.api.TransactionApplicationMode; import org.neo4j.storageengine.api.TransactionApplicationMode;


Expand All @@ -47,18 +49,20 @@ public class ReplicatedTransactionCommitProcess extends LifecycleAdapter impleme
private final LocalSessionPool sessionPool; private final LocalSessionPool sessionPool;
private final Log log; private final Log log;
private final CommittingTransactions txFutures; private final CommittingTransactions txFutures;
private final TxRetryMonitor txRetryMonitor;


public ReplicatedTransactionCommitProcess( Replicator replicator, LocalSessionPool sessionPool, public ReplicatedTransactionCommitProcess( Replicator replicator, LocalSessionPool sessionPool,
Replicator.ReplicatedContentListener replicatedTxListener, Replicator.ReplicatedContentListener replicatedTxListener,
long retryIntervalMillis, LogService logging, long retryIntervalMillis, LogService logging,
CommittingTransactions txFutures ) CommittingTransactions txFutures, Monitors monitors)
{ {
this.sessionPool = sessionPool; this.sessionPool = sessionPool;
this.replicatedTxListener = replicatedTxListener; this.replicatedTxListener = replicatedTxListener;
this.replicator = replicator; this.replicator = replicator;
this.retryIntervalMillis = retryIntervalMillis; this.retryIntervalMillis = retryIntervalMillis;
this.log = logging.getInternalLog( getClass() ); this.log = logging.getInternalLog( getClass() );
this.txFutures = txFutures; this.txFutures = txFutures;
txRetryMonitor = monitors.newMonitor( TxRetryMonitor.class );
replicator.subscribe( this.replicatedTxListener ); replicator.subscribe( this.replicatedTxListener );
} }


Expand Down Expand Up @@ -99,6 +103,7 @@ public long commit( final TransactionToApply tx,
} }
log.warn( "Transaction replication failed, but a previous attempt may have succeeded," + log.warn( "Transaction replication failed, but a previous attempt may have succeeded," +
"so commit process must keep waiting for possible success.", e ); "so commit process must keep waiting for possible success.", e );
txRetryMonitor.retry();
} }


try try
Expand All @@ -112,11 +117,13 @@ public long commit( final TransactionToApply tx,
{ {
interrupted = true; interrupted = true;
log.info( "Replication of %s was interrupted; retrying.", operationContext ); log.info( "Replication of %s was interrupted; retrying.", operationContext );
txRetryMonitor.retry();
} }
catch ( TimeoutException e ) catch ( TimeoutException e )
{ {
log.info( "Replication of %s timed out after %d %s; retrying.", log.info( "Replication of %s timed out after %d %s; retrying.",
operationContext, retryIntervalMillis, TimeUnit.MILLISECONDS ); operationContext, retryIntervalMillis, TimeUnit.MILLISECONDS );
txRetryMonitor.retry();
} }
} }
} }
Expand Down

0 comments on commit 564eb85

Please sign in to comment.