Skip to content

Commit

Permalink
Merge pull request #6238 from martinfurmanski/lock-token
Browse files Browse the repository at this point in the history
Refactor leader-only lock manager around the token concept
  • Loading branch information
apcj committed Jan 19, 2016
2 parents 78a6133 + 0224f99 commit 12324ac
Show file tree
Hide file tree
Showing 25 changed files with 567 additions and 433 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ void visit( ResourceType resourceType, long resourceId, String description, long

interface Client extends ResourceLocker, AutoCloseable
{
/**
* Represents the fact that no lock session is used because no locks are taken.
*/
int NO_LOCK_SESSION_ID = -1;

/**
* Can be grabbed when there are no locks or only share locks on a resource. If the lock cannot be acquired,
* behavior is specified by the {@link WaitStrategy} for the given {@link ResourceType}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.kernel.impl.transaction;

import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.storageengine.api.CommandStream;

Expand Down Expand Up @@ -61,7 +62,7 @@ public interface TransactionRepresentation extends CommandStream
long getTimeCommitted();

/**
* @return the identifier for the lock session associated with this transaction, or {-1} if none.
* @return the identifier for the lock session associated with this transaction, or {@value Locks.Client#NO_LOCK_SESSION_ID} if none.
* This is only used for slave commits.
*/
int getLockSessionId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.neo4j.coreedge.raft.replication.tx.ReplicatedTransaction;
import org.neo4j.coreedge.raft.replication.tx.ReplicatedTransactionSerializer;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.core.locks.ReplicatedLockRequest;
import org.neo4j.coreedge.server.core.locks.ReplicatedLockTokenRequest;

public class CoreReplicatedContentMarshal implements ReplicatedContentMarshal<ByteBuf>
{
Expand All @@ -46,7 +46,7 @@ public class CoreReplicatedContentMarshal implements ReplicatedContentMarshal<By
private static final byte SEED_STORE_ID_TYPE = 3;
private static final byte TOKEN_REQUEST_TYPE = 4;
private static final byte NEW_LEADER_BARRIER_TYPE = 5;
private static final byte LOCK_REQUEST = 6;
private static final byte LOCK_TOKEN_REQUEST = 6;

@Override
public void serialize( ReplicatedContent content, ByteBuf buffer ) throws MarshallingException
Expand Down Expand Up @@ -80,10 +80,10 @@ else if ( content instanceof NewLeaderBarrier )
{
buffer.writeByte( NEW_LEADER_BARRIER_TYPE );
}
else if( content instanceof ReplicatedLockRequest )
else if( content instanceof ReplicatedLockTokenRequest )
{
buffer.writeByte( LOCK_REQUEST );
ReplicatedLockRequestSerializer.serialize( (ReplicatedLockRequest<CoreMember>) content, buffer );
buffer.writeByte( LOCK_TOKEN_REQUEST );
ReplicatedLockTokenSerializer.serialize( (ReplicatedLockTokenRequest<CoreMember>) content, buffer );
}
else
{
Expand Down Expand Up @@ -121,8 +121,8 @@ public ReplicatedContent deserialize( ByteBuf buffer ) throws MarshallingExcepti
case NEW_LEADER_BARRIER_TYPE:
content = new NewLeaderBarrier();
break;
case LOCK_REQUEST:
content = ReplicatedLockRequestSerializer.deserialize( buffer );
case LOCK_TOKEN_REQUEST:
content = ReplicatedLockTokenSerializer.deserialize( buffer );
break;
default:
throw new MarshallingException( String.format( "Unknown content type 0x%x", type ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@
import io.netty.buffer.ByteBuf;

import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.core.locks.ReplicatedLockRequest;
import org.neo4j.coreedge.server.core.locks.ReplicatedLockTokenRequest;

public class ReplicatedLockRequestSerializer
public class ReplicatedLockTokenSerializer
{
public static void serialize( ReplicatedLockRequest<CoreMember> content, ByteBuf buffer )
public static void serialize( ReplicatedLockTokenRequest<CoreMember> tokenRequest, ByteBuf buffer )
{
buffer.writeInt( content.requestedLockSessionId() );
new CoreMember.CoreMemberMarshal().marshal( content.owner(), buffer );
buffer.writeInt( tokenRequest.id() );
new CoreMember.CoreMemberMarshal().marshal( tokenRequest.owner(), buffer );
}

public static ReplicatedLockRequest<CoreMember> deserialize( ByteBuf buffer )
public static ReplicatedLockTokenRequest<CoreMember> deserialize( ByteBuf buffer )
{
int requestedLockSessionId = buffer.readInt();
int candidateId = buffer.readInt();
CoreMember owner = new CoreMember.CoreMemberMarshal().unmarshal( buffer );

return new ReplicatedLockRequest<>( owner, requestedLockSessionId );
return new ReplicatedLockTokenRequest<>( owner, candidateId );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@
import org.neo4j.coreedge.raft.replication.Replicator;
import org.neo4j.coreedge.raft.replication.session.GlobalSession;
import org.neo4j.coreedge.raft.replication.session.GlobalSessionTracker;
import org.neo4j.coreedge.server.core.locks.CurrentReplicatedLockState;
import org.neo4j.coreedge.server.core.locks.LockTokenManager;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.storageengine.api.TransactionApplicationMode;
Expand All @@ -41,19 +42,19 @@ public class ReplicatedTransactionStateMachine implements Replicator.ReplicatedC
{
private final GlobalSessionTracker sessionTracker;
private final GlobalSession myGlobalSession;
private final CurrentReplicatedLockState currentReplicatedLockState;
private final LockTokenManager lockTokenManager;
private final TransactionCommitProcess commitProcess;
private final CommittingTransactions transactionFutures;
private long lastCommittedIndex = -1;

public ReplicatedTransactionStateMachine( TransactionCommitProcess commitProcess,
GlobalSession myGlobalSession,
CurrentReplicatedLockState currentReplicatedLockState,
LockTokenManager lockTokenManager,
CommittingTransactions transactionFutures )
{
this.commitProcess = commitProcess;
this.myGlobalSession = myGlobalSession;
this.currentReplicatedLockState = currentReplicatedLockState;
this.lockTokenManager = lockTokenManager;
this.transactionFutures = transactionFutures;
this.sessionTracker = new GlobalSessionTracker();
}
Expand Down Expand Up @@ -94,14 +95,15 @@ private void handleTransaction( ReplicatedTransaction replicatedTx, long logInde
Optional.ofNullable( transactionFutures.retrieve( replicatedTx.localOperationId() ) ) :
Optional.<CommittingTransaction>empty();

int currentLockSessionId = currentReplicatedLockState.currentLockSession().id();
int currentTokenId = lockTokenManager.currentToken().id();
int txLockSessionId = tx.getLockSessionId();
if ( currentLockSessionId != txLockSessionId )

if ( currentTokenId != txLockSessionId && txLockSessionId != Locks.Client.NO_LOCK_SESSION_ID )
{
future.ifPresent( txFuture -> txFuture.notifyCommitFailed( new TransactionFailureException( LockSessionInvalid,
"The lock session in the cluster has changed: " +
"[current lock session id:%d, tx lock session id:%d]",
currentLockSessionId, txLockSessionId ) ) );
currentTokenId, txLockSessionId ) ) );
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ public String toString()
public static final Setting<Long> token_creation_timeout =
setting( "core_edge.token_creation_timeout", DURATION, "1s" );

@Description("Time out waiting for the leader locking token")
public static final Setting<Long> leader_lock_token_timeout =
setting( "core_edge.leader_lock_token_timeout", DURATION, "1s" );

@Description("Expected size of core cluster")
public static final Setting<Integer> expected_core_cluster_size =
setting( "core_edge.expected_core_cluster_size", INTEGER, "3" );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@
import org.neo4j.coreedge.server.ExpiryScheduler;
import org.neo4j.coreedge.server.ListenSocketAddress;
import org.neo4j.coreedge.server.SenderService;
import org.neo4j.coreedge.server.core.locks.CurrentReplicatedLockState;
import org.neo4j.coreedge.server.core.locks.LockTokenManager;
import org.neo4j.coreedge.server.core.locks.LeaderOnlyLockManager;
import org.neo4j.coreedge.server.core.locks.ReplicatedLockStateMachine;
import org.neo4j.coreedge.server.core.locks.ReplicatedLockTokenStateMachine;
import org.neo4j.coreedge.server.logging.BetterMessageLogger;
import org.neo4j.coreedge.server.logging.MessageLogger;
import org.neo4j.graphdb.DependencyResolver;
Expand Down Expand Up @@ -237,10 +237,10 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,

LocalSessionPool localSessionPool = new LocalSessionPool( myself );

ReplicatedLockStateMachine<CoreMember> replicatedLockStateMachine = new ReplicatedLockStateMachine<>( myself,
replicator );
ReplicatedLockTokenStateMachine<CoreMember> replicatedLockTokenStateMachine =
new ReplicatedLockTokenStateMachine<>( replicator );

commitProcessFactory = createCommitProcessFactory( replicator, localSessionPool, replicatedLockStateMachine,
commitProcessFactory = createCommitProcessFactory( replicator, localSessionPool, replicatedLockTokenStateMachine,
dependencies, logging );

final IdAllocationState idAllocationState;
Expand All @@ -255,11 +255,9 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
throw new RuntimeException( e );
}


ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( myself,
idAllocationState );


replicator.subscribe( idAllocationStateMachine );

// TODO: AllocationChunk should be configurable and per type. The retry timeout should also be configurable.
Expand Down Expand Up @@ -314,8 +312,9 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
channelInitializer ) );
channelInitializer.setOwner( coreToCoreClient );

long leaderLockTokenTimeout = config.get( CoreEdgeClusterSettings.leader_lock_token_timeout );
lockManager = dependencies.satisfyDependency( createLockManager( config, logging, replicator, myself,
replicatedLockStateMachine, raft ) );
replicatedLockTokenStateMachine, raft, leaderLockTokenTimeout ) );

CatchupServer catchupServer = new CatchupServer( logProvider,
new StoreIdSupplier( platformModule ),
Expand Down Expand Up @@ -359,7 +358,7 @@ private File createClusterStateDirectory( File dir, FileSystemAbstraction fileSy

public static CommitProcessFactory createCommitProcessFactory( final Replicator replicator,
final LocalSessionPool localSessionPool,
final CurrentReplicatedLockState
final LockTokenManager
currentReplicatedLockState,
final Dependencies dependencies,
final LogService logging )
Expand Down Expand Up @@ -412,7 +411,6 @@ private static RaftInstance<CoreMember> createRaft( LifeSupport life,

CoreMemberSetBuilder memberSetBuilder = new CoreMemberSetBuilder();


Replicator localReplicator = new LeaderOnlyReplicator<>( myself, myself.getRaftAddress(), outbound );

RaftMembershipManager<CoreMember> raftMembershipManager = new RaftMembershipManager<>( localReplicator,
Expand Down Expand Up @@ -470,12 +468,12 @@ protected ReplicatedIdGeneratorFactory createIdGeneratorFactory( FileSystemAbstr
}

protected Locks createLockManager( final Config config, final LogService logging, final Replicator replicator,
CoreMember myself, ReplicatedLockStateMachine<CoreMember>
replicatedLockStateMachine, LeaderLocator<CoreMember> leaderLocator )
CoreMember myself, LockTokenManager lockTokenManager,
LeaderLocator<CoreMember> leaderLocator, long leaderLockTokenTimeout )
{
Locks local = CommunityEditionModule.createLockManager( config, logging );

return new LeaderOnlyLockManager<>( myself, replicator, leaderLocator, local, replicatedLockStateMachine );
return new LeaderOnlyLockManager<>( myself, replicator, leaderLocator, local, lockTokenManager, leaderLockTokenTimeout );
}

protected TransactionHeaderInformationFactory createHeaderInformationFactory()
Expand Down

0 comments on commit 12324ac

Please sign in to comment.