Skip to content

Commit

Permalink
feat: support client streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
smnbbrv committed Mar 1, 2022
1 parent e472c15 commit 4840401
Show file tree
Hide file tree
Showing 14 changed files with 399 additions and 54 deletions.
3 changes: 2 additions & 1 deletion LICENSE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

-----------

Copyright (c) 2020 Semen Bobrov (smnbbrv)
Copyright (c) 2019-2022 Semen Bobrov (smnbbrv)

Permission is hereby granted, free of charge, to any person
obtaining a copy of this software and associated documentation
files (the "Software"), to deal in the Software without
Expand Down
80 changes: 70 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ Angular gRPC framework.

## Features

- two-way-binding-friendly protobuf messages implementation (instead of Java-like setters / getters in original google-protobuf)
- two-way-binding-friendly protobuf messages implementation (instead of Java-like setters / getters / builder pattern in original google-protobuf)
- client services are wired to Angular's dependency injection
- typescript first-class support
- rxjs first-class support
- client & bidirectional streaming (only possible with @improbable-eng/grpc-web)
- interceptors
- logger
- support for well-known types, including `Any`
Expand Down Expand Up @@ -183,6 +184,54 @@ From now on this particular service is set.

### Service client methods

#### Concept

Every client call accepts a message to be sent and returns an `Observable` of response message(s). However, the request is not being executed until the `Observable` gets subscribed, so it can be safely used at any place.

To cancel the request / close the connection simply unsubscribe from that `Subscription`. Of course, if connection is closed by the server or the error happens, the `Observable` gets terminated.

```ts
class MyService {

constructor(private client: EchoClient) {}

sendOne() {
this.client.echo(new EchoRequest({ message: 'text' })).subscribe(res => console.log(res));

// or if you want to terminate it, e.g. it is a server stream or you navigate away and do not need to wait
const sub = this.client.echo(new EchoRequest({ message: 'text' })).subscribe(res => console.log(res));

setTimeout(() => sub.unsubscribe(), 1000); // this closes connection
}

}
```

The behaviour above is possible due to the `Observable`'s natural laziness and ability to be terminated.

**The server streaming** so it has the same signature as **the unary requests**, just the returned `Observable` can emit more than one message and the connection is potentially infinite.

**The client streaming** is differrent, because it accepts an `Observable` (and its derivatives, such as `Subject`, `BehaviourSubject`, etc.) of messages


```ts
class MyService {

constructor(private client: EchoClient) {}

sendMany() {
const stream = from(['message 1', 'message 2', 'message 3']);

this.client.echoMany(stream).subscribe(res => console.log(res));
}

}
```

**The bidirectional streaming** has the same signature as the client's one and is a combination of server and client streaming.

#### Implementation details

Each RPC has two corresponding methods.

- the first, that emits messages, is a direct method of the service client.
Expand Down Expand Up @@ -267,7 +316,10 @@ If the `embedWellKnownTypes` configuration is enabled, the `customWellKnownTypes

You can add global interceptors to all gRPC calls like Angular's built-in `HttpClient` interceptors.

The important difference is that unlike `HttpClient` interceptors `GrpcInterceptor`s need to work with event streams; there are no errors thrown. Instead you should listen to the `GrpcStatusEvent` and decide whether it is an error or not. Please keep this in mind.
The important differences

- unlike `HttpClient` interceptors `GrpcInterceptor`s need to work with event streams; there are no errors thrown. Instead you should listen to the `GrpcStatusEvent` and decide whether it is an error or not. Please keep this in mind
- the incoming data can be a message or a stream of messages (in case of client streaming)

As an example see `GrpcLoggerInterceptor` [in the core package](packages/core/src/lib/grpc-logger-interceptor.ts).

Expand Down Expand Up @@ -300,35 +352,45 @@ GrpcLoggerModule.forRoot({

The alternative grpc-web implementation from [Improbable Engineering](https://github.com/improbable-eng) provides way more features than standard grpc-web from Google. It supports [various transports](https://github.com/improbable-eng/grpc-web/blob/master/client/grpc-web/docs/transport.md) including WebSocket-based and even Node (can be useful e.g. for SSR).

**The only** client that supports client / bidirectional streaming. This however also requires the server to be able to handle websocket transport. For this purpose improbable-eng team introduced [grpc-web-proxy](https://github.com/improbable-eng/grpc-web/blob/master/go/grpcwebproxy/README.md) - a special facade for the normal grpc server that acts like envoy, but has also the ability to handle websocket transport.

Installation:

```sh
npm i -S @ngx-grpc/improbable-eng-grpc-web-client @improbable-eng/grpc-web
```

Then configuration is similar to the other clients:
Then configuration is similar to the other clients, however there is a transport to configure:

```ts
import { grpc } from '@improbable-eng/grpc-web';
import { GrpcCoreModule } from '@ngx-grpc/core';
import { ImprobableEngGrpcWebClientModule } from '@ngx-grpc/improbable-eng-grpc-web-client';

