Skip to content

fix: resolve governance workflow deadlock and remove WorkflowTransactionManager#26452

Merged
yan-3005 merged 12 commits into
mainfrom
ram/fix-wf--failures
Apr 6, 2026
Merged

fix: resolve governance workflow deadlock and remove WorkflowTransactionManager#26452
yan-3005 merged 12 commits into
mainfrom
ram/fix-wf--failures

Conversation

@yan-3005
Copy link
Copy Markdown
Contributor

@yan-3005 yan-3005 commented Mar 12, 2026

…ionManager

Describe your changes:

Fixes

I worked on ... because ...

Type of change:

  • Bug fix
  • Improvement
  • New feature
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation

Checklist:

  • I have read the CONTRIBUTING document.
  • My PR title is Fixes <issue-number>: <short explanation>
  • I have commented on my code, particularly in hard-to-understand areas.
  • For JSON Schema changes: I updated the migration scripts or explained why it is not needed.

Summary by Gitar

  • Workflow deadlock fix:
    • Moved terminateDuplicateInstances call outside JDBI transaction in FilterEntityImpl to async execution, preventing FK constraint violation from concurrent Flowable job operations.
    • Added check in WorkflowHandler to skip deletion of processes with no active user tasks, preventing lock-order deadlock on PostgreSQL.
  • Removed WorkflowTransactionManager:
    • Deleted 329-line class that attempted distributed transaction coordination; functionality delegated back to superclass methods in WorkflowDefinitionResource.
  • Simplified workflow state queries:
    • Added listAllStatesForInstance() method to CollectionDAO and WorkflowInstanceStateRepository for direct state retrieval without pagination.
    • Refactored markInstanceStatesAsFailed() and markInstanceAsFinished() to use new direct query instead of paginated queries with workflow definition lookups.

This will update automatically on new commits.

@yan-3005 yan-3005 self-assigned this Mar 12, 2026
@yan-3005 yan-3005 added safe to test Add this label to run secure Github workflows on PRs backend labels Mar 12, 2026
@yan-3005 yan-3005 changed the title [ON'T MERGE - WILL RUN MULTIPLE TIMES TO VERIFY THE FIX] fix: resolve governance workflow deadlock and remove WorkflowTransact… [DON'T MERGE - WILL RUN MULTIPLE TIMES TO VERIFY THE FIX] fix: resolve governance workflow deadlock and remove WorkflowTransact… Mar 12, 2026
@yan-3005
Copy link
Copy Markdown
Contributor Author

gitar bot fix the review comments, run mvn spotless:apply on the root of the repo before pushing

Co-authored-by: yan-3005 <yan-3005@users.noreply.github.com>
@gitar-bot
Copy link
Copy Markdown

gitar-bot Bot commented Mar 12, 2026

gitar bot fix the review comments, run mvn spotless:apply on the root of the repo before pushing

Done — applied all three review fixes and ran mvn spotless:apply:

  • Delete endpoints: delete and deleteByName in WorkflowDefinitionResource now delegate to the parent EntityResource helpers, restoring proper DeleteResponse<T> bodies and change-event headers.
  • Async termination: Added .exceptionally() to the CompletableFuture.runAsync() in FilterEntityImpl so failures are logged with a full stack trace instead of being silently dropped.
  • LIMIT on query: Added LIMIT 1000 to listAllStatesForInstance in CollectionDAO.

Copilot AI review requested due to automatic review settings March 23, 2026 10:47
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR targets governance workflow stability by changing how duplicate workflow instances are terminated (to avoid DB deadlocks/FK violations) and by simplifying how workflow instance states are queried/updated, while removing an unused transaction-manager abstraction from the workflow definition resource layer.

Changes:

  • Move Flowable process-instance deletion out of the JDBI transaction path and add guards to avoid deleting auto-completing processes.
  • Terminate duplicate instances asynchronously from the trigger delegate to avoid transactional conflicts during signal handling.
  • Simplify instance-state retrieval by adding a DAO method to list all states for a workflow instance and removing the unused WorkflowTransactionManager.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowDefinitionResource.java Reverts workflow definition CRUD to the base EntityResource implementations (removes transaction-manager usage).
openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceStateRepository.java Refactors failure-marking to use a new “list all states for instance” query.
openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceRepository.java Uses the new state listing method to derive final workflow instance status.
openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java Adds listAllStatesForInstance query on the workflow instance state time-series table.
openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImpl.java Runs duplicate-termination asynchronously to avoid Flowable/MySQL transactional conflicts.
openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowTransactionManager.java Removes the unused transaction manager class.
openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java Performs Flowable deletions outside JDBI transaction and adds a “skip delete if no active user tasks” guard.

Comment on lines +96 to +105
CompletableFuture.runAsync(
() ->
WorkflowHandler.getInstance()
.terminateDuplicateInstances(
workflowName, entityLinkStrFinal, processInstanceId))
.exceptionally(
ex -> {
log.error("Async termination of duplicate instances failed", ex);
return null;
});
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompletableFuture.runAsync(...) is using the ForkJoinPool common pool. This code path can block on DB/Flowable work, so using the common pool risks thread starvation and unpredictable throughput under load. Use the service's managed executor (e.g., AsyncService.getInstance().getExecutorService() or another dedicated executor used elsewhere in the server) and consider wrapping the runnable with the existing context-propagation helper if needed.

Copilot uses AI. Check for mistakes.
Comment on lines +63 to +70
List<WorkflowInstanceState> states =
workflowInstanceStateRepository.listAllStatesForInstance(workflowInstanceId);

do {
ResultList<WorkflowInstanceState> workflowInstanceStates =
workflowInstanceStateRepository.listWorkflowInstanceStateForInstance(
workflowDefinition.getName(),
workflowInstanceId,
offset,
workflowInstance.getStartedAt(),
workflowInstance.getEndedAt(),
100,
false);

if (workflowInstanceStates.getData().stream()
.anyMatch(
workflowInstanceState ->
workflowInstanceState
.getStatus()
.equals(WorkflowInstance.WorkflowStatus.FAILURE))) {
workflowStatus = WorkflowInstance.WorkflowStatus.FAILURE;
break;
}

offset = workflowInstanceStates.getPaging().getAfter();
} while (offset != null);
WorkflowInstance.WorkflowStatus workflowStatus = WorkflowInstance.WorkflowStatus.FINISHED;
if (states.stream()
.anyMatch(s -> s.getStatus().equals(WorkflowInstance.WorkflowStatus.FAILURE))) {
workflowStatus = WorkflowInstance.WorkflowStatus.FAILURE;
}
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateWorkflowInstance now loads all states for an instance into memory just to determine whether any state is FAILURE. For instances with many state records this is a potential memory/latency regression vs the previous paginated/early-exit approach. Consider adding a DAO/repository method that short-circuits (e.g., fetch only statuses, or page through results and break on first failure) so this method doesn’t require reading every state JSON blob.

