Skip to content

Commit

Permalink
Coordinate local lock managers via replicated lock.
Browse files Browse the repository at this point in the history
Each member of the cluster uses its own LeaderOnlyLockManager
which wraps a local Locks.

To prevent conflict between the local Locks, before issuing
any locks, each server must first obtain a replicated exclusive
lock via ReplicatedLockStateMachine.

Since replication is only allowed from the leader, this means
that only the leader is able to obtain a replicated lock, and
therefore only the leader can issue locks.
  • Loading branch information
martinfurmanski authored and apcj committed Dec 12, 2015
1 parent 4403517 commit 6e98dea
Show file tree
Hide file tree
Showing 12 changed files with 668 additions and 118 deletions.
Expand Up @@ -35,6 +35,8 @@
import org.neo4j.coreedge.raft.replication.token.ReplicatedTokenRequestSerializer;
import org.neo4j.coreedge.raft.replication.tx.ReplicatedTransaction;
import org.neo4j.coreedge.raft.replication.tx.ReplicatedTransactionSerializer;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.core.ReplicatedLockRequest;

public class CoreReplicatedContentMarshal implements ReplicatedContentMarshal<ByteBuf>
{
Expand All @@ -44,6 +46,7 @@ public class CoreReplicatedContentMarshal implements ReplicatedContentMarshal<By
private static final byte SEED_STORE_ID_TYPE = 3;
private static final byte TOKEN_REQUEST_TYPE = 4;
private static final byte NEW_LEADER_BARRIER_TYPE = 5;
private static final byte LOCK_REQUEST = 6;

@Override
public void serialize( ReplicatedContent content, ByteBuf buffer ) throws MarshallingException
Expand Down Expand Up @@ -77,6 +80,11 @@ else if ( content instanceof NewLeaderBarrier )
{
buffer.writeByte( NEW_LEADER_BARRIER_TYPE );
}
else if( content instanceof ReplicatedLockRequest )
{
buffer.writeByte( LOCK_REQUEST );
ReplicatedLockRequestSerializer.serialize( (ReplicatedLockRequest<CoreMember>) content, buffer );
}
else
{
throw new IllegalArgumentException( "Unknown content type " + content.getClass() );
Expand Down Expand Up @@ -113,6 +121,9 @@ public ReplicatedContent deserialize( ByteBuf buffer ) throws MarshallingExcepti
case NEW_LEADER_BARRIER_TYPE:
content = new NewLeaderBarrier();
break;
case LOCK_REQUEST:
content = ReplicatedLockRequestSerializer.deserialize( buffer );
break;
default:
throw new MarshallingException( String.format( "Unknown content type 0x%x", type ) );
}
Expand Down
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.net;

import io.netty.buffer.ByteBuf;

import org.neo4j.coreedge.raft.membership.CoreMemberMarshal;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.core.ReplicatedLockRequest;

public class ReplicatedLockRequestSerializer
{
public static void serialize( ReplicatedLockRequest<CoreMember> content, ByteBuf buffer )
{
buffer.writeInt( content.requestedLockSessionId() );
CoreMemberMarshal.serialize( content.owner(), buffer );
}

public static ReplicatedLockRequest<CoreMember> deserialize( ByteBuf buffer )
{
int requestedLockSessionId = buffer.readInt();
CoreMember owner = CoreMemberMarshal.deserialize( buffer );

return new ReplicatedLockRequest<>( owner, requestedLockSessionId );
}
}
Expand Up @@ -89,7 +89,6 @@ public void start()
{
throw new IllegalStateException( "lastCommittedIndex must be set before start." );
}
tokenCache.clear();
replicator.subscribe( this );
}

Expand All @@ -102,8 +101,8 @@ public void stop() throws Throwable
@Override
public void setInitialTokens( List<TOKEN> tokens ) throws NonUniqueTokenException
{
// TODO: There is no need to initialize tokens until we have implemented raft log compression.
// TODO: at the moment, we receive all token allocations since the beginning of time.
tokenCache.clear();
tokenCache.putAll( tokens );
}

@Override
Expand Down
Expand Up @@ -20,49 +20,46 @@
package org.neo4j.coreedge.raft.replication.tx;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;

import org.neo4j.concurrent.CompletableFuture;
import org.neo4j.coreedge.raft.NewLeaderBarrier;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.replication.Replicator;
import org.neo4j.coreedge.raft.replication.session.GlobalSession;
import org.neo4j.coreedge.raft.replication.session.GlobalSessionTracker;
import org.neo4j.coreedge.raft.replication.session.LocalOperationId;
import org.neo4j.coreedge.server.core.CurrentReplicatedLockState;
import org.neo4j.graphdb.TransientFailureException;
import org.neo4j.graphdb.TransientTransactionFailureException;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.api.TransactionApplicationMode;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.impl.util.Dependencies;

import static org.neo4j.coreedge.raft.replication.tx.LogIndexTxHeaderEncoding.encodeLogIndexAsTxHeader;

