Skip to content

Commit

Permalink
fix(core): Add check that queue is defined and remove cyclic dependen…
Browse files Browse the repository at this point in the history
…cy (#7404)

In a rare edge case an undefined queue could be returned - this should
not happen and now an error is thrown.
Also using the opportunity to remove a cyclic dependency from the Queue.
  • Loading branch information
flipswitchingmonkey committed Oct 13, 2023
1 parent 609f083 commit 45f2ef3
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 33 deletions.
21 changes: 19 additions & 2 deletions packages/cli/src/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import type Bull from 'bull';
import { Service } from 'typedi';
import type { ExecutionError, IExecuteResponsePromiseData } from 'n8n-workflow';
import { ActiveExecutions } from '@/ActiveExecutions';
import * as WebhookHelpers from '@/WebhookHelpers';
import { decodeWebhookResponse } from '@/helpers/decodeWebhookResponse';

import {
getRedisClusterClient,
getRedisClusterNodes,
Expand Down Expand Up @@ -62,7 +63,7 @@ export class Queue {
this.jobQueue.on('global:progress', (jobId, progress: WebhookResponse) => {
this.activeExecutions.resolveResponsePromise(
progress.executionId,
WebhookHelpers.decodeWebhookResponse(progress.response),
decodeWebhookResponse(progress.response),
);
});
}
Expand All @@ -79,7 +80,23 @@ export class Queue {
return this.jobQueue.getJobs(jobTypes);
}

async process(concurrency: number, fn: Bull.ProcessCallbackFunction<JobData>): Promise<void> {
return this.jobQueue.process(concurrency, fn);
}

async ping(): Promise<string> {
return this.jobQueue.client.ping();
}

async pause(isLocal?: boolean): Promise<void> {
return this.jobQueue.pause(isLocal);
}

getBullObjectInstance(): JobQueue {
if (this.jobQueue === undefined) {
// if queue is not initialized yet throw an error, since we do not want to hand around an undefined queue
throw new Error('Queue is not initialized yet!');
}
return this.jobQueue;
}

Expand Down
17 changes: 0 additions & 17 deletions packages/cli/src/WebhookHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,23 +160,6 @@ export function getWorkflowWebhooks(
return returnData;
}

export function decodeWebhookResponse(
response: IExecuteResponsePromiseData,
): IExecuteResponsePromiseData {
if (
typeof response === 'object' &&
typeof response.body === 'object' &&
(response.body as IDataObject)['__@N8nEncodedBuffer@__']
) {
response.body = Buffer.from(
(response.body as IDataObject)['__@N8nEncodedBuffer@__'] as string,
BINARY_ENCODING,
);
}

return response;
}

export function encodeWebhookResponse(
response: IExecuteResponsePromiseData,
): IExecuteResponsePromiseData {
Expand Down
11 changes: 5 additions & 6 deletions packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import type {
IWorkflowExecutionDataProcessWithExecution,
} from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import type { Job, JobData, JobQueue, JobResponse } from '@/Queue';
import type { Job, JobData, JobResponse } from '@/Queue';
// eslint-disable-next-line import/no-cycle
import { Queue } from '@/Queue';
import * as WebhookHelpers from '@/WebhookHelpers';
import { decodeWebhookResponse } from '@/helpers/decodeWebhookResponse';
// eslint-disable-next-line import/no-cycle
import * as WorkflowHelpers from '@/WorkflowHelpers';
// eslint-disable-next-line import/no-cycle
Expand All @@ -60,7 +60,7 @@ export class WorkflowRunner {

push: Push;

jobQueue: JobQueue;
jobQueue: Queue;

constructor() {
this.push = Container.get(Push);
Expand Down Expand Up @@ -172,8 +172,7 @@ export class WorkflowRunner {
await initErrorHandling();

if (executionsMode === 'queue') {
const queue = Container.get(Queue);
this.jobQueue = queue.getBullObjectInstance();
this.jobQueue = Container.get(Queue);
}

if (executionsMode === 'queue' && data.executionMode !== 'manual') {
Expand Down Expand Up @@ -733,7 +732,7 @@ export class WorkflowRunner {
this.activeExecutions.remove(executionId, message.data.runData);
} else if (message.type === 'sendResponse') {
if (responsePromise) {
responsePromise.resolve(WebhookHelpers.decodeWebhookResponse(message.data.response));
responsePromise.resolve(decodeWebhookResponse(message.data.response));
}
} else if (message.type === 'sendDataToUI') {
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
Expand Down
15 changes: 7 additions & 8 deletions packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'
import { PermissionChecker } from '@/UserManagement/PermissionChecker';

import config from '@/config';
import type { Job, JobId, JobQueue, JobResponse, WebhookResponse } from '@/Queue';
import type { Job, JobId, JobResponse, WebhookResponse } from '@/Queue';
import { Queue } from '@/Queue';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
import { N8N_VERSION } from '@/constants';
Expand Down Expand Up @@ -61,7 +61,7 @@ export class Worker extends BaseCommand {
[jobId: string]: WorkerJobStatusSummary;
} = {};

static jobQueue: JobQueue;
static jobQueue: Queue;

redisSubscriber: RedisServicePubSubSubscriber;

Expand Down Expand Up @@ -335,15 +335,14 @@ export class Worker extends BaseCommand {
`Opening Redis connection to listen to messages with timeout ${redisConnectionTimeoutLimit}`,
);

const queue = Container.get(Queue);
await queue.init();
Worker.jobQueue = Container.get(Queue);
await Worker.jobQueue.init();
this.logger.debug('Queue singleton ready');
Worker.jobQueue = queue.getBullObjectInstance();
void Worker.jobQueue.process(flags.concurrency, async (job) =>
this.runJob(job, this.nodeTypes),
);

Worker.jobQueue.on('global:progress', (jobId: JobId, progress) => {
Worker.jobQueue.getBullObjectInstance().on('global:progress', (jobId: JobId, progress) => {
// Progress of a job got updated which does get used
// to communicate that a job got canceled.

Expand All @@ -359,7 +358,7 @@ export class Worker extends BaseCommand {

let lastTimer = 0;
let cumulativeTimeout = 0;
Worker.jobQueue.on('error', (error: Error) => {
Worker.jobQueue.getBullObjectInstance().on('error', (error: Error) => {
if (error.toString().includes('ECONNREFUSED')) {
const now = Date.now();
if (now - lastTimer > 30000) {
Expand Down Expand Up @@ -423,7 +422,7 @@ export class Worker extends BaseCommand {
// if it loses the connection to redis
try {
// Redis ping
await Worker.jobQueue.client.ping();
await Worker.jobQueue.ping();
} catch (e) {
LoggerProxy.error('No Redis connection!', e as Error);
const error = new ResponseHelper.ServiceUnavailableError('No Redis connection!');
Expand Down
18 changes: 18 additions & 0 deletions packages/cli/src/helpers/decodeWebhookResponse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { BINARY_ENCODING, type IDataObject, type IExecuteResponsePromiseData } from 'n8n-workflow';

export function decodeWebhookResponse(
response: IExecuteResponsePromiseData,
): IExecuteResponsePromiseData {
if (
typeof response === 'object' &&
typeof response.body === 'object' &&
(response.body as IDataObject)['__@N8nEncodedBuffer@__']
) {
response.body = Buffer.from(
(response.body as IDataObject)['__@N8nEncodedBuffer@__'] as string,
BINARY_ENCODING,
);
}

return response;
}

0 comments on commit 45f2ef3

Please sign in to comment.