-
Notifications
You must be signed in to change notification settings - Fork 1
/
CappuccinoTest.java
237 lines (187 loc) · 7.93 KB
/
CappuccinoTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
package no.systek.dataflow;
import no.systek.dataflow.steps.CollectorStep;
import no.systek.dataflow.steps.PairJoinStep;
import no.systek.dataflow.types.*;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
public class CappuccinoTest extends AbstractStepTest {
@Test
public void makeOneCappuccino() {
final Random random = new Random(203);
/*
* The following graph of steps produces one cappuccino:
*
* TapWater GrindBeans FoamMilk
* | | |
* v | |
* HeatWater <------+ | |
* | | | |
* v | | |
* HotEnough? --no--+ | |
* | | |
* yes | |
* | v |
* +----------------> brew |
* | |
* v |
* Cappuccino <------------+
*
* credit: http://stackoverflow.com/questions/10855045/executing-dependent-tasks-in-parallel-in-java
*
*/
TapWater tapWater = new TapWater();
FoamMilk foamMilk = new FoamMilk();
GrindBeans grindBeans = new GrindBeans();
HotEnough hotEnough = new HotEnough();
CoffeeBrewer brew = new CoffeeBrewer();
Step<Water, Water> heatWater = Steps.newParallel(water ->
work(new Water(random.nextInt(10) + 90), "Heating water"));
CappuccinoStep cappuccino = new CappuccinoStep();
// Setup execution plan by setting the tasks dependencies
cappuccino.dependsOnLeft(brew.output());
cappuccino.dependsOnRight(foamMilk.output());
brew.dependsOnLeft(grindBeans.output());
brew.dependsOnRight(hotEnough.ifTrue());
hotEnough.dependsOn(heatWater.output());
heatWater.dependsOn(hotEnough.ifFalse());
heatWater.dependsOn(tapWater.output());
assertThat(stepExecutor.execute(cappuccino, new Order("CappuccinoOrder")), notNullValue());
}
@Test
public void makeMultipleCappuccino() {
final Random random = new Random(203);
/*
* Let's make multiple cappuccinos in parallel. The water boiler can boil water for two cups at once, so it
* waits until two inputs are available.
*
* OrderSplitter
* |
* |
* +--> GrindBeans-------------------------------------------+
* | |
* +--> TapWater |
* | | |
* | v v
* | Collect(2)----> HeatWater----> HotEnough?---yes---> Brew
* | ^ | |
* | +-------------------------no----+ |
* | v
* +--> FoamMilk--------------------------------------> Cappuccino
*
*/
TapWater tapWater = new TapWater();
FoamMilk foamMilk = new FoamMilk();
GrindBeans grindBeans = new GrindBeans();
HotEnough hotEnough = new HotEnough();
CoffeeBrewer brew = new CoffeeBrewer();
Step<List<Water>, Water> heatWater = Steps.newParallelListStep(waters -> work(
waters.stream().map(water -> new Water(random.nextInt(10) + 90)).collect(Collectors.toList()),
"heating multiple waters at once (" + waters.size() + ")"));
CollectorStep<Water> collector = Steps.newCollector(2);
Step<List<Order>, Order> orderSplitter = Steps.newParallelListStep(orders -> orders);
CappuccinoStep cappuccino = new CappuccinoStep();
// Setup execution plan by setting the tasks dependencies
cappuccino.dependsOnLeft(brew.output());
cappuccino.dependsOnRight(foamMilk.output());
brew.dependsOnLeft(grindBeans.output());
brew.dependsOnRight(hotEnough.ifTrue());
hotEnough.dependsOn(heatWater.output());
heatWater.dependsOn(collector.output()); // water heater is shared, heats for 2 orders at the same time
collector.dependsOn(hotEnough.ifFalse());
collector.dependsOn(tapWater.output());
tapWater.dependsOn(orderSplitter.output());
grindBeans.dependsOn(orderSplitter.output());
foamMilk.dependsOn(orderSplitter.output());
List<Cappuccino> cappuccinos = stepExecutor.executeList(cappuccino, Arrays.asList(
new Order("Order1"),
new Order("order2"),
new Order("order3")));
assertThat(cappuccinos.size(), is(3));
}
private static <T> T work(T doneValue, String task) {
log(task, "...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log(task, "...done");
return doneValue;
}
private static void log(String... msg) {
System.out.println(Thread.currentThread().getName() + ": " + String.join(" ", msg));
}
/*
* Defining small and reusable steps for the tests
*/
private static class HotEnough extends Steps.SimpleConditionalStep<Water> {
HotEnough() {
super(null);
}
@Override
boolean test(Water water) {
boolean result = water.temperature > 95;
log("Water hot enough? " + result);
return result;
}
}
private static class TapWater extends Steps.SimpleStep<Order, Water> {
TapWater() {
super(null, Integer.MAX_VALUE);
}
@Override
Water execute(Order order) {
return work(new Water(18), "tapping water for " + order);
}
}
private static class CoffeeBrewer extends PairJoinStep<GrindedCoffee, Water, BlackCoffee> {
CoffeeBrewer() {
super(null);
}
@Override
protected BlackCoffee join(GrindedCoffee grindedCoffee, Water water) {
return work(new BlackCoffee(grindedCoffee, water), "brewing coffee");
}
@Override
protected boolean isLeft(Object input) {
return input instanceof GrindedCoffee;
}
}
private static class CappuccinoStep extends PairJoinStep<BlackCoffee, FoamedMilk, Cappuccino> {
CappuccinoStep() {
super(null);
}
@Override
protected Cappuccino join(BlackCoffee blackCoffee, FoamedMilk foamedMilk) {
return work(new Cappuccino(blackCoffee, foamedMilk), "making cappuccino");
}
@Override
protected boolean isLeft(Object input) {
return input instanceof BlackCoffee;
}
}
private final class GrindBeans extends Steps.SimpleStep<Order, GrindedCoffee> {
GrindBeans() {
super(null, Integer.MAX_VALUE);
}
@Override
GrindedCoffee execute(Order order) {
return work(new GrindedCoffee(order), "grinding coffee for " + order);
}
}
private final class FoamMilk extends Steps.SimpleStep<Order, FoamedMilk> {
FoamMilk() {
super(null, Integer.MAX_VALUE);
}
@Override
FoamedMilk execute(Order order) {
return work(new FoamedMilk(order), "foaming milk for " + order);
}
}
}