-
-
Notifications
You must be signed in to change notification settings - Fork 33
/
runner.ts
89 lines (73 loc) · 3.18 KB
/
runner.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import config from 'config';
import type { Server } from 'socket.io';
import createHttpError from 'http-errors';
import { getWsServer, PROBES_NAMESPACE } from '../lib/ws/server.js';
import { getProbeRouter, type ProbeRouter } from '../probe/router.js';
import type { Probe } from '../probe/types.js';
import { getMetricsAgent, type MetricsAgent } from '../lib/metrics.js';
import type { MeasurementStore } from './store.js';
import { getMeasurementStore } from './store.js';
import type { MeasurementRequest, MeasurementResultMessage, MeasurementProgressMessage, UserRequest } from './types.js';
import { rateLimit } from '../lib/rate-limiter/rate-limiter-post.js';
import type { ExtendedContext } from '../types.js';
export class MeasurementRunner {
constructor (
private readonly io: Server,
private readonly store: MeasurementStore,
private readonly router: ProbeRouter,
private readonly checkRateLimit: typeof rateLimit,
private readonly metrics: MetricsAgent,
) {}
async run (ctx: ExtendedContext): Promise<{measurementId: string; probesCount: number;}> {
const userRequest = ctx.request.body as UserRequest;
const { onlineProbesMap, allProbes, request } = await this.router.findMatchingProbes(userRequest);
const ipVersion = userRequest.measurementOptions?.ipVersion;
if (allProbes.length === 0) {
throw createHttpError(422, `No suitable probes supporting IPv${ipVersion} found.`, { type: 'no_probes_found' });
}
await this.checkRateLimit(ctx, onlineProbesMap.size);
const measurementId = await this.store.createMeasurement(request, onlineProbesMap, allProbes);
if (onlineProbesMap.size) {
this.sendToProbes(measurementId, onlineProbesMap, request);
// If all selected probes are offline, immediately mark measurement as finished
} else {
await this.store.markFinished(measurementId);
}
this.metrics.recordMeasurement(request.type);
return { measurementId, probesCount: allProbes.length };
}
async recordProgress (data: MeasurementProgressMessage): Promise<void> {
await this.store.storeMeasurementProgress(data);
}
async recordResult (data: MeasurementResultMessage): Promise<void> {
const record = await this.store.storeMeasurementResult(data);
if (record) {
this.metrics.recordMeasurementTime(record.type, Date.now() - new Date(record.createdAt).getTime());
}
}
private sendToProbes (measurementId: string, onlineProbesMap: Map<number, Probe>, request: MeasurementRequest) {
let inProgressTests = 0;
const maxInProgressTests = config.get<number>('measurement.maxInProgressTests');
onlineProbesMap.forEach((probe, index) => {
const inProgressUpdates = request.inProgressUpdates && inProgressTests++ < maxInProgressTests;
this.io.of(PROBES_NAMESPACE).to(probe.client).emit('probe:measurement:request', {
measurementId,
testId: index.toString(),
measurement: {
...request.measurementOptions,
type: request.type,
target: request.target,
inProgressUpdates,
},
});
});
}
}
// Factory
let runner: MeasurementRunner;
export const getMeasurementRunner = () => {
if (!runner) {
runner = new MeasurementRunner(getWsServer(), getMeasurementStore(), getProbeRouter(), rateLimit, getMetricsAgent());
}
return runner;
};