-
Notifications
You must be signed in to change notification settings - Fork 0
Parallel Streams
Streams are an example of an API that supports parallel computations with minimal changes to the underlying code.
The first example shows that calling the parallel
method on a stream results in multiple threads being used to process elements.
private static void printNumbersWithUsedThreads() {
IntStream.range(0, 10)
.parallel()
.mapToObj(n -> "%d: %s".formatted(n, Thread.currentThread().getName()))
.forEach(System.out::println);
}
The output of this program depends on the system configuration. Here is one example where two different threads are used to print numbers. Note that the numbers (in front of the colon) are not printed in the order they occur in the stream.
2: ForkJoinPool.commonPool-worker-1
3: ForkJoinPool.commonPool-worker-1
4: ForkJoinPool.commonPool-worker-1
0: ForkJoinPool.commonPool-worker-1
1: ForkJoinPool.commonPool-worker-1
8: ForkJoinPool.commonPool-worker-1
9: ForkJoinPool.commonPool-worker-1
7: ForkJoinPool.commonPool-worker-1
5: main
6: main
Parallel processing can be useful to increase the performance of CPU intensive computations. To measure the run time of computations and print it next to their result, we define the following helper method.
private static void printPerformance(String label, Supplier<?> supplier) {
Instant start = Instant.now();
Object result = supplier.get();
Instant end = Instant.now();
System.out.println("%s took: %s - result: %s"
.formatted(label, Duration.between(start, end), result));
}
We can use this method to compare summing a large stream of numbers both sequentially and in parallel.
private static void comparePerformanceOfLargeSum() {
long bound = 2_000_000_000;
printPerformance("sequential sum",
() -> LongStream.range(0, bound).sum());
printPerformance("parallel sum",
() -> LongStream.range(0, bound).parallel().sum());
}
As the workload of the sum
method can be split easily,
the performance benefit of parallel processing should roughly correspond to the number of cores the system provides.
Here is example output for a system using two cores.
sequential sum took: PT0.944196296S - result: 1999999999000000000
parallel sum took: PT0.46363052S - result: 1999999999000000000
However, the performance benefit is not always so easy to predict. Potential problems are uneven workload and synchronization overhead. As a slightly more complex example we can implement a prime search, counting prime numbers below a given bound.
private static void comparePrimeCheckPerformance() {
int bound = 10_000_000;
printPerformance("sequential prime search",
() -> IntStream.range(0, bound)
.filter(ParallelStreamDemo::isPrime)
.count());
printPerformance("parallel prime search",
() -> IntStream.range(0, bound)
.parallel()
.filter(ParallelStreamDemo::isPrime)
.count());
}
Here is example output for a system using two cores.
sequential prime search took: PT2.07075597S - result: 664579
parallel prime search took: PT1.828978997S - result: 664579
We can see that the parallel prime search is only slightly faster in this run. In other runs it is slightly slower, so the benefit is not significant in this case.
For a more rigorous performance comparison one should use dedicated performance benchmarking tools like JMH.
© Sebastian Fischer 2024 CC BY-SA 4.0