Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/n8n-io/n8n into node-179-…
Browse files Browse the repository at this point in the history
…microsoft-teams-node-overhaul
  • Loading branch information
michael-radency committed Oct 26, 2023
2 parents b07a37f + ae8c7a6 commit 948d969
Show file tree
Hide file tree
Showing 30 changed files with 209 additions and 182 deletions.
12 changes: 12 additions & 0 deletions packages/cli/BREAKING-CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,18 @@

This list shows all the versions which include breaking changes and how to upgrade.

## 1.15.0

### What changed?

Until now, in main mode, n8n used to deregister webhooks at shutdown and reregister them at startup. Queue mode and the flag `N8N_SKIP_WEBHOOK_DEREGISTRATION_SHUTDOWN` skipped webhook deregistration.

As from now, in both main and queue modes, n8n no longer deregisters webhooks at startup and shutdown, and the flag `N8N_SKIP_WEBHOOK_DEREGISTRATION_SHUTDOWN` is removed. n8n assumes that third-party services will retry unhandled webhook requests.

### When is action necessary?

If using the flag `N8N_SKIP_WEBHOOK_DEREGISTRATION_SHUTDOWN`, note that it no longer has effect and can be removed from your settings.

## 1.9.0

### What changed?
Expand Down
20 changes: 1 addition & 19 deletions packages/cli/src/ActiveWorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import * as ResponseHelper from '@/ResponseHelper';
import * as WebhookHelpers from '@/WebhookHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';

import config from '@/config';
import type { User } from '@db/entities/User';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { ActiveExecutions } from '@/ActiveExecutions';
Expand Down Expand Up @@ -101,18 +100,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
relations: ['shared', 'shared.user', 'shared.user.globalRole', 'shared.role'],
})) as IWorkflowDb[];

if (!config.getEnv('endpoints.skipWebhooksDeregistrationOnShutdown')) {
// Do not clean up database when skip registration is done.
// This flag is set when n8n is running in scaled mode.
// Impact is minimal, but for a short while, n8n will stop accepting requests.
// Also, users had issues when running multiple "main process"
// instances if many of them start at the same time
// This is not officially supported but there is no reason
// it should not work.
// Clear up active workflow table
await this.webhookService.deleteInstanceWebhooks();
}

if (workflowsData.length !== 0) {
this.logger.info(' ================================');
this.logger.info(' Start Active Workflows:');
Expand Down Expand Up @@ -404,12 +391,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
false,
);
} catch (error) {
if (
activation === 'init' &&
config.getEnv('endpoints.skipWebhooksDeregistrationOnShutdown') &&
error.name === 'QueryFailedError'
) {
// When skipWebhooksDeregistrationOnShutdown is enabled,
if (activation === 'init' && error.name === 'QueryFailedError') {
// n8n does not remove the registered webhooks on exit.
// This means that further initializations will always fail
// when inserting to database. This is why we ignore this error
Expand Down
8 changes: 0 additions & 8 deletions packages/cli/src/TestWebhooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,4 @@ export class TestWebhooks implements IWebhookManager {

return foundWebhook;
}

/**
* Removes all the currently active test webhooks
*/
async removeAll(): Promise<void> {
const workflows = Object.values(this.testWebhookData).map(({ workflow }) => workflow);
return this.activeWebhooks.removeAll(workflows);
}
}
55 changes: 15 additions & 40 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import {
updateExistingExecution,
} from './executionLifecycleHooks/shared/sharedHookFunctions';
import { restoreBinaryDataId } from './executionLifecycleHooks/restoreBinaryDataId';
import { toSaveSettings } from './executionLifecycleHooks/toSaveSettings';
import { Logger } from './Logger';

const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType');
Expand Down Expand Up @@ -508,14 +509,9 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
}
}

const workflowSettings = this.workflowData.settings;
let saveManualExecutions = config.getEnv('executions.saveDataManualExecutions');
if (workflowSettings?.saveManualExecutions !== undefined) {
// Apply to workflow override
saveManualExecutions = workflowSettings.saveManualExecutions as boolean;
}
const saveSettings = toSaveSettings(this.workflowData.settings);

