diff --git a/packages/cli/commands/worker.ts b/packages/cli/commands/worker.ts index 30aaf008262a0..7e0a8640cf26c 100644 --- a/packages/cli/commands/worker.ts +++ b/packages/cli/commands/worker.ts @@ -13,23 +13,18 @@ import http from 'http'; import PCancelable from 'p-cancelable'; import { Command, flags } from '@oclif/command'; -import { BinaryDataManager, IBinaryDataConfig, UserSettings, WorkflowExecute } from 'n8n-core'; +import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core'; import { IExecuteResponsePromiseData, INodeTypes, IRun, Workflow, LoggerProxy } from 'n8n-workflow'; import { FindOneOptions, getConnectionManager } from 'typeorm'; -import Bull from 'bull'; import { CredentialsOverwrites, CredentialTypes, Db, ExternalHooks, GenericHelpers, - IBullJobData, - IBullJobResponse, - IBullWebhookResponse, - IExecutionFlattedDb, InternalHooksManager, LoadNodesAndCredentials, NodeTypes, @@ -64,7 +59,7 @@ export class Worker extends Command { [key: string]: PCancelable; } = {}; - static jobQueue: Bull.Queue; + static jobQueue: Queue.JobQueue; static processExistCode = 0; // static activeExecutions = ActiveExecutions.getInstance(); @@ -118,30 +113,28 @@ export class Worker extends Command { process.exit(Worker.processExistCode); } - async runJob(job: Bull.Job, nodeTypes: INodeTypes): Promise { - const jobData = job.data as IBullJobData; - const executionDb = await Db.collections.Execution.findOne(jobData.executionId); + async runJob(job: Queue.Job, nodeTypes: INodeTypes): Promise { + const { executionId, loadStaticData } = job.data; + const executionDb = await Db.collections.Execution.findOne(executionId); if (!executionDb) { LoggerProxy.error( - `Worker failed to find data of execution "${jobData.executionId}" in database. Cannot continue.`, - { - executionId: jobData.executionId, - }, + `Worker failed to find data of execution "${executionId}" in database. Cannot continue.`, + { executionId }, ); throw new Error( - `Unable to find data of execution "${jobData.executionId}" in database. Aborting execution.`, + `Unable to find data of execution "${executionId}" in database. Aborting execution.`, ); } const currentExecutionDb = ResponseHelper.unflattenExecutionData(executionDb); LoggerProxy.info( - `Start job: ${job.id} (Workflow ID: ${currentExecutionDb.workflowData.id} | Execution: ${jobData.executionId})`, + `Start job: ${job.id} (Workflow ID: ${currentExecutionDb.workflowData.id} | Execution: ${executionId})`, ); const workflowOwner = await getWorkflowOwner(currentExecutionDb.workflowData.id!.toString()); let { staticData } = currentExecutionDb.workflowData; - if (jobData.loadStaticData) { + if (loadStaticData) { const findOptions = { select: ['id', 'staticData'], } as FindOneOptions; @@ -154,7 +147,7 @@ export class Worker extends Command { 'Worker execution failed because workflow could not be found in database.', { workflowId: currentExecutionDb.workflowData.id, - executionId: jobData.executionId, + executionId, }, ); throw new Error( @@ -206,14 +199,15 @@ export class Worker extends Command { additionalData.hooks.hookFunctions.sendResponse = [ async (response: IExecuteResponsePromiseData): Promise => { - await job.progress({ - executionId: job.data.executionId as string, + const progress: Queue.WebhookResponse = { + executionId, response: WebhookHelpers.encodeWebhookResponse(response), - } as IBullWebhookResponse); + }; + await job.progress(progress); }, ]; - additionalData.executionId = jobData.executionId; + additionalData.executionId = executionId; let workflowExecute: WorkflowExecute; let workflowRun: PCancelable; diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index 0ebd0f491f3c7..6f0be81979476 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -46,20 +46,6 @@ export interface IActivationError { }; } -export interface IBullJobData { - executionId: string; - loadStaticData: boolean; -} - -export interface IBullJobResponse { - success: boolean; -} - -export interface IBullWebhookResponse { - executionId: string; - response: IExecuteResponsePromiseData; -} - export interface ICustomRequest extends Request { parsedUrl: Url | undefined; } diff --git a/packages/cli/src/Queue.ts b/packages/cli/src/Queue.ts index 9d4314e645ac5..fa74a5ecb8308 100644 --- a/packages/cli/src/Queue.ts +++ b/packages/cli/src/Queue.ts @@ -1,17 +1,32 @@ -/* eslint-disable @typescript-eslint/no-unsafe-member-access */ import Bull from 'bull'; +import { IExecuteResponsePromiseData } from 'n8n-workflow'; import config from '../config'; // eslint-disable-next-line import/no-cycle -import { IBullJobData, IBullWebhookResponse } from './Interfaces'; -// eslint-disable-next-line import/no-cycle import * as ActiveExecutions from './ActiveExecutions'; // eslint-disable-next-line import/no-cycle import * as WebhookHelpers from './WebhookHelpers'; +export type Job = Bull.Job; +export type JobQueue = Bull.Queue; + +export interface JobData { + executionId: string; + loadStaticData: boolean; +} + +export interface JobResponse { + success: boolean; +} + +export interface WebhookResponse { + executionId: string; + response: IExecuteResponsePromiseData; +} + export class Queue { private activeExecutions: ActiveExecutions.ActiveExecutions; - private jobQueue: Bull.Queue; + private jobQueue: JobQueue; constructor() { this.activeExecutions = ActiveExecutions.getInstance(); @@ -26,7 +41,7 @@ export class Queue { // @ts-ignore this.jobQueue = new Bull('jobs', { prefix, redis: redisOptions, enableReadyCheck: false }); - this.jobQueue.on('global:progress', (jobId, progress: IBullWebhookResponse) => { + this.jobQueue.on('global:progress', (jobId, progress: WebhookResponse) => { this.activeExecutions.resolveResponsePromise( // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access progress.executionId, @@ -35,28 +50,28 @@ export class Queue { }); } - async add(jobData: IBullJobData, jobOptions: object): Promise { + async add(jobData: JobData, jobOptions: object): Promise { return this.jobQueue.add(jobData, jobOptions); } - async getJob(jobId: Bull.JobId): Promise { + async getJob(jobId: Bull.JobId): Promise { return this.jobQueue.getJob(jobId); } - async getJobs(jobTypes: Bull.JobStatus[]): Promise { + async getJobs(jobTypes: Bull.JobStatus[]): Promise { return this.jobQueue.getJobs(jobTypes); } - getBullObjectInstance(): Bull.Queue { + getBullObjectInstance(): JobQueue { return this.jobQueue; } /** * - * @param job A Bull.Job instance + * @param job A Job instance * @returns boolean true if we were able to securely stop the job */ - async stopJob(job: Bull.Job): Promise { + async stopJob(job: Job): Promise { if (await job.isActive()) { // Job is already running so tell it to stop await job.progress(-1); diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index a9ddd9f353aed..107386a796374 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -31,17 +31,13 @@ import PCancelable from 'p-cancelable'; import { join as pathJoin } from 'path'; import { fork } from 'child_process'; -import Bull from 'bull'; import config from '../config'; // eslint-disable-next-line import/no-cycle import { ActiveExecutions, CredentialsOverwrites, - CredentialTypes, Db, ExternalHooks, - IBullJobData, - IBullJobResponse, ICredentialsOverwrite, ICredentialsTypeData, IExecutionFlattedDb, @@ -67,7 +63,7 @@ export class WorkflowRunner { push: Push.Push; - jobQueue: Bull.Queue; + jobQueue: Queue.JobQueue; constructor() { this.push = Push.getInstance(); @@ -387,7 +383,7 @@ export class WorkflowRunner { this.activeExecutions.attachResponsePromise(executionId, responsePromise); } - const jobData: IBullJobData = { + const jobData: Queue.JobData = { executionId, loadStaticData: !!loadStaticData, }; @@ -404,7 +400,7 @@ export class WorkflowRunner { removeOnComplete: true, removeOnFail: true, }; - let job: Bull.Job; + let job: Queue.Job; let hooks: WorkflowHooks; try { job = await this.jobQueue.add(jobData, jobOptions); @@ -455,11 +451,11 @@ export class WorkflowRunner { reject(error); }); - const jobData: Promise = job.finished(); + const jobData: Promise = job.finished(); const queueRecoveryInterval = config.getEnv('queue.bull.queueRecoveryInterval'); - const racingPromises: Array> = [jobData]; + const racingPromises: Array> = [jobData]; let clearWatchdogInterval; if (queueRecoveryInterval > 0) {