-
Notifications
You must be signed in to change notification settings - Fork 20
/
PhaserExample.java
113 lines (97 loc) · 3.71 KB
/
PhaserExample.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package com.schlimm.java7.concurrency.phaser;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Phaser;
public class PhaserExample {
void runTasks(List<Runnable> tasks) throws InterruptedException {
/** The phaser is a nice synchronization barrier. */
final Phaser phaser = new Phaser(1) {
/**
* onAdvance() is invoked when all threads reached the synchronization barrier. It returns true if the
* phaser should terminate, false if phaser should continue with next phase. When terminated: (1) attempts
* to register new parties have no effect and (2) synchronization methods immediately return without waiting
* for advance. When continue:
*
* <pre>
* -> set unarrived parties = registered parties
* -> set arrived parties = 0
* -> set phase = phase + 1
* </pre>
*
* This causes another iteration for all thread parties in a new phase (cycle).
*
*/
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("On advance" + " -> Registered: " + getRegisteredParties() + " - Unarrived: "
+ getUnarrivedParties() + " - Arrived: " + getArrivedParties() + " - Phase: " + getPhase());
/**
* This onAdvance() implementation causes the phaser to cycle 1 time (= 2 iterations).
*/
return phase >= 1 || registeredParties == 0;
}
};
dumpPhaserState("After phaser init", phaser);
/** Create and start threads. */
for (final Runnable task : tasks) {
/**
* Increase the number of unarrived parties -> equals the number of parties required to advance to the next
* phase.
*/
phaser.register();
dumpPhaserState("After register", phaser);
new Thread() {
public void run() {
do {
/**
* Wait for all threads reaching the synchronization barrier: more precisely, wait for arrived
* parties = registered parties. If arrived parties = registered parties: phase advances and
* onAdvance() is invoked.
*/
phaser.arriveAndAwaitAdvance();
task.run();
} while (!phaser.isTerminated());
}
}.start();
Thread.sleep(500);
dumpPhaserState("After arrival", phaser);
}
/**
* When the final party for a given phase arrives, onAdvance() is invoked and the phase advances. The
* "face advances" means that all threads reached the barrier and therefore all threads are synchronized and can
* continue processing.
*/
dumpPhaserState("Before main thread arrives and deregisters", phaser);
/**
* The arrival and deregistration of the main thread allows the other threads to start working. This is because
* now the registered parties equal the arrived parties.
*/
phaser.arriveAndDeregister();
dumpPhaserState("After main thread arrived and deregistered", phaser);
System.out.println("Main thread will terminate ...");
}
private void dumpPhaserState(String when, Phaser phaser) {
System.out.println(when + " -> Registered: " + phaser.getRegisteredParties() + " - Unarrived: "
+ phaser.getUnarrivedParties() + " - Arrived: " + phaser.getArrivedParties() + " - Phase: "
+ phaser.getPhase());
}
public static void main(String[] args) throws InterruptedException {
List<Runnable> tasks = new ArrayList<>();
for (int i = 0; i < 2; i++) {
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":go :" + new Date());
int a = 0, b = 1;
for (int i = 0; i < 2000000000; i++) {
a = a + b;
b = a - b;
}
System.out.println(Thread.currentThread().getName() + ":done:" + new Date());
}
};
tasks.add(runnable);
}
new PhaserExample().runTasks(tasks);
}
}