public class ReplicatedTransactionStateMachine implements Replicator.ReplicatedContentListener
{
private final GlobalSessionTracker sessionTracker;
private final GlobalSession myGlobalSession;
private final Dependencies dependencies;
private final CurrentReplicatedLockState currentReplicatedLockState;
private final TransactionCommitProcess commitProcess;
private final Map<LocalOperationId, FutureTxId> outstanding = new ConcurrentHashMap<>();
private long lastCommittedIndex = -1;
private long lastCommittedTxId; // Maintains the last committed tx id, used to set the next field
private long reignStartTxId; // Maintains the last txid committed under the previous service assignment

public ReplicatedTransactionStateMachine( TransactionCommitProcess commitProcess,
GlobalSession myGlobalSession, Dependencies dependencies )
GlobalSession myGlobalSession,
CurrentReplicatedLockState currentReplicatedLockState )
{
this.commitProcess = commitProcess;
this.myGlobalSession = myGlobalSession;
this.dependencies = dependencies;
this.currentReplicatedLockState = currentReplicatedLockState;
this.sessionTracker = new GlobalSessionTracker();
}

Expand All @@ -80,26 +77,6 @@ public synchronized void onReplicated( ReplicatedContent content, long logIndex
{
handleTransaction( (ReplicatedTransaction) content, logIndex );
}
if ( content instanceof NewLeaderBarrier )
{
try
{
reignStartTxId = appendBarrierTx( logIndex );
}
catch ( TransactionFailureException e )
{
throw new RuntimeException( e );
}
}
}

private long appendBarrierTx( long logIndex ) throws TransactionFailureException
{
PhysicalTransactionRepresentation dummyTx = new PhysicalTransactionRepresentation( Collections.emptyList() );
// TODO we need to set the "-1"'s below to useful values
dummyTx.setHeader( encodeLogIndexAsTxHeader( logIndex ), -1, -1, -1, -1, -1, -1 );

return commitProcess.commit( new TransactionToApply( dummyTx ), CommitEvent.NULL, TransactionApplicationMode.EXTERNAL );
}

private void handleTransaction( ReplicatedTransaction replicatedTx, long logIndex )
Expand All @@ -121,10 +98,10 @@ private void handleTransaction( ReplicatedTransaction replicatedTx, long logInde
Optional.ofNullable( outstanding.remove( replicatedTx.localOperationId() ) ) :
Optional.<CompletableFuture<Long>>empty();

if ( tx.getLatestCommittedTxWhenStarted() < reignStartTxId )
if ( currentReplicatedLockState.currentLockSession().id() != tx.getLockSessionId() )
{
future.ifPresent( txFuture -> txFuture.completeExceptionally( new TransientTransactionFailureException(
"Attempt to commit transaction that was started on a different leader term. " +
"Attempt to commit transaction that was started on a different leader. " +
"Please retry the transaction." ) ) );
return;
}
Expand Down
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.server.core;

public interface CurrentReplicatedLockState
{
LockSession currentLockSession();

interface LockSession
{
int id();

boolean isMine();
}
}
Expand Up @@ -197,7 +197,9 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,

LocalSessionPool localSessionPool = new LocalSessionPool( myself );

commitProcessFactory = createCommitProcessFactory( replicator, localSessionPool, dependencies, SYSTEM_CLOCK );
ReplicatedLockStateMachine replicatedLockStateMachine = new ReplicatedLockStateMachine( myself, replicator );

commitProcessFactory = createCommitProcessFactory( replicator, localSessionPool, replicatedLockStateMachine, dependencies, SYSTEM_CLOCK );

ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( myself, new InMemoryIdAllocationStateStore() );
replicator.subscribe( idAllocationStateMachine );
Expand Down Expand Up @@ -252,7 +254,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
channelInitializer ) );
channelInitializer.setOwner( coreToCoreClient );

lockManager = dependencies.satisfyDependency( createLockManager( config, logging ) );
lockManager = dependencies.satisfyDependency( createLockManager( config, logging, replicator, myself, replicatedLockStateMachine ) );

LocalDatabase localDatabase =
new LocalDatabase( platformModule.storeDir,
Expand Down Expand Up @@ -303,6 +305,7 @@ private File createRaftLogsDirectory( File dir, FileSystemAbstraction fileSystem

public static CommitProcessFactory createCommitProcessFactory( final Replicator replicator,
final LocalSessionPool localSessionPool,
CurrentReplicatedLockState currentReplicatedLockState,
final Dependencies dependencies,
final Clock clock )
{
Expand All @@ -312,7 +315,7 @@ public static CommitProcessFactory createCommitProcessFactory( final Replicator
dependencies.satisfyDependencies( localCommit );

ReplicatedTransactionStateMachine replicatedTxStateMachine = new ReplicatedTransactionStateMachine(
localCommit, localSessionPool.getGlobalSession(), dependencies );
localCommit, localSessionPool.getGlobalSession(), currentReplicatedLockState );

dependencies.satisfyDependencies( replicatedTxStateMachine );

Expand Down Expand Up @@ -414,11 +417,12 @@ protected ReplicatedIdGeneratorFactory createIdGeneratorFactory( FileSystemAbstr
return new ReplicatedIdGeneratorFactory( fileSystem, idRangeAcquirer, logProvider );
}

protected Locks createLockManager( final Config config, final LogService logging )
protected Locks createLockManager( final Config config, final LogService logging, final Replicator replicator,
CoreMember myself, ReplicatedLockStateMachine replicatedLockStateMachine )
{
Locks local = CommunityEditionModule.createLockManager( config, logging );

return local;
return new LeaderOnlyLockManager( myself, replicator, local, replicatedLockStateMachine );
}

protected TransactionHeaderInformationFactory createHeaderInformationFactory()
Expand Down

0 comments on commit 6e98dea

Please sign in to comment.