Skip to content

Commit 4f40579

Browse files
committed
feat: sync Prometheus metrics
- Introduced METRIC_SYNC_METRICS interval for scheduled metric synchronization. - Integrated SyncMetricsTask into JOBS_SERVICES for metric synchronization. - Updated ExportMetricsTask to utilize INodeBaseMetricLabels for node metrics consistency.
1 parent a161ebf commit 4f40579

File tree

7 files changed

+197
-4
lines changed

7 files changed

+197
-4
lines changed
Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1-
export class GetAllNodesQuery {
2-
constructor() {}
1+
import { Query } from '@nestjs/cqrs';
2+
3+
import { ICommandResponse } from '@common/types/command-response.type';
4+
5+
import { NodesEntity } from '@modules/nodes/entities/nodes.entity';
6+
7+
export class GetAllNodesQuery extends Query<ICommandResponse<NodesEntity[]>> {
8+
constructor() {
9+
super();
10+
}
311
}

src/modules/system/interfaces/metrics.interface.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ export interface MetricValue {
66
node_country_emoji: string;
77
tag: string;
88
provider_name: string;
9+
tags: string;
910
};
1011
}
1112

src/scheduler/intervals.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ const EVERY_WEEK_AT_MONDAY_00_45 = '45 0 * * 1';
99

1010
export const JOBS_INTERVALS = {
1111
METRIC_EXPORT_METRICS: EVERY_15_SECONDS,
12+
METRIC_SYNC_METRICS: CronExpression.EVERY_6_HOURS,
1213

1314
NODE_HEALTH_CHECK: CronExpression.EVERY_10_SECONDS,
1415
RECORD_NODE_USAGE: CronExpression.EVERY_30_SECONDS,

src/scheduler/metrics-providers.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,3 +137,15 @@ export const METRIC_PROVIDERS = [
137137
labelNames: ['instance_id', 'instance_name'],
138138
}),
139139
];
140+
141+
export interface INodeBaseMetricLabels {
142+
node_uuid: string;
143+
node_name: string;
144+
node_country_emoji: string;
145+
provider_name: string;
146+
tags: string;
147+
}
148+
149+
export interface INodeBandwidthMetricLabels extends INodeBaseMetricLabels {
150+
tag: string;
151+
}

src/scheduler/tasks/export-metrics/export-metrics.task.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { GetAllNodesQuery } from '@modules/nodes/queries/get-all-nodes/get-all-n
2323
import { ShortUserStats } from '@modules/users/interfaces/user-stats.interface';
2424
import { NodesEntity } from '@modules/nodes/entities/nodes.entity';
2525

26+
import { INodeBaseMetricLabels } from '@scheduler/metrics-providers';
2627
import { JOBS_INTERVALS } from '@scheduler/intervals';
2728

2829
interface AxmMonitorMetric {
@@ -174,7 +175,7 @@ export class ExportMetricsTask {
174175
node_country_emoji: resolveCountryEmoji(node.countryCode),
175176
provider_name: node.provider?.name || 'unknown',
176177
tags: node.tags.join(','),
177-
},
178+
} satisfies INodeBaseMetricLabels,
178179
node.usersOnline ?? 0,
179180
);
180181

