Skip to content

Commit

Permalink
Merge pull request #3963 from nestjs/fix/grpc-client-streaming
Browse files Browse the repository at this point in the history
fix(microservices): fix client-side grpc streaming
  • Loading branch information
kamilmysliwiec committed Jan 31, 2020
2 parents 9bd1a43 + 907d07b commit 748d3aa
Show file tree
Hide file tree
Showing 10 changed files with 425 additions and 58 deletions.
63 changes: 63 additions & 0 deletions integration/microservices/e2e/orders-grpc.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,21 @@ describe('Advanced GRPC transport', () => {
});
});

it(`GRPC Streaming and Receiving HTTP POST`, () => {
return request(server)
.post('/client-streaming')
.send('1')
.expect(200, {
id: 1,
itemTypes: [1],
shipmentType: {
from: 'test',
to: 'test1',
carrier: 'test-carrier',
},
});
});

it('GRPC Sending and receiving message', async () => {
// Execute find in Promise
return new Promise(resolve => {
Expand Down Expand Up @@ -166,4 +181,52 @@ describe('Advanced GRPC transport', () => {
setTimeout(() => resolve(), 1000);
});
});

it('GRPC Sending Stream and receiving a single message from RX handler', async () => {
const callHandler = client.streamReq((err, res) => {
if (err) {
throw err;
}
expect(res).to.eql({
id: 1,
itemTypes: [1],
shipmentType: {
from: 'test',
to: 'test1',
carrier: 'test-carrier',
},
});
});

return new Promise((resolve, reject) => {
callHandler.write({
id: 1,
});
setTimeout(() => resolve(), 1000);
});
});

it('GRPC Sending Stream and receiving a single message from Call handler', async () => {
const callHandler = client.streamReqCall((err, res) => {
if (err) {
throw err;
}
expect(res).to.eql({
id: 1,
itemTypes: [1],
shipmentType: {
from: 'test',
to: 'test1',
carrier: 'test-carrier',
},
});
});

return new Promise((resolve, reject) => {
callHandler.write({
id: 1,
});
setTimeout(() => resolve(), 1000);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
Transport,
} from '@nestjs/microservices';
import { join } from 'path';
import { Observable, of, Subject } from 'rxjs';
import { Observable, of, ReplaySubject, Subject } from 'rxjs';

@Controller()
export class AdvancedGrpcController {
Expand Down Expand Up @@ -40,6 +40,28 @@ export class AdvancedGrpcController {
return svc.find({ id });
}

/**
* HTTP Proxy entry for support client-side stream find method
* @param id
*/
@Post('client-streaming')
@HttpCode(200)
stream(): Observable<number> {
const svc = this.client.getService<any>('OrderService');
const upstream = new ReplaySubject();
upstream.next({
id: 1,
itemTypes: [1],
shipmentType: {
from: 'test',
to: 'test1',
carrier: 'test-carrier',
},
});
upstream.complete();
return svc.streamReq(upstream);
}

/**
* GRPC stub for Find method
* @param id
Expand Down Expand Up @@ -97,4 +119,44 @@ export class AdvancedGrpcController {
});
});
}

@GrpcStreamMethod('orders.OrderService')
async streamReq(messages: Observable<any>): Promise<any> {
const s = new Subject();
const o = s.asObservable();
messages.subscribe(
msg => {
s.next({
id: 1,
itemTypes: [1],
shipmentType: {
from: 'test',
to: 'test1',
carrier: 'test-carrier',
},
});
},
null,
() => s.complete(),
);
return o;
}

@GrpcStreamCall('orders.OrderService')
async streamReqCall(stream: any, callback: Function) {
stream.on('data', (msg: any) => {
// process msg
});
stream.on('end', () => {
callback(null, {
id: 1,
itemTypes: [1],
shipmentType: {
from: 'test',
to: 'test1',
carrier: 'test-carrier',
},
});
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ service OrderService {
rpc Find(Order) returns (Order);
rpc Sync(stream Order) returns (stream Order);
rpc SyncCall(stream Order) returns (stream Order);
rpc StreamReq(stream Order) returns (Order);
rpc StreamReqCall(stream Order) returns (Order);
}
58 changes: 53 additions & 5 deletions packages/microservices/client/client-grpc.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Logger } from '@nestjs/common/services/logger.service';
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { isObject } from '@nestjs/common/utils/shared.utils';
import { Observable } from 'rxjs';
import { isFunction, isObject } from '@nestjs/common/utils/shared.utils';
import { Observable, Subscription } from 'rxjs';
import {
GRPC_DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH,
GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH,
Expand Down Expand Up @@ -111,10 +111,27 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
methodName: string,
): (...args: any[]) => Observable<any> {
return (...args: any[]) => {
return new Observable(observer => {
const isRequestStream = client[methodName].requestStream;
const stream = new Observable(observer => {
let isClientCanceled = false;
const call = client[methodName](...args);
let upstreamSubscription: Subscription;

const upstreamSubjectOrData = args[0];
const isUpstreamSubject =
upstreamSubjectOrData && isFunction(upstreamSubjectOrData.subscribe);

const call =
isRequestStream && isUpstreamSubject
? client[methodName]()
: client[methodName](...args);

if (isRequestStream && isUpstreamSubject) {
upstreamSubscription = upstreamSubjectOrData.subscribe(
(val: unknown) => call.write(val),
(err: unknown) => call.emit('error', err),
() => call.end(),
);
}
call.on('data', (data: any) => observer.next(data));
call.on('error', (error: any) => {
if (error.details === GRPC_CANCELLED) {
Expand All @@ -126,17 +143,27 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
observer.error(error);
});
call.on('end', () => {
if (upstreamSubscription) {
upstreamSubscription.unsubscribe();
upstreamSubscription = null;
}
call.removeAllListeners();
observer.complete();
});
return (): any => {
return () => {
if (upstreamSubscription) {
upstreamSubscription.unsubscribe();
upstreamSubscription = null;
}

if (call.finished) {
return undefined;
}
isClientCanceled = true;
call.cancel();
};
});
return stream;
};
}

Expand All @@ -145,6 +172,27 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
methodName: string,
): (...args: any[]) => Observable<any> {
return (...args: any[]) => {
const isRequestStream = client[methodName].requestStream;
const upstreamSubjectOrData = args[0];
const isUpstreamSubject =
upstreamSubjectOrData && isFunction(upstreamSubjectOrData.subscribe);

if (isRequestStream && isUpstreamSubject) {
return new Observable(observer => {
const call = client[methodName]((error, data) => {
if (error) {
return observer.error(error);
}
observer.next(data);
observer.complete();
});
upstreamSubjectOrData.subscribe(
(val: unknown) => call.write(val),
(err: unknown) => call.emit('error', err),
() => call.end(),
);
});
}
return new Observable(observer => {
client[methodName](...args, (error: any, data: any) => {
if (error) {
Expand Down
90 changes: 65 additions & 25 deletions packages/microservices/server/server-grpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,19 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
// streaming from the side of requester
if (protoNativeHandler.requestStream) {
// If any handlers were defined with GrpcStreamMethod annotation use RX
if (streamType === GrpcMethodStreamingType.RX_STREAMING)
return this.createStreamDuplexMethod(methodHandler);
if (streamType === GrpcMethodStreamingType.RX_STREAMING) {
return this.createRequestStreamMethod(
methodHandler,
protoNativeHandler.responseStream,
);
}
// If any handlers were defined with GrpcStreamCall annotation
else if (streamType === GrpcMethodStreamingType.PT_STREAMING)
return this.createStreamCallMethod(methodHandler);
else if (streamType === GrpcMethodStreamingType.PT_STREAMING) {
return this.createStreamCallMethod(
methodHandler,
protoNativeHandler.responseStream,
);
}
}
return protoNativeHandler.responseStream
? this.createStreamServiceMethod(methodHandler)
Expand Down Expand Up @@ -220,17 +228,23 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
};
}

public createStreamDuplexMethod(methodHandler: Function) {
return async (call: GrpcCall) => {
public createRequestStreamMethod(
methodHandler: Function,
isResponseStream: boolean,
) {
return async (
call: GrpcCall,
callback: (err: unknown, value: unknown) => void,
) => {
const req = new Subject<any>();
call.on('data', (m: any) => req.next(m));
call.on('error', (e: any) => {
// Check if error means that stream ended on other end
if (
String(e)
.toLowerCase()
.indexOf('cancelled') > -1
) {
const isCancelledError = String(e)
.toLowerCase()
.indexOf('cancelled');

if (isCancelledError) {
call.end();
return;
}
Expand All @@ -241,23 +255,49 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {

const handler = methodHandler(req.asObservable());
const res = this.transformToObservable(await handler);
await res
.pipe(
takeUntil(fromEvent(call as any, CANCEL_EVENT)),
catchError(err => {
call.emit('error', err);
return EMPTY;
}),
)
.forEach(m => call.write(m));

call.end();
if (isResponseStream) {
await res
.pipe(
takeUntil(fromEvent(call as any, CANCEL_EVENT)),
catchError(err => {
call.emit('error', err);
return EMPTY;
}),
)
.forEach(m => call.write(m));

call.end();
} else {
const response = await res
.pipe(
takeUntil(fromEvent(call as any, CANCEL_EVENT)),
catchError(err => {
callback(err, null);
return EMPTY;
}),
)
.toPromise();

if (typeof response !== 'undefined') {
callback(null, response);
}
}
};
}

public createStreamCallMethod(methodHandler: Function) {
return async (call: GrpcCall) => {
methodHandler(call);
public createStreamCallMethod(
methodHandler: Function,
isResponseStream: boolean,
) {
return async (
call: GrpcCall,
callback: (err: unknown, value: unknown) => void,
) => {
if (isResponseStream) {
methodHandler(call);
} else {
methodHandler(call, callback);
}
};
}

Expand Down

0 comments on commit 748d3aa

Please sign in to comment.