Skip to content

Commit

Permalink
fix(core): Use AbortController to notify nodes to abort execution (#6141
Browse files Browse the repository at this point in the history
)

and add support for cancelling ongoing operations inside a node.

---------
Co-authored-by: Oleg Ivaniv <me@olegivaniv.com>
  • Loading branch information
netroy committed Nov 24, 2023
1 parent 0ec67da commit d2c18c5
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 58 deletions.
24 changes: 4 additions & 20 deletions cypress/e2e/19-execution.cy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ describe('Execution', () => {
.canvasNodeByName('Manual')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters
.canvasNodeByName('Wait')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters
.canvasNodeByName('Set')
.within(() => cy.get('.fa-check'))
Expand Down Expand Up @@ -112,10 +108,6 @@ describe('Execution', () => {
.canvasNodeByName('Manual')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters
.canvasNodeByName('Wait')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters
.canvasNodeByName('Wait')
.within(() => cy.get('.fa-sync-alt').should('not.visible'));
Expand All @@ -128,8 +120,8 @@ describe('Execution', () => {
workflowPage.getters.clearExecutionDataButton().click();
workflowPage.getters.clearExecutionDataButton().should('not.exist');

// Check success toast (works because Cypress waits enough for the element to show after the http request node has finished)
workflowPage.getters.successToast().should('be.visible');
// Check warning toast (works because Cypress waits enough for the element to show after the http request node has finished)
workflowPage.getters.warningToast().should('be.visible');
});

it('should test webhook workflow', () => {
Expand Down Expand Up @@ -191,10 +183,6 @@ describe('Execution', () => {
.canvasNodeByName('Webhook')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters
.canvasNodeByName('Wait')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters
.canvasNodeByName('Set')
.within(() => cy.get('.fa-check'))
Expand Down Expand Up @@ -267,10 +255,6 @@ describe('Execution', () => {
.canvasNodeByName('Webhook')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters
.canvasNodeByName('Wait')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters
.canvasNodeByName('Wait')
.within(() => cy.get('.fa-sync-alt').should('not.visible'));
Expand All @@ -283,7 +267,7 @@ describe('Execution', () => {
workflowPage.getters.clearExecutionDataButton().click();
workflowPage.getters.clearExecutionDataButton().should('not.exist');

// Check success toast (works because Cypress waits enough for the element to show after the http request node has finished)
workflowPage.getters.successToast().should('be.visible');
// Check warning toast (works because Cypress waits enough for the element to show after the http request node has finished)
workflowPage.getters.warningToast().should('be.visible');
});
});
1 change: 1 addition & 0 deletions cypress/pages/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export class WorkflowPage extends BasePage {
return cy.get(this.getters.getEndpointSelector('plus', nodeName, index));
},
successToast: () => cy.get('.el-notification:has(.el-notification--success)'),
warningToast: () => cy.get('.el-notification:has(.el-notification--warning)'),
errorToast: () => cy.get('.el-notification:has(.el-notification--error)'),
activatorSwitch: () => cy.getByTestId('workflow-activate-switch'),
workflowMenu: () => cy.getByTestId('workflow-menu'),
Expand Down
17 changes: 17 additions & 0 deletions packages/core/src/NodeExecuteFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2508,6 +2508,19 @@ const getCommonWorkflowFunctions = (
prepareOutputData: async (outputData) => [outputData],
});

const executionCancellationFunctions = (
abortSignal?: AbortSignal,
): Pick<IExecuteFunctions, 'onExecutionCancellation' | 'getExecutionCancelSignal'> => ({
getExecutionCancelSignal: () => abortSignal,
onExecutionCancellation: (handler) => {
const fn = () => {
abortSignal?.removeEventListener('abort', fn);
handler();
};
abortSignal?.addEventListener('abort', fn);
},
});

const getRequestHelperFunctions = (
workflow: Workflow,
node: INode,
Expand Down Expand Up @@ -3087,10 +3100,12 @@ export function getExecuteFunctions(
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteFunctions {
return ((workflow, runExecutionData, connectionInputData, inputData, node) => {
return {
...getCommonWorkflowFunctions(workflow, node, additionalData),
...executionCancellationFunctions(abortSignal),
getMode: () => mode,
getCredentials: async (type, itemIndex) =>
getCredentials(
Expand Down Expand Up @@ -3512,10 +3527,12 @@ export function getExecuteSingleFunctions(
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteSingleFunctions {
return ((workflow, runExecutionData, connectionInputData, inputData, node, itemIndex) => {
return {
...getCommonWorkflowFunctions(workflow, node, additionalData),
...executionCancellationFunctions(abortSignal),
continueOnFail: () => continueOnFail(node),
evaluateExpression: (expression: string, evaluateItemIndex: number | undefined) => {
evaluateItemIndex = evaluateItemIndex === undefined ? itemIndex : evaluateItemIndex;
Expand Down
46 changes: 20 additions & 26 deletions packages/core/src/WorkflowExecute.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
/* eslint-disable @typescript-eslint/prefer-optional-chain */

/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */

/* eslint-disable @typescript-eslint/prefer-nullish-coalescing */
import { setMaxListeners } from 'events';
import PCancelable from 'p-cancelable';

import type {
Expand Down Expand Up @@ -44,23 +43,14 @@ import get from 'lodash/get';
import * as NodeExecuteFunctions from './NodeExecuteFunctions';

export class WorkflowExecute {
runExecutionData: IRunExecutionData;

private additionalData: IWorkflowExecuteAdditionalData;

private mode: WorkflowExecuteMode;
private status: ExecutionStatus = 'new';

private status: ExecutionStatus;
private readonly abortController = new AbortController();

constructor(
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
runExecutionData?: IRunExecutionData,
) {
this.additionalData = additionalData;
this.mode = mode;
this.status = 'new';
this.runExecutionData = runExecutionData || {
private readonly additionalData: IWorkflowExecuteAdditionalData,
private readonly mode: WorkflowExecuteMode,
private runExecutionData: IRunExecutionData = {
startData: {},
resultData: {
runData: {},
Expand All @@ -73,8 +63,8 @@ export class WorkflowExecute {
waitingExecution: {},
waitingExecutionSource: {},
},
};
}
},
) {}

/**
* Executes the given workflow.
Expand Down Expand Up @@ -830,11 +820,16 @@ export class WorkflowExecute {
let closeFunction: Promise<void> | undefined;

return new PCancelable(async (resolve, reject, onCancel) => {
let gotCancel = false;
// Let as many nodes listen to the abort signal, without getting the MaxListenersExceededWarning
setMaxListeners(Infinity, this.abortController.signal);

onCancel.shouldReject = false;
onCancel(() => {
gotCancel = true;
this.status = 'canceled';
this.abortController.abort();
const fullRunData = this.getFullRunData(startedAt);
void this.executeHook('workflowExecuteAfter', [fullRunData]);
setTimeout(() => resolve(fullRunData), 10);
});

const returnPromise = (async () => {
Expand Down Expand Up @@ -881,10 +876,10 @@ export class WorkflowExecute {
this.additionalData.executionTimeoutTimestamp !== undefined &&
Date.now() >= this.additionalData.executionTimeoutTimestamp
) {
gotCancel = true;
this.status = 'canceled';
}

if (gotCancel) {
if (this.status === 'canceled') {
return;
}

Expand Down Expand Up @@ -1014,9 +1009,6 @@ export class WorkflowExecute {
}

for (let tryIndex = 0; tryIndex < maxTries; tryIndex++) {
if (gotCancel) {
return;
}
try {
if (tryIndex !== 0) {
// Reset executionError from previous error try
Expand Down Expand Up @@ -1052,6 +1044,7 @@ export class WorkflowExecute {
this.additionalData,
NodeExecuteFunctions,
this.mode,
this.abortController.signal,
);
nodeSuccessData = runNodeData.data;

Expand Down Expand Up @@ -1089,6 +1082,7 @@ export class WorkflowExecute {
this.additionalData,
executionData,
this.mode,
this.abortController.signal,
);
const dataProxy = executeFunctions.getWorkflowDataProxy(0);

Expand Down Expand Up @@ -1644,7 +1638,7 @@ export class WorkflowExecute {
return;
})()
.then(async () => {
if (gotCancel && executionError === undefined) {
if (this.status === 'canceled' && executionError === undefined) {
return this.processSuccessExecution(
startedAt,
workflow,
Expand Down
20 changes: 16 additions & 4 deletions packages/editor-ui/src/mixins/pushConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ export const pushConnection = defineComponent({
return false;
}

if (this.workflowsStore.activeExecutionId !== pushData.executionId) {
const { activeExecutionId } = this.workflowsStore;
if (activeExecutionId !== pushData.executionId) {
// The workflow which did finish execution did either not get started
// by this session or we do not have the execution id yet.
if (isRetry !== true) {
Expand All @@ -285,10 +286,17 @@ export const pushConnection = defineComponent({

let runDataExecutedErrorMessage = this.getExecutionError(runDataExecuted.data);

if (pushData.data.status === 'crashed') {
if (runDataExecuted.status === 'crashed') {
runDataExecutedErrorMessage = this.$locale.baseText(
'pushConnection.executionFailed.message',
);
} else if (runDataExecuted.status === 'canceled') {
runDataExecutedErrorMessage = this.$locale.baseText(
'executionsList.showMessage.stopExecution.message',
{
interpolate: { activeExecutionId },
},
);
}

const lineNumber = runDataExecuted?.data?.resultData?.error?.lineNumber;
Expand Down Expand Up @@ -389,7 +397,11 @@ export const pushConnection = defineComponent({
});
} else {
let title: string;
if (runDataExecuted.data.resultData.lastNodeExecuted) {
let type = 'error';
if (runDataExecuted.status === 'canceled') {
title = this.$locale.baseText('nodeView.showMessage.stopExecutionTry.title');
type = 'warning';
} else if (runDataExecuted.data.resultData.lastNodeExecuted) {
title = `Problem in node ‘${runDataExecuted.data.resultData.lastNodeExecuted}‘`;
} else {
title = 'Problem executing workflow';
Expand All @@ -398,7 +410,7 @@ export const pushConnection = defineComponent({
this.showMessage({
title,
message: runDataExecutedErrorMessage,
type: 'error',
type,
duration: 0,
dangerouslyUseHTMLString: true,
});
Expand Down
4 changes: 0 additions & 4 deletions packages/editor-ui/src/views/NodeView.vue
Original file line number Diff line number Diff line change
Expand Up @@ -1562,10 +1562,6 @@ export default defineComponent({
try {
this.stopExecutionInProgress = true;
await this.workflowsStore.stopCurrentExecution(executionId);
this.showMessage({
title: this.$locale.baseText('nodeView.showMessage.stopExecutionTry.title'),
type: 'success',
});
} catch (error) {
// Execution stop might fail when the execution has already finished. Let's treat this here.
const execution = await this.workflowsStore.getExecution(executionId);
Expand Down
7 changes: 3 additions & 4 deletions packages/nodes-base/nodes/Wait/Wait.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,9 @@ export class Wait extends Webhook {
if (waitValue < 65000) {
// If wait time is shorter than 65 seconds leave execution active because
// we just check the database every 60 seconds.
return new Promise((resolve, _reject) => {
setTimeout(() => {
resolve([context.getInputData()]);
}, waitValue);
return new Promise((resolve) => {
const timer = setTimeout(() => resolve([context.getInputData()]), waitValue);
context.onExecutionCancellation(() => clearTimeout(timer));
});
}

Expand Down
4 changes: 4 additions & 0 deletions packages/workflow/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ export interface IGetExecuteFunctions {
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteFunctions;
}

Expand All @@ -437,6 +438,7 @@ export interface IGetExecuteSingleFunctions {
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteSingleFunctions;
}

Expand Down Expand Up @@ -776,6 +778,8 @@ type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & {
getExecuteData(): IExecuteData;
getWorkflowDataProxy(itemIndex: number): IWorkflowDataProxyData;
getInputSourceData(inputIndex?: number, inputName?: string): ISourceData;
getExecutionCancelSignal(): AbortSignal | undefined;
onExecutionCancellation(handler: () => unknown): void;
};

// TODO: Create later own type only for Config-Nodes
Expand Down
3 changes: 3 additions & 0 deletions packages/workflow/src/RoutingNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ export class RoutingNode {
executeData: IExecuteData,
nodeExecuteFunctions: INodeExecuteFunctions,
credentialsDecrypted?: ICredentialsDecrypted,
abortSignal?: AbortSignal,
): Promise<INodeExecutionData[][] | null | undefined> {
const items = inputData.main[0] as INodeExecutionData[];
const returnData: INodeExecutionData[] = [];
Expand All @@ -99,6 +100,7 @@ export class RoutingNode {
this.additionalData,
executeData,
this.mode,
abortSignal,
);

let credentials: ICredentialDataDecryptedObject | undefined;
Expand Down Expand Up @@ -136,6 +138,7 @@ export class RoutingNode {
this.additionalData,
executeData,
this.mode,
abortSignal,
);
const requestData: DeclarativeRestApiSettings.ResultOptions = {
options: {
Expand Down
4 changes: 4 additions & 0 deletions packages/workflow/src/Workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,7 @@ export class Workflow {
additionalData: IWorkflowExecuteAdditionalData,
nodeExecuteFunctions: INodeExecuteFunctions,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): Promise<IRunNodeResponse> {
const { node } = executionData;
let inputData = executionData.data;
Expand Down Expand Up @@ -1303,6 +1304,7 @@ export class Workflow {
additionalData,
executionData,
mode,
abortSignal,
);
const data =
nodeType instanceof Node
Expand Down Expand Up @@ -1385,6 +1387,8 @@ export class Workflow {
nodeType,
executionData,
nodeExecuteFunctions,
undefined,
abortSignal,
),
};
}
Expand Down

0 comments on commit d2c18c5

Please sign in to comment.