Skip to content

Commit

Permalink
core-edge: Batching inbound message queue for RAFT.
Browse files Browse the repository at this point in the history
The RAFT message processing is now running on a dedicated thread distinct from the
Netty networking thread. They are connected by a queue and new entry requests are
batched together before being processed by the leader. This leads to batching when
appending to the RAFT log and when sending new entries.
  • Loading branch information
martinfurmanski committed May 31, 2016
1 parent 50f03cb commit fbd6105
Show file tree
Hide file tree
Showing 20 changed files with 634 additions and 36 deletions.
Expand Up @@ -21,6 +21,7 @@

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -170,6 +171,8 @@ class Groups
interface JobHandle
{
void cancel( boolean mayInterruptIfRunning );

void waitTermination() throws InterruptedException, ExecutionException;
}

/** Expose a group scheduler as an {@link Executor} */
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.kernel.impl.util;

import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -198,6 +199,12 @@ public void cancel( boolean mayInterruptIfRunning )
{
job.cancel( mayInterruptIfRunning );
}

@Override
public void waitTermination() throws InterruptedException, ExecutionException
{
job.get();
}
}

private static class SingleThreadHandle implements JobHandle
Expand All @@ -217,5 +224,11 @@ public void cancel( boolean mayInterruptIfRunning )
thread.interrupt();
}
}

@Override
public void waitTermination() throws InterruptedException
{
thread.join();
}
}
}
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.test;

import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -107,5 +108,11 @@ public void cancel( boolean mayInterruptIfRunning )
{
job = null;
}

@Override
public void waitTermination() throws InterruptedException, ExecutionException
{
// on demand
}
}
}
@@ -0,0 +1,123 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import org.neo4j.coreedge.raft.RaftMessages.RaftMessage;
import org.neo4j.coreedge.raft.net.Inbound.MessageHandler;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.util.concurrent.TimeUnit.SECONDS;

public class BatchingMessageHandler<MEMBER> implements Runnable, MessageHandler<RaftMessage<MEMBER>>
{
private final Log log;
private final MessageHandler<RaftMessage<MEMBER>> innerHandler;

private final BlockingQueue<RaftMessage<MEMBER>> messageQueue;
private final int maxBatch;
private final List<RaftMessage<MEMBER>> batch;

public BatchingMessageHandler( MessageHandler<RaftMessage<MEMBER>> innerHandler, LogProvider logProvider, int queueSize, int maxBatch )
{
this.innerHandler = innerHandler;
this.log = logProvider.getLog( getClass() );
this.maxBatch = maxBatch;

this.batch = new ArrayList<>( maxBatch );
this.messageQueue = new ArrayBlockingQueue<>( queueSize );
}

@Override
public void handle( RaftMessage<MEMBER> message )
{
try
{
messageQueue.put( message );
}
catch ( InterruptedException e )
{
log.warn( "Not expecting to be interrupted.", e );
}
}

@Override
public void run()
{
RaftMessage<MEMBER> message = null;
try
{
message = messageQueue.poll( 1, SECONDS );
}
catch ( InterruptedException e )
{
log.warn( "Not expecting to be interrupted.", e );
}

if ( message != null )
{
if ( messageQueue.isEmpty() )
{
innerHandler.handle( message );
}
else
{
batch.clear();
batch.add( message );
messageQueue.drainTo( batch, maxBatch - 1 );

collateAndHandleBatch( batch );
}
}
}

private void collateAndHandleBatch( List<RaftMessage<MEMBER>> batch )
{
RaftMessages.NewEntry.Batch<MEMBER> batchRequest = null;

for ( RaftMessage<MEMBER> message : batch )
{
if ( message instanceof RaftMessages.NewEntry.Request )
{
RaftMessages.NewEntry.Request newEntryRequest = (RaftMessages.NewEntry.Request) message;

if ( batchRequest == null )
{
batchRequest = new RaftMessages.NewEntry.Batch<>( batch.size() );
}
batchRequest.add( newEntryRequest.content() );
}
else
{
innerHandler.handle( message );
}
}

if ( batchRequest != null )
{
innerHandler.handle( batchRequest );
}
}
}
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft;

