Skip to content

Commit

Permalink
feat(core): Coordinate manual workflow activation and deactivation in…
Browse files Browse the repository at this point in the history
… multi-main scenario (#7643)

Followup to #7566 | Story: https://linear.app/n8n/issue/PAY-926

### Manual workflow activation and deactivation

In a multi-main scenario, if the user manually activates or deactivates
a workflow, the process (whether leader or follower) that handles the
PATCH request and updates its internal state should send a message into
the command channel, so that all other main processes update their
internal state accordingly:

- Add to `ActiveWorkflows` if activating
- Remove from `ActiveWorkflows` if deactivating
- Remove and re-add to `ActiveWorkflows` if the update did not change
activation status.

After updating their internal state, if activating or deactivating, the
recipient main processes should push a message to all connected
frontends so that these can update their stores and so reflect the value
in the UI.

### Workflow activation errors

On failure to activate a workflow, the main instance should record the
error in Redis - main instances should always pull activation errors
from Redis in a multi-main scenario.

### Leadership change

On leadership change...

- The old leader should stop pruning and the new leader should start
pruning.
- The old leader should remove trigger- and poller-based workflows and
the new leader should add them.
  • Loading branch information
ivov committed Nov 17, 2023
1 parent b3a3f16 commit 4c40825
Show file tree
Hide file tree
Showing 33 changed files with 637 additions and 334 deletions.
52 changes: 52 additions & 0 deletions packages/cli/src/ActivationErrors.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { Service } from 'typedi';
import { CacheService } from './services/cache.service';
import { jsonParse } from 'n8n-workflow';

type ActivationErrors = {
[workflowId: string]: string; // error message
};

@Service()
export class ActivationErrorsService {
private readonly cacheKey = 'workflow-activation-errors';

constructor(private readonly cacheService: CacheService) {}

async set(workflowId: string, errorMessage: string) {
const errors = await this.getAll();

errors[workflowId] = errorMessage;

await this.cacheService.set(this.cacheKey, JSON.stringify(errors));
}

async unset(workflowId: string) {
const errors = await this.getAll();

if (Object.keys(errors).length === 0) return;

delete errors[workflowId];

await this.cacheService.set(this.cacheKey, JSON.stringify(errors));
}

async get(workflowId: string) {
const errors = await this.getAll();

if (Object.keys(errors).length === 0) return null;

return errors[workflowId];
}

async getAll() {
const errors = await this.cacheService.get<string>(this.cacheKey);

if (!errors) return {};

return jsonParse<ActivationErrors>(errors);
}

async clearAll() {
await this.cacheService.delete(this.cacheKey);
}
}
100 changes: 36 additions & 64 deletions packages/cli/src/ActiveWorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */

import Container, { Service } from 'typedi';
import { Service } from 'typedi';
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';
import config from '@/config';

import type {
ExecutionError,
Expand Down Expand Up @@ -64,8 +65,8 @@ import { WebhookService } from './services/webhook.service';
import { Logger } from './Logger';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { WorkflowRepository } from '@db/repositories/workflow.repository';
import config from '@/config';
import type { MultiMainInstancePublisher } from './services/orchestration/main/MultiMainInstance.publisher.ee';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import { ActivationErrorsService } from '@/ActivationErrors.service';

const WEBHOOK_PROD_UNREGISTERED_HINT =
"The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)";
Expand All @@ -74,15 +75,6 @@ const WEBHOOK_PROD_UNREGISTERED_HINT =
export class ActiveWorkflowRunner implements IWebhookManager {
activeWorkflows = new ActiveWorkflows();

private activationErrors: {
[workflowId: string]: {
time: number; // ms
error: {
message: string;
};
};
} = {};

private queuedActivations: {
[workflowId: string]: {
activationMode: WorkflowActivateMode;
Expand All @@ -92,11 +84,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
};
} = {};

isMultiMainScenario =
config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled');

multiMainInstancePublisher: MultiMainInstancePublisher | undefined;

constructor(
private readonly logger: Logger,
private readonly activeExecutions: ActiveExecutions,
Expand All @@ -105,17 +92,13 @@ export class ActiveWorkflowRunner implements IWebhookManager {
private readonly webhookService: WebhookService,
private readonly workflowRepository: WorkflowRepository,
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
private readonly multiMainSetup: MultiMainSetup,
private readonly activationErrorsService: ActivationErrorsService,
) {}

async init() {
if (this.isMultiMainScenario) {
const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);

this.multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);

await this.multiMainInstancePublisher.init();
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
await this.multiMainSetup.init();
}

await this.addActiveWorkflows('init');
Expand Down Expand Up @@ -272,6 +255,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
async allActiveInStorage(user?: User) {
const isFullAccess = !user || user.globalRole.name === 'owner';

const activationErrors = await this.activationErrorsService.getAll();

if (isFullAccess) {
const activeWorkflows = await this.workflowRepository.find({
select: ['id'],
Expand All @@ -280,7 +265,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {

return activeWorkflows
.map((workflow) => workflow.id)
.filter((workflowId) => !this.activationErrors[workflowId]);
.filter((workflowId) => !activationErrors[workflowId]);
}

const where = whereClause({
Expand All @@ -304,7 +289,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {

return sharings
.map((sharing) => sharing.workflowId)
.filter((workflowId) => !this.activationErrors[workflowId]);
.filter((workflowId) => !activationErrors[workflowId]);
}

/**
Expand All @@ -325,8 +310,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
/**
* Return error if there was a problem activating the workflow
*/
getActivationError(workflowId: string) {
return this.activationErrors[workflowId];
async getActivationError(workflowId: string) {
return this.activationErrorsService.get(workflowId);
}

/**
Expand Down Expand Up @@ -612,12 +597,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
// Remove the workflow as "active"

void this.activeWorkflows.remove(workflowData.id);
this.activationErrors[workflowData.id] = {
time: new Date().getTime(),
error: {
message: error.message,
},
};

void this.activationErrorsService.set(workflowData.id, error.message);

// Run Error Workflow if defined
const activationError = new WorkflowActivationError(
Expand Down Expand Up @@ -709,15 +690,15 @@ export class ActiveWorkflowRunner implements IWebhookManager {
this.logger.verbose('Finished activating workflows (startup)');
}

async addAllTriggerAndPollerBasedWorkflows() {
this.logger.debug('[Leadership change] Adding all trigger- and poller-based workflows...');
async clearAllActivationErrors() {
await this.activationErrorsService.clearAll();
}

async addAllTriggerAndPollerBasedWorkflows() {
await this.addActiveWorkflows('leadershipChange');
}

async removeAllTriggerAndPollerBasedWorkflows() {
this.logger.debug('[Leadership change] Removing all trigger- and poller-based workflows...');

await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows();
}

Expand Down Expand Up @@ -750,12 +731,12 @@ export class ActiveWorkflowRunner implements IWebhookManager {
let shouldAddWebhooks = true;
let shouldAddTriggersAndPollers = true;

if (this.isMultiMainScenario && activationMode !== 'leadershipChange') {
shouldAddWebhooks = this.multiMainInstancePublisher?.isLeader ?? false;
shouldAddTriggersAndPollers = this.multiMainInstancePublisher?.isLeader ?? false;
if (this.multiMainSetup.isEnabled && activationMode !== 'leadershipChange') {
shouldAddWebhooks = this.multiMainSetup.isLeader;
shouldAddTriggersAndPollers = this.multiMainSetup.isLeader;
}

if (this.isMultiMainScenario && activationMode === 'leadershipChange') {
if (this.multiMainSetup.isEnabled && activationMode === 'leadershipChange') {
shouldAddWebhooks = false;
shouldAddTriggersAndPollers = true;
}
Expand Down Expand Up @@ -795,17 +776,13 @@ export class ActiveWorkflowRunner implements IWebhookManager {
const additionalData = await WorkflowExecuteAdditionalData.getBase(sharing.user.id);

if (shouldAddWebhooks) {
this.logger.debug('============');
this.logger.debug(`Adding webhooks for workflow "${dbWorkflow.display()}"`);
this.logger.debug('============');
this.logger.debug(`Adding webhooks for workflow ${dbWorkflow.display()}`);

await this.addWebhooks(workflow, additionalData, 'trigger', activationMode);
}

if (shouldAddTriggersAndPollers) {
this.logger.debug('============');
this.logger.debug(`Adding triggers and pollers for workflow "${dbWorkflow.display()}"`);
this.logger.debug('============');
this.logger.debug(`Adding triggers and pollers for workflow ${dbWorkflow.display()}`);

await this.addTriggersAndPollers(dbWorkflow, workflow, {
activationMode,
Expand All @@ -817,21 +794,15 @@ export class ActiveWorkflowRunner implements IWebhookManager {
// Workflow got now successfully activated so make sure nothing is left in the queue
this.removeQueuedWorkflowActivation(workflowId);

if (this.activationErrors[workflowId]) {
delete this.activationErrors[workflowId];
}
await this.activationErrorsService.unset(workflowId);

const triggerCount = this.countTriggers(workflow, additionalData);
await WorkflowsService.updateWorkflowTriggerCount(workflow.id, triggerCount);
} catch (error) {
this.activationErrors[workflowId] = {
time: new Date().getTime(),
error: {
message: error.message,
},
};
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
await this.activationErrorsService.set(workflowId, error.message);

throw error;
throw e;
}

// If for example webhooks get created it sometimes has to save the
Expand Down Expand Up @@ -950,10 +921,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
);
}

if (this.activationErrors[workflowId] !== undefined) {
// If there were any activation errors delete them
delete this.activationErrors[workflowId];
}
await this.activationErrorsService.unset(workflowId);

if (this.queuedActivations[workflowId] !== undefined) {
this.removeQueuedWorkflowActivation(workflowId);
Expand Down Expand Up @@ -1016,4 +984,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
});
}
}

async removeActivationError(workflowId: string) {
await this.activationErrorsService.unset(workflowId);
}
}
4 changes: 2 additions & 2 deletions packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
import { License } from '@/License';
import { InternalHooks } from '@/InternalHooks';
import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee';
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';

@Service()
export class ExternalSecretsManager {
Expand Down Expand Up @@ -82,7 +82,7 @@ export class ExternalSecretsManager {
}

async broadcastReloadExternalSecretsProviders() {
await Container.get(SingleMainInstancePublisher).broadcastReloadExternalSecretsProviders();
await Container.get(SingleMainSetup).broadcastReloadExternalSecretsProviders();
}

private decryptSecretsSettings(value: string): ExternalSecretsSettings {
Expand Down
34 changes: 33 additions & 1 deletion packages/cli/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ export interface IDiagnosticInfo {
ldap_allowed: boolean;
saml_enabled: boolean;
binary_data_s3: boolean;
multi_main_setup_enabled: boolean;
licensePlanName?: string;
licenseTenantId?: number;
}
Expand Down Expand Up @@ -469,7 +470,25 @@ export type IPushData =
| PushDataNodeDescriptionUpdated
| PushDataExecutionRecovered
| PushDataActiveWorkflowUsersChanged
| PushDataWorkerStatusMessage;
| PushDataWorkerStatusMessage
| PushDataWorkflowActivated
| PushDataWorkflowDeactivated
| PushDataWorkflowFailedToActivate;

type PushDataWorkflowFailedToActivate = {
data: IWorkflowFailedToActivate;
type: 'workflowFailedToActivate';
};

type PushDataWorkflowActivated = {
data: IActiveWorkflowChanged;
type: 'workflowActivated';
};

type PushDataWorkflowDeactivated = {
data: IActiveWorkflowChanged;
type: 'workflowDeactivated';
};

type PushDataActiveWorkflowUsersChanged = {
data: IActiveWorkflowUsersChanged;
Expand Down Expand Up @@ -536,11 +555,24 @@ export interface IActiveWorkflowUser {
lastSeen: Date;
}

export interface IActiveWorkflowAdded {
workflowId: Workflow['id'];
}

export interface IActiveWorkflowUsersChanged {
workflowId: Workflow['id'];
activeUsers: IActiveWorkflowUser[];
}

interface IActiveWorkflowChanged {
workflowId: Workflow['id'];
}

interface IWorkflowFailedToActivate {
workflowId: Workflow['id'];
errorMessage: string;
}

export interface IPushDataExecutionRecovered {
executionId: string;
}
Expand Down

0 comments on commit 4c40825

Please sign in to comment.