-
Notifications
You must be signed in to change notification settings - Fork 41
/
index.ts
141 lines (119 loc) · 4.17 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import * as grpc from 'grpc';
import { Observable } from 'rxjs/Observable';
import { lookupPackage } from './utils';
type DynamicMethods = { [name: string]: any; };
export interface GenericServerBuilder<T> {
start(address: string, credentials?: any): void;
forceShutdown(): void;
}
export function serverBuilder<T>(protoPath: string, packageName: string): T & GenericServerBuilder<T> {
const server = new grpc.Server();
const builder: DynamicMethods = <GenericServerBuilder<T>> {
start(address: string, credentials?: any) {
server.bind(address, credentials || grpc.ServerCredentials.createInsecure());
server.start();
},
forceShutdown() {
server.forceShutdown();
}
};
const pkg = lookupPackage(grpc.load(protoPath), packageName)
for (const name of getServiceNames(pkg)) {
builder[`add${name}`] = function(rxImpl: DynamicMethods) {
server.addProtoService(pkg[name].service, createService(pkg[name], rxImpl));
return this;
};
}
return builder as any;
}
function createService(Service: any, rxImpl: DynamicMethods) {
const service: DynamicMethods = {};
for (const name in Service.prototype) {
if (typeof rxImpl[name] === 'function') {
service[name] = createMethod(rxImpl, name, Service.prototype);
}
}
return service;
}
function createMethod(rxImpl: DynamicMethods, name: string, serviceMethods: DynamicMethods) {
return serviceMethods[name].responseStream
? createStreamingMethod(rxImpl, name)
: createUnaryMethod(rxImpl, name);
}
function createUnaryMethod(rxImpl: DynamicMethods, name: string) {
return function(call: any, callback: any) {
const response: Observable<any> = rxImpl[name](call.request);
response.subscribe(
data => callback(null, data),
error => callback(error)
);
};
}
function createStreamingMethod(rxImpl: DynamicMethods, name: string) {
return async function(call: any, callback: any) {
const response: Observable<any> = rxImpl[name](call.request);
await response.forEach(data => call.write(data));
call.end();
};
}
export type ClientFactoryConstructor<T> = new(address: string, credentials?: any, options?: any) => T;
export function clientFactory<T>(protoPath: string, packageName: string) {
class Constructor {
readonly __args: any[];
constructor(address: string, credentials?: any, options: any = undefined) {
this.__args = [
address,
credentials || grpc.credentials.createInsecure(),
options
];
}
}
const prototype: DynamicMethods = Constructor.prototype;
const pkg = lookupPackage(grpc.load(protoPath), packageName)
for (const name of getServiceNames(pkg)) {
prototype[`get${name}`] = function(this: Constructor) {
return createServiceClient(pkg[name], this.__args);
};
}
return <any> Constructor as ClientFactoryConstructor<T>;
}
function getServiceNames(pkg: any) {
return Object.keys(pkg).filter(name => pkg[name].service);
}
function createServiceClient(GrpcClient: any, args: any[]) {
const grpcClient = new GrpcClient(...args);
const rxClient: DynamicMethods = {};
for (const name of Object.keys(GrpcClient.prototype)) {
rxClient[name] = createClientMethod(grpcClient, name);
}
return rxClient;
}
function createClientMethod(grpcClient: DynamicMethods, name: string) {
return grpcClient[name].responseStream
? createStreamingClientMethod(grpcClient, name)
: createUnaryClientMethod(grpcClient, name);
}
function createUnaryClientMethod(grpcClient: DynamicMethods, name: string) {
return function(...args: any[]) {
return new Observable(observer => {
grpcClient[name](...args, (error: any, data: any) => {
if (error) {
observer.error(error);
} else {
observer.next(data);
}
observer.complete();
});
});
};
}
function createStreamingClientMethod(grpcClient: DynamicMethods, name: string) {
return function(...args: any[]) {
return new Observable(observer => {
const call = grpcClient[name](...args);
call.on('data', (data: any) => observer.next(data));
call.on('error', (error: any) => observer.error(error));
call.on('end', () => observer.complete());
});
};
}