@@ -185,7 +186,7 @@ export class ExportMetricsTask {
185186
node_country_emoji: resolveCountryEmoji(node.countryCode),
186187
provider_name: node.provider?.name || 'unknown',
187188
tags: node.tags.join(','),
188-
},
189+
} satisfies INodeBaseMetricLabels,
189190
node.isConnected ? 1 : 0,
190191
);
191192
});
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
import { InjectMetric } from '@willsoto/nestjs-prometheus';
2+
import { Counter, Gauge } from 'prom-client';
3+
4+
import { Injectable, Logger } from '@nestjs/common';
5+
import { Cron } from '@nestjs/schedule';
6+
import { QueryBus } from '@nestjs/cqrs';
7+
8+
import { resolveCountryEmoji } from '@common/utils/resolve-country-emoji';
9+
import { METRIC_NAMES } from '@libs/contracts/constants';
10+
11+
import { GetAllNodesQuery } from '@modules/nodes/queries/get-all-nodes/get-all-nodes.query';
12+
13+
import { INodeBandwidthMetricLabels, INodeBaseMetricLabels } from '@scheduler/metrics-providers';
14+
import { JOBS_INTERVALS } from '@scheduler/intervals';
15+
16+
@Injectable()
17+
export class SyncMetricsTask {
18+
private static readonly CRON_NAME = 'syncMetrics';
19+
private readonly logger = new Logger(SyncMetricsTask.name);
20+
21+
constructor(
22+
@InjectMetric(METRIC_NAMES.NODE_ONLINE_USERS) public nodeOnlineUsers: Gauge<string>,
23+
@InjectMetric(METRIC_NAMES.NODE_STATUS) public nodeStatus: Gauge<string>,
24+
@InjectMetric(METRIC_NAMES.NODE_INBOUND_UPLOAD_BYTES)
25+
public nodeInboundUploadBytes: Counter<string>,
26+
@InjectMetric(METRIC_NAMES.NODE_INBOUND_DOWNLOAD_BYTES)
27+
public nodeInboundDownloadBytes: Counter<string>,
28+
@InjectMetric(METRIC_NAMES.NODE_OUTBOUND_UPLOAD_BYTES)
29+
public nodeOutboundUploadBytes: Counter<string>,
30+
@InjectMetric(METRIC_NAMES.NODE_OUTBOUND_DOWNLOAD_BYTES)
31+
public nodeOutboundDownloadBytes: Counter<string>,
32+
private readonly queryBus: QueryBus,
33+
) {}
34+
35+
@Cron(JOBS_INTERVALS.METRIC_SYNC_METRICS, {
36+
name: SyncMetricsTask.CRON_NAME,
37+
waitForCompletion: true,
38+
})
39+
async handleCron() {
40+
try {
41+
await this.syncNodeMetrics();
42+
} catch (error) {
43+
this.logger.error(`Error in SyncMetricsTask: ${error}`);
44+
}
45+
}
46+
47+
private async syncNodeMetrics(): Promise<void> {
48+
const nodesMap = new Map<string, INodeBaseMetricLabels>();
49+
50+
try {
51+
const nodesResponse = await this.queryBus.execute(new GetAllNodesQuery());
52+
if (
53+
!nodesResponse.isOk ||
54+
!nodesResponse.response ||
55+
nodesResponse.response.length === 0
56+
) {
57+
return;
58+
}
59+
60+
for (const node of nodesResponse.response) {
61+
nodesMap.set(node.uuid, {
62+
node_uuid: node.uuid,
63+
node_name: node.name,
64+
node_country_emoji: resolveCountryEmoji(node.countryCode),
65+
provider_name: node.provider?.name || 'unknown',
66+
tags: node.tags.join(','),
67+
});
68+
}
69+
70+
const [
71+
{ values: onlineUsersValues },
72+
{ values: statusValues },
73+
{ values: inboundUploadValues },
74+
{ values: inboundDownloadValues },
75+
{ values: outboundUploadValues },
76+
{ values: outboundDownloadValues },
77+
] = await Promise.all([
78+
this.nodeOnlineUsers.get(),
79+
this.nodeStatus.get(),
80+
this.nodeInboundUploadBytes.get(),
81+
this.nodeInboundDownloadBytes.get(),
82+
this.nodeOutboundUploadBytes.get(),
83+
this.nodeOutboundDownloadBytes.get(),
84+
]);
85+
86+
this.cleanupBaseMetrics(this.nodeOnlineUsers, onlineUsersValues, nodesMap);
87+
this.cleanupBaseMetrics(this.nodeStatus, statusValues, nodesMap);
88+
89+
this.cleanupBandwidthMetrics(
90+
this.nodeInboundUploadBytes,
91+
inboundUploadValues,
92+
nodesMap,
93+
);
94+
this.cleanupBandwidthMetrics(
95+
this.nodeInboundDownloadBytes,
96+
inboundDownloadValues,
97+
nodesMap,
98+
);
99+
this.cleanupBandwidthMetrics(
100+
this.nodeOutboundUploadBytes,
101+
outboundUploadValues,
102+
nodesMap,
103+
);
104+
this.cleanupBandwidthMetrics(
105+
this.nodeOutboundDownloadBytes,
106+
outboundDownloadValues,
107+
nodesMap,
108+
);
109+
} catch (error) {
110+
this.logger.error(`Error in syncNodeMetrics: ${error}`);
111+
} finally {
112+
nodesMap.clear();
113+
}
114+
}
115+
116+
private cleanupBaseMetrics(
117+
metric: Gauge<string>,
118+
values: any[],
119+
nodesMap: Map<string, INodeBaseMetricLabels>,
120+
) {
121+
for (const stat of values) {
122+
const labels = stat.labels as INodeBaseMetricLabels;
123+
const existingNode = nodesMap.get(labels.node_uuid);
124+
125+
if (!existingNode || !this.compareBaseLabels(existingNode, labels)) {
126+
metric.remove(stat.labels);
127+
}
128+
}
129+
}
130+
131+
private cleanupBandwidthMetrics(
132+
metric: Counter<string>,
133+
values: any[],
134+
nodesMap: Map<string, INodeBaseMetricLabels>,
135+
) {
136+
for (const stat of values) {
137+
const labels = stat.labels as INodeBandwidthMetricLabels;
138+
const existingNode = nodesMap.get(labels.node_uuid);
139+
140+
if (!existingNode || !this.compareNodeBandwidthLabels(existingNode, labels)) {
141+
metric.remove(stat.labels);
142+
}
143+
}
144+
}
145+
146+
private compareBaseLabels(nodeA: INodeBaseMetricLabels, nodeB: INodeBaseMetricLabels): boolean {
147+
return (
148+
nodeA.node_uuid === nodeB.node_uuid &&
149+
nodeA.node_name === nodeB.node_name &&
150+
nodeA.node_country_emoji === nodeB.node_country_emoji &&
151+
nodeA.provider_name === nodeB.provider_name &&
152+
nodeA.tags === nodeB.tags
153+
);
154+
}
155+
156+
private compareNodeBandwidthLabels(
157+
existingNode: INodeBaseMetricLabels,
158+
metricLabels: INodeBandwidthMetricLabels,
159+
): boolean {
160+
return (
161+
existingNode.node_uuid === metricLabels.node_uuid &&
162+
existingNode.node_name === metricLabels.node_name &&
163+
existingNode.node_country_emoji === metricLabels.node_country_emoji &&
164+
existingNode.provider_name === metricLabels.provider_name &&
165+
existingNode.tags === metricLabels.tags
166+
);
167+
}
168+
}

src/scheduler/tasks/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@ import { InfraBillingNodesNotificationsTask } from './crm/infra-billing-nodes-no
22
import { NodeMetricsMessageHandler } from './export-metrics/node-metrics-message.handler';
33
import { ResetNodeTrafficTask } from './reset-node-traffic/reset-node-traffic.service';
44
import { ExportMetricsTask } from './export-metrics/export-metrics.task';
5+
import { SyncMetricsTask } from './export-metrics/sync-metrics.task';
56
import { ReviewNodesTask } from './review-nodes/review-nodes.task';
67

78
export const JOBS_SERVICES = [
89
ResetNodeTrafficTask,
910
ReviewNodesTask,
1011
ExportMetricsTask,
12+
SyncMetricsTask,
1113
InfraBillingNodesNotificationsTask,
1214
];
1315

0 commit comments

Comments
 (0)