Copilot uses AI. Check for mistakes.
@yan-3005 yan-3005 changed the title [DON'T MERGE - WILL RUN MULTIPLE TIMES TO VERIFY THE FIX] fix: resolve governance workflow deadlock and remove WorkflowTransact… fix: resolve governance workflow deadlock and remove WorkflowTransactionManager Mar 30, 2026
Copilot AI review requested due to automatic review settings March 30, 2026 12:00
@gitar-bot
Copy link
Copy Markdown

gitar-bot Bot commented Mar 30, 2026

Code Review ⚠️ Changes requested 8 resolved / 10 findings

Resolves multiple workflow deadlock issues and removes the deprecated WorkflowTransactionManager, but introduces an unbounded query that loads all workflow states into memory and a decimal-to-float conversion that can silently produce infinity for large values.

⚠️ Performance: Unbounded query loads all workflow states into memory

📄 openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java:8961 📄 openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceRepository.java:63 📄 openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceStateRepository.java:185

The new listAllStatesForInstance query in CollectionDAO.java has no LIMIT clause and loads all JSON blobs into a List<String> in memory. In WorkflowInstanceRepository.updateWorkflowInstance(), this is called on every workflow completion just to check if any state has FAILURE status — loading potentially thousands of large JSON objects when a simple SELECT 1 ... WHERE status = 'FAILURE' LIMIT 1 would suffice.

In markInstanceStatesAsFailed(), the full list is loaded and then each state is updated individually (N+1 pattern), compounding the memory and DB cost.

The old code at least paginated with a limit of 100 and broke early on finding a failure. The new code removes all pagination with no size guard.

Suggested fix
// For updateWorkflowInstance, use a targeted query instead:
@SqlQuery(
    "SELECT 1 FROM workflow_instance_state_time_series "
    + "WHERE workflowInstanceId = :workflowInstanceId "
    + "AND json_extract(json, '$.status') = 'FAILURE' LIMIT 1")
Optional<Integer> hasFailedState(
    @Bind("workflowInstanceId") String workflowInstanceId);

// For markInstanceStatesAsFailed, consider a bulk UPDATE statement
// instead of loading all rows and updating one-by-one.
💡 Edge Case: Decimal-to-float can silently produce infinity for huge values

At line 110, float(value) on a finite Decimal with magnitude beyond float range (e.g. Decimal('1e999')) returns float('inf') without raising an exception. This bypasses the is_finite() guard and produces a non-JSON-safe infinity value, which would then fail Jackson parsing on the Java side.

In practice, database NUMERIC/DECIMAL columns are unlikely to hold values this extreme, so impact is very low.

Suggested fix
if isinstance(value, decimal.Decimal):
    if not value.is_finite():
        return str(value)
    f = float(value)
    return f if math.isfinite(f) else str(value)
✅ 8 resolved
Bug: Delete endpoints return empty body, breaking API contract

📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowDefinitionResource.java:475 📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowDefinitionResource.java:545 📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowDefinitionResource.java:363 📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowDefinitionResource.java:445 📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowDefinitionResource.java:477
The custom delete and deleteByName methods return Response.ok().build() with no entity body and no X-OpenMetadata-Change-Type header. Every other EntityResource subclass delegates to the parent's delete() helper which returns a DeleteResponse<T> containing the deleted entity and the change-event header. Clients or internal callers that rely on reading the deleted entity from the response (e.g., UserResource reads the response entity to decrypt it) will receive null. This also means no change event is emitted for workflow definition deletions.

Consider delegating to the parent's delete() helper and wrapping it with retry, or at minimum returning the DeleteResponse from repository.delete().

Bug: Fire-and-forget async termination silently drops failures

📄 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImpl.java:96 📄 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImpl.java:83
In FilterEntityImpl, CompletableFuture.runAsync() is used to terminate duplicate instances, but the returned future is discarded — no .exceptionally(), .handle(), or .whenComplete() is chained. If terminateDuplicateInstances fails, the outer catch only logs at WARN level with e.getMessage() (no stack trace), making debugging very difficult. More critically, duplicate workflow instances will continue running if termination fails, which is the exact problem this PR aims to solve.

Additionally, since this runs on the common ForkJoinPool, a slow Flowable deleteProcessInstance call could starve other async tasks in the application.

Performance: listAllStatesForInstance has no result limit

📄 openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java:8962
The new listAllStatesForInstance DAO query fetches all rows without any LIMIT clause. The previous implementation capped results at 1000. While workflow instances typically have a bounded number of states, a pathological case (e.g., a workflow stuck in a retry loop creating many state entries) could return an unexpectedly large result set.

Quality: Retry config (RETRY, RETRY_CONFIG) is dead code — never used

📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowDefinitionResource.java:77 📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowDefinitionResource.java:83 📄 openmetadata-service/src/main/java/org/openmetadata/service/resources/governance/WorkflowDefinitionResource.java:85
RETRY_CONFIG, RETRY, and the isTransientDatabaseError helper method are declared as static fields/methods in WorkflowDefinitionResource but are never referenced anywhere in the class. The resource methods (create, patch, delete, createOrUpdate) all delegate to superclass methods that don't use the retry. This adds unused dependencies (resilience4j, ExceptionUtils) and misleads readers into thinking retry logic is active.

Either wrap the super calls with Retry.decorateRunnable(RETRY, ...) or remove the dead code and its imports.

Edge Case: LIMIT 1000 query replaces paginated loop — truncation risk

📄 openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java:8964 📄 openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceRepository.java:63
The new listAllStatesForInstance SQL query uses a hard LIMIT 1000 without pagination, replacing the old do-while pagination loop in updateWorkflowInstance that could handle arbitrarily many states. If a workflow instance ever exceeds 1000 states, updateWorkflowInstance could miss a FAILURE state and incorrectly mark the instance as FINISHED, and markInstanceStatesAsFailed would leave states beyond 1000 in their original status.

Current workflows have ~23 stages max so this is theoretical today, but the silent truncation removes a safety guarantee the old code had. Consider adding a log warning if the result list size equals 1000 (indicating possible truncation), or implementing simple pagination.

...and 3 more resolved from earlier reviews

🤖 Prompt for agents
Code Review: Resolves multiple workflow deadlock issues and removes the deprecated WorkflowTransactionManager, but introduces an unbounded query that loads all workflow states into memory and a decimal-to-float conversion that can silently produce infinity for large values.

1. ⚠️ Performance: Unbounded query loads all workflow states into memory
   Files: openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java:8961, openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceRepository.java:63, openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowInstanceStateRepository.java:185

   The new `listAllStatesForInstance` query in `CollectionDAO.java` has no LIMIT clause and loads all JSON blobs into a `List<String>` in memory. In `WorkflowInstanceRepository.updateWorkflowInstance()`, this is called on every workflow completion just to check if any state has FAILURE status — loading potentially thousands of large JSON objects when a simple `SELECT 1 ... WHERE status = 'FAILURE' LIMIT 1` would suffice.
   
   In `markInstanceStatesAsFailed()`, the full list is loaded and then each state is updated individually (N+1 pattern), compounding the memory and DB cost.
   
   The old code at least paginated with a limit of 100 and broke early on finding a failure. The new code removes all pagination with no size guard.

   Suggested fix:
   // For updateWorkflowInstance, use a targeted query instead:
   @SqlQuery(
       "SELECT 1 FROM workflow_instance_state_time_series "
       + "WHERE workflowInstanceId = :workflowInstanceId "
       + "AND json_extract(json, '$.status') = 'FAILURE' LIMIT 1")
   Optional<Integer> hasFailedState(
       @Bind("workflowInstanceId") String workflowInstanceId);
   
   // For markInstanceStatesAsFailed, consider a bulk UPDATE statement
   // instead of loading all rows and updating one-by-one.

2. 💡 Edge Case: Decimal-to-float can silently produce infinity for huge values

   At line 110, `float(value)` on a finite `Decimal` with magnitude beyond float range (e.g. `Decimal('1e999')`) returns `float('inf')` without raising an exception. This bypasses the `is_finite()` guard and produces a non-JSON-safe infinity value, which would then fail Jackson parsing on the Java side.
   
   In practice, database NUMERIC/DECIMAL columns are unlikely to hold values this extreme, so impact is very low.

   Suggested fix:
   if isinstance(value, decimal.Decimal):
       if not value.is_finite():
           return str(value)
       f = float(value)
       return f if math.isfinite(f) else str(value)

Options

Display: compact → Showing less information.

Comment with these commands to change:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.

processId, "Terminated due to conflicting workflow instance");
} catch (FlowableObjectNotFoundException e) {
LOG.debug(
"Process instance {} already completed before termination, skipping", processId);
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Flowable termination loop only handles FlowableObjectNotFoundException. Any other runtime exception from deleteProcessInstance (e.g., transient DB/lock errors) will abort the method before the JDBI transaction runs, leaving conflicting instances neither terminated nor marked as failed. To make this more robust, catch/log unexpected exceptions per-instance and continue so the state-marking transaction still executes.

Suggested change
"Process instance {} already completed before termination, skipping", processId);
"Process instance {} already completed before termination, skipping", processId);
} catch (RuntimeException e) {
LOG.error(
"Failed to terminate process instance {} for conflicting instance {}. Continuing with remaining instances.",
processId,
instance.getId(),
e);

Copilot uses AI. Check for mistakes.
@sonarqubecloud
Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backend safe to test Add this label to run secure Github workflows on PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants