Skip to content

Commit

Permalink
TODO tidyup - mostly removing unnecessary catch(Throwable)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Needham committed Jun 15, 2016
1 parent fb895b3 commit 8400cfd
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 110 deletions.
Expand Up @@ -51,27 +51,13 @@ public void init()
@Override @Override
public Executor executor( final Group group ) public Executor executor( final Group group )
{ {
return new Executor() return job -> schedule( group, job );
{
@Override
public void execute( Runnable command )
{
schedule( group, command );
}
};
} }


@Override @Override
public ThreadFactory threadFactory( final Group group ) public ThreadFactory threadFactory( final Group group )
{ {
return new ThreadFactory() return job -> createNewThread( group, job, NO_METADATA );
{
@Override
public Thread newThread( Runnable r )
{
return createNewThread( group, r, NO_METADATA );
}
};
} }


@Override @Override
Expand Down Expand Up @@ -130,6 +116,11 @@ public JobHandle schedule( Group group, final Runnable runnable, long initialDel
} }
} }


@Override
public void stop()
{
}

@Override @Override
public void shutdown() public void shutdown()
{ {
Expand Down
Expand Up @@ -61,7 +61,7 @@ public class DelayedRenewableTimeoutService extends LifecycleAdapter implements
private final Clock clock; private final Clock clock;
private final Log log; private final Log log;
private final Random random; private final Random random;
private final JobScheduler scheduler; private final Neo4jJobScheduler scheduler;
private JobScheduler.JobHandle jobHandle; private JobScheduler.JobHandle jobHandle;


public DelayedRenewableTimeoutService( Clock clock, LogProvider logProvider ) public DelayedRenewableTimeoutService( Clock clock, LogProvider logProvider )
Expand Down Expand Up @@ -118,69 +118,60 @@ private long calcTimeoutTimestamp( long milliseconds, long randomRange )
@Override @Override
public synchronized void run() public synchronized void run()
{ {
try
long now = clock.millis();
Collection<ScheduledRenewableTimeout> triggered = new LinkedList<>();

synchronized ( timeouts )
{ {
long now = clock.millis(); // Handle renewals
Collection<ScheduledRenewableTimeout> triggered = new LinkedList<>(); ScheduledRenewableTimeout renew;
while ( (renew = pendingRenewals.poll()) != null )
{
timeouts.remove( renew );
renew.setTimeoutTimestamp( calcTimeoutTimestamp( renew.timeoutLength, renew.randomRange ) );
timeouts.add( renew );
}


synchronized ( timeouts ) // Trigger timeouts
for ( ScheduledRenewableTimeout timeout : timeouts )
{ {
// Handle renewals if ( timeout.shouldTrigger( now ) )
ScheduledRenewableTimeout renew;
while ( (renew = pendingRenewals.poll()) != null )
{ {
timeouts.remove( renew ); triggered.add( timeout );
renew.setTimeoutTimestamp( calcTimeoutTimestamp( renew.timeoutLength, renew.randomRange ) );
timeouts.add( renew );
} }

else
// Trigger timeouts
for ( ScheduledRenewableTimeout timeout : timeouts )
{ {
if ( timeout.shouldTrigger( now ) ) // Since the timeouts are sorted, the first timeout we hit that should not be triggered means
{ // there are no others that should either, so we bail.
triggered.add( timeout ); break;
}
else
{
// Since the timeouts are sorted, the first timeout we hit that should not be triggered means
// there are no others that should either, so we bail.
break;
}
} }
} }
}


for ( ScheduledRenewableTimeout timeout : triggered ) triggered.forEach( ScheduledRenewableTimeout::trigger );
{
timeout.trigger();
}


synchronized ( timeouts ) synchronized ( timeouts )
{
timeouts.removeAll( triggered );
}
}
catch ( Throwable e )
{ {
log.error( "Error handling timeouts", e ); timeouts.removeAll( triggered );
} }
} }


@Override @Override
public void init() throws Throwable public void init()
{ {
scheduler.init(); scheduler.init();
} }


@Override @Override
public void start() throws Throwable public void start()
{ {
jobHandle = scheduler.scheduleRecurring( new JobScheduler.Group( "Scheduler", POOLED ), this, TIMER_RESOLUTION, jobHandle = scheduler.scheduleRecurring( new JobScheduler.Group( "Scheduler", POOLED ), this, TIMER_RESOLUTION,
TIMER_RESOLUTION_UNIT ); TIMER_RESOLUTION_UNIT );
} }


@Override @Override
public void stop() throws Throwable public void stop()
{ {
jobHandle.cancel( false ); jobHandle.cancel( false );
scheduler.stop(); scheduler.stop();
Expand Down
Expand Up @@ -41,6 +41,7 @@
import static java.lang.String.format; import static java.lang.String.format;


import static org.neo4j.coreedge.raft.RenewableTimeoutService.RenewableTimeout; import static org.neo4j.coreedge.raft.RenewableTimeoutService.RenewableTimeout;
import static org.neo4j.coreedge.raft.replication.shipping.RaftLogShipper.Mode.PIPELINE;
import static org.neo4j.coreedge.raft.replication.shipping.RaftLogShipper.Timeouts.RESEND; import static org.neo4j.coreedge.raft.replication.shipping.RaftLogShipper.Timeouts.RESEND;


/// Optimizations /// Optimizations
Expand Down Expand Up @@ -155,25 +156,11 @@ public Object identity()
public synchronized void start() public synchronized void start()
{ {
log.info( "Starting log shipper: %s", statusAsString() ); log.info( "Starting log shipper: %s", statusAsString() );

timeoutService = new DelayedRenewableTimeoutService( clock, logProvider );
try timeoutService.init();
{ timeoutService.start();
timeoutService = new DelayedRenewableTimeoutService( clock, logProvider );
timeoutService.init();
timeoutService.start();
}
catch ( Throwable e )
{
// TODO: Think about how to handle this. We cannot be allowed to throw when
// TODO: starting the log shippers from the main RAFT handling. The timeout
// TODO: service is a LifeCycle.
// TODO: Should we have and use one system level timeout service instead?

log.error( "Failed to start log shipper " + statusAsString(), e );
}

sendSingle( raftLog.appendIndex(), lastLeaderContext ); sendSingle( raftLog.appendIndex(), lastLeaderContext );
} }


public synchronized void stop() public synchronized void stop()
{ {
Expand Down Expand Up @@ -222,7 +209,7 @@ public synchronized void onMatch( long newMatchIndex, LeaderContext leaderContex
if ( sendNextBatchAfterMatch( leaderContext ) ) if ( sendNextBatchAfterMatch( leaderContext ) )
{ {
log.info( "%s: caught up after mismatch, moving to PIPELINE mode", statusAsString() ); log.info( "%s: caught up after mismatch, moving to PIPELINE mode", statusAsString() );
mode = Mode.PIPELINE; mode = PIPELINE;
} }
else else
{ {
Expand All @@ -236,7 +223,7 @@ public synchronized void onMatch( long newMatchIndex, LeaderContext leaderContex
if ( sendNextBatchAfterMatch( leaderContext ) ) if ( sendNextBatchAfterMatch( leaderContext ) )
{ {
log.info( "%s: caught up, moving to PIPELINE mode", statusAsString() ); log.info( "%s: caught up, moving to PIPELINE mode", statusAsString() );
mode = Mode.PIPELINE; mode = PIPELINE;
} }
} }
break; break;
Expand Down Expand Up @@ -300,49 +287,40 @@ public synchronized void onCommitUpdate( LeaderContext leaderContext )


private synchronized void onScheduledTimeoutExpiry() private synchronized void onScheduledTimeoutExpiry()
{ {
try if ( timedOut() )
{ {
if ( timedOut() ) onTimeout();
{ return;
onTimeout(); }
}
else if ( timeoutAbsoluteMillis != 0 )
{
long timeLeft = timeoutAbsoluteMillis - clock.millis();


if ( timeLeft > 0 ) if ( timeoutAbsoluteMillis <= 0 )
{ {
scheduleTimeout( timeLeft ); return;
}
else
{
onTimeout();
}
}
} }
catch ( Throwable e )
long timeLeft = timeoutAbsoluteMillis - clock.millis();
if ( timeLeft > 0 )
{
scheduleTimeout( timeLeft );
}
else
{ {
log.error( "Exception during timeout handling: " + statusAsString(), e ); onTimeout();
} }
} }


private void onTimeout() throws IOException private void onTimeout()
{ {
switch ( mode ) if ( mode == PIPELINE )
{ {
case PIPELINE: /* The follower seems unresponsive and we do not want to spam it with new entries */
/* we leave pipelined mode here, because the follower seems log.info( "%s: timed out, moving to CATCHUP mode", statusAsString() );
* unresponsive and we do not want to spam it with new entries */ mode = Mode.CATCHUP;
log.info( "%s: timed out, moving to CATCHUP mode", statusAsString() ); }
mode = Mode.CATCHUP;
/* fallthrough */ if ( lastLeaderContext != null )
case CATCHUP: {
case MISMATCH: sendSingle( lastSentIndex, lastLeaderContext );
if ( lastLeaderContext != null )
{
sendSingle( lastSentIndex, lastLeaderContext );
}
break;
} }
} }


Expand Down Expand Up @@ -417,8 +395,8 @@ private void sendSingle( long logIndex, LeaderContext leaderContext )
sendRange( logIndex, logIndex, leaderContext ); sendRange( logIndex, logIndex, leaderContext );
} }


private void sendNewEntries( long prevLogIndex, long prevLogTerm, RaftLogEntry[] newEntries, LeaderContext private void sendNewEntries( long prevLogIndex, long prevLogTerm, RaftLogEntry[] newEntries,
leaderContext ) LeaderContext leaderContext )
{ {
scheduleTimeout( retryTimeMillis ); scheduleTimeout( retryTimeMillis );


Expand Down

0 comments on commit 8400cfd

Please sign in to comment.