/
RegisterHealthCheck.js
76 lines (68 loc) · 2.72 KB
/
RegisterHealthCheck.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* Copyright 2021 Olympe S.A.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Brick, ActionBrick, registerBrick, Process, ErrorFlow } from 'olympe';
import {tap} from "rxjs/operators";
import {take, merge, timer} from "rxjs";
import {getLogger} from "logging";
export default class RegisterHealthCheck extends ActionBrick {
/**
* @override
* @protected
* @param {!BrickContext} $
* @param {!Brick} check
* @param {?number} timeout
* @param {?string} name
* @param {function()} forwardEvent
*/
update($, [check, timeout, name], [forwardEvent]) {
if (!(check instanceof Brick)) {
getLogger('RegisterHealthCheck').warn('No health check implementation provided.');
forwardEvent();
return;
}
const finalName = typeof name === 'string' ? name : 'Custom health check';
const finalTimeout = typeof timeout === 'number' ? Math.max(timeout, 0) : 1000;
const off = Process.onHealthCheck(() => this.process($, check, finalTimeout, finalName));
$.onClear(off);
forwardEvent();
}
/**
* @private
* @param {!BrickContext} $
* @param {!Brick} check
* @param {number} timeout
* @param {string} name
*/
process($, check, timeout, name) {
const [startInput] = check.getInputs();
const [healthyOutput, errorOutput] = check.getOutputs();
const $check = $.runner(check).trigger(startInput);
return new Promise((resolve, reject) => {
const ctrlFlow = $check.observe(healthyOutput).pipe(tap(() => {
resolve(`${name} OK`);
}));
const errorFlow = $check.observe(errorOutput).pipe(tap((error) => {
reject(`${name} ${error instanceof ErrorFlow ? error.getMessage() : error}`);
}));
const timeoutFlow = timer(timeout).pipe(tap(() => {
reject(`Timeout ${name}`);
}));
// Take one value either from the control flow or from the error flow outputs.
merge(ctrlFlow, errorFlow, timeoutFlow).pipe(take(1)).subscribe();
}).finally(() => $check.destroy());
}
}
registerBrick('0185792773abd926fe83', RegisterHealthCheck);