const xhr = grpc.CrossBrowserHttpTransport({});
const ws = grpc.WebsocketTransport();

@NgModule({
imports: [
GrpcCoreModule.forRoot(),
ImprobableEngGrpcWebClientModule.forRoot({
ImprobableEngGrpcWebClientModule.forChild({
settings: {
host: 'http://localhost:8080',
transport: grpc.CrossBrowserHttpTransport({}),
transport: {
unary: xhr,
serverStream: xhr,
clientStream: ws,
bidiStream: ws,
},
// or simply e.g.
// transport: ws, // to configure all methods to use websockets
},
}),
],
})
export class AppModule {}
```

Choose your transport and provide it as a part of the settings. Now you are set.

## Web worker

Web worker allows to run gRPC clients, messages serialization and deserialization in a separate thread. It might give some performance benefits on large data sets; however the main reason of the worker is to avoid blocking the main thread. That means that rendering engine has more resources to work on rendering while the messages processing is done in parallel.
Expand Down Expand Up @@ -392,13 +454,11 @@ That's it. All your requests are served by worker.
## Not implemented (yet)

- [Proto 2 Extensions](https://developers.google.com/protocol-buffers/docs/proto#extensions)
- Client streaming
- Bidirectional streaming

## Contributing

- to run tests on Apple m1 chips use `npm ci --target_arch=x64 --no-optional` and `brew install protoc-gen-grpc-web`

## License

MIT
[MIT](LICENSE)
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import { ExamplePageModule } from '../example-page/example-page.module';
import { ExamplePageComponent } from '../example-page/example-page/example-page.component';
import { ImprobableEngGrpcWebClientExampleComponent } from './improbable-eng-grpc-web-client-example.component';

const xhr = grpc.CrossBrowserHttpTransport({});
const ws = grpc.WebsocketTransport();

@NgModule({
declarations: [ImprobableEngGrpcWebClientExampleComponent],
imports: [
Expand All @@ -16,7 +19,12 @@ import { ImprobableEngGrpcWebClientExampleComponent } from './improbable-eng-grp
ImprobableEngGrpcWebClientModule.forChild({
settings: {
host: 'http://localhost:8080',
transport: grpc.CrossBrowserHttpTransport({}),
transport: {
unary: xhr,
serverStream: xhr,
clientStream: ws,
bidiStream: ws,
},
},
}),
],
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"scripts": {
"ng": "ng",
"start": "run-s start:lib:protoc-gen-ng",
"start:lib:protoc-gen-ng": "nodemon --watch packages/protoc-gen-ng/src --watch packages/protoc-gen-ng/test/proto -e ts,proto --exec 'npm run build:test && npm run test:lib:protoc-gen-ng:generate && npm run test:lib:protoc-gen-ng:generate-grpc-web && echo \"Done\"'",
"start:lib:protoc-gen-ng": "nodemon --watch packages/protoc-gen-ng/src --watch packages/protoc-gen-ng/test/proto -e ts,proto --exec 'npm run build:test && npm run build:lib:protoc-gen-ng && npm run test:lib:protoc-gen-ng:generate && npm run test:lib:protoc-gen-ng:generate-grpc-web && echo \"Done\"'",
"build": "run-s build:lib build:test",
"build:lib": "run-s build:lib:common build:lib:core build:lib:grpc-web-client build:lib:improbable-eng-grpc-web-client build:lib:worker build:lib:worker-client build:lib:protoc-gen-ng build:lib:well-known-types",
"build:lib:core": "ng build core --configuration production && cpy LICENSE.md dist/core/",
Expand Down
38 changes: 37 additions & 1 deletion packages/common/src/lib/grpc-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,40 @@ export interface GrpcClient<ST> {
resclss: GrpcMessageClass<S>,
): Observable<GrpcEvent<S>>;

/**
* Handle client stream RPC
*
* @param path gRPC method path (rpc path)
* @param inputStream streamed request data
* @param metadata request metadata
* @param reqclss request message class
* @param resclss response message class
*/
clientStream<Q extends GrpcMessage, S extends GrpcMessage>(
path: string,
inputStream: Observable<Q>,
metadata: GrpcMetadata,
reqclss: GrpcMessageClass<Q>,
resclss: GrpcMessageClass<S>,
): Observable<GrpcEvent<S>>;

/**
* Handle bidirectional stream RPC
*
* @param path gRPC method path (rpc path)
* @param inputStream streamed request data
* @param metadata request metadata
* @param reqclss request message class
* @param resclss response message class
*/
bidiStream<Q extends GrpcMessage, S extends GrpcMessage>(
path: string,
inputStream: Observable<Q>,
metadata: GrpcMetadata,
reqclss: GrpcMessageClass<Q>,
resclss: GrpcMessageClass<S>,
): Observable<GrpcEvent<S>>;

}

/**
Expand All @@ -75,6 +109,8 @@ export interface GrpcClient<ST> {
export enum GrpcCallType {
unary,
serverStream,
clientStream,
bidiStream,
}

/**
Expand All @@ -84,7 +120,7 @@ export interface GrpcRequest<Q extends GrpcMessage, S extends GrpcMessage> {
path: string;
client: GrpcClient<any>;
type: GrpcCallType;
requestData: Q;
requestData: Q | Observable<Q>;
requestMetadata: GrpcMetadata;
requestClass: GrpcMessageClass<Q>;
responseClass: GrpcMessageClass<S>;
Expand Down
51 changes: 40 additions & 11 deletions packages/core/src/lib/grpc-handler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Inject, Injectable, Optional } from '@angular/core';
import { GrpcCallType, GrpcEvent, GrpcMessage, GrpcRequest } from '@ngx-grpc/common';
import { Observable } from 'rxjs';
import { isObservable, Observable } from 'rxjs';
import { GrpcInterceptor } from './grpc-interceptor';
import { GRPC_INTERCEPTORS } from './injection-tokens';

Expand Down Expand Up @@ -35,23 +35,52 @@ export class GrpcHandler {
return interceptor.intercept(request, new GrpcHandler(interceptors));
}

if (request.type === GrpcCallType.unary) {
return request.client.unary(
switch (request.type) {
case GrpcCallType.unary: return request.client.unary(
request.path,
request.requestData,
this.message(request.requestData),
request.requestMetadata,
request.requestClass,
request.responseClass,
);
case GrpcCallType.serverStream: return request.client.serverStream(
request.path,
this.message(request.requestData),
request.requestMetadata,
request.requestClass,
request.responseClass,
);
case GrpcCallType.clientStream: return request.client.clientStream(
request.path,
this.stream(request.requestData),
request.requestMetadata,
request.requestClass,
request.responseClass,
);
case GrpcCallType.clientStream: return request.client.bidiStream(
request.path,
this.stream(request.requestData),
request.requestMetadata,
request.requestClass,
request.responseClass,
);
}
}

private message<Q extends GrpcMessage>(p: Q | Observable<Q>): Q {
if (!isObservable(p)) {
return p;
}

throw new Error('Expected Message, got Observable');
}

private stream<Q extends GrpcMessage>(p: Q | Observable<Q>): Observable<Q> {
if (isObservable(p)) {
return p;
}

return request.client.serverStream(
request.path,
request.requestData,
request.requestMetadata,
request.requestClass,
request.responseClass,
);
throw new Error('Expected Observable, got message');
}

}
31 changes: 26 additions & 5 deletions packages/core/src/lib/grpc-logger-interceptor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Inject, Injectable, InjectionToken, Optional } from '@angular/core';
import { GrpcDataEvent, GrpcEvent, GrpcMessage, GrpcRequest } from '@ngx-grpc/common';
import { Observable } from 'rxjs';
import { tap } from 'rxjs/operators';
import { isObservable, Observable, of } from 'rxjs';
import { share, tap } from 'rxjs/operators';
import { GrpcHandler } from './grpc-handler';
import { GrpcInterceptor } from './grpc-interceptor';

Expand Down Expand Up @@ -69,8 +69,11 @@ export interface GrpcLoggerSettings {
@Injectable()
export class GrpcLoggerInterceptor implements GrpcInterceptor {

private static requestId = 0;

private clientDataStyle = 'color: #eb0edc;';
private dataStyle = 'color: #5c7ced;';
private errorStyle = 'color: red;';
private errorStyle = 'color: #f00505;';
private statusOkStyle = 'color: #0ffcf5;';

private settings: GrpcLoggerSettings;
Expand All @@ -88,23 +91,41 @@ export class GrpcLoggerInterceptor implements GrpcInterceptor {

intercept<Q extends GrpcMessage, S extends GrpcMessage>(request: GrpcRequest<Q, S>, next: GrpcHandler): Observable<GrpcEvent<S>> {
if (this.settings.enabled) {
const id = ++GrpcLoggerInterceptor.requestId;
const start = Date.now();

// check if client streaming, then push each value separately
if (isObservable(request.requestData)) {
request.requestData = request.requestData.pipe(
tap(msg => {
console.groupCollapsed(`%c#${id}: ${Date.now() - start}ms -> ${request.path}`, this.clientDataStyle);
console.log('%c>>', this.clientDataStyle, this.settings.requestMapper(msg));
console.groupEnd();
}),
);
}

