Skip to content

Commit

Permalink
Merge pull request #8851 from micalevisk/fix-issue-8595
Browse files Browse the repository at this point in the history
Use `isObervable` from `rxjs` instead of our own implementation. Drop few deprecated APIs from 3rd-party libs. And other minor refactorings
  • Loading branch information
kamilmysliwiec committed Dec 27, 2021
2 parents 14a111f + de83c0c commit 28516d7
Show file tree
Hide file tree
Showing 30 changed files with 160 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ export class AdvancedGrpcController {
async streamReq(messages: Observable<any>): Promise<any> {
const s = new Subject();
const o = s.asObservable();
messages.subscribe(
msg => {
messages.subscribe({
next: () => {
s.next({
id: 1,
itemTypes: [1],
Expand All @@ -146,9 +146,8 @@ export class AdvancedGrpcController {
},
});
},
null,
() => s.complete(),
);
complete: () => s.complete(),
});
return o;
}

Expand Down
8 changes: 4 additions & 4 deletions integration/microservices/src/grpc/grpc.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ export class GrpcController {
@GrpcStreamMethod('Math')
async sumStream(messages: Observable<any>): Promise<any> {
return new Promise<any>((resolve, reject) => {
messages.subscribe(
msg => {
messages.subscribe({
next: msg => {
resolve({
result: msg.data.reduce((a, b) => a + b),
});
},
err => {
error: err => {
reject(err);
},
);
});
});
}

Expand Down
4 changes: 2 additions & 2 deletions packages/common/pipes/default-value.pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
ArgumentMetadata,
PipeTransform,
} from '../interfaces/features/pipe-transform.interface';
import { isNil } from '../utils/shared.utils';
import { isNil, isNumber } from '../utils/shared.utils';

/**
* Defines the built-in DefaultValue Pipe
Expand All @@ -21,7 +21,7 @@ export class DefaultValuePipe<T = any, R = any>
transform(value?: T, _metadata?: ArgumentMetadata): T | R {
if (
isNil(value) ||
(typeof value === 'number' && isNaN(value as unknown as number))
(isNumber(value) && isNaN(value as unknown as number))
) {
return this.defaultValue;
}
Expand Down
23 changes: 21 additions & 2 deletions packages/common/test/utils/shared.utils.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
isEmpty,
isFunction,
isNil,
isNumber,
isObject,
isPlainObject,
isString,
Expand Down Expand Up @@ -66,15 +67,33 @@ describe('Shared utils', () => {
});
});
describe('isString', () => {
it('should return true when obj is a string', () => {
it('should return true when val is a string', () => {
expect(isString('true')).to.be.true;
});
it('should return false when object is not a string', () => {
it('should return false when val is not a string', () => {
expect(isString(new String('fine'))).to.be.false;
expect(isString(false)).to.be.false;
expect(isString(null)).to.be.false;
expect(isString(undefined)).to.be.false;
});
});
describe('isNumber', () => {
it('should return true when val is a number or NaN', () => {
expect(isNumber(1)).to.be.true;
expect(isNumber(1.23)).to.be.true; // with decimals
expect(isNumber(123e-5)).to.be.true; // scientific (exponent) notation
expect(isNumber(0o1)).to.be.true; // octal notation
expect(isNumber(0b1)).to.be.true; // binary notation
expect(isNumber(0x1)).to.be.true; // hexadecimal notation
expect(isNumber(NaN)).to.be.true;
});
it('should return false when val is not a number', () => {
// expect(isNumber(1n)).to.be.false; // big int (available on ES2020)
expect(isNumber('1')).to.be.false; // string
expect(isNumber(undefined)).to.be.false; // nullish
expect(isNumber(null)).to.be.false; // nullish
});
});
describe('isConstructor', () => {
it('should return true when string is equal to constructor', () => {
expect(isConstructor('constructor')).to.be.true;
Expand Down
13 changes: 7 additions & 6 deletions packages/common/utils/shared.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ export const normalizePath = (path?: string): string =>
export const stripEndSlash = (path: string) =>
path[path.length - 1] === '/' ? path.slice(0, path.length - 1) : path;

export const isFunction = (fn: any): boolean => typeof fn === 'function';
export const isString = (fn: any): fn is string => typeof fn === 'string';
export const isConstructor = (fn: any): boolean => fn === 'constructor';
export const isNil = (obj: any): obj is null | undefined =>
isUndefined(obj) || obj === null;
export const isFunction = (val: any): boolean => typeof val === 'function';
export const isString = (val: any): val is string => typeof val === 'string';
export const isNumber = (val: any): val is number => typeof val === 'number';
export const isConstructor = (val: any): boolean => val === 'constructor';
export const isNil = (val: any): val is null | undefined =>
isUndefined(val) || val === null;
export const isEmpty = (array: any): boolean => !(array && array.length > 0);
export const isSymbol = (fn: any): fn is symbol => typeof fn === 'symbol';
export const isSymbol = (val: any): val is symbol => typeof val === 'symbol';
4 changes: 2 additions & 2 deletions packages/core/helpers/external-context-creator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
PipeTransform,
} from '@nestjs/common/interfaces';
import { isEmpty, isFunction } from '@nestjs/common/utils/shared.utils';
import { lastValueFrom } from 'rxjs';
import { lastValueFrom, isObservable } from 'rxjs';
import { ExternalExceptionFilterContext } from '../exceptions/external-exception-filter-context';
import { FORBIDDEN_MESSAGE } from '../guards/constants';
import { GuardsConsumer } from '../guards/guards-consumer';
Expand Down Expand Up @@ -329,7 +329,7 @@ export class ExternalContextCreator {
}

public async transformToResult(resultOrDeferred: any) {
if (resultOrDeferred && isFunction(resultOrDeferred.subscribe)) {
if (isObservable(resultOrDeferred)) {
return lastValueFrom(resultOrDeferred);
}
return resultOrDeferred;
Expand Down
9 changes: 2 additions & 7 deletions packages/microservices/context/rpc-proxy.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { isFunction } from '@nestjs/common/utils/shared.utils';
import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context-host';
import { Observable } from 'rxjs';
import { Observable, isObservable } from 'rxjs';
import { catchError } from 'rxjs/operators';
import { RpcExceptionsHandler } from '../exceptions/rpc-exceptions-handler';

Expand All @@ -12,7 +11,7 @@ export class RpcProxy {
return async (...args: unknown[]) => {
try {
const result = await targetCallback(...args);
return !this.isObservable(result)
return !isObservable(result)
? result
: result.pipe(
catchError(error =>
Expand All @@ -34,8 +33,4 @@ export class RpcProxy {
host.setType('rpc');
return exceptionsHandler.handle(error, host);
}

isObservable(result: any): boolean {
return result && isFunction(result.subscribe);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export class BaseRpcExceptionFilter<T = any, R = any>
}
const res = exception.getError();
const message = isObject(res) ? res : { status, message: res };
return _throw(message);
return _throw(() => message);
}

public handleUnknownError(exception: T, status: string) {
Expand All @@ -29,7 +29,7 @@ export class BaseRpcExceptionFilter<T = any, R = any>
const logger = BaseRpcExceptionFilter.logger;
logger.error.apply(logger, loggerArgs as any);

return _throw({ status, message: errorMessage });
return _throw(() => ({ status, message: errorMessage }));
}

public isError(exception: any): exception is Error {
Expand Down
5 changes: 2 additions & 3 deletions packages/microservices/serializers/nats-record.serializer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { isObject } from '@nestjs/common/utils/shared.utils';
import { NatsCodec } from '../external/nats-client.interface';
import { ReadPacket } from '../interfaces';
import { Serializer } from '../interfaces/serializer.interface';
Expand All @@ -20,9 +21,7 @@ export class NatsRecordSerializer

serialize(packet: ReadPacket | any): NatsRecord {
const natsMessage =
packet?.data &&
typeof packet.data === 'object' &&
packet.data instanceof NatsRecord
packet?.data && isObject(packet.data) && packet.data instanceof NatsRecord
? (packet.data as NatsRecord)
: new NatsRecordBuilder(packet?.data).build();

Expand Down
8 changes: 4 additions & 4 deletions packages/microservices/server/server-grpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,10 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
public createUnaryServiceMethod(methodHandler: Function): Function {
return async (call: GrpcCall, callback: Function) => {
const handler = methodHandler(call.request, call.metadata, call);
this.transformToObservable(await handler).subscribe(
data => callback(null, data),
(err: any) => callback(err),
);
this.transformToObservable(await handler).subscribe({
next: data => callback(null, data),
error: (err: any) => callback(err),
});
};
}

Expand Down
2 changes: 1 addition & 1 deletion packages/microservices/server/server-kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy {

const response$ = this.transformToObservable(
await handler(packet.data, kafkaContext),
) as Observable<any>;
);
response$ && this.send(response$, publish);
}

Expand Down
5 changes: 3 additions & 2 deletions packages/microservices/server/server-mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ export class ServerMqtt extends Server implements CustomTransportStrategy {
}
const response$ = this.transformToObservable(
await handler(packet.data, mqttContext),
) as Observable<any>;
);
response$ && this.send(response$, publish);
}

Expand Down Expand Up @@ -191,7 +191,8 @@ export class ServerMqtt extends Server implements CustomTransportStrategy {

for (const [key, value] of this.messageHandlers) {
if (
!key.includes(MQTT_WILDCARD_SINGLE) && !key.includes(MQTT_WILDCARD_ALL)
!key.includes(MQTT_WILDCARD_SINGLE) &&
!key.includes(MQTT_WILDCARD_ALL)
) {
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/microservices/server/server-nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ export class ServerNats extends Server implements CustomTransportStrategy {
}
const response$ = this.transformToObservable(
await handler(message.data, natsCtx),
) as Observable<any>;
);
response$ && this.send(response$, publish);
}

Expand Down
2 changes: 1 addition & 1 deletion packages/microservices/server/server-redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ export class ServerRedis extends Server implements CustomTransportStrategy {
}
const response$ = this.transformToObservable(
await handler(packet.data, redisCtx),
) as Observable<any>;
);
response$ && this.send(response$, publish);
}

Expand Down
2 changes: 1 addition & 1 deletion packages/microservices/server/server-rmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
}
const response$ = this.transformToObservable(
await handler(packet.data, rmqContext),
) as Observable<any>;
);

const publish = <T>(data: T) =>
this.sendMessage(data, properties.replyTo, properties.correlationId);
Expand Down
2 changes: 1 addition & 1 deletion packages/microservices/server/server-tcp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export class ServerTCP extends Server implements CustomTransportStrategy {
}
const response$ = this.transformToObservable(
await handler(packet.data, tcpContext),
) as Observable<any>;
);

response$ &&
this.send(response$, data => {
Expand Down
32 changes: 20 additions & 12 deletions packages/microservices/server/server.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Logger, LoggerService } from '@nestjs/common/services/logger.service';
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { isFunction } from '@nestjs/common/utils/shared.utils';
import {
connectable,
EMPTY as empty,
EMPTY,
from as fromPromise,
isObservable,
Observable,
ObservedValueOf,
of,
Subject,
Subscription,
Expand Down Expand Up @@ -94,7 +95,7 @@ export abstract class Server {
.pipe(
catchError((err: any) => {
scheduleOnNextTick({ err });
return empty;
return EMPTY;
}),
finalize(() => scheduleOnNextTick({ isDisposed: true })),
)
Expand All @@ -113,7 +114,7 @@ export abstract class Server {
);
}
const resultOrStream = await handler(packet.data, context);
if (this.isObservable(resultOrStream)) {
if (isObservable(resultOrStream)) {
const connectableSource = connectable(resultOrStream, {
connector: () => new Subject(),
resetOnDisconnect: false,
Expand All @@ -122,13 +123,24 @@ export abstract class Server {
}
}

public transformToObservable<T = any>(resultOrDeferred: any): Observable<T> {
public transformToObservable<T>(
resultOrDeferred: Observable<T> | Promise<T>,
): Observable<T>;
public transformToObservable<T>(
resultOrDeferred: T,
): never extends Observable<ObservedValueOf<T>>
? Observable<T>
: Observable<ObservedValueOf<T>>;
public transformToObservable(resultOrDeferred: any) {
if (resultOrDeferred instanceof Promise) {
return fromPromise(resultOrDeferred);
} else if (!this.isObservable(resultOrDeferred)) {
return of(resultOrDeferred);
}
return resultOrDeferred;

if (isObservable(resultOrDeferred)) {
return resultOrDeferred;
}

return of(resultOrDeferred);
}

public getOptionsProp<
Expand Down Expand Up @@ -180,10 +192,6 @@ export abstract class Server {
new IncomingRequestDeserializer();
}

private isObservable(input: unknown): input is Observable<any> {
return input && isFunction((input as Observable<any>).subscribe);
}

/**
* Transforms the server Pattern to valid type and returns a route for him.
*
Expand Down

0 comments on commit 28516d7

Please sign in to comment.