Skip to content

Commit

Permalink
Batch tx applications when notified of new committed entries in raft
Browse files Browse the repository at this point in the history
Instead of applying one request at the time now we try to groip
requests in batches and apply them together if possible.

Note that the code will make sure to not mix application of tx with
other requests since it could be unsafe.
  • Loading branch information
davidegrohmann committed Jun 7, 2016
1 parent bf9db07 commit 70a7a7f
Show file tree
Hide file tree
Showing 24 changed files with 523 additions and 216 deletions.
Expand Up @@ -43,16 +43,16 @@ public TransactionQueue( int maxSize, Applier applier )
this.applier = applier;
}

public void queue( TransactionToApply transacion ) throws Exception
public void queue( TransactionToApply transaction ) throws Exception
{
if ( size == 0 )
{
first = last = transacion;
first = last = transaction;
}
else
{
last.next( transacion );
last = transacion;
last.next( transaction );
last = transaction;
}

if ( ++size == maxSize )
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.kernel.impl.api;

import java.io.IOException;
import java.util.function.Consumer;

import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
Expand Down Expand Up @@ -66,6 +67,8 @@ public class TransactionToApply implements CommandsToApply
// These fields are provided by commit process/storage engine
private Commitment commitment;

private Consumer<Long> callback;

/**
* Used when committing a transaction that hasn't already gotten a transaction id assigned.
*/
Expand Down Expand Up @@ -123,11 +126,20 @@ public void commitment( Commitment commitment, long transactionId )
{
this.commitment = commitment;
this.transactionId = transactionId;
if ( callback != null )
{
commitment.onClosed( callback );
}
}

@Override
public TransactionToApply next()
{
return nextTransactionInBatch;
}

public void onClose( Consumer<Long> callback )
{
this.callback = callback;
}
}
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.kernel.impl.transaction.log;

import java.util.function.Consumer;

import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.transaction.tracing.LogAppendEvent;
Expand Down Expand Up @@ -54,6 +56,12 @@ public boolean hasLegacyIndexChanges()
{
return false;
}

@Override
public void onClosed( Consumer<Long> callback )
{
throw new UnsupportedOperationException();
}
};

/**
Expand All @@ -75,4 +83,10 @@ public boolean hasLegacyIndexChanges()
* @return whether or not this transaction contains legacy index changes.
*/
boolean hasLegacyIndexChanges();

/**
* Register a callback to be triggered when the commitment has been published as closed
* @param callback The callback to be invoked when the commitment is published as closed
*/
void onClosed( Consumer<Long> callback );
}
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.kernel.impl.transaction.log;

import java.util.function.Consumer;

class TransactionCommitment implements Commitment
{
private final boolean hasLegacyIndexChanges;
Expand All @@ -27,6 +29,7 @@ class TransactionCommitment implements Commitment
private final LogPosition logPosition;
private final TransactionIdStore transactionIdStore;
private boolean markedAsCommitted;
private Consumer<Long> callback;

TransactionCommitment( boolean hasLegacyIndexChanges, long transactionId, long transactionChecksum,
LogPosition logPosition, TransactionIdStore transactionIdStore )
Expand All @@ -50,6 +53,10 @@ public void publishAsClosed()
{
transactionIdStore.transactionClosed( transactionId,
logPosition.getLogVersion(), logPosition.getByteOffset() );
if ( callback != null )
{
callback.accept( transactionId );
}
}

@Override
Expand All @@ -63,4 +70,10 @@ public boolean hasLegacyIndexChanges()
{
return hasLegacyIndexChanges;
}

@Override
public void onClosed( Consumer<Long> callback )
{
this.callback = callback;
}
}
Expand Up @@ -50,8 +50,7 @@ public class SchemaIndexWaitingAcceptanceTest
protected void configure( GraphDatabaseFactory databaseFactory )
{
List<KernelExtensionFactory<?>> extensions;
extensions = Collections.<KernelExtensionFactory<?>>singletonList( singleInstanceSchemaIndexProviderFactory(
"test", provider ) );
extensions = Collections.<KernelExtensionFactory<?>>singletonList( singleInstanceSchemaIndexProviderFactory( "test", provider ) );
((TestGraphDatabaseFactory) databaseFactory).addKernelExtensions( extensions );
}
};
Expand Down
Expand Up @@ -19,13 +19,16 @@
*/
package org.neo4j.kernel.impl.transaction.log;

import java.util.function.Consumer;

