-
Notifications
You must be signed in to change notification settings - Fork 0
/
CyclicBarrier.java
443 lines (417 loc) · 15.6 KB
/
CyclicBarrier.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* A synchronization aid that allows a set of threads to all wait for
* each other to reach a common barrier point. CyclicBarriers are
* useful in programs involving a fixed sized party of threads that
* must occasionally wait for each other. The barrier is called
* <em>cyclic</em> because it can be re-used after the waiting threads
* are released.
*
* <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command
* that is run once per barrier point, after the last thread in the party
* arrives, but before any threads are released.
* This <em>barrier action</em> is useful
* for updating shared-state before any of the parties continue.
*
* <p><b>Sample usage:</b> Here is an example of using a barrier in a
* parallel decomposition design:
*
* <pre> {@code
* class Solver {
* final int N;
* final float[][] data;
* final CyclicBarrier barrier;
*
* class Worker implements Runnable {
* int myRow;
* Worker(int row) { myRow = row; }
* public void run() {
* while (!done()) {
* processRow(myRow);
*
* try {
* barrier.await();
* } catch (InterruptedException ex) {
* return;
* } catch (BrokenBarrierException ex) {
* return;
* }
* }
* }
* }
*
* public Solver(float[][] matrix) {
* data = matrix;
* N = matrix.length;
* Runnable barrierAction =
* new Runnable() { public void run() { mergeRows(...); }};
* barrier = new CyclicBarrier(N, barrierAction);
*
* List<Thread> threads = new ArrayList<Thread>(N);
* for (int i = 0; i < N; i++) {
* Thread thread = new Thread(new Worker(i));
* threads.add(thread);
* thread.start();
* }
*
* // wait until done
* for (Thread thread : threads)
* thread.join();
* }
* }}</pre>
*
* Here, each worker thread processes a row of the matrix then waits at the
* barrier until all rows have been processed. When all rows are processed
* the supplied {@link Runnable} barrier action is executed and merges the
* rows. If the merger
* determines that a solution has been found then {@code done()} will return
* {@code true} and each worker will terminate.
*
* <p>If the barrier action does not rely on the parties being suspended when
* it is executed, then any of the threads in the party could execute that
* action when it is released. To facilitate this, each invocation of
* {@link #await} returns the arrival index of that thread at the barrier.
* You can then choose which thread should execute the barrier action, for
* example:
* <pre> {@code
* if (barrier.await() == 0) {
* // log the completion of this iteration
* }}</pre>
*
* <p>The {@code CyclicBarrier} uses an all-or-none breakage model
* for failed synchronization attempts: If a thread leaves a barrier
* point prematurely because of interruption, failure, or timeout, all
* other threads waiting at that barrier point will also leave
* abnormally via {@link BrokenBarrierException} (or
* {@link InterruptedException} if they too were interrupted at about
* the same time).
*
* <p>Memory consistency effects: Actions in a thread prior to calling
* {@code await()}
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
* actions that are part of the barrier action, which in turn
* <i>happen-before</i> actions following a successful return from the
* corresponding {@code await()} in other threads.
*
* @since 1.5
* @see CountDownLatch
*
* @author Doug Lea
*/
/**
* 循环的栅栏
*/
public class CyclicBarrier {
/**
* Each use of the barrier is represented as a generation instance.
* The generation changes whenever the barrier is tripped, or
* is reset. There can be many generations associated with threads
* using the barrier - due to the non-deterministic way the lock
* may be allocated to waiting threads - but only one of these
* can be active at a time (the one to which {@code count} applies)
* and all the rest are either broken or tripped.
* There need not be an active generation if there has been a break
* but no subsequent reset.
*/
//因为CyclicBarrier是可以循环在用的,用Generation 区分出每一代
private static class Generation {
//表示这一代是否因为异常被破坏
boolean broken = false;
}
//CyclicBarrier 是基于 ReentrantLock 和 Condition 实现的
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
/**
* 表示栅栏控制几个线程,当收到parties个信号后,栅栏才会打开
*/
private final int parties;
//栅栏打开后的一个钩子函数,可以为null
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
//count用来计数,表示还需要收到多少个信号栅栏才会打开,初始值等于parites,每收到一个信号,count减1
private int count;
/**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
private void nextGeneration() {
// signal completion of last generation
//唤醒等待队列中的节点,全部移动到同步队列
trip.signalAll();
// set up next generation
//重置count 和 generation
count = parties;
generation = new Generation();
}
/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
private void breakBarrier() {
//将generation的broken标识设置为true,让同一代的其他等待线程在唤醒时也能发现代被破坏,而非整成唤醒
generation.broken = true;
//重置count
count = parties;
//唤醒其他等待队列中的节点
trip.signalAll();
}
/**
* Main barrier code, covering the various policies.
*/
/**
* 栅栏控制的主要过程
* @param timed
* @param nanos
* @return
* @throws InterruptedException
* @throws BrokenBarrierException
* @throws TimeoutException
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//进入等待前,先用局部变量保存代的信息
final Generation g = generation;
//如果代已经被破坏,则直接抛出异常
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {//如果线程已经被中断
//破坏栅栏,抛出异常
breakBarrier();
throw new InterruptedException();
}
//收到一次信号后,count 减1
int index = --count;
if (index == 0) { //当count等于0时,说明此时需要打开栅栏
boolean ranAction = false;
try {
//如果有设置勾子,则执行勾子
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//旧一代结束,产生新一代
nextGeneration();
//返回
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
//信号量还没满
for (;;) {
try {
//如果没有设置超时,则通过condition进入等待
if (!timed)
trip.await();
else if (nanos > 0L) //否则,进入设置超时时间的等待
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
//进入这里的情形是 其他等待的线程(非最后一个给出信号量的线程)从这里返回
//因为此时generation已经在最后一个信号量给出时 生成了新的一代
if (g != generation)
return index;
//等待超时设置
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or {@code null} if there is no action
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
/**
* 构造函数
* @param parties 表示栅栏控制几个信号量
* @param barrierAction 栅栏的勾子函数
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and
* does not perform a predefined action when the barrier is tripped.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
/**
* 构造函数
* @param parties
*/
public CyclicBarrier(int parties) {
this(parties, null);
}
/**
* Returns the number of parties required to trip this barrier.
*
* @return the number of parties required to trip this barrier
*/
public int getParties() {
return parties;
}
/**
* 线程调用await方法后,会进入等待,直到N个线程都调用了await()方法(N 等于 count),等待的线程全被唤醒
* @return
* @throws InterruptedException
* @throws BrokenBarrierException
*/
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) { //await方法没有设置超时时间,因此不可能抛出超时异常,这里只是为了调整dowait方法的异常签名
throw new Error(toe); // cannot happen
}
}
/**
* 带超时时间的等待
* @param timeout
* @param unit
* @return
* @throws InterruptedException
* @throws BrokenBarrierException
* @throws TimeoutException
*/
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
/**
* Queries if this barrier is in a broken state.
*
* @return {@code true} if one or more parties broke out of this
* barrier due to interruption or timeout since
* construction or the last reset, or a barrier action
* failed due to an exception; {@code false} otherwise.
*/
/**
* 检查代是否被破坏
* @return
*/
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
/**
* Resets the barrier to its initial state. If any parties are
* currently waiting at the barrier, they will return with a
* {@link BrokenBarrierException}. Note that resets <em>after</em>
* a breakage has occurred for other reasons can be complicated to
* carry out; threads need to re-synchronize in some other way,
* and choose one to perform the reset. It may be preferable to
* instead create a new barrier for subsequent use.
*/
/**
* 重置栅栏,
* 正常结束的代会自动重置,这里用来控制外部重置代(当前代算异常结束)
*/
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
/**
* 获取正在等待信号量的线程
* @return
*/
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
}