fix(reindex): unwedge orchestrator and recover stale Quartz job on crash#28699
fix(reindex): unwedge orchestrator and recover stale Quartz job on crash#28699mohityadav766 wants to merge 5 commits into
Conversation
A distributed reindex that crashes or wedges mid-run could leave the
SearchIndexApp permanently un-retriggerable ("Job is already running,
please wait for it to complete.") until a manual pod restart, even though
the job row was already FAILED and the distributed lock released.
Two independent causes, both fixed:
1. Orchestrator hang (local path, AppScheduler:308). execute() runs the
reindex synchronously on the Quartz worker thread, which parked on an
unbounded workerLatch.await(). A worker wedged on a degraded search
backend never counted down the latch, so the thread never returned and
getCurrentlyExecutingJobs() kept reporting the app as running.
Fix: awaitWorkers() polls job state every 5s; on terminal/STOPPING it
forces stop() (shutdownNow interrupts wedged workers) and returns,
letting the existing finally do the bounded drain. No wall-clock cap —
a healthy multi-hour reindex is never terminal, so it keeps waiting.
2. Stale Quartz entry (cross-pod path, AppScheduler:333). The on-demand
job is non-durable, so a crash leaves a persisted QRTZ_* JobDetail;
because the store is clustered, a retrigger from any pod then throws
ObjectAlreadyExistsException even when nothing runs. The old code
rethrew unconditionally without checking whether the app was running.
Fix: scheduleOnDemandJob() consults the DB-backed AppRunRecord
(cross-pod truth) — genuinely active runs rethrow; stale entries are
cleared and rescheduled once. Fail-safe: if the run record can't be
read, treat as active so a live job is never wrongly cleared.
Tests: DistributedSearchIndexExecutorTest (+2), AppSchedulerTest (+2).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR improves resilience of distributed reindex orchestration by (1) preventing the Quartz execution thread from hanging indefinitely when a worker wedges, and (2) recovering from stale Quartz JobDetail/trigger entries that can block retriggers after a pod crash in a clustered Quartz setup.
Changes:
- Replace unbounded
CountDownLatch.await()with a poll loop that can unwind and forcestop()once the job becomes terminal/STOPPING. - Add stale Quartz entry recovery on
ObjectAlreadyExistsExceptionby consulting the latest DB-backedAppRunRecordand clearing/rescheduling when the run is not active. - Add unit tests covering both the orchestrator unwind behavior and the stale Quartz-entry recovery behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java | Adds stale Quartz-entry recovery logic for on-demand app scheduling based on latest AppRunRecord. |
| openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedSearchIndexExecutor.java | Prevents indefinite orchestrator waits by polling latch completion and checking job terminal/stopping state. |
| openmetadata-service/src/test/java/org/openmetadata/service/apps/scheduler/AppSchedulerTest.java | Adds tests validating stale Quartz entry cleanup vs. rethrow when a run is genuinely active. |
| openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedSearchIndexExecutorTest.java | Adds tests ensuring the orchestrator unwinds on terminal job state even if workers never finish. |
| } catch (ObjectAlreadyExistsException ex) { | ||
| if (hasActiveAppRun(application)) { | ||
| throw ex; | ||
| } |
…ng, log clarity - ACTIVE_ERROR is a terminal AppRunRecord status (per OmAppJobListener), so the hand-rolled active-status set wrongly treated it as active and would leave retriggers wedged. Reuse OmAppJobListener.isTerminalStatus (now public) as the single source of truth instead. - Gate stale Quartz-entry recovery to non-concurrent jobs only. Concurrent jobs use a unique identity per run, so a collision is not a stale entry and the app-wide latest run record is not a reliable signal. - On a recovery reschedule that collides again (cross-pod race), let the ObjectAlreadyExistsException propagate to the standard "already running" message rather than disrupting the job another pod just scheduled. - awaitWorkers now returns whether workers drained normally; the caller logs "All workers completed" only on a normal drain, and a distinct warning on forced unwind, so stuck workers are easier to diagnose. Tests: AppSchedulerTest (+2: ACTIVE_ERROR stale recovery, concurrent collision rethrow without clearing). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
🔴 Playwright Results — 1 failure(s), 14 flaky✅ 4264 passed · ❌ 1 failed · 🟡 14 flaky · ⏭️ 89 skipped
Genuine Failures (failed on all attempts)❌
|
Follow-up review caught that reusing OmAppJobListener.isTerminalStatus
(which classifies ACTIVE_ERROR as terminal for run-timing purposes) was
unsafe here: ACTIVE_ERROR is an in-flight status — apps set it while still
progressing (CacheWarmupApp) and jobWasExecuted only normalizes it to
FAILED when the run actually finishes; crash recovery
(markRunningEntriesFailed*) only flips 'running', never 'activeError'.
Treating it as terminal could make a retrigger delete a job another pod
is genuinely running.
Use a dedicated TERMINAL_RUN_STATUSES set {SUCCESS, FAILED, STOPPED,
COMPLETED}; any other status (incl. ACTIVE_ERROR) counts as a live run we
must not clear. Erring toward "active" is the safe direction — a stale
entry is recoverable, deleting a live job is not. Revert isTerminalStatus
back to private.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| .map( | ||
| run -> | ||
| run.getStatus() != null && !TERMINAL_RUN_STATUSES.contains(run.getStatus())) | ||
| .orElse(false); |
| /** Statuses that mean a run has finished; anything else (incl. ACTIVE_ERROR) is in-flight. */ | ||
| private static final Set<AppRunRecord.Status> TERMINAL_RUN_STATUSES = | ||
| Set.of( | ||
| AppRunRecord.Status.SUCCESS, | ||
| AppRunRecord.Status.FAILED, | ||
| AppRunRecord.Status.STOPPED, | ||
| AppRunRecord.Status.COMPLETED); |
| * <p>"Active" is defined by {@link #TERMINAL_RUN_STATUSES}: any non-terminal status (including | ||
| * {@code ACTIVE_ERROR}, which is in-flight — set by apps that are still progressing and only | ||
| * normalized to {@code FAILED} when the run actually finishes) is treated as a live run we must | ||
| * not clear. Erring toward "active" is deliberate: leaving a stale entry is recoverable, while | ||
| * deleting a job another pod is genuinely running risks a duplicate/disrupted execution. |
| private boolean awaitWorkers(CountDownLatch workerLatch, UUID jobId) throws InterruptedException { | ||
| boolean drained = false; | ||
| boolean done = false; | ||
| while (!done) { | ||
| drained = workerLatch.await(LATCH_POLL_INTERVAL_SECONDS, TimeUnit.SECONDS); | ||
| if (drained) { | ||
| done = true; | ||
| } else if (isJobTerminalOrStopping(jobId)) { | ||
| LOG.warn( | ||
| "Job {} is terminal/stopping but workers have not drained; forcing executor " | ||
| + "shutdown so the orchestrator can unwind", | ||
| jobId); | ||
| stop(); | ||
| done = true; | ||
| } | ||
| } | ||
| return drained; | ||
| } |
|
…rrors The orchestrator's awaitWorkers loop polls coordinator.getJob() every cycle to detect a terminal/STOPPING transition. The previous workerLatch.await() was DB-independent; the polling added thousands of getJob() reads on the orchestrator's critical path over a multi-hour reindex, any one of which could throw (connection reset, pool exhaustion) and tear the job down via the finally block. - isJobTerminalOrStopping now wraps the read in try/catch and treats a read failure as non-terminal, so a transient DB blip keeps the orchestrator waiting and the wedge unwinds on the next clean poll (mirrors the hasActiveAppRun fail-safe). - Widen the re-check cadence 5s -> 15s (3x less steady DB load; unwind is not latency-critical) and make it an injectable instance field so tests stay fast instead of slowing 3x. Tests: +2 (transient-read keeps waiting then unwinds; isJobTerminalOrStopping treats an unreadable state as non-terminal); existing unwind test injects a 1s interval. All 54 pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Code Review ✅ Approved 3 resolved / 3 findingsImplements a fail-safe poll loop for the reindex orchestrator and a robust stale-job recovery mechanism, resolving thread-hanging and cross-pod Quartz blocking issues. ✅ 3 resolved✅ Edge Case: Stale-job recovery is a non-atomic check-then-act across pods
✅ Bug: awaitWorkers still hangs if a wedged worker leaves job non-terminal
✅ Edge Case: ACTIVE_ERROR run treated as stale can clear a live cross-pod job
OptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |



Problem
A distributed reindex that crashes or wedges mid-run could leave
SearchIndexApppermanently un-retriggerable — every attempt rejected with "Job is already running, please wait for it to complete." — until a manual pod restart, even though the job row was alreadyFAILEDand the distributed lock released.Observed in production (nbn-dev): the OM pod restarted mid-reindex during a cluster upgrade; afterward the job showed
FAILEDbut retriggers were blocked, and only a pod restart cleared it.Root cause — two independent paths
1. Orchestrator hang (local,
AppScheduler:308).SearchIndexApp.execute()runs the reindex synchronously on the Quartz worker thread, which parked on an unboundedworkerLatch.await(). When a worker wedged on a degraded search backend (theDeadlineTimeoutExceptionin the incident), it never counted down the latch, so the thread never returned andgetCurrentlyExecutingJobs()kept reporting the app as running. The DB-side lock/recovery (JobRecoveryManager) correctly marked the jobFAILEDand released the lock, but it can't kill a wedged JVM thread — hence status said "free" while Quartz said "running."2. Stale Quartz entry (cross-pod,
AppScheduler:333).The on-demand job is non-durable, so a crash leaves a persisted
QRTZ_*JobDetail. Because the store is clustered, a retrigger from any pod then throwsObjectAlreadyExistsException. The old code rethrew unconditionally without checking whether the app was actually running, so a stale entry blocked retriggers indefinitely.Fix
DistributedSearchIndexExecutor.awaitWorkers()replaces the unbounded await with a 5s poll loop. While the job keeps progressing it is never terminal, so it simply keeps waiting — no wall-clock cap (a healthy reindex can legitimately run for hours). The moment the job goes terminal/STOPPING, it forcesstop()(whichshutdownNow()-interrupts wedged workers) and returns; the existingfinallyperforms the bounded drain.PartitionWorkerwas already stop-aware between batches.AppScheduler.scheduleOnDemandJob()recovers a stale Quartz entry: onObjectAlreadyExistsExceptionit consults the DB-backedAppRunRecord(cross-pod truth). Genuinely active runs are rethrown; stale entries are cleared (deleteJob+unscheduleJob) and rescheduled once. Fail-safe: if the run record can't be read, it treats the app as active so a live job is never wrongly cleared. This is generic — it helps all non-concurrent on-demand apps.Tests
DistributedSearchIndexExecutorTest(+2): orchestrator unwinds when the job is terminal even if a worker never finishes;isJobTerminalOrStoppingstate coverage.AppSchedulerTest(+2): stale entry (terminal run record) → cleared and rescheduled; genuinely active run → rethrown, not cleared.All green:
DistributedSearchIndexExecutorTest37/37,AppSchedulerTest14/14.mvn spotless:applyrun.🤖 Generated with Claude Code