public class FakeCommitment implements Commitment
{
public static final int CHECKSUM = 3;
private final long id;
private final TransactionIdStore transactionIdStore;
private boolean committed;
private boolean hasLegacyIndexChanges = false;
private Consumer<Long> callback;

public FakeCommitment( long id, TransactionIdStore transactionIdStore )
{
Expand All @@ -50,6 +53,10 @@ public void publishAsCommitted()
public void publishAsClosed()
{
transactionIdStore.transactionClosed( id, 1, 2 );
if ( callback != null )
{
callback.accept( id );
}
}

@Override
Expand All @@ -68,4 +75,10 @@ public boolean hasLegacyIndexChanges()
{
return hasLegacyIndexChanges;
}

@Override
public void onClosed( Consumer<Long> callback )
{
this.callback = callback;
}
}

This file was deleted.

Expand Up @@ -34,7 +34,7 @@
/**
* A uniquely identifiable operation.
*/
public class DistributedOperation implements ReplicatedContent
public class DistributedOperation implements ReplicatedContent
{
private final ReplicatedContent content;
private final GlobalSession<CoreMember> globalSession;
Expand Down
Expand Up @@ -19,14 +19,12 @@
*/
package org.neo4j.coreedge.raft.replication.id;

import java.util.Optional;
import java.util.function.Consumer;

import org.neo4j.coreedge.raft.replication.tx.CoreReplicatedContent;
import org.neo4j.coreedge.raft.state.CoreStateMachines;
import org.neo4j.coreedge.raft.state.CommandDispatcher;
import org.neo4j.coreedge.raft.state.Result;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.kernel.impl.store.id.IdType;

import static java.lang.String.format;
Expand Down Expand Up @@ -105,8 +103,8 @@ public String toString()
}

@Override
public void dispatch( CoreStateMachines coreStateMachines, long commandIndex, Consumer<Result> callback )
public void dispatch( CommandDispatcher commandDispatcher, long commandIndex, Consumer<Result> callback )
{
coreStateMachines.dispatch( this, commandIndex, callback );
commandDispatcher.dispatch( this, commandIndex, callback );
}
}
Expand Up @@ -20,12 +20,10 @@
package org.neo4j.coreedge.raft.replication.token;

import java.util.Arrays;
import java.util.Optional;
import java.util.function.Consumer;

import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.replication.tx.CoreReplicatedContent;
import org.neo4j.coreedge.raft.state.CoreStateMachines;
import org.neo4j.coreedge.raft.state.CommandDispatcher;
import org.neo4j.coreedge.raft.state.Result;

public class ReplicatedTokenRequest implements CoreReplicatedContent
Expand Down Expand Up @@ -91,8 +89,8 @@ public String toString()
}

@Override
public void dispatch( CoreStateMachines coreStateMachines, long commandIndex, Consumer<Result> callback )
public void dispatch( CommandDispatcher commandDispatcher, long commandIndex, Consumer<Result> callback )
{
coreStateMachines.dispatch( this, commandIndex, callback );
commandDispatcher.dispatch( this, commandIndex, callback );
}
}
Expand Up @@ -22,10 +22,10 @@
import java.util.function.Consumer;

import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.state.CoreStateMachines;
import org.neo4j.coreedge.raft.state.CommandDispatcher;
import org.neo4j.coreedge.raft.state.Result;

public interface CoreReplicatedContent extends ReplicatedContent
{
void dispatch( CoreStateMachines coreStateMachines, long commandIndex, Consumer<Result> callback );
void dispatch( CommandDispatcher commandDispatcher, long commandIndex, Consumer<Result> callback );
}
Expand Up @@ -20,10 +20,9 @@
package org.neo4j.coreedge.raft.replication.tx;

import java.util.Arrays;
import java.util.Optional;
import java.util.function.Consumer;

import org.neo4j.coreedge.raft.state.CoreStateMachines;
import org.neo4j.coreedge.raft.state.CommandDispatcher;
import org.neo4j.coreedge.raft.state.Result;

public class ReplicatedTransaction implements CoreReplicatedContent
Expand All @@ -41,9 +40,9 @@ public byte[] getTxBytes()
}

@Override
public void dispatch( CoreStateMachines coreStateMachines, long commandIndex, Consumer<Result> callback )
public void dispatch( CommandDispatcher commandDispatcher, long commandIndex, Consumer<Result> callback )
{
coreStateMachines.dispatch( this, commandIndex, callback );
commandDispatcher.dispatch( this, commandIndex, callback );
}

@Override
Expand Down

0 comments on commit 70a7a7f

Please sign in to comment.