Skip to content

Commit

Permalink
Counts raft replication events. More precisely counts:
Browse files Browse the repository at this point in the history
new replication, nbr of attempts, successful and failed.

Also logs number of attempts if more than 1
  • Loading branch information
RagnarW committed May 31, 2018
1 parent 77162c7 commit 279fe7e
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 64 deletions.
Expand Up @@ -87,8 +87,7 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config
progressRetryStrategy,
leaderRetryStrategy,
platformModule.availabilityGuard,
logProvider,
replicationLimit ) );
logProvider, replicationLimit, platformModule.monitors ) );
}

public RaftReplicator getReplicator()
Expand Down
Expand Up @@ -30,6 +30,8 @@
import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.replication.monitoring.LoggingReplicationMonitor;
import org.neo4j.causalclustering.core.replication.monitoring.ReplicationMonitor;
import org.neo4j.causalclustering.core.replication.session.LocalSessionPool;
import org.neo4j.causalclustering.core.replication.session.OperationContext;
import org.neo4j.causalclustering.helper.TimeoutStrategy;
Expand All @@ -38,6 +40,7 @@
import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

Expand All @@ -56,10 +59,11 @@ public class RaftReplicator extends LifecycleAdapter implements Replicator, Lead
private final TimeoutStrategy leaderTimeoutStrategy;
private final Log log;
private final Throttler throttler;
private final ReplicationMonitor replicationMonitor;

public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound<MemberId,RaftMessages.RaftMessage> outbound, LocalSessionPool sessionPool,
ProgressTracker progressTracker, TimeoutStrategy progressTimeoutStrategy, TimeoutStrategy leaderTimeoutStrategy,
AvailabilityGuard availabilityGuard, LogProvider logProvider, long replicationLimit )
AvailabilityGuard availabilityGuard, LogProvider logProvider, long replicationLimit, Monitors monitors )
{
this.me = me;
this.outbound = outbound;
Expand All @@ -72,6 +76,8 @@ public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound<Member
this.leaderLocator = leaderLocator;
leaderLocator.registerListener( this );
log = logProvider.getLog( getClass() );
monitors.addMonitorListener( new LoggingReplicationMonitor( log ) );
this.replicationMonitor = monitors.newMonitor( ReplicationMonitor.class );
}