// handle unary calls and server streaming in the same manner
return next.handle(request).pipe(
tap(event => {
const style = event instanceof GrpcDataEvent ? this.dataStyle : event.statusCode !== 0 ? this.errorStyle : this.statusOkStyle;
const openGroup = () => console.groupCollapsed(`%c${Date.now() - start}ms -> ${request.path}`, style);

const openGroup = () => console.groupCollapsed(`%c#${id}: ${Date.now() - start}ms -> ${request.path}`, style);

const printSettings = () => {
if (this.settings.logClientSettings) {
console.log('%csc', style, request.client.getSettings());
}
};

const printMetadata = () => {
if (this.settings.logMetadata) {
console.log('%c**', style, request.requestMetadata.toObject());
}
};
const printRequest = () => console.log('%c>>', style, this.settings.requestMapper(request.requestData));

const printRequest = () => console.log('%c>>', style, isObservable(request.requestData) ? '<see above>' : this.settings.requestMapper(request.requestData));

const closeGroup = () => console.groupEnd();

if (event instanceof GrpcDataEvent) {
Expand Down
8 changes: 8 additions & 0 deletions packages/grpc-web-client/src/lib/grpc-web-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,12 @@ export class GrpcWebClient implements GrpcClient<GrpcWebClientSettings> {
});
}

clientStream = () => {
throw new Error('Client streaming not supported');
};

bidiStream = () => {
throw new Error('Bidirectional streaming not supported');
};

}

0 comments on commit 4840401

Please sign in to comment.