Conversation
| } | ||
| } | ||
| }); | ||
| participantThread = |
There was a problem hiding this comment.
⚠️ Edge Case: Race condition: joinAndProcessJob can overwrite active thread ref
joinAndProcessJob() guards against concurrent calls with participating.compareAndSet(false, true), and onJobDiscovered() checks participating.get() before calling it. However, there's a race window in the finally block of the spawned thread (line 215-216):
finally {
participantThread = null; // line 215
participating.set(false); // line 216Between setting participantThread = null and participating.set(false), the onJobDiscovered callback (running on a different thread) could see participating == true, skip the call, and miss a job notification. More concerning: if a new onJobDiscovered fires right after participating.set(false) on line 216 but before the virtual thread fully exits, the new call to joinAndProcessJob will set participantThread to a new thread while the old one is still in its finally block.
The more critical scenario is during stop(): stop() reads participantThread into a local variable, and then the thread's finally block nulls it out. The stop() method correctly captures the reference via Thread thread = participantThread (line 121), so the interrupt/join itself is safe. But the ordering of cleanup in the finally block should set participating.set(false) last (after all other cleanup including resetting the notifier) to minimize the race window. Currently participantThread = null is set first, then participating, which is the right relative order for those two — but currentJobId = null should also be set before participating goes false, which it is. So this is mostly a theoretical concern, but worth noting the window exists.
Was this helpful? React with 👍 / 👎
| .withTimestamp(System.currentTimeMillis()); | ||
| write(changeEvent); | ||
| }); | ||
| AsyncService.getInstance() |
There was a problem hiding this comment.
💡 Bug: Swallowed exceptions from writeAuthEvent after pool migration
The writeAuthEvent method now submits to the executor pool via .submit(), which returns a Future. If the task throws an exception (e.g., database write failure in write(changeEvent)), the exception is silently swallowed because the returned Future is never checked.
With the previous Thread.startVirtualThread(), uncaught exceptions would at least be logged by the default uncaught exception handler. With ExecutorService.submit(), exceptions are captured in the Future and only thrown when Future.get() is called — which never happens here.
Consider using .execute() instead of .submit() so that the virtual thread factory's uncaught exception handler can log failures, or wrap the Runnable with try/catch and explicit logging.
Suggested fix:
AsyncService.getInstance()
.getExecutorService()
.execute(
() -> {
ChangeEvent changeEvent =
new ChangeEvent()
.withId(UUID.randomUUID())
.withEventType(eventType)
.withEntityType(Entity.USER)
.withEntityId(userId)
.withUserName(userName)
.withTimestamp(System.currentTimeMillis());
write(changeEvent);
});
Was this helpful? React with 👍 / 👎
| } | ||
|
|
||
| @Override | ||
| public void execute(Runnable command) { |
There was a problem hiding this comment.
⚠️ Edge Case: Semaphore acquired after thread creation doesn't bound thread count
The BoundedExecutorService.execute() method calls delegate.execute(wrappedTask) which creates a new virtual thread immediately, and the semaphore.acquire() only happens inside that already-spawned thread. Under burst submission (e.g., a flood of audit events), this allows unbounded virtual thread creation — threads are spawned eagerly and then park on the semaphore.
While parked virtual threads are lightweight (they don't consume OS threads), this defeats the PR's stated goal of preventing "unbounded virtual thread creation." The approach does effectively bound the number of concurrently running tasks, which protects downstream resources like DB connections.
To actually bound thread creation, the semaphore should be acquired before delegating to the underlying executor:
public void execute(Runnable command) {
try {
semaphore.acquire(); // block the CALLER, not the spawned thread
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted waiting for concurrency permit", e);
}
delegate.execute(() -> {
try {
command.run();
} finally {
semaphore.release();
}
});
}This would apply back-pressure to callers and truly bound the number of created threads. The trade-off is that callers (like the audit log writer) would block until a permit is available, which may be acceptable since they're already running on virtual threads.
Suggested fix:
@Override
public void execute(Runnable command) {
try {
semaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted waiting for concurrency permit", e);
}
delegate.execute(
() -> {
try {
command.run();
} finally {
semaphore.release();
}
});
}
Was this helpful? React with 👍 / 👎
| } | ||
|
|
||
| private static int resolveMaxConcurrency() { | ||
| String env = System.getenv("ASYNC_SERVICE_MAX_CONCURRENCY"); |
There was a problem hiding this comment.
💡 Quality: Invalid env var ASYNC_SERVICE_MAX_CONCURRENCY silently ignored
When ASYNC_SERVICE_MAX_CONCURRENCY is set but contains an invalid value (non-numeric string, zero, or negative number), the code silently falls through to the CPU/DB-pool heuristic at lines 46-47. This can confuse operators who set the env var expecting explicit control but get different behavior with no feedback.
A log warning would help operators detect misconfiguration quickly, especially since this is the highest-priority configuration source.
Suggested fix:
String env = System.getenv("ASYNC_SERVICE_MAX_CONCURRENCY");
if (env != null) {
try {
int value = Integer.parseInt(env.trim());
if (value > 0) {
return value;
}
LOG.warn("ASYNC_SERVICE_MAX_CONCURRENCY must be positive, got {}; falling back to auto", value);
} catch (NumberFormatException e) {
LOG.warn("ASYNC_SERVICE_MAX_CONCURRENCY is not a valid integer: '{}'; falling back to auto", env);
}
}
Was this helpful? React with 👍 / 👎
🔍 CI failure analysis for ac0ebf5: After semaphore-based concurrency limiting: 2 search index timeout failures. Tests expect entities to appear in 1 minute but semaphore may throttle async indexing tasks, causing delays.IssueCI failures after new commit (43d1cba) with 2 tests failed out of 10,714 (0.02% rate):
Both are search index synchronization timeouts. Root CauseSemaphore-based concurrency limiting may be throttling async indexing operations. New commit introduced:
Critical calculation for tests: DetailsBackground errors (non-fatal):
Key observations:
Relevance AssessmentHIGH CONFIDENCE THIS IS RELATED TO SEMAPHORE: Evidence:
How semaphore causes delays:
Example scenario: RecommendationPRIORITY: Increase Concurrency for Test EnvironmentOption 1: Environment variable (quickest) export ASYNC_SERVICE_MAX_CONCURRENCY=32 # in test CI configOption 2: Test-specific calculation if (isTestEnvironment()) {
return Math.max(16, cpuBudget); // Higher for tests
}
return Math.max(4, Math.min(cpuBudget, poolSize / 3)); // Conservative for prodOption 3: Non-blocking for indexing // In execute() method
if (!semaphore.tryAcquire(5, TimeUnit.SECONDS)) {
LOG.warn("Semaphore timeout, running task anyway");
delegate.execute(command); // Bypass semaphore
return;
}
// ... rest of wrapped executionOption 4: Increase test timeouts (treating symptom) AssessmentThe semaphore approach is architecturally sound but needs tuning:
Recommended fix: Set Code Review
|
| Auto-apply | Compact |
|
|
Was this helpful? React with 👍 / 👎 | Gitar
|
* Fix Virtual Threads unbounded * Bound all AsyncService paths with semaphore-wrapped executor --------- Co-authored-by: Adrià Manero <adria.estivill@getcollate.io>



Describe your changes:
Fixes
I worked on ... because ...
Type of change:
Checklist:
Fixes <issue-number>: <short explanation>Summary by Gitar
BoundedExecutorServicewrapper enforcing semaphore-based concurrency limits on all async tasksDistributedJobParticipantandDistributedSearchIndexExecutornow properly track and interrupt long-running virtual threads with configurable timeoutsinterruptAndJoin()helper method centralizing thread shutdown logicexecute()andsubmit()convenience methods for bounded async operationsAuditLogRepository.writeAuthEvent()from unboundedThread.startVirtualThread()to boundedAsyncService.execute()This will update automatically on new commits.