Skip to content

Commit

Permalink
Merge pull request #10022 from bangbang93/pass-options-to-rmq-deseria…
Browse files Browse the repository at this point in the history
…lize

fix(microservices): pass options to rmq deserialize
  • Loading branch information
kamilmysliwiec committed Jul 28, 2022
2 parents a1aeba6 + 0ec9cb7 commit 7c40213
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
27 changes: 25 additions & 2 deletions packages/microservices/client/client-rmq.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Logger } from '@nestjs/common/services/logger.service';
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util';
import { isFunction } from '@nestjs/common/utils/shared.utils';
import { EventEmitter } from 'events';
import { EmptyError, fromEvent, lastValueFrom, merge, Observable } from 'rxjs';
import { first, map, retryWhen, scan, share, switchMap } from 'rxjs/operators';
Expand Down Expand Up @@ -187,9 +188,25 @@ export class ClientRMQ extends ClientProxy {
public async handleMessage(
packet: unknown,
callback: (packet: WritePacket) => any,
);
public async handleMessage(
packet: unknown,
options: Record<string, unknown>,
callback: (packet: WritePacket) => any,
);
public async handleMessage(
packet: unknown,
options: Record<string, unknown> | ((packet: WritePacket) => any),
callback?: (packet: WritePacket) => any,
) {
if (isFunction(options)) {
callback = options as (packet: WritePacket) => any;
options = undefined;
}

const { err, response, isDisposed } = await this.deserializer.deserialize(
packet,
options,
);
if (isDisposed || err) {
callback({
Expand All @@ -210,8 +227,14 @@ export class ClientRMQ extends ClientProxy {
): () => void {
try {
const correlationId = randomStringGenerator();
const listener = ({ content }: { content: any }) =>
this.handleMessage(JSON.parse(content.toString()), callback);
const listener = ({
content,
options,
}: {
content: any;
options: Record<string, unknown>;
}) =>
this.handleMessage(JSON.parse(content.toString()), options, callback);

Object.assign(message, { id: correlationId });
const serializedPacket: ReadPacket & Partial<RmqRecord> =
Expand Down
4 changes: 2 additions & 2 deletions packages/microservices/server/server-rmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import {
DISCONNECTED_RMQ_MESSAGE,
NO_MESSAGE_HANDLER,
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
RQM_DEFAULT_NO_ASSERT,
RQM_DEFAULT_NOACK,
RQM_DEFAULT_PREFETCH_COUNT,
RQM_DEFAULT_QUEUE,
RQM_DEFAULT_QUEUE_OPTIONS,
RQM_DEFAULT_URL,
RQM_DEFAULT_NO_ASSERT,
} from '../constants';
import { RmqContext } from '../ctx-host';
import { Transport } from '../enums';
Expand Down Expand Up @@ -142,7 +142,7 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
}
const { content, properties } = message;
const rawMessage = JSON.parse(content.toString());
const packet = await this.deserializer.deserialize(rawMessage);
const packet = await this.deserializer.deserialize(rawMessage, properties);
const pattern = isString(packet.pattern)
? packet.pattern
: JSON.stringify(packet.pattern);
Expand Down

0 comments on commit 7c40213

Please sign in to comment.