-
Notifications
You must be signed in to change notification settings - Fork 127
/
server-streaming-call.ts
170 lines (141 loc) · 4.8 KB
/
server-streaming-call.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import {RpcCallShared, RpcCancelFn} from "./rpc-call-shared";
import {RpcOutputStream} from "./rpc-output-stream";
import {RpcStatus} from "./rpc-status";
import {MethodInfo} from "./reflection-info";
import {RpcMetadata} from "./rpc-metadata";
/**
* A server streaming RPC call. The client provides exactly one input message
* but the server may respond with 0, 1, or more messages.
*/
export class ServerStreamingCall<I extends object = object, O extends object = object> implements RpcCallShared<I, O>, PromiseLike<FinishedServerStreamingCall<I, O>> {
/**
* Reflection information about this call.
*/
readonly method: MethodInfo<I, O>;
/**
* Request headers being sent with the request.
*
* Request headers are provided in the `meta` property of the
* `RpcOptions` passed to a call.
*/
readonly requestHeaders: Readonly<RpcMetadata>;
/**
* The request message being sent.
*/
readonly request: Readonly<I>;
/**
* The response headers that the server sent.
*
* This promise will reject with a `RpcError` when the server sends a
* error status code.
*/
readonly headers: Promise<RpcMetadata>;
/**
* Response messages from the server.
* This is an AsyncIterable that can be iterated with `await for .. of`.
*/
readonly response: RpcOutputStream<O>;
/**
* The response status the server replied with.
*
* This promise will resolve when the server has finished the request
* successfully.
*
* If the server replies with an error status, this promise will
* reject with a `RpcError`.
*/
readonly status: Promise<RpcStatus>;
/**
* The trailers the server attached to the response.
*
* This promise will resolve when the server has finished the request
* successfully.
*
* If the server replies with an error status, this promise will
* reject with a `RpcError`.
*/
readonly trailers: Promise<RpcMetadata>;
private readonly cancelFn: RpcCancelFn;
constructor(
method: MethodInfo<I, O>,
requestHeaders: Readonly<RpcMetadata>,
request: Readonly<I>,
headers: Promise<RpcMetadata>,
response: RpcOutputStream<O>,
status: Promise<RpcStatus>,
trailers: Promise<RpcMetadata>,
cancelFn: RpcCancelFn,
) {
this.method = method;
this.requestHeaders = requestHeaders;
this.request = request;
this.headers = headers;
this.response = response;
this.status = status;
this.trailers = trailers;
this.cancelFn = cancelFn;
}
/**
* Cancel this call.
*/
cancel(): void {
this.cancelFn();
}
/**
* Instead of awaiting the response status and trailers, you can
* just as well await this call itself to receive the final outcome.
* You should first setup some listeners to the `request` to
* see the actual messages the server replied with.
*/
then<TResult1 = FinishedServerStreamingCall<I, O>, TResult2 = never>(
onfulfilled?: ((value: FinishedServerStreamingCall<I, O>) => (PromiseLike<TResult1> | TResult1)) | undefined | null,
onrejected?: ((reason: any) => (PromiseLike<TResult2> | TResult2)) | undefined | null
): PromiseLike<TResult1 | TResult2> {
return this.promiseFinished().then(
value => onfulfilled ? Promise.resolve(onfulfilled(value)) : value as unknown as TResult1,
reason => onrejected ? Promise.resolve(onrejected(reason)) : Promise.reject(reason));
}
private async promiseFinished(): Promise<FinishedServerStreamingCall<I, O>> {
let [headers, status, trailers] =
await Promise.all([this.headers, this.status, this.trailers]);
return {
method: this.method,
requestHeaders: this.requestHeaders,
request: this.request,
headers,
status,
trailers,
};
}
}
/**
* A completed server streaming RPC call. The server will not send any more
* messages.
*/
export interface FinishedServerStreamingCall<I extends object, O extends object> {
/**
* Reflection information about this call.
*/
readonly method: MethodInfo<I, O>;
/**
* Request headers being sent with the request.
*/
readonly requestHeaders: Readonly<RpcMetadata>;
/**
* The request message being sent.
*/
readonly request: Readonly<I>;
/**
* The response headers that the server sent.
*/
readonly headers: RpcMetadata;
/**
* The response status the server replied with.
* The status code will always be OK.
*/
readonly status: RpcStatus;
/**
* The trailers the server attached to the response.
*/
readonly trailers: RpcMetadata;
}