-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
LeaderOnlyLockManager.java
261 lines (230 loc) · 9.29 KB
/
LeaderOnlyLockManager.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.core.state.machines.locks;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.neo4j.coreedge.core.consensus.LeaderLocator;
import org.neo4j.coreedge.core.consensus.NoLeaderFoundException;
import org.neo4j.coreedge.core.replication.Replicator;
import org.neo4j.coreedge.core.state.machines.tx.ReplicatedTransactionStateMachine;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.storageengine.api.lock.AcquireLockTimeoutException;
import org.neo4j.storageengine.api.lock.ResourceType;
import static org.neo4j.kernel.api.exceptions.Status.Cluster.NoLeaderAvailable;
import static org.neo4j.kernel.api.exceptions.Status.Cluster.NotALeader;
import static org.neo4j.kernel.api.exceptions.Status.Transaction.Interrupted;
/**
* Each member of the cluster uses its own {@link LeaderOnlyLockManager} which wraps a local {@link Locks} manager.
* The validity of local lock managers is synchronized by using a token which gets requested by each server as necessary
* and if the request is granted then the associated id can be used to identify a unique lock session in the cluster.
* <p/>
* The fundamental strategy is to only allow locks on the leader. This has the benefit of minimizing the synchronization
* to only concern the single token but it also means that non-leaders should not even attempt to request the token or
* significant churn of this single resource will lead to a high level of aborted transactions.
* <p/>
* The token requests carry a candidate id and they get ordered with respect to the transactions in the consensus
* machinery.
* The latest request which gets accepted (see {@link ReplicatedTransactionStateMachine}) defines the currently valid
* lock session id in this ordering. Each transaction that uses locking gets marked with a lock session id that was
* valid
* at the time of acquiring it, but by the time a transaction commits it might no longer be valid, which in such case
* would lead to the transaction being rejected and failed.
* <p/>
* The {@link ReplicatedLockTokenStateMachine} handles the token requests and considers only one to be valid at a time.
* Meanwhile, {@link ReplicatedTransactionStateMachine} rejects any transactions that get committed under an
* invalid token.
*/
// TODO: Fix lock exception usage when lock exception hierarchy has been fixed.
public class LeaderOnlyLockManager implements Locks
{
public static final String LOCK_NOT_ON_LEADER_ERROR_MESSAGE = "Should only attempt to take locks when leader.";
private final MemberId myself;
private final Replicator replicator;
private final LeaderLocator leaderLocator;
private final Locks localLocks;
private final ReplicatedLockTokenStateMachine lockTokenStateMachine;
public LeaderOnlyLockManager( MemberId myself, Replicator replicator, LeaderLocator leaderLocator, Locks localLocks,
ReplicatedLockTokenStateMachine lockTokenStateMachine )
{
this.myself = myself;
this.replicator = replicator;
this.leaderLocator = leaderLocator;
this.localLocks = localLocks;
this.lockTokenStateMachine = lockTokenStateMachine;
}
@Override
public Locks.Client newClient()
{
return new LeaderOnlyLockClient( localLocks.newClient() );
}
/**
* Acquires a valid token id owned by us or throws.
*/
private synchronized int acquireTokenOrThrow()
{
LockToken currentToken = lockTokenStateMachine.currentToken();
if ( myself.equals( currentToken.owner() ) )
{
return currentToken.id();
}
/* If we are not the leader then we will not even attempt to get the token,
since only the leader should take locks. */
ensureLeader();
ReplicatedLockTokenRequest lockTokenRequest =
new ReplicatedLockTokenRequest( myself, LockToken.nextCandidateId( currentToken.id() ) );
Future<Object> future;
try
{
future = replicator.replicate( lockTokenRequest, true );
}
catch ( InterruptedException e )
{
throw new AcquireLockTimeoutException( e, "Interrupted acquiring token.", Interrupted );
}
try
{
boolean success = (boolean) future.get();
if( success )
{
return lockTokenRequest.id();
}
else
{
throw new AcquireLockTimeoutException( "Failed to acquire lock token. Was taken by another candidate.",
NotALeader);
}
}
catch ( ExecutionException e )
{
throw new AcquireLockTimeoutException( e, "Failed to acquire lock token.", NotALeader );
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
throw new AcquireLockTimeoutException( e, "Failed to acquire lock token.", Interrupted );
}
}
private void ensureLeader()
{
MemberId leader;
try
{
leader = leaderLocator.getLeader();
}
catch ( NoLeaderFoundException e )
{
throw new AcquireLockTimeoutException( e, "Could not acquire lock token.", NoLeaderAvailable );
}
if ( !leader.equals( myself ) )
{
throw new AcquireLockTimeoutException( LOCK_NOT_ON_LEADER_ERROR_MESSAGE, NotALeader );
}
}
@Override
public void accept( Visitor visitor )
{
localLocks.accept( visitor );
}
@Override
public void close()
{
localLocks.close();
}
/**
* The LeaderOnlyLockClient delegates to a local lock client for taking locks, but makes
* sure that it holds the cluster locking token before actually taking locks. If the token
* is lost during a locking session then a transaction will either fail on a subsequent
* local locking operation or during commit time.
*/
private class LeaderOnlyLockClient implements Locks.Client
{
private final Client localClient;
private int lockTokenId = LockToken.INVALID_LOCK_TOKEN_ID;
LeaderOnlyLockClient( Client localClient )
{
this.localClient = localClient;
}
/**
* This ensures that a valid token was held at some point in time. It throws an
* exception if it was held but was later lost or never could be taken to
* begin with.
*/
private void ensureHoldingToken()
{
if ( lockTokenId == LockToken.INVALID_LOCK_TOKEN_ID )
{
lockTokenId = acquireTokenOrThrow();
}
else if ( lockTokenId != lockTokenStateMachine.currentToken().id() )
{
throw new AcquireLockTimeoutException( "Local instance lost lock token.", NotALeader );
}
}
@Override
public void acquireShared( ResourceType resourceType, long... resourceId ) throws AcquireLockTimeoutException
{
localClient.acquireShared( resourceType, resourceId );
}
@Override
public void acquireExclusive( ResourceType resourceType, long... resourceId ) throws AcquireLockTimeoutException
{
ensureHoldingToken();
localClient.acquireExclusive( resourceType, resourceId );
}
@Override
public boolean tryExclusiveLock( ResourceType resourceType, long resourceId )
{
ensureHoldingToken();
return localClient.tryExclusiveLock( resourceType, resourceId );
}
@Override
public boolean trySharedLock( ResourceType resourceType, long resourceId )
{
return localClient.trySharedLock( resourceType, resourceId );
}
@Override
public void releaseShared( ResourceType resourceType, long resourceId )
{
localClient.releaseShared( resourceType, resourceId );
}
@Override
public void releaseExclusive( ResourceType resourceType, long resourceId )
{
localClient.releaseExclusive( resourceType, resourceId );
}
@Override
public void stop()
{
localClient.stop();
}
@Override
public void close()
{
localClient.close();
}
@Override
public int getLockSessionId()
{
return lockTokenId;
}
}
}