์๋ฐ 7์ด ๋ฑ์ฅํ๊ธฐ ์ ์๋ ๋ฐ์ดํฐ ์ปฌ๋ ์ ์ ๋ณ๋ ฌ๋ก ์ฒ๋ฆฌํ๊ธฐ๊ฐ ์ด๋ ค์ ๋ค.
- ๋ฐ์ดํฐ๋ฅผ ์๋ธ ํํธ๋ก ๋ถํ ํ๋ค.
- ๋ถํ ๋ ์๋ธ ํํธ๋ฅผ ๊ฐ๊ฐ์ ์ค๋ ๋๋ก ํ ๋นํ๋ค.
- ์ค๋ ๋๋ก ํ ๋นํ ๋ค์์๋ ์๋์น ์์ ๋ ์ด์ค ์ปจ๋์ ์ด ๋ฐ์ํ์ง ์๋๋ก ์ ์ ํ ๋๊ธฐํ๋ฅผ ์ถ๊ฐํด์ผ ํ๋ค.
- ๊ฒฐ๊ณผ๋ฅผ ํฉ์ณ์ผํ๋ค.
์์ ๊ณผ์ ์ ์ฝ๊ฒ ๋ณ๋ ฌํ๋ฅผ ์ํํ๋ฉด์ ์๋ฌ๋ฅผ ์ต์ํํ ์ ์๋๋ก ํฌํฌ/์กฐ์ธ ํ๋ ์์ํฌ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ค.
๋ณ๋ ฌ ์คํธ๋ฆผ์ด ๋ด๋ถ์ ์ผ๋ก ์ด๋ป๊ฒ ์ฒ๋ฆฌ๋๋์ง, ์ฌ๋ฌ ์ฒญํฌ๋ก ๋ถํ ํ๋ ๋ฐฉ๋ฒ์ ์์๋ณด๊ณ ์ปค์คํ
Spliterator๋ฅผ ๊ตฌํํด๋ณด์.
๋ณ๋ ฌ ์คํธ๋ฆผ์ด๋ ๊ฐ๊ฐ์ ์ค๋ ๋์์ ์ฒ๋ฆฌํ ์ ์๋๋ก ์คํธ๋ฆผ ์์๋ฅผ ์ฌ๋ฌ ์ฒญํฌ๋ก ๋ถํ ํ ์คํธ๋ฆผ์ด๋ค.
parallelStream
์ ํธ์ถํ๋ฉด ๋ณ๋ ฌ ์คํธ๋ฆผ ์ด ์์ฑ๋๋ค. ๋ฐ๋ผ์ ๋ณ๋ ฌ ์คํธ๋ฆผ์ ์ด์ฉํ๋ฉด ๋ชจ๋ ๋ฉํฐ์ฝ์ด ํ๋ก์ธ์๊ฐ ๊ฐ๊ฐ์ ์ฒญํฌ๋ฅผ ์ฒ๋ฆฌํ๋๋ก ํ ๋นํ ์ ์๋ค.
private long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel()
.reduce(0L, Long::sum);
}
์ฌ์ค ์์ฐจ ์คํธ๋ฆผ์ parallel
์ ํธ์ถํด๋ ์คํธ๋ฆผ ์์ฒด์๋ ์๋ฌด ๋ณํ๋ ์ผ์ด๋์ง ์๋๋ค.
๋ด๋ถ์ ์ผ๋ก๋ ์ฐ์ฐ์ด ๋ณ๋ ฌ๋ก ์ํํด์ผ ํจ์ ์๋ฏธํ๋ ๋ถ๋ฆฌ์ธ ํ๋๊ทธ๊ฐ ์ค์ ๋๋ค.
@Override
public final boolean isParallel() {
return sourceStage.parallel;
}
์ด๋ค ์ฐ์ฐ์ ๋ณ๋ ฌ๋ก ์คํํ๊ณ ์ด๋ค ์ฐ์ฐ์ ์์ฐจ๋ก ์คํํ ์ง ์์ฐจ โ๏ธ ๋ณ๋ ฌ
์ ์ ์ดํ ์ ์๋ค.
๋ณ๋ ฌ ์คํธ๋ฆผ์์ ์ฌ์ฉํ๋ ์ค๋ ๋ ํ ์ค์
์คํธ๋ฆผ์parallel
๋ฉ์๋์์ ๋ณ๋ ฌ๋ก ์์ ์ ์ํํ๋ ์ค๋ ๋๋ ์ด๋์ ์์ฑ๋๋ ๊ฒ์ด๋ฉฐ, ๋ช๊ฐ๋ ์์ฑ๋๋์ง ๊ทธ๋ฆฌ๊ณ ๊ทธ ๊ณผ์ ์ ์ด๋ป๊ฒ ์ปค์คํฐ๋ง์ด์ฆ ํ ์ ์๋์ง ๊ถ๊ธํ ๊ฒ์ด๋ค.
๋ณ๋ ฌ ์คํธ๋ฆผ์ ๋ด๋ถ์ ์ผ๋ก ForJoinPool ์ ์ฌ์ฉํ๋ค. ๊ธฐ๋ณธ์ ์ผ๋ก ForJoinPool ์ ํ๋ก์ธ์ ์, ์ฆRuntime.getRuntime().availableProcessors()
๊ฐ ๋ฐํํ๋ ๊ฐ์ ์์ํ๋ ์ค๋ ๋๋ฅผ ๊ฐ๋๋ค.
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");
์์ ์ค์ ์ฝ๋๋ ์ ์ญ์ ์ผ๋ก ์ค์ ํ๋ ๋ฐฉ๋ฒ์ด๋ฏ๋ก ๋ชจ๋ ๋ณ๋ ฌ ์คํธ๋ฆผ ์ฐ์ฐ์ ์ํฅ์ ์ค๋ค.
ํ๋์ ๋ณ๋ ฌ ์คํธ๋ฆผ์ ์ฌ์ฉํ ์ ์๋ ํน์ ํ ๊ฐ์ ์ง์ ํ๊ธด ์ด๋ ต๊ณ , ์ผ๋ฐ์ ์ผ๋ก ๊ธฐ๊ธฐ์ ํ๋ก์ธ์ ์์ ๊ฐ์ผ๋ฏ๋ก ํน๋ณํ ์ด์ ๊ฐ ์๋ค๋ฉด ๊ธฐ๋ณธ๊ฐ์ ๊ทธ๋๋ก ์ฌ์ฉํ๋ ๊ฒ์ ๊ถ์ฅํ๋ค.
๋ณ๋ ฌ ํ๋ก๊ทธ๋๋ฐ์ ์ค์ฉ (์๋ฅผ ๋ค์ด ๋ณ๋ ฌ๊ณผ ๊ฑฐ๋ฆฌ๊ฐ ๋จผ ๋ฐ๋ณต ์์
)์ ํ๋ฉด ์คํ๋ ค ์ค๋ ๋๋ฅผ ํ ๋นํ๋ ์ค๋ฒํค๋๋ง ์ฆ๊ฐํ๊ฒ ๋์ด ์ ์ฒด ํ๋ก๊ทธ๋จ์ ์ฑ๋ฅ์ด ๋ ๋๋น ์ง ์๋ ์๋ค.
๋ง๋ฒ๊ฐ์ parallel์ ํธ์ถํ๋ฉด ๋ด๋ถ์ ์ผ๋ก ์ด๋ค ์ผ์ด ์ผ์ด๋๋์ง ๊ผญ ์ดํดํด์ผ ํ๋ค.
๋ณ๋ ฌํ๋ ๊ณต์ง๊ฐ ์๋๋ผ๋ ์ฌ์ค์ ๊ธฐ์ตํ๋ผ.
์คํธ๋ฆผ์ ์ฌ๊ท์ ์ผ๋ก ๋ถํ ํด์ผ ํ๊ณ , ๊ฐ ์๋ธ์คํธ๋ฆผ์ ์๋ก ๋ค๋ฅธ ์ค๋ ๋์ ๋ฆฌ๋์ฑ ์ฐ์ฐ์ผ๋ก ํ ๋นํ๊ณ , ์ด๋ค ๊ฒฐ๊ณผ๋ฅผ ํ๋์ ๊ฐ์ผ๋ก ํฉ์ณ์ผ ํ๋ค.
๋ฉํฐ์ฝ์ด ๊ฐ์ ๋ฐ์ดํฐ ์ด๋์ ์๊ฐ๋ณด๋ค ๋น์ผ ์์ ์ด๋ค.
๋ฐ๋ผ์ ์ฝ์ด ๊ฐ์ ๋ฐ์ดํฐ ์ ์ก ์๊ฐ๋ณด๋ค ํจ์ฌ ์ค๋ ๊ฑธ๋ฆฌ๋ ์์ ๋ง ๋ณ๋ ฌ๋ก ๋ค๋ฅธ ์ฝ์ด์์ ์ํํ๋ ๊ฒ์ด ๋ฐ๋์งํ๋ค.
- ๋ณ๋ ฌ ์คํธ๋ฆผ์ผ๋ก ๋ฐ๊พธ๋ ๊ฒ์ด ๋ฅ์ฌ๊ฐ ์๋๋ค. ๊ผญ ์ง์ ์ธก์ ํด์ผ ํ๋ค.
- ๋ฐ์ฑ์ ์ฃผ์ํ๋ผ. ์๋ ๋ฐ์ฑ๊ณผ ์ธ๋ฐ์ฑ์ ์ฑ๋ฅ์ ํฌ๊ฒ ์ ํ์ํฌ ์ ์๋ ์์๋ค. ๊ธฐ๋ณธํ ํนํ ์คํธ๋ฆผ์ ํ์ธํ๋ผ.
- ์์ฐจ ์คํธ๋ฆผ๋ณด๋ค ๋ณ๋ ฌ ์คํธ๋ฆผ์์ ์ฑ๋ฅ์ด ๋จ์ด์ง๋ ์ฐ์ฐ์ด ์๋ค.
limit
,findFirst
์ฒ๋ผ ์์์ ์์์ ์์กดํ๋ ์ฐ์ฐ์ ๋ณ๋ ฌ ์คํธ๋ฆผ์์ ์ํํ๋ ค๋ฉด ๋น์ผ ๋น์ฉ์ ์น๋ค์ผ ํ๋ค. - ์คํธ๋ฆผ์์ ์ํํ๋ ์ ์ฒด ํ์ดํ๋ผ์ธ ์ฐ์ฐ ๋น์ฉ์ ๊ณ ๋ คํ๋ผ.
- ์๋์ ๋ฐ์ดํฐ์์๋ ๋ณ๋ ฌ ์คํธ๋ฆผ์ด ๋์ ๋์ง ์๋๋ค.
- ์คํธ๋ฆผ์ ๊ตฌ์ฑํ๋ ์๋ฃ๊ตฌ์กฐ๊ฐ ์ ์ ํ์ง ํ์ธํ๋ผ.
- ์คํธ๋ฆผ์ ํน์ฑ๊ณผ ํ์ดํ๋ผ์ธ์ ์ค๊ฐ ์ฐ์ฐ์ด ์คํธ๋ฆผ์ ํน์ฑ์ ์ด๋ป๊ฒ ๋ฐ๊พธ๋์ง์ ๋ฐ๋ผ ๋ถํด ๊ณผ์ ์ ์ฑ๋ฅ์ด ๋ฌ๋ผ์ง ์ ์๋ค. ํํฐ ์ฐ์ฐ์ผ๋ก ์ธํด ์คํธ๋ฆผ์ ๊ธธ์ด๋ฅผ ์์ธกํ ์ ์์ผ๋ฉด ์คํธ๋ฆผ์ ๋ณ๋ ฌ ์ฒ๋ฆฌํ ์ ์์์ง ์ ์ ์๊ฒ ๋๋ค.
- ์ต์ข
์ฐ์ฐ์ ๋ณํฉ ๊ณผ์ (
combiner
) ๋น์ฉ์ ์ดํด๋ณด๋ผ.
public long sideEffectSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).forEach(accumulator::add);
return accumulator.total;
}
public class Accumulator {
public long total = 0;
public void add(long value) { total += value; }
}
์์ ์ฝ๋๋ฅผ ๋ค์์ ์ค๋ ๋์์ ๋์์ ์คํํ๋ฉด ๋ฐ์ดํฐ ๋ ์ด์ค ๋ฌธ์ ๊ฐ ์ผ์ด๋๋ค.
๋๊ธฐํ๋ก ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๋ค๋ณด๋ฉด ๊ฒฐ๊ตญ ๋ณ๋ ฌํ๋ผ๋ ํน์ฑ์ด ์์ด์ ธ ๋ฒ๋ฆฌ๊ธฐ ๋๋ฌธ์ ๋ณ๋ ฌ ์คํธ๋ฆผ๊ณผ ๋ณ๋ ฌ ๊ณ์ฐ์์๋ ๊ณต์ ๋ ๊ฐ๋ณ ์ํ๋ฅผ ํผํด์ผ ํ๋ค๋ ์ฌ์ค์ ๋ช
์ฌํด์ผ ํ๋ค.
ํฌํฌ/์กฐ์ธ ํ๋ ์์ํฌ๋ ๋ณ๋ ฌํํ ์ ์๋ ์์
์ ์ฌ๊ท์ ์ผ๋ก
์์ ์์
์ผ๋ก ๋ถํ ํ ๋ค์์ ์๋ธ ํ์คํฌ ๊ฐ๊ฐ์ ๊ฒฐ๊ณผ๋ฅผ ํฉ์ณ์ ์ ์ฒด ๊ฒฐ๊ณผ๋ฅผ ๋ง๋ค๋๋ก ์ค๊ณ๋์๋ค.
์๋ธ ํ์คํฌ๋ฅผ ์ค๋ ๋ ํ(ForkJoinPool
)์ ์์
์ ์ค๋ ๋์ ๋ถ์ฐ ํ ๋นํ๋ ExecutorService
์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ๋ค.
์ค๋ ๋ ํ์ ์ด์ฉํ๋ ค๋ฉด RecursiveTask<R>
์ ์๋ธํด๋์ค๋ฅผ ๋ง๋ค์ด์ผ ํ๋ค.
์ฌ๊ธฐ์ R
์ ๋ณ๋ ฌํ๋ ํ์คํธ๊ฐ ์์ฑํ๋ ๊ฒฐ๊ณผ ํ์ ๋๋ ๊ฒฐ๊ณผ๊ฐ ์์ ๋๋ RecursiveAction
ํ์์ด๋ค.
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
/**
* ๊ณ์ฐ ๊ฒฐ๊ณผ
*/
V result;
/**
* ์ด ์์
์ผ๋ก ์ํ๋๋ ์ฃผ์ ๊ณ์ฐ
*/
protected abstract V compute();
protected final boolean exec() {
result = compute();
return true;
}
...
}
RecursiveTask
๋ฅผ ์ ์ํ๋ ค๋ฉด ์ถ์ ๋ฉ์๋์ธ compute
๋ฅผ ๊ตฌํํด์ผ ํ๋ค.
๋ํ์ ์ธ ๊ตฌํ์ฒด๋ก๋ DualPivotQuicksort.RunMerger ๊ฐ ์๋ค.
ํด๋น compute
๋ฉ์๋๋ ํ์คํฌ๋ฅผ ์๋ธํ์คํฌ๋ก ๋ถํ ํ๋ ๋ก์ง๊ณผ ๋ ์ด์ ๋ถํ ํ ์ ์์ ๋ ๊ฐ๋ณ ์๋ธํ์คํฌ์ ๊ฒฐ๊ณผ๋ฅผ ์์ฐํ ์๊ณ ๋ฆฌ์ฆ์ ์ ์ํ๋ค.
๋ถํ ํ ์ ๋ณต ์๊ณ ๋ฆฌ์ฆ์ ๋ณ๋ ฌํ ๋ฒ์ ์ด๋ค.
if (ํ์คํฌ๊ฐ ์ถฉ๋ถํ ์๊ฑฐ๋ ๋ ์ด์ ๋ถํ ํ ์ ์์ผ๋ฉด) {
์์ฐจ์ ์ผ๋ก ํ์คํธ ๊ณ์ฐ
} else {
ํ์คํฌ๋ฅผ ๋ ์๋ธํ์คํฌ๋ก ๋ถํ
ํ์คํฌ๊ฐ ๋ค์ ์๋ธํ์คํฌ๋ก ๋ถํ ๋๋๋ก ์ด ๋ฉ์๋๋ฅผ ์ฌ๊ท์ ์ผ๋ก ํธ์ถํจ
๋ชจ๋ ์๋ธํ์คํฌ์ ์ฐ์ฐ์ด ์๋ฃ๋ ๋ ๊น์ง ๊ธฐ๋ค๋ฆผ
๊ฐ ์๋ธํ์คํฌ์ ๊ฒฐ๊ณผ๋ฅผ ํฉ์นจ
}
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
private final long[] numbers;
private final int start;
private final int end;
public static final long THRESHOLD = 10_000;
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
return computeSequentially();
}
System.out.printf("%s start: %d, end: %d\n", Thread.currentThread().getName(), start, end);
ForkJoinSumCalculator leftTask =
new ForkJoinSumCalculator(numbers, start, start + length / 2);
leftTask.fork(); // ForkJoinPool์ ๋ค๋ฅธ ์ค๋ ๋๋ก ์๋ก ์์ฑํ ํ์คํฌ๋ฅผ ๋น๋๊ธฐ๋ก ์คํํ๋ค.
ForkJoinSumCalculator rightTask =
new ForkJoinSumCalculator(numbers, start + length / 2, end);
Long rightResult = rightTask.compute(); // ๋ ๋ฒ์งธ ์๋ธํ์คํฌ๋ฅผ ๋๊ธฐ ์คํํ๋ค. ์ด๋ ์ถ๊ฐ๋ก ๋ถํ ์ด ์ผ์ด๋ ์ ์๋ค.
Long leftResult = leftTask.join(); // ์ฒซ ๋ฒ์งธ ์๋ธํ์คํฌ์ ๊ฒฐ๊ณผ๋ฅผ ์ฝ๊ฑฐ๋ ์์ง ๊ฒฐ๊ณผ๊ฐ ์์ผ๋ฉด ๊ธฐ๋ค๋ฆฐ๋ค.
return leftResult + rightResult;
}
private Long computeSequentially() {
long sum = 0;
for (int i = start ; i < end ; i++) {
sum += numbers[i];
}
return sum;
}
}
@Test
void forkJoin() {
long[] numbers = LongStream.rangeClosed(1, 100_000).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
Long result = new ForkJoinPool().invoke(task);
Assertions.assertThat(result).isEqualTo(5000050000L);
}
// ForkJoinPool-1-worker-1 start: 0, end: 100000
// ForkJoinPool-1-worker-1 start: 50000, end: 100000
// ForkJoinPool-1-worker-2 start: 0, end: 50000
// ForkJoinPool-1-worker-1 start: 75000, end: 100000
// ForkJoinPool-1-worker-1 start: 87500, end: 100000
// ForkJoinPool-1-worker-2 start: 25000, end: 50000
// ForkJoinPool-1-worker-1 start: 75000, end: 87500
// ForkJoinPool-1-worker-3 start: 0, end: 25000
// ForkJoinPool-1-worker-4 start: 50000, end: 75000
// ForkJoinPool-1-worker-2 start: 37500, end: 50000
// ForkJoinPool-1-worker-4 start: 62500, end: 75000
// ForkJoinPool-1-worker-4 start: 50000, end: 62500
// ForkJoinPool-1-worker-3 start: 12500, end: 25000
// ForkJoinPool-1-worker-5 start: 25000, end: 37500
// ForkJoinPool-1-worker-6 start: 0, end: 12500
์์ฑํ ํ์คํฌ๋ฅผ ์๋ก์ด ForkJoinPool
์ invoke
๋ฉ์๋๋ก ์ ๋ฌํ์ฌ ForkJoinSumCalculator
์์ ์ ์ํ ํ์คํฌ์ ๊ฒฐ๊ณผ๋ก ๋ฐํ๋ฐ๋๋ค.
์ผ๋ฐ์ ์ผ๋ก ์ ํ๋ฆฌ์ผ์ด์
์์๋ ๋ ์ด์์ ForkJoinPool
์ ์ฌ์ฉํ์ง ์๋๋ค.
ํด๋น ์ค๋ ๋ ํ์ ํ ๋ฒ๋ง ์ธ์คํด์คํํด์ ์ ์ ํ๋์ ์ฑ๊ธํค์ผ๋ก ์ ์ฅํ๋ค.
ForkJoinPool์ ๋ง๋ค๋ฉด์ ์ธ์๊ฐ ์๋ ๋ํดํธ ์์ฑ์๋ฅผ ์ด์ฉํ๋๋ฐ, ์ด๋ JVM์์ ์ด์ฉํ ์ ์๋ ๋ชจ๋ ํ๋ก์ธ์๊ฐ ์์ ๋กญ๊ฒ ํ์ ์ ๊ทผํ ์ ์์์ ์๋ฏธํ๋ค.
๋ ์ ํํ๊ฒ๋ Runtime.availableProcessors์ ๋ฐํ๊ฐ์ผ๋ก ํ์ ์ฌ์ฉํ ์ค๋ ๋ ์๋ฅผ ๊ฒฐ์ ํ๋ค.
'์ฌ์ฉํ ์ ์๋ ํ๋ก์ธ์'๋ผ๋ ์ด๋ฆ๊ณผ ๋ฌ๋ฆฌ ์ค์ ํ๋ก์ธ์์ธ์ ํ์ดํผ์ค๋ ๋ฉ๊ณผ ๊ด๋ จ๋ ๊ฐ์ ํ๋ก์ธ์๋ ๊ฐ์์ ํฌํจ๋๋ค.
join
๋ฉ์๋๋ฅผ ํ์คํฌ์ ํธ์ถํ๋ฉด ํ์คํฌ๊ฐ ์์ฐํ๋ ๊ฒฐ๊ณผ๊ฐ ์ค๋น๋ ๋๊น์ง ํธ์ถ์๋ฅผ ๋ธ๋ก์ํจ๋ค.- ๋ฐ๋ผ์ ๋ ์๋ธํ์คํฌ๊ฐ ๋ชจ๋ ์์๋ ๋ค์์
join
์ ํธ์ถํด์ผ ํ๋ค. - ๊ทธ๋ ์ง ์์ผ๋ฉด ๊ฐ๊ฐ์ ์๋ธํ์คํฌ๊ฐ ๋ค๋ฅธ ํ์คํฌ๊ฐ ๋๋๊ธธ ๊ธฐ๋ผ๋๋ ์ต์ ์ ์ํฉ์ด ๋ฐ์ํ๋ค.
- ๋ฐ๋ผ์ ๋ ์๋ธํ์คํฌ๊ฐ ๋ชจ๋ ์์๋ ๋ค์์
RecursiveTask
๋ด์์๋ForkJoinPool
์invoke
๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ง ๋ง์์ผ ํ๋ค.- ๋์
compute
๋fork
๋ฉ์๋๋ฅผ ์ง์ ํธ์ถํ ์ ์๋ค. - ์์ฐจ ์ฝ๋์์ ๋ณ๋ ฌ ๊ณ์ฐ์ ์์ํ ๋๋ง
invoke
๋ฅผ ์ฌ์ฉํ๋ค.
- ๋์
- ์๋ธํ์คํฌ์
fork
๋ฉ์๋๋ฅผ ํธ์ถํด์ForkJoinPool
์ ์ผ์ ์ ์กฐ์ ํ ์ ์๋ค.- ์ผ์ชฝ ์์
๊ณผ ์ค๋ฅธ์ชฝ ์์
๋ชจ๋์
fork
๋ฉ์๋๋ฅผ ํธ์ถํ๋ ๊ฒ์ด ์์ฐ์ค๋ฌ์ธ ๊ฒ ๊ฐ์ง๋ง ํ์ชฝ ์์ ์๋fork
๋ฅผ ํธ์ถํ๋ ๊ฒ ๋ณด๋ค๋compute
๋ฅผ ํธ์ถํ๋ ๊ฒ์ด ํจ์จ์ ์ด๋ค. - ๊ทธ๋ฌ๋ฉด ๋ ์๋ธํ์คํฌ์ ํ ํ์คํฌ์๋ ๊ฐ์ ์ค๋ ๋๋ฅผ ์ฌ์ฌ์ฉํ ์ ์์ผ๋ฏ๋ก ํ์์ ๋ถํ์ํ ํ์คํฌ๋ฅผ ํ ๋นํ๋ ์ค๋ฒํค๋๋ฅผ ํผํ ์ ์๋ค.
- ์ผ์ชฝ ์์
๊ณผ ์ค๋ฅธ์ชฝ ์์
๋ชจ๋์
- ํฌํฌ/์กฐ์ธ ํ๋ ์์ํฌ๋ฅผ ์ด์ฉํ๋ ๋ณ๋ ฌ ๊ณ์ฐ์ ๋๋ฒ๊น ํ๊ธฐ ์ด๋ ต๋ค.
- ๋ณ๋ ฌ ์คํธ๋ฆผ์์ ์ดํด๋ณธ ๊ฒ์ฒ๋ผ ๋ฉํฐ์ฝ์ด์ ํฌํฌ/์กฐ์ธ ํ๋ ์์ํฌ๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ด ์์ฐจ ์ฒ๋ฆฌ๋ณด๋ค ๋ฌด์กฐ๊ฑด ๋น ๋ฅผ ๊ฒ์ด๋ ์๊ฐ์ ๋ฒ๋ ค์ผ ํ๋ค.
- ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ก ์ฑ๋ฅ์ ๊ฐ์ ํ๋ ค๋ฉด ํ์คํฌ๋ฅผ ์ฌ๋ฌ ๋ ๋ฆฝ์ ์ธ ์๋ธํ์คํฌ๋ก ๋ถํ ํ ์ ์์ด์ผ ํ๋ค.
- ๊ฐ ์๋ธํ์คํฌ์ ์คํ ์๊ฐ์ ์๋ก์ด ํ์คํฌ๋ฅผ ํฌํนํ๋๋ฐ ๋๋ ์๊ฐ๋ณด๋ค ๊ธธ์ด์ผ ํ๋ค.
- JIT ์ปดํ์ผ๋ฌ์ ์ํด ์ต์ ํ๋๋ ค๋ฉด ์์ ๋๋ ์คํ ๊ณผ์ ์ ๊ฑฐ์ณ์ผ ํ๋ฏ๋ก ํ๋ก๊ทธ๋จ์ ์ฌ๋ฌ ๋ฒ ์คํํ ๊ฒฐ๊ณผ๋ฅผ ์ธก์ ํด์ผ ํ๋ค.
์์ ์์ ์์ ์ซ์๊ฐ 10_000๊ฐ
์ดํ์ด๋ฉด ๋ถํ ์ ์ค๋จํ๋๋ฐ, ๋ง์ฝ ์ฒ๋ง ๊ฐ ํญ๋ชฉ์ ํฌํจํ๋ ๋ฐฐ์ด์ด์๋ค๋ฉด ForkJoinSumCalculator
๋ ์ฒ ๊ฐ ์ด์์ ์๋ธ ํ์คํฌ๋ฅผ ํฌํฌํ ๊ฒ์ด๋ค.
์ฝ์ด์ ๊ฐ์์ ๋นํด ์๋ธํ์คํฌ๊ฐ ์ฒ ๊ฐ ์ด์์ผ๋ก ๋๋ฌด ๋ง์ ์์๋ง ๋ญ๋นํ๋ ๊ฒ ๊ฐ์ ๋ณด์ผ ์ ์๋ค.
์ค์ ๋ก๋ ์ฝ์ด ๊ฐ์์ ๊ด๊ณ์์ด ์ ์ ํ ํฌ๊ธฐ๋ก ๋ถํ ๋ ๋ง์ ํ์คํฌ๋ฅผ ํฌํนํ๋ ๊ฒ์ด ๋ฐ๋์งํ๋ค.
์ด๋ก ์ ์ผ๋ก๋ ์ฝ์ด ๊ฐ์๋งํผ ๋ณ๋ ฌํ๋ ํ์คํฌ๋ก ์์
๋ถํ๋ฅผ ๋ถํ ํ๋ฉด ๋ชจ๋ CPU ์ฝ์ด์์ ํ์คํฌ๋ฅผ ์คํํ ๊ฒ์ด๊ณ ํฌ๊ธฐ๊ฐ ๊ฐ์ ๊ฐ๊ฐ์ ํ์คํฌ๋ ๊ฐ์ ์๊ฐ์ ์ข
๋ฃ๋ ๊ฒ์ด๋ผ ์๊ฐํ ์ ์๋ค.
ํ์ง๋ง ์ค๋ฌด์์๋ ์ด ์์ ๋ณด๋ค ๋ณต์กํ ๊ณ์ฐ์ด ์ฌ์ฉ๋๊ธฐ ๋๋ฌธ์ ๊ฐ๊ฐ์ ์๋ธํ์คํฌ์ ์์
์๋ฃ ์๊ฐ์ด ํฌ๊ฒ ๋ฌ๋ผ์ง ์ ์๋ค.
ํฌํฌ/์กฐ์ธ ํ๋ ์์ํฌ์์๋ ์์
ํ์น๊ธฐ ๋ผ๋ ๊ธฐ๋ฒ์ผ๋ก ์ด ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๋ค.
์์
ํ์น๊ธฐ ๊ธฐ๋ฒ์์๋ ForkJoinPool
์ ๋ชจ๋ ์ค๋ ๋๋ฅผ ๊ฑฐ์ ๊ณต์ ํ๊ฒ ๋ถํ ํ๋ค.
๊ฐ๊ฐ์ ์ค๋ ๋๋ ์์ ์๊ฒ ํ ๋น๋ ํ์คํฌ๋ฅผ ํฌํจํ๋ ์ด์ค ์ฐ๊ฒฐ ๋ฆฌ์คํธ๋ฅผ ์ฐธ์กฐํ๋ฉด์ ์์
์ด ๋๋ ๋๋ง๋ค ํ์ ํค๋์์ ๋ค๋ฅธ ํ์คํฌ๋ฅผ ๊ฐ์ ธ์์ ์์
์ ์ฒ๋ฆฌ
ํ๋ค.
์ฆ, ํ ์ผ์ด ๋ค ๋๋ ์ค๋ ๋๋ ์ ํด ์ํ๋ก ๋ฐ๋๋ ๊ฒ์ด ์๋๋ผ ๋ค๋ฅธ ์ค๋ ๋ ํ์ ๊ผฌ๋ฆฌ์์ ์์
์ ํ์ณ์จ๋ค.
๋ชจ๋ ํ์คํฌ๊ฐ ์์
์ ๋๋ผ ๋๊น์ง (๋ชจ๋ ํ๊ฐ ๋น ๋๊น์ง) ์ด ๊ณผ์ ์ ๋ฐ๋ณตํ๋ค.
๋ฐ๋ผ์ ํ์คํฌ์ ํฌ๊ธฐ๋ฅผ ์๊ฒ ๋๋์ด์ผ ์์
์ ์ค๋ ๋ ๊ฐ์ ์์
๋ถํ๋ฅผ ๋น์ทํ ์์ค์ผ๋ก ์ ์งํ ์ ์๋ค.
ํ์ ์๋ ์์
์ ์ค๋ ๋์ ํ์คํฌ๋ฅผ ์ฌ๋ถ๋ฐฐํ๊ณ ๊ท ํ์ ๋ง์ถ ๋ ์์
ํ์น๊ธฐ ์๊ณ ๋ฆฌ์ฆ์ ์ฌ์ฉํ๋ค.
์์
์์ ํ์ ์๋ ํ์คํฌ๋ฅผ ๋ ๊ฐ์ ์๋ธ ํ์คํฌ๋ก ๋ถํ ํ์ ๋ ๋ ์ค ํ๋์ ํ์คํฌ๋ฅผ ๋ค๋ฅธ ์ ํด ์์
์๊ฐ ํ์ณ๊ฐ ์ ์๋ค.
public class ForkJoinWorkerThread extends Thread {
/*
* ForkJoinWorkerThreads๋ ForkJoinPools์ ์ํด ๊ด๋ฆฌ๋๊ณ ForkJoinTasks๋ฅผ ์ํํฉ๋๋ค.
* ์ค๋ช
์ ForkJoinPool ํด๋์ค์ ๋ด๋ถ ๋ฌธ์๋ฅผ ์ฐธ์กฐํ์ธ์.
* ์ด ํด๋์ค๋ ํ๊ณผ WorkQueue์ ๋ํ ๋งํฌ๋ง ์ ์ง ๊ด๋ฆฌํฉ๋๋ค.
*/
final ForkJoinPool pool; // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
...
}
์๋ฐ 8์ Spliterator ๋ถํ ํ ์ ์๋ ๋ฐ๋ณต์ ๋ผ๋ ์๋ก์ด ์ธํฐํ์ด์ค๋ฅผ ์ ๊ณตํ๋ค.
Iterator์ ๊ฐ์ด ์์ค์ ์์ ํ์ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ ์ ์ ๊ฐ์ง๋ง Spliterator๋ ๋ณ๋ ฌ ์์
์ ํนํ๋์ด ์๋ค.
์๋ฐ 8์ ์ปฌ๋ ์ ํ๋ ์์ํฌ์ ํฌํจ๋ ๋ชจ๋ ์๋ฃ๊ตฌ์กฐ์ ์ฌ์ฉํ ์ ์๋ ๋ํดํธ Spilterator ๊ตฌํ์ ์ ๊ณตํ๋ค.
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
}
T
๋ Spilterator์์ ํ์ํ๋ ์์์ ํ์์ ๊ฐ๋ฆฌํจ๋ค.tryAdvance
๋ฉ์๋๋ Spliterator์ ์์๋ฅผ ํ๋์ฌ ์์ฐจ์ ์ผ๋ก ์๋นํ๋ฉด์(์ฆ, ์ผ๋ฐ์ ์ธ Iterator ๋์๊ณผ ๊ฐ๋ค.) ํ์ํด์ผ ํ ์์๊ฐ ๋จ์์์ผ๋ฉด ์ฐธ์ ๋ฐํํ๋ค.trySplit
๋ฉ์๋๋ Spliterator์ ์ผ๋ถ ์์๋ฅผ ๋ถํ ํด์ ๋ ๋ฒ์งธ Spliterator๋ฅผ ์์ฑํ๋ ๋ฉ์๋๋ค.estimateSize
๋ก ํ์ํด์ผ ํ ์์ ์ ์ ๋ณด๋ฅผ ์ ๊ณตํ ์ ์๋ค. ํนํ ํ์ํด์ผ ํ ์์ ์๊ฐ ์ ํํ์ง ์๋๋ผ๋ ์ ๊ณต๋ ๊ฐ์ ์ด์ฉํด์ ๋ ์ฝ๊ณ ๊ณตํํ๊ฒ Spliterator๋ฅผ ๋ถํ ํ ์ ์๋ค.
์์ ์ด๋ฏธ์ง์ ๊ฐ์ด ์คํธ๋ฆผ์ ์ฌ๋ฌ ์คํธ๋ฆผ์ผ๋ก ๋ถํ ํ๋ ๊ณผ์ ์ ์ฌ๊ท์ ์ผ๋ก ์ผ์ด๋๋ค.
trySplit
์ ๊ฒฐ๊ณผ๊ฐ null์ด ๋ ๋๊น์ง(๋ ์ด์ ์๋ฃ๊ตฌ์กฐ๋ฅผ ๋ถํ ํ ์ ์์) ์ด ๊ณผ์ ์ ๋ฐ๋ณตํ๋ค.
๊ทธ๋ฆฌ๊ณ characteristics
๋ผ๋ ์ถ์ ๋ฉ์๋๋ฅผ ์ ์ํ์ฌ Spliterator์ ํน์ฑ ์ ๋ณด๋ฅผ ๋ฐํํ ์ ์๋ค.
/**
* ์คํธ๋ฆผ์ ๋ฆฌ๋์ฑ์ ์คํํ๋ฉด์ ๋จ์ด ์๋ฅผ ๊ณ์ฐํ ์ ์๋ค.
* ์ง๊ธ๊น์ง ๋ฐ๊ฒฌํ ๋จ์ด ์๋ฅผ ๊ณ์ฐํ๋ counter์ ๋ง์ง๋ง ๋ฌธ์๊ฐ ๊ณต๋ฐฑ์ด์๋์ง ์ฌ๋ถ๋ฅผ ๊ธฐ์ตํ๋ lastSpace ๋ณ์๊ฐ ์๋ค.
*/
public class WordCounter {
private final int counter;
private final boolean lastSpace;
public WordCounter(int counter, boolean lastSpace) {
this.counter = counter;
this.lastSpace = lastSpace;
}
/**
* WordCounter ํด๋์ค๋ฅผ ์ด๋ค ์ํ๋ก ์์ฑํ ๊ฒ์ธ์ง ์ ์
* ํ๋ผ๋ฏธํฐ๊ฐ ๊ณต๋ฐฑ์ด๋ผ๋ฉด lastSpace ์ ๊ฐ์ ๋ณด๊ณ ์ด๋ค ์ํ๋ก ์ ์ดํ ์ง ์ ์ํ ๊ฒ์ด๋ค.
*/
public WordCounter accumulate(Character c) {
if (Character.isWhitespace(c)) {
return lastSpace ? this : new WordCounter(counter, true);
} else {
return lastSpace ? new WordCounter(counter + 1, false) : this;
}
}
/**
* ๋ฌธ์์ด ์๋ธ ์คํธ๋ฆผ์ ์ฒ๋ฆฌํ WordCounter์ ๊ฒฐ๊ณผ๋ฅผ ํฉ์น๋ค.
* WordCounter์ ๋ด๋ถ counter๊ฐ์ ์๋ก ํฉ์น๋ค.
*/
public WordCounter combine(WordCounter wordCounter) {
return new WordCounter(
counter + wordCounter.counter,
wordCounter.lastSpace
);
}
public int getCounter() {
return this.counter;
}
}
public class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentIndex = 0;
public WordCounterSpliterator(String string) {
this.string = string;
}
/**
* ๋ฌธ์์ด์์ ํ์ฌ ์ธ๋ฑ์ค์ ํด๋นํ๋ ๋ฌธ์๋ฅผ Consumer์ ์ ๊ณตํ ๋ค์์ ์ธ๋ฑ์ค๋ฅผ ์ฆ๊ฐ์ํจ๋ค.
* ์ธ์๋ก ์ ๋ฌ๋ Consumer๋ ์คํธ๋ฆผ์ ํ์ํ๋ฉด์ ์ ์ฉํด์ผ ํ๋ ํจ์ ์งํฉ์ด ์์
์ ์ฒ๋ฆฌํ ์ ์๋๋ก ์๋นํ ๋ฌธ์๋ฅผ ์ ๋ฌํ๋ ์๋ฐ ๋ด๋ถ ํด๋์ค๋ค.
* ์ฌ๊ธฐ์์๋ ์คํธ๋ฆผ์ ํ์ํ๋ฉด์ ํ๋์ ๋ฆฌ๋์ฑ ํจ์, ์ฆ WordCounter.accumulate ๋ฉ์๋๋ง ์ ์ฉํ๋ค.
* ์๋ก์ด ์ปค์ ์์น๊ฐ ์ ์ฒด ๋ฌธ์์ด ๊ธธ์ด๋ณด๋ค ์์ผ๋ฉด ์ฐธ์ ๋ฐํํ๋ฉฐ ๋ฐ๋ณต ํ์ํด์ผ ํ ๋ฌธ์๊ฐ ๋จ์์์์ ์๋ฏธํ๋ค.
*/
@Override
public boolean tryAdvance(Consumer<? super Character> action) {
action.accept(string.charAt(currentIndex++)); // ํ์ฌ ๋ฌธ์๋ฅผ ์๋น
return currentIndex < string.length(); // ์๋นํ ๋ฌธ์๊ฐ ๋จ์์์ผ๋ฉด true๋ฅผ ๋ฆฌํด
}
/**
* ๋ฐ๋ณต๋ ์๋ฃ๊ตฌ์กฐ๋ฅผ ๋ถํ ํ๋ ๋ก์ง์ ํฌํจํ๋ฏ๋ก ๊ฐ์ฅ ์ค์ํ ๋ฉ์๋๋ค.
* RecursiveTask.compute ๋ฉ์๋ ์์ ํ๋ ๊ฒ ์ฒ๋ผ ์ฐ์ ๋ถํ ๋์์ ์ค๋จํ ํ๊ณ๋ฅผ ์ค์ ํด์ผ ํ๋ค.
* ์ฌ๊ธฐ์๋ 10๊ฐ์ ๋ฌธ์๋ฅผ ์ฌ์ฉํ์ง๋ง ์ค์ ์์๋ ๋๋ฌด ๋ง์ ํ์คํฌ๋ฅผ ๋ง๋ค์ง ์๋๋ก ๋ ๋์ ํ๊ณ๊ฐ์ ์ฌ์ฉํด์ผ ํ ๊ฒ์ด๋ค.
* ๋ถํ ์ด ํ์ํ ์ํฉ์์๋ ํ์ฑํด์ผ ํ ๋ฌธ์์ด ์ฒญํฌ์ ์ค๊ฐ ์์น๋ฅผ ๊ธฐ์ค์ผ๋ก ๋ถํ ํ๋๋ก ์ง์ํ๋ค.
* ์ด๋ ๋จ์ด ์ค๊ฐ์ ๋ถํ ํ์ง ์๋๋ก ๋น ๋ฌธ์๊ฐ ๋์ฌ๋๊น์ง ๋ถํ ์์น๋ฅผ ์ด๋์ํจ๋ค.
* ๋ถํ ํ ์์น๋ฅผ ์ฐพ์์ผ๋ฉด ์๋ก์ด Spliterator๋ฅผ ๋ง๋ ๋ค.
* ์๋ก ๋ง๋ Spliterator๋ ํ์ฌ ์์น currentIndex ๋ถํฐ ๋ถํ ๋ ์์น๊น์ง์ ๋ฌธ์๋ฅผ ํ์ํ๋ค.
*/
@Override
public Spliterator<Character> trySplit() {
int currentSize = string.length() - currentIndex;
if (currentSize < 10) {
return null; // ํ์ฑํ ๋ฌธ์์ด์ ์์ฐจ ์ฒ๋ฆฌํ ์ ์์ ๋งํผ ์ถฉ๋ถํ ์์์ก์์ ์๋ฆฌ๋ null์ ๋ฐํ
}
for (int splitPos = currentSize / 2 + currentIndex;
splitPos < string.length() ; splitPos++) { // ๋ถํ ์์ ์์น๋ฅผ ์ค๊ฐ์ผ๋ก ์ง์
if (Character.isWhitespace(string.charAt(splitPos))) { // ๋ค์ ๊ณต๋ฐฑ์ด ๋์ฌ๋๊น์ง ๋ถํ ์์น๋ฅผ ๋ค๋ก ์ด๋
// ์ฒ์๋ถํฐ ๋ถํ ์์น๊น์ง ๋ฌธ์์ด์ ํ์ฑํ ์๋ก์ด WordCountSpliterator ๋ฅผ ์์ฑ
Spliterator<Character> spliterator =
new WordCounterSpliterator(string.substring(currentIndex, splitPos));
currentIndex = splitPos; // ์์ ์์น๋ฅผ ๋ถํ ์์น๋ก ์ค์
return spliterator; // ๊ณต๋ฐฑ์ ์ฐพ์๊ณ ๋ฌธ์์ด์ ๋ถ๋ฆฌํ์ผ๋ฏ๋ก ๋ฃจํ๋ฅผ ์ข
๋ฃ
}
}
return null;
}
/**
* Spliterator๊ฐ ํ์ฑํ ๋ฌธ์์ด ์ ์ฒด ๊ธธ์ด์ ํ์ฌ ๋ฐ๋ณต ์ค์ธ ์์น์ ์ฐจ์ด๋ค.
*/
@Override
public long estimateSize() {
return string.length() - currentIndex;
}
@Override
public int characteristics() {
return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
}
}
@Test
void whiteSpaceCount() {
// 1. ์ฐจ๋ก๋๋ก ํ์
final String content = "Lorem Ipsum is simply dummy text of the printing and typesetting industry.";
int count = this.countWordsIteratively(content);
Assertions.assertThat(count).isEqualTo(12);
// 2. Stream์ ๋ฆฌ๋์ค ์ฐ์ฐ
Stream<Character> stream = IntStream.range(0, content.length()).mapToObj(content::charAt);
WordCounter reduce = stream.reduce(
new WordCounter(0, true),
WordCounter::accumulate,
WordCounter::combine
);
Assertions.assertThat(reduce.getCounter()).isEqualTo(12);
// 3. ๋ณ๋ ฌ๋ก ์ฐ์ฐ
WordCounterSpliterator wordCounterSpliterator = new WordCounterSpliterator(content);
Stream<Character> stream2 = StreamSupport.stream(wordCounterSpliterator, true); // ๋ณ๋ ฌ ์คํธ๋ฆผ ์์ฑ ์ฌ๋ถ true
WordCounter reduce2 = stream2.reduce(
new WordCounter(0, true),
WordCounter::accumulate,
WordCounter::combine
);
Assertions.assertThat(reduce2.getCounter()).isEqualTo(12);
}
ํฌํฌ/์กฐ์ธ ํ๋ ์์ํฌ ์์๋ ๋ณ๋ ฌํํ ์ ์๋ ํ์คํฌ๋ฅผ ์์ ํ์คํฌ๋ก ๋ถํ ํ ๋ค์์ ๋ถํ ๋ ํ์คํฌ๋ฅผ ๊ฐ๊ฐ์ ์ค๋ ๋๋ก ์คํํ๋ฉฐ ์๋ธํ์คํฌ ๊ฐ๊ฐ์ ๊ฒฐ๊ณผ๋ฅผ ํฉ์ณ์ ์ต์ข
๊ฒฐ๊ณผ๋ฅผ ์์ฐํ๋ค.
Spliterator ๋ ํ์ํ๋ ค๋ ๋ฐ์ดํฐ๋ฅผ ํฌํจํ๋ ์คํธ๋ฆผ์ ์ด๋ป๊ฒ ๋ณ๋ ฌํํ ๊ฒ์ธ์ง ์ ์ํ๋ค.