From 5cffec827fc69a336cff0db95bca877e555902e9 Mon Sep 17 00:00:00 2001 From: jonaslagoni Date: Sat, 26 Apr 2025 17:02:11 +0200 Subject: [PATCH] add valid --- .../channels/protocols/amqp/subscribeQueue.ts | 42 ++++++++++++++--- .../__snapshots__/channels.spec.ts.snap | 45 ++++++++++++------- .../typescript/test/channels/amqp.spec.ts | 32 +++++++++---- .../typescript/test/channels/nats.spec.ts | 2 +- 4 files changed, 91 insertions(+), 30 deletions(-) diff --git a/src/codegen/generators/typescript/channels/protocols/amqp/subscribeQueue.ts b/src/codegen/generators/typescript/channels/protocols/amqp/subscribeQueue.ts index f5e29510..a5d080d5 100644 --- a/src/codegen/generators/typescript/channels/protocols/amqp/subscribeQueue.ts +++ b/src/codegen/generators/typescript/channels/protocols/amqp/subscribeQueue.ts @@ -2,6 +2,7 @@ import {ChannelFunctionTypes} from '../..'; import {SingleFunctionRenderType} from '../../../../../types'; import {pascalCase} from '../../../utils'; import {RenderRegularParameters} from '../../types'; +import { getValidationFunctions } from '../../utils'; export function renderSubscribeQueue({ topic, @@ -9,28 +10,54 @@ export function renderSubscribeQueue({ messageModule, channelParameters, subName = pascalCase(topic), - functionName = `subscribeTo${subName}Queue` + functionName = `subscribeTo${subName}Queue`, + payloadGenerator }: RenderRegularParameters): SingleFunctionRenderType { + const includeValidation = payloadGenerator.generator.includeValidation; const addressToUse = channelParameters ? `parameters.getChannelWithParameters('${topic}')` : `'${topic}'`; - const messageUnmarshalling = `${messageModule ?? messageType}.unmarshal(msg.content.toString())`; + const messageUnmarshalling = `${messageModule ?? messageType}.unmarshal(receivedData)`; messageType = messageModule ? `${messageModule}.${messageType}` : messageType; + const {potentialValidatorCreation, potentialValidationFunction} = + getValidationFunctions({ + includeValidation, + messageModule, + messageType, + onValidationFail: `onDataCallback(new Error('Invalid message payload received', {cause: errors}), undefined, msg); return;` + }); const subscribeOperation = `const channel = await amqp.createChannel(); const queue = ${addressToUse}; await channel.assertQueue(queue, { durable: true }); +${potentialValidatorCreation} channel.consume(queue, (msg) => { if (msg !== null) { + const receivedData = msg.content.toString() + ${potentialValidationFunction} const message = ${messageUnmarshalling}; - onMessage({message, amqpMsg: msg}); + onDataCallback(undefined, message, msg); } }, options);`; + const callbackFunctionParameters = [ + { + parameter: 'err?: Error', + jsDoc: ' * @param err if any error occurred this will be sat' + }, + { + parameter: `msg?: ${messageType}`, + jsDoc: ' * @param msg that was received' + }, + { + parameter: `amqpMsg?: Amqp.ConsumeMessage`, + jsDoc: ' * @param amqpMsg' + } + ]; const functionParameters = [ { - parameter: `onMessage: (callback: {message: ${messageType}, amqpMsg: Amqp.ConsumeMessage}) => void`, - jsDoc: ' * @param onMessage callback to handle received messages' + parameter: `onDataCallback: (${callbackFunctionParameters.map((param) => param.parameter).join(', ')}) => void`, + jsDoc: ` * @param {${functionName}Callback} onDataCallback to call when messages are received` }, ...(channelParameters ? [ @@ -46,6 +73,11 @@ channel.consume(queue, (msg) => { }, { parameter: `options?: Amqp.Options.Consume` + }, + { + parameter: 'skipMessageValidation: boolean = false', + jsDoc: + ' * @param skipMessageValidation turn off runtime validation of incoming messages' } ]; diff --git a/test/codegen/generators/typescript/__snapshots__/channels.spec.ts.snap b/test/codegen/generators/typescript/__snapshots__/channels.spec.ts.snap index 3542c81c..a8fefc6b 100644 --- a/test/codegen/generators/typescript/__snapshots__/channels.spec.ts.snap +++ b/test/codegen/generators/typescript/__snapshots__/channels.spec.ts.snap @@ -240,24 +240,29 @@ channel.sendToQueue(queue, Buffer.from(dataToSend), options); /** * AMQP subscribe operation for queue \`user/signedup\` * - * @param onMessage callback to handle received messages + * @param {subscribeToUserSignedupQueueCallback} onDataCallback to call when messages are received * @param amqp the AMQP connection to receive from + * @param skipMessageValidation turn off runtime validation of incoming messages */ subscribeToUserSignedupQueue: ( - onMessage: (callback: {message: MessageTypeModule.MessageType, amqpMsg: Amqp.ConsumeMessage}) => void, + onDataCallback: (err?: Error, msg?: MessageTypeModule.MessageType, amqpMsg?: Amqp.ConsumeMessage) => void, amqp: Amqp.Connection, - options?: Amqp.Options.Consume + options?: Amqp.Options.Consume, + skipMessageValidation: boolean = false ): Promise => { return new Promise(async (resolve, reject) => { try { const channel = await amqp.createChannel(); const queue = 'user/signedup'; await channel.assertQueue(queue, { durable: true }); + channel.consume(queue, (msg) => { if (msg !== null) { - const message = MessageTypeModule.unmarshal(msg.content.toString()); - onMessage({message, amqpMsg: msg}); + const receivedData = msg.content.toString() + + const message = MessageTypeModule.unmarshal(receivedData); + onDataCallback(undefined, message, msg); } }, options); resolve(channel); @@ -670,24 +675,29 @@ channel.sendToQueue(queue, Buffer.from(dataToSend), options); /** * AMQP subscribe operation for queue \`user/signedup\` * - * @param onMessage callback to handle received messages + * @param {subscribeToUserSignedupQueueCallback} onDataCallback to call when messages are received * @param amqp the AMQP connection to receive from + * @param skipMessageValidation turn off runtime validation of incoming messages */ subscribeToUserSignedupQueue: ( - onMessage: (callback: {message: MessageTypeModule.MessageType, amqpMsg: Amqp.ConsumeMessage}) => void, + onDataCallback: (err?: Error, msg?: MessageTypeModule.MessageType, amqpMsg?: Amqp.ConsumeMessage) => void, amqp: Amqp.Connection, - options?: Amqp.Options.Consume + options?: Amqp.Options.Consume, + skipMessageValidation: boolean = false ): Promise => { return new Promise(async (resolve, reject) => { try { const channel = await amqp.createChannel(); const queue = 'user/signedup'; await channel.assertQueue(queue, { durable: true }); + channel.consume(queue, (msg) => { if (msg !== null) { - const message = MessageTypeModule.unmarshal(msg.content.toString()); - onMessage({message, amqpMsg: msg}); + const receivedData = msg.content.toString() + + const message = MessageTypeModule.unmarshal(receivedData); + onDataCallback(undefined, message, msg); } }, options); resolve(channel); @@ -1133,24 +1143,29 @@ channel.sendToQueue(queue, Buffer.from(dataToSend), options); /** * AMQP subscribe operation for queue \`/ping\` * - * @param onMessage callback to handle received messages + * @param {subscribeToPingQueueCallback} onDataCallback to call when messages are received * @param amqp the AMQP connection to receive from + * @param skipMessageValidation turn off runtime validation of incoming messages */ subscribeToPingQueue: ( - onMessage: (callback: {message: MessageTypeModule.MessageType, amqpMsg: Amqp.ConsumeMessage}) => void, + onDataCallback: (err?: Error, msg?: MessageTypeModule.MessageType, amqpMsg?: Amqp.ConsumeMessage) => void, amqp: Amqp.Connection, - options?: Amqp.Options.Consume + options?: Amqp.Options.Consume, + skipMessageValidation: boolean = false ): Promise => { return new Promise(async (resolve, reject) => { try { const channel = await amqp.createChannel(); const queue = '/ping'; await channel.assertQueue(queue, { durable: true }); + channel.consume(queue, (msg) => { if (msg !== null) { - const message = MessageTypeModule.unmarshal(msg.content.toString()); - onMessage({message, amqpMsg: msg}); + const receivedData = msg.content.toString() + + const message = MessageTypeModule.unmarshal(receivedData); + onDataCallback(undefined, message, msg); } }, options); resolve(channel); diff --git a/test/runtime/typescript/test/channels/amqp.spec.ts b/test/runtime/typescript/test/channels/amqp.spec.ts index e07b5bc6..fb6c03ad 100644 --- a/test/runtime/typescript/test/channels/amqp.spec.ts +++ b/test/runtime/typescript/test/channels/amqp.spec.ts @@ -8,6 +8,7 @@ import { UserSignedupParameters } from '../../src/parameters/UserSignedupParamet describe('amqp', () => { const testMessage = new UserSignedUp({displayName: 'test', email: 'test@test.dk'}); + const invalidMessage = new UserSignedUp({displayName: 'test', email: '123'}); const testParameters = new UserSignedupParameters({myParameter: 'test', enumParameter: 'asyncapi'}); let connection; beforeAll(async () => { @@ -22,14 +23,10 @@ describe('amqp', () => { it('should be able to publish to queue', () => { // eslint-disable-next-line no-async-promise-executor return new Promise(async (resolve, reject) => { - const channel = await subscribeToReceiveUserSignedupQueue(({message, amqpMsg}) => { - if (message !== null) { - expect(message.marshal()).toEqual(testMessage.marshal()); - channel.ack(amqpMsg); - resolve(); - } else { - reject(); - } + const channel = await subscribeToReceiveUserSignedupQueue((err, message, amqpMsg) => { + expect(message.marshal()).toEqual(testMessage.marshal()); + channel.ack(amqpMsg); + resolve(); }, testParameters, connection, {noAck: true}); channel.on('error', (err) => { reject(err); @@ -39,6 +36,23 @@ describe('amqp', () => { await publishToSendUserSignedupQueue(testMessage, testParameters, connection); }); }); + it('should be able to catch invalid message', () => { + // eslint-disable-next-line no-async-promise-executor + return new Promise(async (resolve, reject) => { + const channel = await subscribeToReceiveUserSignedupQueue((err, message, amqpMsg) => { + expect(err).toBeDefined(); + expect(err?.message).toEqual('Invalid message payload received'); + expect(err?.cause).toBeDefined(); + resolve() + }, testParameters, connection, {noAck: true}); + channel.on('error', (err) => { + reject(err); + }); + await channel.prefetch(1); + + await publishToSendUserSignedupQueue(invalidMessage, testParameters, connection); + }); + }); // TODO: cannot create exchange // it('should be able to publish to exchange', () => { // // eslint-disable-next-line no-async-promise-executor @@ -70,7 +84,7 @@ describe('amqp', () => { describe('without parameters', () => { it('should be able to publish to queue', () => { return new Promise(async (resolve, reject) => { - const channel = await subscribeToNoParameterQueue(({message, amqpMsg}) => { + const channel = await subscribeToNoParameterQueue((err, message, amqpMsg) => { if (message !== null) { expect(message.marshal()).toEqual(testMessage.marshal()); channel.ack(amqpMsg); diff --git a/test/runtime/typescript/test/channels/nats.spec.ts b/test/runtime/typescript/test/channels/nats.spec.ts index f8b657fe..7565ae9b 100644 --- a/test/runtime/typescript/test/channels/nats.spec.ts +++ b/test/runtime/typescript/test/channels/nats.spec.ts @@ -100,7 +100,7 @@ describe('nats', () => { await subscriber.drain(); resolve(); } catch (error) { - reject(error); config + reject(error); } }, new UserSignedupParameters({ myParameter: '*', enumParameter: 'asyncapi' }), js, config); js.publish(`user.signedup.${testParameters.myParameter}.${testParameters.enumParameter}`, incorrectPayload);