@Override
Expand Down Expand Up @@ -106,58 +112,69 @@ public Future<Object> replicate( ReplicatedContent command, boolean trackResult

private Future<Object> replicate0( ReplicatedContent command, boolean trackResult, MemberId leader ) throws ReplicationFailureException
{
assertNoLeaderSwitch( leader );

OperationContext session = sessionPool.acquireSession();

DistributedOperation operation = new DistributedOperation( command, session.globalSession(), session.localOperationId() );
Progress progress = progressTracker.start( operation );

TimeoutStrategy.Timeout progressTimeout = progressTimeoutStrategy.newTimeout();
TimeoutStrategy.Timeout leaderTimeout = leaderTimeoutStrategy.newTimeout();
replicationMonitor.startReplication( command );
try
{
do
{
assertDatabaseNotShutdown();
try
{
// blocking at least until the send has succeeded or failed before retrying
outbound.send( leader, new RaftMessages.NewEntry.Request( me, operation ), true );
assertNoLeaderSwitch( leader );

leaderTimeout = leaderTimeoutStrategy.newTimeout();
OperationContext session = sessionPool.acquireSession();

progress.awaitReplication( progressTimeout.getMillis() );
progressTimeout.increment();
leader = leaderLocator.getLeader();
}
catch ( NoLeaderFoundException e )
DistributedOperation operation = new DistributedOperation( command, session.globalSession(), session.localOperationId() );
Progress progress = progressTracker.start( operation );

TimeoutStrategy.Timeout progressTimeout = progressTimeoutStrategy.newTimeout();
TimeoutStrategy.Timeout leaderTimeout = leaderTimeoutStrategy.newTimeout();
try
{
do
{
log.debug( "Could not replicate operation " + operation + " because no leader was found. Retrying.", e );
Thread.sleep( leaderTimeout.getMillis() );
leaderTimeout.increment();
replicationMonitor.replicationAttempt();
assertDatabaseNotShutdown();
try
{
// blocking at least until the send has succeeded or failed before retrying
outbound.send( leader, new RaftMessages.NewEntry.Request( me, operation ), true );

leaderTimeout = leaderTimeoutStrategy.newTimeout();

progress.awaitReplication( progressTimeout.getMillis() );
progressTimeout.increment();
leader = leaderLocator.getLeader();
}
catch ( NoLeaderFoundException e )
{
log.debug( "Could not replicate operation " + operation + " because no leader was found. Retrying.", e );
Thread.sleep( leaderTimeout.getMillis() );
leaderTimeout.increment();
}
}
while ( !progress.isReplicated() );
}
catch ( InterruptedException e )
{
progressTracker.abort( operation );
throw new ReplicationFailureException( "Interrupted while replicating", e );
}
while ( !progress.isReplicated() );
}
catch ( InterruptedException e )
{
progressTracker.abort( operation );
throw new ReplicationFailureException( "Interrupted while replicating", e );
}

BiConsumer<Object,Throwable> cleanup = ( ignored1, ignored2 ) -> sessionPool.releaseSession( session );
BiConsumer<Object,Throwable> cleanup = ( ignored1, ignored2 ) -> sessionPool.releaseSession( session );

if ( trackResult )
{
progress.futureResult().whenComplete( cleanup );
if ( trackResult )
{
progress.futureResult().whenComplete( cleanup );
}
else
{
cleanup.accept( null, null );
}
replicationMonitor.successfulReplication();
return progress.futureResult();
}
else
catch ( Throwable t )
{
cleanup.accept( null, null );
replicationMonitor.failedReplication( t );
throw t;
}

return progress.futureResult();
}

private void assertNoLeaderSwitch( MemberId originalLeader ) throws ReplicationFailureException
Expand Down
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j Enterprise Edition. The included source
* code can be redistributed and/or modified under the terms of the
* GNU AFFERO GENERAL PUBLIC LICENSE Version 3
* (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the
* Commons Clause, as found in the associated LICENSE.txt file.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* Neo4j object code can be licensed independently from the source
* under separate terms from the AGPL. Inquiries can be directed to:
* licensing@neo4j.com
*
* More information is also available at:
* https://neo4j.com/licensing/
*/
package org.neo4j.causalclustering.core.replication.monitoring;

import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.logging.Log;

import static java.lang.String.format;

public class LoggingReplicationMonitor implements ReplicationMonitor
{
private final Log log;
private int attempts;
private ReplicatedContent current;

public LoggingReplicationMonitor( Log log )
{
this.log = log;
}

@Override
public void startReplication( ReplicatedContent command )
{
clear( command );
}

@Override
public void replicationAttempt()
{
if ( attempts > 0 )
{
log.warn( format( "Failed to replicated content. Attempt %d, Content: %s", attempts, current ) );
}
attempts++;
}

@Override
public void successfulReplication()
{
clear( null );
}

@Override
public void failedReplication( Throwable t )
{
clear( null );
log.error( "Failed to replicated content", t );
}

private void clear( ReplicatedContent o )
{
current = o;
attempts = 0;
}
}
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j Enterprise Edition. The included source
* code can be redistributed and/or modified under the terms of the
* GNU AFFERO GENERAL PUBLIC LICENSE Version 3
* (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the
* Commons Clause, as found in the associated LICENSE.txt file.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* Neo4j object code can be licensed independently from the source
* under separate terms from the AGPL. Inquiries can be directed to:
* licensing@neo4j.com
*
* More information is also available at:
* https://neo4j.com/licensing/
*/
package org.neo4j.causalclustering.core.replication.monitoring;

import org.neo4j.causalclustering.core.replication.ReplicatedContent;

public interface ReplicationMonitor
{
void startReplication( ReplicatedContent command );

void replicationAttempt();

void successfulReplication();

void failedReplication( Throwable t );
}

0 comments on commit 279fe7e

Please sign in to comment.