forked from GoogleCloudPlatform/functions-framework-nodejs
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathruntime.ts
418 lines (371 loc) · 11.8 KB
/
runtime.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
import {env} from 'process';
import {
chain,
get,
has,
extend,
isPlainObject,
isEmpty,
size,
values,
} from 'lodash';
import {Request, Response} from 'express';
import {DaprClient, CommunicationProtocolEnum} from '@dapr/dapr';
import {KeyValuePairType} from '@dapr/dapr/types/KeyValuePair.type';
import {KeyValueType} from '@dapr/dapr/types/KeyValue.type';
import {OperationType} from '@dapr/dapr/types/Operation.type';
import {IRequestMetadata} from '@dapr/dapr/types/RequestMetadata.type';
import {StateQueryType} from '@dapr/dapr/types/state/StateQuery.type';
import {StateQueryResponseType} from '@dapr/dapr/types/state/StateQueryResponse.type';
import {OpenFunction} from '../functions';
import {
OpenFunctionComponent,
OpenFunctionContext,
ContextUtils,
} from './context';
import {Plugin, PluginStore} from './plugin';
/**
* Defining the interface of the HttpTarget.
* @public
*/
export interface HttpTrigger {
req?: Request;
res?: Response;
}
/**
* Defining the type union of OpenFunction trigger.
* @public
*/
export type OpenFunctionTrigger = HttpTrigger;
/**
* The OpenFunction's serving runtime abstract class.
* @public
*/
export abstract class OpenFunctionRuntime {
/**
* The context of the OpenFunction.
*/
protected readonly context: OpenFunctionContext;
/**
* The optional trigger of OpenFunction.
*/
protected trigger?: OpenFunctionTrigger;
/**
* An object to hold local data.
* TODO: Clarify the usage of this property
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
readonly locals: Record<string, any>;
/**
* The optional error object to hold exception data.
*/
error?: Error;
/**
* Constructor of the OpenFunctionRuntime.
*/
constructor(context: OpenFunctionContext) {
this.context = context;
this.locals = {};
}
/**
* Static method to parse the context and get runtime.
*/
static Parse(context: OpenFunctionContext): OpenFunctionRuntime {
return new DaprRuntime(context);
}
/**
* It creates a proxy for the runtime object, which delegates all property access to the runtime object
* @param context - The context object to be proxied.
* @returns The proxy object.
*/
static ProxyContext(context: OpenFunctionContext): OpenFunctionRuntime {
// Get a proper runtime for the context
const runtime = OpenFunctionRuntime.Parse(context);
// Create a proxy for the context
return new Proxy(runtime, {
get: (target, prop) => {
// Provide delegated property access of the context object
if (has(target.context, prop)) return get(target.context, prop);
// Otherwise, return the property of the runtime object
else return Reflect.get(target, prop);
},
});
}
/**
* It takes a user function and a context object, and returns a function that executes the user
* function with the context object, and executes all the pre and post hooks before and after the user function.
* @param userFunction - The function that you want to wrap.
* @param context - This is the context object that is passed to the user function.
* @returns A function that takes in data and returns a promise.
*/
static WrapUserFunction(
userFunction: OpenFunction,
context: OpenFunctionContext | OpenFunctionRuntime
// eslint-disable-next-line @typescript-eslint/no-explicit-any
): (data: any) => Promise<void> {
const ctx: OpenFunctionRuntime = !isPlainObject(context)
? (context as OpenFunctionRuntime)
: OpenFunctionRuntime.ProxyContext(context as OpenFunctionContext);
// Load plugin stores
const userPlugins = PluginStore.Instance();
const sysPlugins = PluginStore.Instance(PluginStore.Type.BUILTIN);
return async data => {
// Execute pre hooks, user plugins go first
await userPlugins.execPreHooks(ctx);
await sysPlugins.execPreHooks(ctx);
// Execute user function and save error for lazy reporting
try {
await userFunction(ctx, data);
} catch (ex) {
ctx.error = <Error>ex;
}
// Execute pre hooks, user plugins go last
await sysPlugins.execPostHooks(ctx);
await userPlugins.execPostHooks(ctx);
// Report error if exists at the very last
if (ctx.error) throw ctx.error;
};
}
/**
* Getter for the port of Dapr sidecar
*/
get sidecarPort() {
return {
HTTP: env.DAPR_HTTP_PORT || '3500',
GRPC: env.DAPR_GRPC_PORT || '50001',
};
}
/**
* Getter returns the request object from the trigger.
* @returns The request object.
*/
get req() {
return this.trigger?.req;
}
/**
* Getter returns the response object from the trigger.
* @returns The res property of the trigger object.
*/
get res() {
return this.trigger?.res;
}
/**
* It sets the trigger object to the request and response objects passed in
* @param req - The HTTP request object
* @param res - The HTTP response object
*/
setTrigger(req: Request, res?: Response) {
this.trigger = extend(this.trigger, {req, res});
}
/**
* Get a plugin from the plugin store, or if it doesn't exist, get it from the built-in plugin store.
*
* @param name - The name of the plugin to get.
* @returns A plugin object
*/
getPlugin(name: string): Plugin {
return (
PluginStore.Instance().get(name) ||
PluginStore.Instance(PluginStore.Type.BUILTIN).get(name)
);
}
/**
* The promise that send data to certain ouput.
*/
abstract send(data: object, output?: string): Promise<object>;
/**
* The promise that handle data by state store.
*/
abstract get state(): StateOperations;
}
/**
* The state's operation.
* @public
*/
export interface StateOperations {
save: (data: object, db?: string) => Promise<void>;
get: (data: object, db?: string) => Promise<KeyValueType | string>;
getBulk: (data: object, db?: string) => Promise<KeyValueType[]>;
delete: (data: object, db?: string) => Promise<void>;
transaction: (data: object, db?: string) => Promise<void>;
query: (query: object, db?: string) => Promise<StateQueryResponseType>;
}
/**
* Dapr runtime class derived from OpenFunctionRuntime.
*/
class DaprRuntime extends OpenFunctionRuntime {
/**
* The Dapr client instance.
*/
private daprClient!: DaprClient;
/**
* Constructor of the DaprRuntime.
*/
constructor(context: OpenFunctionContext) {
super(context);
/**
* NOTE: GRPC is not well supported so far in Dapr Node.js SDK
* TODO: Should determine whether to use GRPC channel
*/
this.daprClient = new DaprClient(
process.env.DAPR_HOST,
this.sidecarPort.HTTP,
CommunicationProtocolEnum.HTTP
);
}
/**
* Send data to the Dapr runtime (sidecar).
* @param {object} data - The data to send to the output.
* @param {string} [output] - The output to send the data to.
* @returns The promise of the actions being executed.
*/
send(data: object, output?: string): Promise<object> {
const actions = chain(this.context.outputs)
.filter((v, k) => !output || k === output)
.map((component: OpenFunctionComponent) => {
if (ContextUtils.IsBindingComponent(component)) {
return this.daprClient.binding.send(
component.componentName,
component.operation || '',
data,
component.metadata
);
} else if (ContextUtils.IsPubSubComponent(component)) {
return this.daprClient.pubsub.publish(
component.componentName,
component.uri || '',
data
);
}
return Promise.resolve(undefined);
})
.value();
return Promise.allSettled(actions);
}
/**
* Get the state store
* @param db the user specify the state store
* @returns Corresponding state store
*/
getState(db?: string): OpenFunctionComponent {
// check the states field
if (isEmpty(this.context.states)) {
throw new Error('You must specify the state in the context');
}
// if you don't specify the db, we will use the first one defined in the context
if (isEmpty(db)) {
if (!ContextUtils.IsStateComponent(values(this.context.states!)[0])) {
throw new Error('The state component type is wrong');
}
return values(this.context.states!)[0];
}
// or we will use the one specified by user
if (!ContextUtils.IsStateComponent(this.context.states![db!])) {
throw new Error('The state component type is wrong');
}
return values(this.context.states!)[0];
}
/**
* Save the data to the state store
* @param data The data for save operation
* @param db The state store to save the data
* @returns The promise of the save action being executed.
*/
#stateSave(data: object, db?: string): Promise<void> {
return this.daprClient.state.save(
this.getState(db).componentName,
values(data)[0] as KeyValuePairType[]
);
}
/**
* Get the data from the state store
* @param data The data for get operation
* @param db The state store to get the data
* @returns The promise of the get action being executed.
*/
#stateGet(data: object, db?: string): Promise<KeyValueType | string> {
if (isEmpty(data) || size(data) > 1) {
throw new Error('State get method: invalid key number');
}
return this.daprClient.state.get(
this.getState(db).componentName,
values(data)[0] as string
);
}
/**
* Get the datas from the state store
* @param data The data for getBulk operation
* @param db The state store to getBulk the data
* @returns The promise of the getBulk action being executed.
*/
#stateGetBulk(data: object, db?: string): Promise<KeyValueType[]> {
const [keys, parallelism, metadata] = values(data) as unknown as [
string[],
number,
string
];
return this.daprClient.state.getBulk(
this.getState(db).componentName,
keys,
parallelism,
metadata
);
}
/**
* Delete the data from the state store
* @param data The data for delete operation
* @param db The state store to delete the data
* @returns The promise of the delete action being executed.
*/
#stateDelete(data: object, db?: string): Promise<void> {
if (isEmpty(data) || size(data) > 1) {
throw new Error('State get method: invalid key number');
}
return this.daprClient.state.delete(
this.getState(db).componentName,
values(data)[0] as string
);
}
/**
* Transaction the data from the state store
* @param data The data for transaction operation
* @param db The state store to transaction the data
* @returns The promise of the transaction action being executed.
*/
#stateTransaction(data: object, db?: string): Promise<void> {
const [operations, metadata] = values(data) as unknown as [
OperationType[],
IRequestMetadata
];
return this.daprClient.state.transaction(
this.getState(db).componentName,
operations as OperationType[],
metadata ? (metadata as IRequestMetadata) : null
);
}
/**
* query the data from the state store
* @param data The data for query operation
* @param db The state store to query the data
* @returns The promise of the query action being executed.
*/
#stateQuery(data: object, db?: string): Promise<StateQueryResponseType> {
return this.daprClient.state.query(
this.getState(db).componentName,
values(data)[0] as StateQueryType
);
}
/**
* The promise that handle data by state store.
*/
get state(): StateOperations {
return {
save: this.#stateSave.bind(this),
get: this.#stateGet.bind(this),
getBulk: this.#stateGetBulk.bind(this),
delete: this.#stateDelete.bind(this),
transaction: this.#stateTransaction.bind(this),
query: this.#stateQuery.bind(this),
};
}
}