Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Implement inter-main communication for test webhooks in multi-main setup #8267

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions packages/cli/src/TestWebhooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ import type {
import { Push } from '@/push';
import { NodeTypes } from '@/NodeTypes';
import * as WebhookHelpers from '@/WebhookHelpers';
import { TIME } from '@/constants';
import { TEST_WEBHOOK_TIMEOUT } from '@/constants';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import { WorkflowMissingIdError } from '@/errors/workflow-missing-id.error';
import { WebhookNotFoundError } from '@/errors/response-errors/webhook-not-found.error';
import * as NodeExecuteFunctions from 'n8n-core';
import { removeTrailingSlash } from './utils';
import { TestWebhookRegistrationsService } from '@/services/test-webhook-registrations.service';
import { MultiMainSetup } from './services/orchestration/main/MultiMainSetup.ee';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';

@Service()
Expand All @@ -32,6 +33,7 @@ export class TestWebhooks implements IWebhookManager {
private readonly push: Push,
private readonly nodeTypes: NodeTypes,
private readonly registrations: TestWebhookRegistrationsService,
private readonly multiMainSetup: MultiMainSetup,
) {}

private timeouts: { [webhookKey: string]: NodeJS.Timeout } = {};
Expand Down Expand Up @@ -89,7 +91,6 @@ export class TestWebhooks implements IWebhookManager {
}

const { destinationNode, sessionId, workflowEntity } = registration;
const timeout = this.timeouts[key];

const workflow = this.toWorkflow(workflowEntity);

Expand Down Expand Up @@ -135,15 +136,34 @@ export class TestWebhooks implements IWebhookManager {
}
} catch {}

// Delete webhook also if an error is thrown
if (timeout) clearTimeout(timeout);
/**
* Multi-main setup: In a manual webhook execution, the main process that
* handles a webhook might not be the same as the main process that created
* the webhook. If so, after the test webhook has been successfully executed,
* the handler process commands the creator process to clear its test webhooks.
*/
if (
this.multiMainSetup.isEnabled &&
sessionId &&
!this.push.getBackend().hasSessionId(sessionId)
) {
const payload = { webhookKey: key, workflowEntity, sessionId };
void this.multiMainSetup.publish('clear-test-webhooks', payload);
return;
}

await this.registrations.deregisterAll();
this.clearTimeout(key);
ivov marked this conversation as resolved.
Show resolved Hide resolved

await this.deactivateWebhooks(workflow);
});
}

clearTimeout(key: string) {
const timeout = this.timeouts[key];

if (timeout) clearTimeout(timeout);
}

