Skip to content

Commit e5f6b5e

Browse files
mcollovaticlaude
andauthored
fix: use dedicated executor for process stream consumption (#22997) (#23002)
Replace usage of ForkJoinPool.commonPool() in consumeProcessStreams() with a dedicated cached thread pool to prevent common pool exhaustion during parallel Maven builds. The common pool has limited parallelism (typically CPU cores - 1) and when multiple frontend processes run concurrently, blocking I/O operations can exhaust all available threads, causing timeouts. The fix uses a cached thread pool with daemon threads: - Creates threads on demand for I/O-bound operations - Daemon threads don't prevent JVM shutdown when the build completes - Idle threads are automatically reclaimed after 60 seconds - Named "vaadin-stream-consumer-N" for easier debugging Fixes #22756 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 265dd16 commit e5f6b5e

File tree

2 files changed

+178
-5
lines changed

2 files changed

+178
-5
lines changed

flow-server/src/main/java/com/vaadin/flow/server/frontend/FrontendUtils.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
import java.util.Optional;
3636
import java.util.concurrent.CompletableFuture;
3737
import java.util.concurrent.ExecutionException;
38-
import java.util.function.Function;
38+
import java.util.concurrent.ExecutorService;
39+
import java.util.concurrent.Executors;
3940
import java.util.function.UnaryOperator;
4041
import java.util.regex.Matcher;
4142
import java.util.regex.Pattern;
@@ -353,6 +354,24 @@ public class FrontendUtils {
353354
"(?<=(?:const|let|var) routes)(:\\s?\\w*\\[\\s?])?\\s?=\\s?\\[([\\s\\S]*?)(?=\\.{3}serverSideRoutes)",
354355
Pattern.MULTILINE);
355356

357+
/**
358+
* Executor for I/O-bound operations like reading process streams.
359+
* <p>
360+
* Uses daemon threads to avoid blocking JVM shutdown - since this is
361+
* primarily used during build-time operations, the executor doesn't need
362+
* explicit lifecycle management. Idle threads are automatically reclaimed
363+
* after 60 seconds, and any remaining threads terminate when the JVM exits.
364+
*
365+
* @see <a href="https://github.com/vaadin/flow/issues/22756">Issue
366+
* #22756</a>
367+
*/
368+
private static final ExecutorService STREAM_EXECUTOR = Executors
369+
.newCachedThreadPool(r -> {
370+
Thread thread = new Thread(r, "vaadin-stream-consumer");
371+
thread.setDaemon(true); // Won't prevent JVM exit
372+
return thread;
373+
});
374+
356375
/**
357376
* Only static stuff here.
358377
*/
@@ -950,10 +969,12 @@ public static String executeCommand(List<String> command,
950969
*/
951970
public static CompletableFuture<Pair<String, String>> consumeProcessStreams(
952971
Process process) {
953-
CompletableFuture<String> stdOut = CompletableFuture
954-
.supplyAsync(() -> streamToString(process.getInputStream()));
955-
CompletableFuture<String> stdErr = CompletableFuture
956-
.supplyAsync(() -> streamToString(process.getErrorStream()));
972+
CompletableFuture<String> stdOut = CompletableFuture.supplyAsync(
973+
() -> streamToString(process.getInputStream()),
974+
STREAM_EXECUTOR);
975+
CompletableFuture<String> stdErr = CompletableFuture.supplyAsync(
976+
() -> streamToString(process.getErrorStream()),
977+
STREAM_EXECUTOR);
957978
return CompletableFuture.allOf(stdOut, stdErr).thenApply(
958979
unused -> new Pair<>(stdOut.getNow(""), stdErr.getNow("")));
959980
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Copyright 2000-2025 Vaadin Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.vaadin.flow.server.frontend;
17+
18+
import java.nio.file.Paths;
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.ForkJoinPool;
24+
import java.util.concurrent.TimeUnit;
25+
26+
import org.junit.After;
27+
import org.junit.Assert;
28+
import org.junit.Test;
29+
30+
import com.vaadin.flow.internal.Pair;
31+
32+
/**
33+
* Regression test for https://github.com/vaadin/flow/issues/22756
34+
* <p>
35+
* Tests that {@link FrontendUtils#consumeProcessStreams(Process)} does not
36+
* depend on {@link ForkJoinPool#commonPool()} and can complete even when the
37+
* common pool is exhausted.
38+
*/
39+
public class ForkJoinPoolExhaustionTest {
40+
41+
private final List<CompletableFuture<?>> blockingTasks = new ArrayList<>();
42+
43+
@After
44+
public void cleanup() {
45+
// Cancel all blocking tasks to avoid affecting other tests
46+
blockingTasks.forEach(f -> f.cancel(true));
47+
blockingTasks.clear();
48+
}
49+
50+
/**
51+
* Test that consumeProcessStreams can complete even when
52+
* ForkJoinPool.commonPool() is exhausted by other blocking tasks.
53+
* <p>
54+
* This test verifies that consumeProcessStreams uses a dedicated executor
55+
* (virtual threads) instead of the common pool.
56+
*/
57+
@Test
58+
public void consumeProcessStreams_shouldNotBeBlockedByExhaustedCommonPool()
59+
throws Exception {
60+
// Step 1: Saturate the ForkJoinPool.commonPool() with blocking tasks
61+
// We need to submit more blocking tasks than the pool's parallelism
62+
// to ensure all threads are occupied
63+
int parallelism = ForkJoinPool.commonPool().getParallelism();
64+
int numBlockingTasks = parallelism + 2;
65+
66+
// Use a latch to ensure the pool is saturated. We wait for
67+
// 'parallelism'
68+
// tasks to start (the max that can run concurrently). The +2 extra
69+
// tasks
70+
// will be queued, ensuring the pool is fully occupied.
71+
CountDownLatch poolSaturated = new CountDownLatch(parallelism);
72+
73+
// Submit (parallelism + 2) blocking tasks to ensure the pool is fully
74+
// saturated. The +2 accounts for potential compensation threads.
75+
for (int i = 0; i < numBlockingTasks; i++) {
76+
CompletableFuture<?> blocker = CompletableFuture.runAsync(() -> {
77+
try {
78+
poolSaturated.countDown(); // Signal that this task has
79+
// started
80+
// Block for 10 seconds (longer than our test timeout)
81+
Thread.sleep(10_000);
82+
} catch (InterruptedException e) {
83+
Thread.currentThread().interrupt();
84+
}
85+
}); // Uses commonPool by default
86+
blockingTasks.add(blocker);
87+
}
88+
89+
// Wait until the pool is saturated (more reliable than fixed sleep)
90+
Assert.assertTrue("Pool didn't saturate in time",
91+
poolSaturated.await(5, TimeUnit.SECONDS));
92+
93+
// Step 2: Start a fast process that outputs immediately and exits
94+
List<String> cmd = List.of(
95+
Paths.get(System.getProperty("java.home"), "bin", "java")
96+
.toFile().getAbsolutePath(),
97+
"-cp", System.getProperty("java.class.path"),
98+
FastTestExecutable.class.getName());
99+
100+
Process process = new ProcessBuilder(cmd).start();
101+
102+
// Step 3: Call consumeProcessStreams and wait with a reasonable timeout
103+
// If the implementation uses commonPool, this will timeout because all
104+
// threads are blocked.
105+
// If the implementation uses a dedicated executor, this will complete
106+
// quickly.
107+
CompletableFuture<Pair<String, String>> streamsFuture = FrontendUtils
108+
.consumeProcessStreams(process);
109+
110+
// Wait for the process to complete (should be nearly instant)
111+
boolean processCompleted = process.waitFor(2, TimeUnit.SECONDS);
112+
Assert.assertTrue("Process should complete within 2 seconds",
113+
processCompleted);
114+
115+
// Step 4: Try to get the streams with a 2-second timeout
116+
// This is the key assertion - with the buggy code this will timeout
117+
try {
118+
Pair<String, String> streams = streamsFuture.get(2,
119+
TimeUnit.SECONDS);
120+
121+
// Verify the output was captured correctly
122+
String stdOut = streams.getFirst();
123+
Assert.assertTrue(
124+
"Expected stdout to contain test output, but was: "
125+
+ stdOut,
126+
stdOut.contains("FastTestExecutable completed"));
127+
String stdErr = streams.getSecond();
128+
Assert.assertTrue(
129+
"Expected stdout to contain test output, but was: "
130+
+ stdErr,
131+
stdErr.contains("FastTestExecutable writing to stderr"));
132+
133+
} catch (java.util.concurrent.TimeoutException e) {
134+
Assert.fail("consumeProcessStreams should not be blocked by "
135+
+ "exhausted ForkJoinPool.commonPool(). "
136+
+ "This indicates the implementation incorrectly uses "
137+
+ "the common pool instead of a dedicated executor. "
138+
+ "See https://github.com/vaadin/flow/issues/22756");
139+
}
140+
}
141+
142+
/**
143+
* A simple test executable that outputs to stdout and exits immediately.
144+
* Used to test that consumeProcessStreams can capture output quickly.
145+
*/
146+
public static class FastTestExecutable {
147+
public static void main(String... args) {
148+
System.err.println("FastTestExecutable writing to stderr");
149+
System.out.println("FastTestExecutable completed");
150+
}
151+
}
152+
}

0 commit comments

Comments
 (0)