From 5d4c4dc2146d8238808ac6f42d14fb9224e9a75b Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Fri, 5 Jul 2024 15:47:16 +0200 Subject: [PATCH] fix(core): fix processing of execution killed with tenant (#4173) --- .../models/executions/ExecutionKilled.java | 16 ++++++++++++++ .../executions/ExecutionKilledExecution.java | 21 +++---------------- .../core/services/ExecutionService.java | 1 + .../io/kestra/jdbc/runner/JdbcExecutor.java | 1 + 4 files changed, 21 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/io/kestra/core/models/executions/ExecutionKilled.java b/core/src/main/java/io/kestra/core/models/executions/ExecutionKilled.java index ac0e6819bc..db3be8bb73 100644 --- a/core/src/main/java/io/kestra/core/models/executions/ExecutionKilled.java +++ b/core/src/main/java/io/kestra/core/models/executions/ExecutionKilled.java @@ -9,6 +9,22 @@ import lombok.ToString; import lombok.experimental.SuperBuilder; +/** + * The Kestra event for killing an execution. A {@link ExecutionKilled} can be in two states: + *

+ *

+ *  - {@link State#REQUESTED}: The event was requested either by an Executor or by an external request.
+ *  - {@link State#EXECUTED}: The event was consumed and processed by the Executor.
+ *  
+ * + * A {@link ExecutionKilled} will always transit from {@link State#REQUESTED} to {@link State#EXECUTED} + * regardless of whether the associated execution exist or not to ensure that Workers will be notified for the tasks + * to be killed no matter what the circumstances. + *

+ * IMPORTANT: A {@link ExecutionKilled} is considered to be a fire-and-forget event. As a result, we do not manage a + * COMPLETED state, i.e., the Executor will never wait for Workers to process an executed {@link ExecutionKilled} + * before considering an execution to be KILLED. + */ @Getter @SuperBuilder @EqualsAndHashCode diff --git a/core/src/main/java/io/kestra/core/models/executions/ExecutionKilledExecution.java b/core/src/main/java/io/kestra/core/models/executions/ExecutionKilledExecution.java index d01f5868a6..bd6daca313 100644 --- a/core/src/main/java/io/kestra/core/models/executions/ExecutionKilledExecution.java +++ b/core/src/main/java/io/kestra/core/models/executions/ExecutionKilledExecution.java @@ -8,22 +8,6 @@ import lombok.*; import lombok.experimental.SuperBuilder; -/** - * The Kestra event for killing an execution. A {@link ExecutionKilledExecution} can be in two states: - *

- *

- *  - {@link State#REQUESTED}: The event was requested either by an Executor or by an external request.
- *  - {@link State#EXECUTED}: The event was consumed and processed by the Executor.
- *  
- * - * A {@link ExecutionKilledExecution} will always transit from {@link State#REQUESTED} to {@link State#EXECUTED} - * regardless of whether the associated execution exist or not to ensure that Workers will be notified for the tasks - * to be killed no matter what the circumstances. - *

- * IMPORTANT: A {@link ExecutionKilledExecution} is considered to be a fire-and-forget event. As a result, we do not manage a - * COMPLETED state, i.e., the Executor will never wait for Workers to process an executed {@link ExecutionKilledExecution} - * before considering an execution to be KILLED. - */ @Getter @SuperBuilder @EqualsAndHashCode @@ -47,8 +31,9 @@ public class ExecutionKilledExecution extends ExecutionKilled implements TenantI Boolean isOnKillCascade; public boolean isEqual(WorkerTask workerTask) { - return (workerTask.getTaskRun().getTenantId() == null || (workerTask.getTaskRun().getTenantId() != null && workerTask.getTaskRun().getTenantId().equals(this.tenantId))) && - workerTask.getTaskRun().getExecutionId().equals(this.executionId); + String taskTenantId = workerTask.getTaskRun().getTenantId(); + String taskExecutionId = workerTask.getTaskRun().getExecutionId(); + return (taskTenantId == null || taskTenantId.equals(this.tenantId)) && taskExecutionId.equals(this.executionId); } @Override diff --git a/core/src/main/java/io/kestra/core/services/ExecutionService.java b/core/src/main/java/io/kestra/core/services/ExecutionService.java index 8969ece9f0..017be7a7fc 100644 --- a/core/src/main/java/io/kestra/core/services/ExecutionService.java +++ b/core/src/main/java/io/kestra/core/services/ExecutionService.java @@ -478,6 +478,7 @@ public Flux killSubflowExecutions(final String tenantI .executionId(childExecution.getId()) .isOnKillCascade(true) .state(ExecutionKilled.State.REQUESTED) // Event will be reentrant in the Executor. + .tenantId(tenantId) .build() ); } diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index c036bc2872..95bfa94169 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -706,6 +706,7 @@ private void killQueue(Either either) .executionId(killedExecution.getExecutionId()) .isOnKillCascade(false) .state(ExecutionKilled.State.EXECUTED) + .tenantId(killedExecution.getTenantId()) .build() );