import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;

/**
* Invokes the supplied task continuously when started. The supplied task
* should be short since the abort flag is checked in between invocations.
*/
public class ContinuousJob extends LifecycleAdapter
{
private final AbortableJob abortableJob = new AbortableJob();
private final JobScheduler scheduler;
private final JobScheduler.Group group;
private final Runnable task;

private JobScheduler.JobHandle jobHandle;

public ContinuousJob( JobScheduler scheduler, JobScheduler.Group group, Runnable task )
{
this.scheduler = scheduler;
this.group = group;
this.task = task;
}

@Override
public void start() throws Throwable
{
abortableJob.keepRunning = true;
jobHandle = scheduler.schedule( group, abortableJob );
}

@Override
public void stop() throws Throwable
{
abortableJob.keepRunning = false;
jobHandle.waitTermination();
}

private class AbortableJob implements Runnable
{
private volatile boolean keepRunning;

@Override
public void run()
{
while ( keepRunning )
{
task.run();
}
}
}
}
Expand Up @@ -110,7 +110,7 @@ public RaftInstance( MEMBER myself, StateStorage<TermState> termStorage,
StateStorage<VoteState<MEMBER>> voteStorage, RaftLog entryLog,
RaftStateMachine raftStateMachine, long electionTimeout, long heartbeatInterval,
RenewableTimeoutService renewableTimeoutService,
final Inbound<RaftMessages.RaftMessage<MEMBER>> inbound, final Outbound<MEMBER> outbound,
final Outbound<MEMBER> outbound,
LogProvider logProvider, RaftMembershipManager<MEMBER> membershipManager,
RaftLogShippingManager<MEMBER> logShipping,
Supplier<DatabaseHealth> databaseHealthSupplier,
Expand All @@ -136,8 +136,6 @@ public RaftInstance( MEMBER myself, StateStorage<TermState> termStorage,
leaderNotFoundMonitor = monitors.newMonitor( LeaderNotFoundMonitor.class );

initTimers();

inbound.registerHandler( this );
}

private void initTimers()
Expand Down
Expand Up @@ -19,7 +19,10 @@
*/
package org.neo4j.coreedge.raft;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import org.neo4j.coreedge.network.Message;
Expand Down Expand Up @@ -48,6 +51,7 @@ enum Type
// TODO: Refactor, these are client-facing messages / api. Perhaps not public and instantiated through an api
// TODO: method instead?
NEW_ENTRY_REQUEST,
NEW_BATCH_REQUEST,
NEW_MEMBERSHIP_TARGET,
}

Expand Down Expand Up @@ -580,6 +584,54 @@ public ReplicatedContent content()
return content;
}
}

class Batch<MEMBER> extends BaseMessage<MEMBER>
{
private List<ReplicatedContent> list;

public Batch( int batchSize )
{
super( null, Type.NEW_BATCH_REQUEST );
list = new ArrayList<>( batchSize );
}

public void add( ReplicatedContent content )
{
list.add( content );
}

@Override
public boolean equals( Object o )
{
if ( this == o )
{ return true; }
if ( o == null || getClass() != o.getClass() )
{ return false; }
if ( !super.equals( o ) )
{ return false; }
Batch<?> batch = (Batch<?>) o;
return Objects.equals( list, batch.list );
}

@Override
public int hashCode()
{
return Objects.hash( super.hashCode(), list );
}

@Override
public String toString()
{
return "Batch{" +
"list=" + list +
'}';
}

public List<ReplicatedContent> contents()
{
return Collections.unmodifiableList( list );
}
}
}

abstract class BaseMessage<MEMBER> implements RaftMessage<MEMBER>
Expand Down
Expand Up @@ -28,6 +28,7 @@
public class RaftLogEntry
{
public static final RaftLogEntry[] empty = new RaftLogEntry[0];

private final long term;
private final ReplicatedContent content;

Expand Down

0 comments on commit fbd6105

Please sign in to comment.