-
Notifications
You must be signed in to change notification settings - Fork 0
/
influx.ts
65 lines (56 loc) · 1.77 KB
/
influx.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import { CloudEvent, ConsumerInterface } from "./named-consumer";
import { InfluxDB, FieldType, ISingleHostConfig } from "influx";
import { hostname } from "os";
export class InfluxMonitor implements ConsumerInterface {
private influx: InfluxDB;
private buffer = [];
constructor(
private readonly consumer: ConsumerInterface,
private readonly configuration: ISingleHostConfig,
private readonly consumerName: string
) {
this.influx = new InfluxDB({
...configuration,
schema: [
{
measurement: "fossil_consumer",
fields: {
stream: FieldType.STRING,
lag: FieldType.INTEGER,
duration: FieldType.INTEGER
},
tags: ["host", "consumer"]
}
]
});
}
consume(matcher: string, handler: (message: CloudEvent) => Promise<void>) {
const publishInterval = setInterval(async () => {
const toPublish = this.buffer;
this.buffer = [];
console.log(`Consuming at ${toPublish.length} eps`);
await this.influx.writePoints(toPublish);
}, 1000);
try {
return this.consumer.consume(matcher, async message => {
const {fossilstream: stream, time} = message;
const lag = time ? new Date().valueOf() - Date.parse(time) : 0;
const point = {
measurement: "fossil_consumer",
tags: {host: hostname(), consumer: this.consumerName},
fields: {stream, lag, duration: 0},
timestamp: new Date()
};
try {
return await handler(message);
} finally {
point.fields.duration =
new Date().valueOf() - point.timestamp.valueOf();
this.buffer.push(point);
}
});
} finally {
clearInterval(publishInterval);
}
}
}