if (isManualMode && !saveManualExecutions && !fullRunData.waitTill) {
if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) {
await Container.get(ExecutionRepository).hardDelete({
workflowId: this.workflowData.id as string,
executionId: this.executionId,
Expand All @@ -524,24 +520,12 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
return;
}

// Check config to know if execution should be saved or not
let saveDataErrorExecution = config.getEnv('executions.saveDataOnError') as string;
let saveDataSuccessExecution = config.getEnv('executions.saveDataOnSuccess') as string;
if (this.workflowData.settings !== undefined) {
saveDataErrorExecution =
(this.workflowData.settings.saveDataErrorExecution as string) ||
saveDataErrorExecution;
saveDataSuccessExecution =
(this.workflowData.settings.saveDataSuccessExecution as string) ||
saveDataSuccessExecution;
}

const workflowStatusFinal = determineFinalExecutionStatus(fullRunData);
const executionStatus = determineFinalExecutionStatus(fullRunData);
const shouldNotSave =
(executionStatus === 'success' && !saveSettings.success) ||
(executionStatus !== 'success' && !saveSettings.error);

if (
(workflowStatusFinal === 'success' && saveDataSuccessExecution === 'none') ||
(workflowStatusFinal !== 'success' && saveDataErrorExecution === 'none')
) {
if (shouldNotSave) {
if (!fullRunData.waitTill && !isManualMode) {
executeErrorWorkflow(
this.workflowData,
Expand All @@ -564,7 +548,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
const fullExecutionData = prepareExecutionDataForDbUpdate({
runData: fullRunData,
workflowData: this.workflowData,
workflowStatusFinal,
workflowStatusFinal: executionStatus,
retryOf: this.retryOf,
});

Expand Down Expand Up @@ -1135,23 +1119,14 @@ export function getWorkflowHooksWorkerMain(
fullRunData: IRun,
newStaticData: IDataObject,
): Promise<void> {
// Check config to know if execution should be saved or not
let saveDataErrorExecution = config.getEnv('executions.saveDataOnError') as string;
let saveDataSuccessExecution = config.getEnv('executions.saveDataOnSuccess') as string;
if (this.workflowData.settings !== undefined) {
saveDataErrorExecution =
(this.workflowData.settings.saveDataErrorExecution as string) || saveDataErrorExecution;
saveDataSuccessExecution =
(this.workflowData.settings.saveDataSuccessExecution as string) ||
saveDataSuccessExecution;
}
const executionStatus = determineFinalExecutionStatus(fullRunData);
const saveSettings = toSaveSettings(this.workflowData.settings);

const workflowStatusFinal = determineFinalExecutionStatus(fullRunData);
const shouldNotSave =
(executionStatus === 'success' && !saveSettings.success) ||
(executionStatus !== 'success' && !saveSettings.error);

if (
(workflowStatusFinal === 'success' && saveDataSuccessExecution === 'none') ||
(workflowStatusFinal !== 'success' && saveDataErrorExecution === 'none')
) {
if (shouldNotSave) {
await Container.get(ExecutionRepository).hardDelete({
workflowId: this.workflowData.id as string,
executionId: this.executionId,
Expand Down
6 changes: 3 additions & 3 deletions packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ export class WorkflowRunner {
/**
* The process did send a hook message so execute the appropriate hook
*/
processHookMessage(workflowHooks: WorkflowHooks, hookData: IProcessMessageDataHook) {
void workflowHooks.executeHookFunctions(hookData.hook, hookData.parameters);
async processHookMessage(workflowHooks: WorkflowHooks, hookData: IProcessMessageDataHook) {
await workflowHooks.executeHookFunctions(hookData.hook, hookData.parameters);
}

/**
Expand Down Expand Up @@ -777,7 +777,7 @@ export class WorkflowRunner {
workflowHooks,
);
} else if (message.type === 'processHook') {
this.processHookMessage(workflowHooks, message.data as IProcessMessageDataHook);
await this.processHookMessage(workflowHooks, message.data as IProcessMessageDataHook);
} else if (message.type === 'timeout') {
// Execution timed out and its process has been terminated
const timeoutError = new WorkflowOperationError('Workflow execution timed out!');
Expand Down
6 changes: 6 additions & 0 deletions packages/cli/src/commands/BaseCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ export abstract class BaseCommand extends Command {
);
}

if (process.env.N8N_SKIP_WEBHOOK_DEREGISTRATION_SHUTDOWN) {
this.logger.warn(
'The flag to skip webhook deregistration N8N_SKIP_WEBHOOK_DEREGISTRATION_SHUTDOWN has been removed. n8n no longer deregisters webhooks at startup and shutdown, in main and queue mode.',
);
}

await Container.get(PostHogClient).init();
await Container.get(InternalHooks).init();
}
Expand Down
16 changes: 0 additions & 16 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import * as Db from '@/Db';
import * as GenericHelpers from '@/GenericHelpers';
import { Server } from '@/Server';
import { TestWebhooks } from '@/TestWebhooks';
import { EDITOR_UI_DIST_DIR, GENERATED_STATIC_DIR } from '@/constants';
import { eventBus } from '@/eventbus';
import { BaseCommand } from './BaseCommand';
Expand Down Expand Up @@ -115,21 +114,6 @@ export class Start extends BaseCommand {

await Container.get(InternalHooks).onN8nStop();

const skipWebhookDeregistration = config.getEnv(
'endpoints.skipWebhooksDeregistrationOnShutdown',
);

const removePromises = [];
if (!skipWebhookDeregistration) {
removePromises.push(this.activeWorkflowRunner.removeAll());
}

// Remove all test webhooks
const testWebhooks = Container.get(TestWebhooks);
removePromises.push(testWebhooks.removeAll());

await Promise.all(removePromises);

// Wait for active workflow executions to finish
const activeExecutionsInstance = Container.get(ActiveExecutions);
let executingWorkflows = activeExecutionsInstance.getActiveExecutions();
Expand Down
16 changes: 8 additions & 8 deletions packages/cli/src/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ export const schema = {
},
rejectUnauthorized: {
doc: 'If unauthorized SSL connections should be rejected',
format: 'Boolean',
format: Boolean,
default: true,
env: 'DB_POSTGRESDB_SSL_REJECT_UNAUTHORIZED',
},
Expand Down Expand Up @@ -215,7 +215,7 @@ export const schema = {
},
onboardingFlowDisabled: {
doc: 'Show onboarding flow in new workflow',
format: 'Boolean',
format: Boolean,
default: false,
env: 'N8N_ONBOARDING_FLOW_DISABLED',
},
Expand Down Expand Up @@ -288,7 +288,7 @@ export const schema = {
},
saveExecutionProgress: {
doc: 'Whether or not to save progress for each node executed',
format: 'Boolean',
format: Boolean,
default: false,
env: 'EXECUTIONS_DATA_SAVE_ON_PROGRESS',
},
Expand All @@ -300,7 +300,7 @@ export const schema = {
// in the editor.
saveDataManualExecutions: {
doc: 'Save data of executions when started manually via editor',
format: 'Boolean',
format: Boolean,
default: true,
env: 'EXECUTIONS_DATA_SAVE_MANUAL_EXECUTIONS',
},
Expand All @@ -312,7 +312,7 @@ export const schema = {
// a future version.
pruneData: {
doc: 'Delete data of past executions on a rolling basis',
format: 'Boolean',
format: Boolean,
default: true,
env: 'EXECUTIONS_DATA_PRUNE',
},
Expand Down Expand Up @@ -358,7 +358,7 @@ export const schema = {
health: {
active: {
doc: 'If health checks should be enabled',
format: 'Boolean',
format: Boolean,
default: false,
env: 'QUEUE_HEALTH_CHECK_ACTIVE',
},
Expand Down Expand Up @@ -420,7 +420,7 @@ export const schema = {
env: 'QUEUE_BULL_REDIS_CLUSTER_NODES',
},
tls: {
format: 'Boolean',
format: Boolean,
default: false,
env: 'QUEUE_BULL_REDIS_TLS',
doc: 'Enable TLS on Redis connections. Default: false',
Expand Down Expand Up @@ -558,7 +558,7 @@ export const schema = {
},
metrics: {
enable: {
format: 'Boolean',
format: Boolean,
default: false,
env: 'N8N_METRICS',
doc: 'Enable /metrics endpoint. Default: false',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export function determineFinalExecutionStatus(runData: IRun): ExecutionStatus {
const workflowHasCrashed = runData.status === 'crashed';
const workflowWasCanceled = runData.status === 'canceled';
const workflowDidSucceed =
!runData.data.resultData.error && !workflowHasCrashed && !workflowWasCanceled;
!runData.data.resultData?.error && !workflowHasCrashed && !workflowWasCanceled;
let workflowStatusFinal: ExecutionStatus = workflowDidSucceed ? 'success' : 'failed';
if (workflowHasCrashed) workflowStatusFinal = 'crashed';
if (workflowWasCanceled) workflowStatusFinal = 'canceled';
Expand Down
20 changes: 20 additions & 0 deletions packages/cli/src/executionLifecycleHooks/toSaveSettings.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import config from '@/config';
import type { IWorkflowSettings } from 'n8n-workflow';

const DEFAULTS = {
ERROR: config.getEnv('executions.saveDataOnError'),
SUCCESS: config.getEnv('executions.saveDataOnSuccess'),
MANUAL: config.getEnv('executions.saveDataManualExecutions'),
};

/**
* Return whether a workflow execution is configured to be saved or not,
* for error executions, success executions, and manual executions.
*/
export function toSaveSettings(workflowSettings: IWorkflowSettings = {}) {
return {
error: workflowSettings.saveDataErrorExecution !== 'none' ?? DEFAULTS.ERROR !== 'none',
success: workflowSettings.saveDataSuccessExecution !== 'none' ?? DEFAULTS.SUCCESS !== 'none',
manual: workflowSettings?.saveManualExecutions ?? DEFAULTS.MANUAL,
};
}
6 changes: 0 additions & 6 deletions packages/cli/src/services/webhook.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,6 @@ export class WebhookService {
return this.deleteWebhooks(webhooks);
}

async deleteInstanceWebhooks() {
const webhooks = await this.webhookRepository.find();

return this.deleteWebhooks(webhooks);
}

private async deleteWebhooks(webhooks: WebhookEntity[]) {
void this.cacheService.deleteMany(webhooks.map((w) => w.cacheKey));

Expand Down
2 changes: 0 additions & 2 deletions packages/cli/test/unit/ActiveWorkflowRunner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ describe('ActiveWorkflowRunner', () => {
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(0);
expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled();
expect(webhookService.deleteInstanceWebhooks).toHaveBeenCalled();
expect(externalHooks.run).toHaveBeenCalledTimes(1);
});

Expand All @@ -171,7 +170,6 @@ describe('ActiveWorkflowRunner', () => {
databaseActiveWorkflowsCount,
);
expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled();
expect(webhookService.deleteInstanceWebhooks).toHaveBeenCalled();
expect(externalHooks.run).toHaveBeenCalled();
});

Expand Down
Loading

0 comments on commit 948d969

Please sign in to comment.