Skip to content

Commit

Permalink
Merge d53db56 into 385071b
Browse files Browse the repository at this point in the history
  • Loading branch information
anton-alation committed Nov 15, 2018
2 parents 385071b + d53db56 commit c0d3877
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 9 deletions.
2 changes: 1 addition & 1 deletion packages/microservices/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@nestjs/microservices",
"version": "5.4.0",
"version": "5.4.1",
"description": "Nest - modern, fast, powerful node.js web framework (@microservices)",
"author": "Kamil Mysliwiec",
"license": "MIT",
Expand Down
31 changes: 25 additions & 6 deletions packages/microservices/server/server-grpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,31 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {

public createStreamServiceMethod(methodHandler): Function {
return async (call, callback) => {
const handler = methodHandler(call.request, call.metadata);
const result$ = this.transformToObservable(await handler);
await result$
.pipe(takeUntil(fromEvent(call, CANCEL_EVENT)))
.forEach(data => call.write(data));
call.end();
// Define handler container which will be defined by exact flow on return
let handler = null;
// Check if call request is socket stream, for socket stream we should pass
// only socket stream for next execution
if (typeof call.request === 'undefined') {
handler = methodHandler(call, call.metadata);
}
// Otherwise call would go in the way of Unary->Stream output
else {
handler = methodHandler(call.request, call.metadata);
}
// Receive control from handler
const control = await handler;
// If control defined as null-reference or some type of non-observable
// function, we should not continue on transforming it to the stream.
// If control is not an Observable — current flow is considered as
// custom flow for GRPC socket connection, and would be maintained accordingly
if (control !== null) {
// Turns Promise or Observable into Subscribable object
const result$ = this.transformToObservable(control);
await result$
.pipe(takeUntil(fromEvent(call, CANCEL_EVENT)))
.forEach(data => call.write(data));
call.end();
}
};
}

Expand Down
9 changes: 7 additions & 2 deletions packages/microservices/test/server/server-grpc.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import * as chai from 'chai';
import { expect } from 'chai';
import * as chaiAsPromised from 'chai-as-promised';
import { join } from 'path';
import { of } from 'rxjs';
import * as sinon from 'sinon';
import { InvalidGrpcPackageException } from '../../exceptions/errors/invalid-grpc-package.exception';
import { ServerGrpc } from '../../server/server-grpc';

// For Eventually to be available for Chai expectations
chai.use(chaiAsPromised);

describe('ServerGrpc', () => {
let server: ServerGrpc;
beforeEach(() => {
Expand Down Expand Up @@ -44,8 +49,8 @@ describe('ServerGrpc', () => {
describe('when package does not exist', () => {
it('should throw "InvalidGrpcPackageException"', () => {
sinon.stub(server, 'lookupPackage').callsFake(() => null);
expect(server.bindEvents()).to.eventually.throws(
InvalidGrpcPackageException,
expect(server.bindEvents()).to.eventually.be.rejectedWith(
InvalidGrpcPackageException
);
});
});
Expand Down

0 comments on commit c0d3877

Please sign in to comment.