feat(workflows): add onConflict trigger policy with retrigger-as-branch#26865
feat(workflows): add onConflict trigger policy with retrigger-as-branch#26865IceS2 wants to merge 40 commits intofeat/incident-lifecycle-workflowfrom
Conversation
Adds a generic, configurable-status human task node for governance workflows. The node creates an OM Task, waits for status transitions via IntermediateCatchEvent messages, and routes based on terminal vs non-terminal statuses. Key components: - ManualTask.java: BPMN subprocess builder (setup → wait → route → close) - SetupDelegate/SetupImpl: Task creation, idempotent on cycle re-entry - CheckTerminalDelegate: Validates status against template - CloseTaskDelegate/CloseTaskImpl: Closes task on terminal status - SetResultDelegate: Propagates status to parent for edge routing - ManualTaskTemplateResolver: Template-based status configuration - ManualTaskDefinition JSON schema + nodeType/nodeSubType registration The node is domain-agnostic — incident/approval behavior lives in the workflow graph around the node, not inside it.
Remove inputNamespaceMapExpr and configMapExpr from BaseDelegate. Each delegate now declares its own Expression fields, preventing NullPointerExceptions in delegates that don't use these fields (e.g., SetResultDelegate, CheckTerminalDelegate, CloseTaskDelegate).
- Fix: isAlreadyClosed now only checks task.getResolution() != null. Previously it also checked terminalStatuses.contains(currentStatus), which always returned true when CloseTask runs (the PATCH already set the terminal status), leaving tasks permanently unresolved. - Remove unused terminalStatuses parameter from closeTask/CloseTaskDelegate - Rename taskCreated → taskAlreadyExists for clarity: the variable means "should we enter the message-waiting phase" (true on re-entry, false on first creation)
Implements the bridge that connects Task status changes to the ManualTask workflow node via Flowable message delivery. Bridge chain: TaskRepository.postUpdate() detects status change → TaskWorkflowHandler.transitionManualTaskStatus() (sends updatedBy) → WorkflowHandler.sendManualTaskMessage() with async exponential retry Key design decisions: - postUpdate wrapped in try-catch: workflow failures never break PATCH - Async retry via ScheduledExecutorService + resilience4j IntervalFunction: 500ms → 1s → 2s → 4s → 5s cap (~12.5s total coverage) - First attempt synchronous (fast path), retries non-blocking - CloseTaskImpl uses actual user from PATCH, falls back to governance-bot Also includes: - WorkflowDefinitionRepository/WorkflowInstanceStateRepository updates - CollectionDAO, ListFilter, EntityResource supporting changes - SQL migration (2.0.0) for stageResult generated column - ManualTaskWorkflowTest: full E2E lifecycle test
- Fix: catch FlowableOptimisticLockingException in tryDeliverMessage and return false to trigger retry (concurrent modification means another thread may have consumed the subscription) - Fix: nonTerminalReachable BFS now iterates the full edges list at every step, not just the unfiltered outgoingEdges map. Prevents following terminal-condition edges from intermediate nodes. - Refactor: hoist IntervalFunction to a static final constant instead of recreating on each retry. Made interval fields final. - Fix: remove IF NOT EXISTS from PostgreSQL migration for consistency with MySQL pattern (Flyway handles migration idempotency)
…tConsumer Add Entity.TASK to validEntityTypes, detect workflow-managed task status changes via isWorkflowManagedTaskStatusChange, and enqueue them to the outbox table via enqueueTaskMessage before the existing signal broadcast.
Add 4 reflection-based unit tests for isWorkflowManagedTaskStatusChange covering early-return conditions: non-update events, non-task entity types, missing changeDescription, and non-status field changes.
Remove the TaskRepository.postUpdate override that synchronously called TransitionManualTaskStatus, the TaskWorkflowHandler.transitionManualTaskStatus method it depended on, and the WorkflowHandler async retry infrastructure (sendManualTaskMessage, scheduleMessageRetry, tryDeliverMessage, and their backing constants and ScheduledExecutorService). Task status transitions are now delivered exclusively via the Transactional Outbox pattern.
…andler Start the drainer after the process engine is built in the constructor. Restart it when initializeNewProcessEngine() rebuilds the engine at runtime. Shut it down gracefully via WorkflowHandler.shutdown(), which is called from ManagedShutdown.stop() in OpenMetadataApplication.
…tency The E2E test must tolerate up to 10s CE poll + 30s drainer poll plus margin. Raise all Awaitility atMost() values to 90 seconds.
…roadcast disruption A DB failure during outbox INSERT should not prevent the signal broadcast path from executing. Log the error and continue.
…an older row SKIP LOCKED skips individual locked rows, not entire task groups. Without this guard, Worker B could grab a newer status while Worker A still holds the oldest — violating per-task ordering. The fix queries the absolute oldest createdAt (no lock) and skips the task if the locked row is newer.
…ycle Collapse findDistinctPendingTaskIds + per-task findAndLockOldestPending + per-task findOldestPendingCreatedAt into a single findAndLockAllOldestPending query using MIN(createdAt) JOIN with FOR UPDATE SKIP LOCKED. Per-task ordering is preserved naturally: if the oldest row for a task is locked by another worker, the JOIN produces no match for that task.
C2: Replace MIN(createdAt) JOIN with ROW_NUMBER() PARTITION BY taskId
to guarantee exactly one row per task even with identical timestamps.
C3: Split bulk-lock transaction into bulk-read (no lock) + per-entry
transactions. Row locks now held only during single-entry delivery,
not the entire batch. Flowable API calls no longer inside a DB
transaction holding locks on other rows.
I2: Add MAX_ATTEMPTS=100. Entries exceeding this are excluded from the
drain query and effectively dead-lettered for investigation.
I3: Call cleanupDelivered() at end of each drain cycle with 7-day
retention to prevent unbounded table growth.
I4: Extract workflowInstanceId from ChangeEvent entity payload instead
of fetching from DB. Eliminates extra round trip per task status
change event.
I5: Move signal broadcast before outbox enqueue so it always fires.
Wrap enqueue in resilience4j retry (3 attempts) for transient DB
errors. Unhandled failure propagates to event publisher for retry.
Add LIMIT 500 with ORDER BY attempts ASC, createdAt ASC to prevent unbounded result sets and prioritize fresh messages over stuck ones. Separate cleanup into its own try-catch for cleaner error diagnostics.
…OutboxIT Move E2E test to integration-tests module where the full application stack is running (CE pipeline, schedulers, drainer). The test verifies the complete outbox delivery pipeline through observable outcomes: 1. Deploy workflow → create table → workflow triggers → task created 2. PATCH task InProgress → PATCH task Completed 3. Poll workflow instance until FINISHED status 4. Assert stage results contain expected status transitions
Strangler Fig bridge that syncs Task lifecycle events to TCRS records, enabling workflow-managed incidents to keep the existing incident UI/API working while Tasks become the source of truth. Components: - aboutEntityLink field on Task schema (EntityLink format with testCase FQN + incident stateId), backed by generated DB columns + index - IncidentTcrsSyncHandler: lifecycle handler mapping Task events to TCRS records (New, Ack, Assigned, Resolved) with entity relationships - TCRS guard in openOrAssignTask() to skip Task creation when a workflow-managed Task already exists for the incident - SetupImpl builds aboutEntityLink for incident task types - testCaseStatus added to WorkflowTriggerFields enum - E2E integration test verifying full pipeline
…al status check - Rename specificUsers to specificAssignees using EntityLink strings (e.g. <#E::user::alice>, <#E::team::engineers>) to support both users and teams, consistent with the existing user task pattern - CloseTaskImpl.isAlreadyClosed() now checks terminal status from the resolved template instead of checking resolution != null
…incident-tcrs-sync-hook
…pen-metadata/OpenMetadata into feat/ilw-item2-incident-tcrs-sync-hook
Adds configurable duplicate trigger handling for governance workflows. When the same entity triggers a workflow again while an instance is already running, the trigger's onConflict policy controls the behavior: - restart (default): terminate old instance, start new (existing behavior) - skip: keep existing instance, ignore new event - forward: deliver 'retrigger' status to the active ManualTask The retrigger status flows through the existing IntermediateCatchEvent path — zero BPMN changes needed. ManualTask exposes 'retrigger' as a routable branch only when the trigger uses onConflict=forward, allowing workflow authors to wire it via normal graph edges (self-loop for incidents, back-to-decision-tree for approvals).
❌ Lint Check Failed — ESLint + Prettier (core-components)The following files have style issues that need to be fixed: Fix locally (fast — changed files only): cd openmetadata-ui-core-components/src/main/resources/ui
yarn ui-checkstyle:changedOr to fix all files: |
✅ TypeScript Types Auto-UpdatedThe generated TypeScript types have been automatically updated based on JSON schema changes in this PR. |
|
The Python checkstyle failed. Please run You can install the pre-commit hooks with |
Code Review 👍 Approved with suggestions 2 resolved / 3 findingsIntroduces onConflict trigger policy with retrigger-as-branch capability, resolving head-of-line blocking in outbox drainer and failed delivery termination issues. Consider addressing the extractStringValue crash on single-quote string input in IncidentTcrsSyncHandler. 💡 Bug: extractStringValue crashes on single-quote string inputIn This is called on Suggested fix✅ 2 resolved✅ Edge Case: onConflict=forward doesn't terminate old instance on failed delivery
✅ Performance: Outbox drainer head-of-line blocking for ~50 min on persistent failures
🤖 Prompt for agentsOptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
| private String extractStringValue(Object value) { | ||
| if (value instanceof String s) { | ||
| return s.startsWith("\"") ? s.substring(1, s.length() - 1) : s; | ||
| } | ||
| return value != null ? value.toString() : null; | ||
| } |
There was a problem hiding this comment.
💡 Bug: extractStringValue crashes on single-quote string input
In IncidentTcrsSyncHandler.extractStringValue(), if the input string is a single " character, s.substring(1, s.length() - 1) becomes s.substring(1, 0) which throws StringIndexOutOfBoundsException. Similarly, a string like "value (starts with quote, doesn't end with one) will incorrectly strip the last character.
This is called on fc.getNewValue() from ChangeDescription field changes, which could contain unexpected formats.
Suggested fix:
private String extractStringValue(Object value) {
if (value instanceof String s) {
if (s.length() >= 2 && s.startsWith(""") && s.endsWith(""")) {
return s.substring(1, s.length() - 1);
}
return s;
}
return value != null ? value.toString() : null;
}
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
Summary
onConflictpolicy toeventBasedEntityTriggerconfig with three modes:restart(default, existing behavior),skip(keep existing instance), andforward(deliver retrigger status to active ManualTask)forwardmode delivers a"retrigger"status through the ManualTask's existing IntermediateCatchEvent path — zero BPMN structural changes needed"retrigger"as a routable branch only when the trigger usesonConflict=forward, controlled viaretriggerEnabledon WorkflowConfiguration (set automatically by the MainWorkflow compiler)Why
When the same entity triggers a workflow again while an instance is already running (e.g., test case fails again during open incident, glossary term edited while in review), the current behavior always kills the old instance and starts a new one. This destroys in-progress human task state for incident workflows. The
onConflictpolicy lets each workflow declare the correct behavior.Changes
eventBasedEntityTrigger.jsononConflictenum: restart/skip/forwardworkflowDefinition.jsonretriggerEnabledboolean on WorkflowConfigurationMainWorkflow.javaretriggerEnabledfrom trigger's onConflict at compile timeManualTask.javaEventBasedEntityTrigger.javaFilterEntityImpl.javaWorkflowHandler.javahasRunningInstance()+forwardRetrigger()using Flowable'svariableValueEqualsManualTaskOutboxIT.javaTest plan
onConflict_restart_terminatesOldWorkflow— second trigger terminates old workflow (status → FAILURE)onConflict_skip_keepsExistingWorkflow— second trigger ignored, same task + workflow preserved, instance count stays 1onConflict_forward_deliversRetriggerToManualTask— retrigger delivered, ManualTask re-entered, same task stays openoutboxDeliversStatusChangesInOrder_andSyncsTcrs— existing outbox pipeline + TCRS sync unchanged