diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/WebhookHelpers.ts index 8d8d75a7c6c5f..2682e928edd46 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/WebhookHelpers.ts @@ -9,7 +9,7 @@ import type express from 'express'; import { Container } from 'typedi'; import get from 'lodash/get'; -import { pipeline } from 'stream/promises'; +import { finished } from 'stream/promises'; import formidable from 'formidable'; import { BinaryDataService, NodeExecuteFunctions } from 'n8n-core'; @@ -30,6 +30,7 @@ import type { IWebhookResponseData, IWorkflowDataProxyAdditionalKeys, IWorkflowExecuteAdditionalData, + WebhookResponseMode, Workflow, WorkflowExecuteMode, } from 'n8n-workflow'; @@ -272,7 +273,7 @@ export async function executeWebhook( additionalKeys, undefined, 'onReceived', - ); + ) as WebhookResponseMode; const responseCode = workflow.expression.getSimpleParameterValue( workflowStartNode, webhookData.webhookDescription.responseCode as string, @@ -291,7 +292,7 @@ export async function executeWebhook( 'firstEntryJson', ); - if (!['onReceived', 'lastNode', 'responseNode'].includes(responseMode as string)) { + if (!['onReceived', 'lastNode', 'responseNode'].includes(responseMode)) { // If the mode is not known we error. Is probably best like that instead of using // the default that people know as early as possible (probably already testing phase) // that something does not resolve properly. @@ -562,7 +563,8 @@ export async function executeWebhook( if (binaryData?.id) { res.header(response.headers); const stream = await Container.get(BinaryDataService).getAsStream(binaryData.id); - await pipeline(stream, res); + stream.pipe(res, { end: false }); + await finished(stream); responseCallback(null, { noWebhookResponse: true }); } else if (Buffer.isBuffer(response.body)) { res.header(response.headers); @@ -595,6 +597,7 @@ export async function executeWebhook( }); } + process.nextTick(() => res.end()); didSendResponse = true; }) .catch(async (error) => { @@ -659,17 +662,9 @@ export async function executeWebhook( return data; } - if (responseMode === 'responseNode') { - if (!didSendResponse) { - // Return an error if no Webhook-Response node did send any data - responseCallback(null, { - data: { - message: 'Workflow executed successfully', - }, - responseCode, - }); - didSendResponse = true; - } + // in `responseNode` mode `responseCallback` is called by `responsePromise` + if (responseMode === 'responseNode' && responsePromise) { + await Promise.allSettled([responsePromise.promise()]); return undefined; } @@ -795,14 +790,16 @@ export async function executeWebhook( res.setHeader('Content-Type', binaryData.mimeType); if (binaryData.id) { const stream = await Container.get(BinaryDataService).getAsStream(binaryData.id); - await pipeline(stream, res); + stream.pipe(res, { end: false }); + await finished(stream); } else { - res.end(Buffer.from(binaryData.data, BINARY_ENCODING)); + res.write(Buffer.from(binaryData.data, BINARY_ENCODING)); } responseCallback(null, { noWebhookResponse: true, }); + process.nextTick(() => res.end()); } } else if (responseData === 'noData') { // Return without data diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 49fab3b5d316c..4becafcb07318 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -1834,7 +1834,7 @@ export interface IWebhookResponseData { } export type WebhookResponseData = 'allEntries' | 'firstEntryJson' | 'firstEntryBinary' | 'noData'; -export type WebhookResponseMode = 'onReceived' | 'lastNode'; +export type WebhookResponseMode = 'onReceived' | 'lastNode' | 'responseNode'; export interface INodeTypes { getByName(nodeType: string): INodeType | IVersionedNodeType;