async getWebhookMethods(path: string) {
const allKeys = await this.registrations.getAllKeys();

Expand Down Expand Up @@ -207,7 +227,7 @@ export class TestWebhooks implements IWebhookManager {
return false; // no webhooks found to start a workflow
}

const timeout = setTimeout(async () => this.cancelWebhook(workflow.id), 2 * TIME.MINUTE);
const timeout = setTimeout(async () => this.cancelWebhook(workflow.id), TEST_WEBHOOK_TIMEOUT);

for (const webhook of webhooks) {
const key = this.registrations.toKey(webhook);
Expand Down Expand Up @@ -265,13 +285,11 @@ export class TestWebhooks implements IWebhookManager {

const { sessionId, workflowEntity } = registration;

const timeout = this.timeouts[key];

const workflow = this.toWorkflow(workflowEntity);

if (workflowEntity.id !== workflowId) continue;

clearTimeout(timeout);
this.clearTimeout(key);

if (sessionId !== undefined) {
try {
Expand Down Expand Up @@ -354,13 +372,13 @@ export class TestWebhooks implements IWebhookManager {
if (staticData) workflow.staticData = staticData;

await workflow.deleteWebhook(webhook, NodeExecuteFunctions, 'internal', 'update');

await this.registrations.deregister(webhook);
}

await this.registrations.deregisterAll();
}

/**
* Convert a `WorkflowEntity` from `typeorm` to a `Workflow` from `n8n-workflow`.
* Convert a `WorkflowEntity` from `typeorm` to a temporary `Workflow` from `n8n-workflow`.
*/
toWorkflow(workflowEntity: IWorkflowDb) {
return new Workflow({
Expand Down
6 changes: 5 additions & 1 deletion packages/cli/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,12 @@ export const TIME = {
MINUTE: 60 * 1000,
HOUR: 60 * 60 * 1000,
DAY: 24 * 60 * 60 * 1000,
};
} as const;

export const MIN_PASSWORD_CHAR_LENGTH = 8;

export const MAX_PASSWORD_CHAR_LENGTH = 64;

export const TEST_WEBHOOK_TIMEOUT = 2 * TIME.MINUTE;

export const TEST_WEBHOOK_TIMEOUT_BUFFER = 30 * TIME.SECOND;
75 changes: 45 additions & 30 deletions packages/cli/src/push/abstract.push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { assert, jsonStringify } from 'n8n-workflow';
import type { IPushDataType } from '@/Interfaces';
import type { Logger } from '@/Logger';
import type { User } from '@db/entities/User';
import type { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';

/**
* Abstract class for two-way push communication.
Expand All @@ -16,77 +17,90 @@ export abstract class AbstractPush<T> extends EventEmitter {
protected userIdBySessionId: Record<string, string> = {};

protected abstract close(connection: T): void;
protected abstract sendToOne(connection: T, data: string): void;
protected abstract sendToOneConnection(connection: T, data: string): void;

constructor(protected readonly logger: Logger) {
constructor(
protected readonly logger: Logger,
private readonly multiMainSetup: MultiMainSetup,
) {
super();
}

protected add(sessionId: string, userId: User['id'], connection: T): void {
protected add(sessionId: string, userId: User['id'], connection: T) {
const { connections, userIdBySessionId: userIdsBySessionId } = this;
this.logger.debug('Add editor-UI session', { sessionId });

const existingConnection = connections[sessionId];

if (existingConnection) {
// Make sure to remove existing connection with the same id
// Make sure to remove existing connection with the same ID
this.close(existingConnection);
}

connections[sessionId] = connection;
userIdsBySessionId[sessionId] = userId;
}

protected onMessageReceived(sessionId: string, msg: unknown): void {
protected onMessageReceived(sessionId: string, msg: unknown) {
this.logger.debug('Received message from editor-UI', { sessionId, msg });

const userId = this.userIdBySessionId[sessionId];
this.emit('message', {
sessionId,
userId,
msg,
});

this.emit('message', { sessionId, userId, msg });
}

protected remove(sessionId?: string): void {
if (sessionId !== undefined) {
this.logger.debug('Remove editor-UI session', { sessionId });
delete this.connections[sessionId];
delete this.userIdBySessionId[sessionId];
}
protected remove(sessionId?: string) {
if (!sessionId) return;

this.logger.debug('Removed editor-UI session', { sessionId });

delete this.connections[sessionId];
delete this.userIdBySessionId[sessionId];
}

private sendToSessions<D>(type: IPushDataType, data: D, sessionIds: string[]) {
private sendToSessions(type: IPushDataType, data: unknown, sessionIds: string[]) {
this.logger.debug(`Send data of type "${type}" to editor-UI`, {
dataType: type,
sessionIds: sessionIds.join(', '),
});

const sendData = jsonStringify({ type, data }, { replaceCircularRefs: true });
const stringifiedPayload = jsonStringify({ type, data }, { replaceCircularRefs: true });

for (const sessionId of sessionIds) {
const connection = this.connections[sessionId];
assert(connection);
this.sendToOne(connection, sendData);
this.sendToOneConnection(connection, stringifiedPayload);
}
}

broadcast<D>(type: IPushDataType, data?: D) {
sendToAllSessions(type: IPushDataType, data?: unknown) {
this.sendToSessions(type, data, Object.keys(this.connections));
}

send<D>(type: IPushDataType, data: D, sessionId: string) {
const { connections } = this;
if (connections[sessionId] === undefined) {
sendToOneSession(type: IPushDataType, data: unknown, sessionId: string) {
/**
* Multi-main setup: In a manual webhook execution, the main process that
* handles a webhook might not be the same as the main process that created
* the webhook. If so, the handler process commands the creator process to
* relay the former's execution lifecyle events to the creator's frontend.
*/
if (this.multiMainSetup.isEnabled && !this.hasSessionId(sessionId)) {
const payload = { type, args: data, sessionId };

void this.multiMainSetup.publish('relay-execution-lifecycle-event', payload);

return;
}

if (this.connections[sessionId] === undefined) {
this.logger.error(`The session "${sessionId}" is not registered.`, { sessionId });
return;
}

this.sendToSessions(type, data, [sessionId]);
}

/**
* Sends the given data to given users' connections
*/
sendToUsers<D>(type: IPushDataType, data: D, userIds: Array<User['id']>) {
sendToUsers(type: IPushDataType, data: unknown, userIds: Array<User['id']>) {
const { connections } = this;
const userSessionIds = Object.keys(connections).filter((sessionId) =>
userIds.includes(this.userIdBySessionId[sessionId]),
Expand All @@ -95,9 +109,6 @@ export abstract class AbstractPush<T> extends EventEmitter {
this.sendToSessions(type, data, userSessionIds);
}

/**
* Closes all push existing connections
*/
closeAllConnections() {
for (const sessionId in this.connections) {
// Signal the connection that we want to close it.
Expand All @@ -107,4 +118,8 @@ export abstract class AbstractPush<T> extends EventEmitter {
this.close(this.connections[sessionId]);
}
}

hasSessionId(sessionId: string) {
return this.connections[sessionId] !== undefined;
}
}
17 changes: 8 additions & 9 deletions packages/cli/src/push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,15 @@ export class Push extends EventEmitter {
constructor() {
super();

if (useWebSockets) {
this.backend.on('message', (msg) => this.emit('message', msg));
}
if (useWebSockets) this.backend.on('message', (msg) => this.emit('message', msg));
}

handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) {
const {
userId,
query: { sessionId },
} = req;

if (req.ws) {
(this.backend as WebSocketPush).add(sessionId, userId, req.ws);
} else if (!useWebSockets) {
Expand All @@ -56,24 +55,24 @@ export class Push extends EventEmitter {
this.emit('editorUiConnected', sessionId);
}

broadcast<D>(type: IPushDataType, data?: D) {
this.backend.broadcast(type, data);
broadcast(type: IPushDataType, data?: unknown) {
this.backend.sendToAllSessions(type, data);
}

send<D>(type: IPushDataType, data: D, sessionId: string) {
this.backend.send(type, data, sessionId);
send(type: IPushDataType, data: unknown, sessionId: string) {
this.backend.sendToOneSession(type, data, sessionId);
}

getBackend() {
return this.backend;
}

sendToUsers<D>(type: IPushDataType, data: D, userIds: Array<User['id']>) {
sendToUsers(type: IPushDataType, data: unknown, userIds: Array<User['id']>) {
this.backend.sendToUsers(type, data, userIds);
}

@OnShutdown()
onShutdown(): void {
onShutdown() {
this.backend.closeAllConnections();
}
}
Expand Down
10 changes: 6 additions & 4 deletions packages/cli/src/push/sse.push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Logger } from '@/Logger';
import { AbstractPush } from './abstract.push';
import type { PushRequest, PushResponse } from './types';
import type { User } from '@db/entities/User';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';

type Connection = { req: PushRequest; res: PushResponse };

Expand All @@ -13,8 +14,9 @@ export class SSEPush extends AbstractPush<Connection> {

readonly connections: Record<string, Connection> = {};

constructor(logger: Logger) {
super(logger);
constructor(logger: Logger, multiMainSetup: MultiMainSetup) {
super(logger, multiMainSetup);

this.channel.on('disconnect', (channel, { req }) => {
this.remove(req?.query?.sessionId);
});
Expand All @@ -25,12 +27,12 @@ export class SSEPush extends AbstractPush<Connection> {
this.channel.addClient(connection.req, connection.res);
}

protected close({ res }: Connection): void {
protected close({ res }: Connection) {
res.end();
this.channel.removeClient(res);
}

protected sendToOne(connection: Connection, data: string): void {
protected sendToOneConnection(connection: Connection, data: string) {
this.channel.send(data, [connection.res]);
}
}
7 changes: 4 additions & 3 deletions packages/cli/src/push/websocket.push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ import { Service } from 'typedi';
import { Logger } from '@/Logger';
import { AbstractPush } from './abstract.push';
import type { User } from '@db/entities/User';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';

function heartbeat(this: WebSocket) {
this.isAlive = true;
}

@Service()
export class WebSocketPush extends AbstractPush<WebSocket> {
constructor(logger: Logger) {
super(logger);
constructor(logger: Logger, multiMainSetup: MultiMainSetup) {
super(logger, multiMainSetup);

// Ping all connected clients every 60 seconds
setInterval(() => this.pingAll(), 60 * 1000);
Expand Down Expand Up @@ -51,7 +52,7 @@ export class WebSocketPush extends AbstractPush<WebSocket> {
connection.close();
}

protected sendToOne(connection: WebSocket, data: string): void {
protected sendToOneConnection(connection: WebSocket, data: string): void {
connection.send(data);
}

Expand Down
Loading
Loading