-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
SharedLock.java
372 lines (342 loc) · 12.6 KB
/
SharedLock.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.enterprise.lock.forseti;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.neo4j.kernel.impl.util.collection.SimpleBitSet;
/**
* A Forseti share lock. Can be upgraded to an update lock, which will block new attempts at acquiring shared lock,
* but will allow existing holders to complete.
*/
class SharedLock implements ForsetiLockManager.Lock
{
/**
* The update lock flag is inlined into the ref count integer, in order to allow common CAS operations across
* both the update flag and the refCount simultaneously. This avoids a nasty series of race conditions, but
* makes the reference counting code much mode complicated. May be worth revisiting.
*/
private static final int UPDATE_LOCK_FLAG = 1 << 31;
/**
* No more holders than this allowed, don't change this without changing the sizing of
* {@link #clientsHoldingThisLock}.
*/
private static final int MAX_HOLDERS = 4680;
// TODO Investigate inlining and padding the refCount.
// TODO My gut feeling tells me there's a high chance of false-sharing
// TODO on these unpadded AtomicIntegers.
private final AtomicInteger refCount = new AtomicInteger( 1 );
/**
* When reading this, keep in mind the main design goals here: Releasing and acquiring this lock should not require
* synchronization, and the lock should have as low of a memory footprint as possible.
* <p/>
* An array of arrays containing references to clients holding this lock. Each client can only show up once.
* When the lock is created only the first reference array is created (so the last three slots in the outer array
* are empty). The outer array is populated when the reference arrays are filled up, with exponentially larger
* reference arrays:
* <p/>
* clientsHoldingThisLock[0] = 8 slots
* clientsHoldingThisLock[1] = 64 slots
* clientsHoldingThisLock[2] = 512 slots
* clientsHoldingThisLock[3] = 4096 slots
* <p/>
* Allowing a total of 4680 transactions holding the same shared lock simultaneously.
* <p/>
* This data structure was chosen over using regular resizing of the array, because we need to be able to increase
* the size of the array without requiring synchronization between threads writing to the array and threads trying
* to resize (since the threads writing to the array are on one of the hottest code paths in the database).
* <p/>
* This data structure is, however, not optimal, since it requires O(n) at worst to search for a slot and to remove
* a client from the array. This should be revisited in the future.
*/
private final AtomicReferenceArray<ForsetiClient>[] clientsHoldingThisLock = new AtomicReferenceArray[4];
/** Client that holds the update lock, if any. */
private ForsetiClient updateHolder;
SharedLock( ForsetiClient client )
{
addClientHoldingLock( client );
}
public boolean acquire( ForsetiClient client )
{
// First, bump refcount to make sure no one drops this lock on the floor
if ( !acquireReference() )
{
return false;
}
// Then add our wait list to the pile of things waiting in case if we are not there yet
// if we already waiting we will release a reference to keep counter in sync
if ( !clientHoldsThisLock( client ) )
{
// try to add client to a clients that holding current lock.
return addClientHoldingLock( client );
}
else
{
releaseReference();
return false;
}
}
public boolean release( ForsetiClient client )
{
removeClientHoldingLock( client );
return releaseReference();
}
@Override
public void copyHolderWaitListsInto( SimpleBitSet waitList )
{
for ( int i = 0; i < clientsHoldingThisLock.length; i++ )
{
AtomicReferenceArray<ForsetiClient> holders = clientsHoldingThisLock[i];
for ( int j = 0; holders != null && j < holders.length(); j++ )
{
ForsetiClient client = holders.get( j );
if ( client != null )
{
client.copyWaitListTo( waitList );
}
}
}
}
@Override
public int detectDeadlock( int clientId )
{
for ( int i = 0; i < clientsHoldingThisLock.length; i++ )
{
AtomicReferenceArray<ForsetiClient> holders = clientsHoldingThisLock[i];
for ( int j = 0; holders != null && j < holders.length(); j++ )
{
ForsetiClient client = holders.get( j );
if ( client != null && client.isWaitingFor( clientId ) )
{
return client.id();
}
}
}
return -1;
}
public boolean tryAcquireUpdateLock( ForsetiClient client )
{
while ( true )
{
int refs = refCount.get();
if ( refs > 0 /* UPDATE_LOCK flips the sign bit, so refs will be < 0 if it is an update lock. */ )
{
if ( refCount.compareAndSet( refs, refs | UPDATE_LOCK_FLAG ) )
{
updateHolder = client;
return true;
}
}
else
{
return false;
}
}
}
public void releaseUpdateLock( ForsetiClient client )
{
while ( true )
{
int refs = refCount.get();
cleanUpdateHolder();
if ( refCount.compareAndSet( refs, refs & ~UPDATE_LOCK_FLAG ) )
{
return;
}
}
}
public void cleanUpdateHolder()
{
updateHolder = null;
}
public int numberOfHolders()
{
return refCount.get() & ~UPDATE_LOCK_FLAG;
}
public boolean isUpdateLock()
{
return (refCount.get() & UPDATE_LOCK_FLAG) == UPDATE_LOCK_FLAG;
}
@Override
public String describeWaitList()
{
StringBuilder sb = new StringBuilder( "SharedLock[" );
for ( int i = 0; i < clientsHoldingThisLock.length; i++ )
{
AtomicReferenceArray<ForsetiClient> holders = clientsHoldingThisLock[i];
boolean first = true;
for ( int j = 0; holders != null && j < holders.length(); j++ )
{
ForsetiClient current = holders.get( j );
if ( current != null )
{
sb.append( first ? "" : ", " ).append( current.describeWaitList() );
first = false;
}
}
}
return sb.append( "]" ).toString();
}
@Override
public void collectOwners( Set<ForsetiClient> owners )
{
for ( AtomicReferenceArray<ForsetiClient> ownerArray : clientsHoldingThisLock )
{
if ( ownerArray != null )
{
int len = ownerArray.length();
for ( int i = 0; i < len; i++ )
{
ForsetiClient owner = ownerArray.get( i );
if ( owner != null )
{
owners.add( owner );
}
}
}
}
}
@Override
public String toString()
{
// TODO we should only read out the refCount once, and build a deterministic string based on that
if ( isUpdateLock() )
{
return "UpdateLock{" +
"objectId=" + System.identityHashCode( this ) +
", refCount=" + (refCount.get() & ~UPDATE_LOCK_FLAG) +
", holder=" + updateHolder +
'}';
}
else
{
return "SharedLock{" +
"objectId=" + System.identityHashCode( this ) +
", refCount=" + refCount +
'}';
}
}
private void removeClientHoldingLock( ForsetiClient client )
{
for ( int i = 0; i < clientsHoldingThisLock.length; i++ )
{
AtomicReferenceArray<ForsetiClient> holders = clientsHoldingThisLock[i];
if ( holders == null )
{
break;
}
for ( int j = 0; j < holders.length(); j++ )
{
ForsetiClient current = holders.get( j );
if ( current != null && current.equals( client ) )
{
holders.set( j, null );
return;
}
}
}
throw new IllegalStateException(
client + " asked to be removed from holder list, but it does not hold " + this );
}
private boolean addClientHoldingLock( ForsetiClient client )
{
while ( true )
{
for ( int i = 0; i < clientsHoldingThisLock.length; i++ )
{
AtomicReferenceArray<ForsetiClient> holders = clientsHoldingThisLock[i];
if ( holders == null )
{
holders = addHolderArray( i );
}
for ( int j = 0; j < holders.length(); j++ )
{
ForsetiClient c = holders.get( j );
if ( c == null )
{
// TODO This means we do CAS on each entry, very likely hitting a lot of failures until we
// TODO find a slot. We should look into better strategies here.
// TODO One such strategy could be binary searching for a free slot, and then linear scan
// TODO after that if the CAS fails on the slot we found with binary search.
if ( holders.compareAndSet( j, null, client ) )
{
return true;
}
}
}
}
}
}
private boolean acquireReference()
{
while ( true )
{
int refs = refCount.get();
// UPDATE_LOCK flips the sign bit, so refs will be < 0 if it is an update lock.
if ( refs > 0 && refs < MAX_HOLDERS )
{
if ( refCount.compareAndSet( refs, refs + 1 ) )
{
return true;
}
}
else
{
return false;
}
}
}
private boolean releaseReference()
{
while ( true )
{
int refAndUpdateFlag = refCount.get();
int newRefCount = (refAndUpdateFlag & ~UPDATE_LOCK_FLAG) - 1;
if ( refCount.compareAndSet( refAndUpdateFlag, newRefCount | (refAndUpdateFlag & UPDATE_LOCK_FLAG) ) )
{
return newRefCount == 0;
}
}
}
private synchronized AtomicReferenceArray<ForsetiClient> addHolderArray( int slot )
{
if ( clientsHoldingThisLock[slot] == null )
{
clientsHoldingThisLock[slot] = new AtomicReferenceArray<>( (int) (8 * Math.pow( 8, slot )) );
}
return clientsHoldingThisLock[slot];
}
private boolean clientHoldsThisLock( ForsetiClient client )
{
for ( int i = 0; i < clientsHoldingThisLock.length; i++ )
{
AtomicReferenceArray<ForsetiClient> holders = clientsHoldingThisLock[i];
for ( int j = 0; holders != null && j < holders.length(); j++ )
{
ForsetiClient current = holders.get( j );
if ( current != null && current.equals( client ) )
{
return true;
}
}
}
return false;
}
}