Skip to content

Commit de6518d

Browse files
committed
feat: add node health check queue and refactor health check service
- Introduce NodeHealthCheckQueueModule and NodeHealthCheckQueueService - Simplify NodeHealthCheckService by delegating node health checks to queue - Remove unused event and command bus dependencies - Add new queue name for node health check in QueueNames enum - Update queue module to include NodeHealthCheckQueueModule
1 parent bb8efa7 commit de6518d

File tree

14 files changed

+252
-109
lines changed

14 files changed

+252
-109
lines changed

.hygen/new/queue/queue.module.ejs.t

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ unless_exists: true
1616
import { BullBoardModule } from '@bull-board/nestjs';
1717

1818
import { BullModule } from '@nestjs/bullmq';
19+
import { CqrsModule } from '@nestjs/cqrs';
1920
import { Module } from '@nestjs/common';
2021

2122
import { useBullBoard, useQueueProcessor } from '@common/utils/startup-app';

.hygen/new/queue/queue.processor.ejs.t

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ unless_exists: true
1111
%>import { Job } from 'bullmq';
1212

1313
import { Processor, WorkerHost } from '@nestjs/bullmq';
14+
import { CommandBus, QueryBus } from '@nestjs/cqrs';
1415
import { Logger } from '@nestjs/common';
1516

1617
import { <%= QueueJobNamesEnumName %> } from './enums';
@@ -20,12 +21,19 @@ import { QueueNames } from '../queue.enum';
2021
export class <%= QueueProcessorName %> extends WorkerHost {
2122
private readonly logger = new Logger(<%= QueueProcessorName %>.name)
2223

24+
constructor(
25+
private readonly queryBus: QueryBus,
26+
private readonly commandBus: CommandBus,
27+
) {
28+
super();
29+
}
30+
2331
async process(job: Job) {
2432
switch (job.name) {
2533
case <%= QueueJobNamesEnumName %>.exampleJob:
2634
return this.handleExampleJob(job);
2735
default:
28-
this.logger.warn(`⚠️ Job "${job.name}" is not handled.`);
36+
this.logger.warn(`🚨 Job "${job.name}" is not handled.`);
2937
break;
3038
}
3139
}

src/modules/xray-config/xray-config.service.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { ERRORS } from '@contract/constants';
22

33
import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common';
4-
import { CommandBus, EventBus, QueryBus } from '@nestjs/cqrs';
4+
import { CommandBus, QueryBus } from '@nestjs/cqrs';
55

66
import { ICommandResponse } from '@common/types/command-response.type';
77
import { IXrayConfig } from '@common/helpers/xray-config/interfaces';
@@ -27,7 +27,6 @@ export class XrayConfigService implements OnApplicationBootstrap {
2727
constructor(
2828
private readonly commandBus: CommandBus,
2929
private readonly queryBus: QueryBus,
30-
private readonly eventBus: EventBus,
3130
private readonly xrayConfigRepository: XrayConfigRepository,
3231
private readonly startAllNodesQueue: StartAllNodesQueueService,
3332
) {}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from './node-health-check-job-names.enum';
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export enum NodeHealthCheckJobNames {
2+
checkNodeHealth = 'checkNodeHealth',
3+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export * from './enums';
2+
export * from './node-health-check.service';
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from './node-health-check.interface';
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export interface NodeHealthCheckPayload {
2+
nodeUuid: string;
3+
nodeAddress: string;
4+
nodePort: number | null;
5+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
2+
import { BullBoardModule } from '@bull-board/nestjs';
3+
4+
import { BullModule } from '@nestjs/bullmq';
5+
import { CqrsModule } from '@nestjs/cqrs';
6+
import { Module } from '@nestjs/common';
7+
8+
import { useBullBoard, useQueueProcessor } from '@common/utils/startup-app';
9+
10+
import { NodeHealthCheckQueueProcessor } from './node-health-check.processor';
11+
import { NodeHealthCheckQueueService } from './node-health-check.service';
12+
import { QueueNames } from '../queue.enum';
13+
14+
const requiredModules = [CqrsModule];
15+
16+
const processors = [NodeHealthCheckQueueProcessor];
17+
const services = [NodeHealthCheckQueueService];
18+
19+
const queues = [BullModule.registerQueue({ name: QueueNames.nodeHealthCheck })];
20+
21+
const bullBoard = [
22+
BullBoardModule.forFeature({ name: QueueNames.nodeHealthCheck, adapter: BullMQAdapter }),
23+
];
24+
25+
const providers = useQueueProcessor() ? processors : [];
26+
const imports = useBullBoard() ? bullBoard : [];
27+
28+
@Module({
29+
imports: [...queues, ...imports, ...requiredModules],
30+
providers: [...providers, ...services],
31+
exports: [...services],
32+
})
33+
export class NodeHealthCheckQueueModule {}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
import { Job } from 'bullmq';
2+
3+
import { Processor, WorkerHost } from '@nestjs/bullmq';
4+
import { EventEmitter2 } from '@nestjs/event-emitter';
5+
import { CommandBus, QueryBus } from '@nestjs/cqrs';
6+
import { Logger } from '@nestjs/common';
7+
8+
import { GetSystemStatsCommand } from '@remnawave/node-contract';
9+
10+
import { ICommandResponse } from '@common/types/command-response.type';
11+
import { AxiosService } from '@common/axios';
12+
import { EVENTS } from '@libs/contracts/constants';
13+
14+
import { NodeEvent } from '@intergration-modules/telegram-bot/events/nodes/interfaces';
15+
16+
import { GetEnabledNodesQuery } from '@modules/nodes/queries/get-enabled-nodes';
17+
import { UpdateNodeCommand } from '@modules/nodes/commands/update-node';
18+
import { NodesEntity } from '@modules/nodes/entities/nodes.entity';
19+
20+
import { NodeHealthCheckJobNames } from './enums';
21+
import { QueueNames } from '../queue.enum';
22+
23+
@Processor(QueueNames.nodeHealthCheck, {
24+
concurrency: 40,
25+
})
26+
export class NodeHealthCheckQueueProcessor extends WorkerHost {
27+
private readonly logger = new Logger(NodeHealthCheckQueueProcessor.name);
28+
29+
constructor(
30+
private readonly queryBus: QueryBus,
31+
private readonly commandBus: CommandBus,
32+
private readonly eventEmitter: EventEmitter2,
33+
private readonly axios: AxiosService,
34+
) {
35+
super();
36+
}
37+
async process(job: Job) {
38+
switch (job.name) {
39+
case NodeHealthCheckJobNames.checkNodeHealth:
40+
return this.handleCheckNodeHealthJob(job);
41+
default:
42+
this.logger.warn(`🚨 Job "${job.name}" is not handled.`);
43+
break;
44+
}
45+
}
46+
47+
private async handleCheckNodeHealthJob(job: Job<NodesEntity>) {
48+
this.logger.debug(
49+
`✅ Handling "${NodeHealthCheckJobNames.checkNodeHealth}" job with ID: ${job?.id || ''}, data: ${JSON.stringify(job?.data || '')}`,
50+
);
51+
52+
try {
53+
const response = await this.axios.getSystemStats(job.data.address, job.data.port);
54+
switch (response.isOk) {
55+
case true:
56+
return this.handleConnectedNode(job.data, response.response!);
57+
case false:
58+
return this.handleDisconnectedNode(job.data, response.message);
59+
}
60+
} catch (error) {
61+
this.logger.error(
62+
`❌ Error handling "${NodeHealthCheckJobNames.checkNodeHealth}" job: ${error}`,
63+
);
64+
}
65+
}
66+
67+
private async handleConnectedNode(node: NodesEntity, response: GetSystemStatsCommand.Response) {
68+
if (typeof response.response.uptime !== 'number') {
69+
this.logger.error(`Node ${node.uuid} uptime is not a number`);
70+
return;
71+
}
72+
73+
await this.updateNode({
74+
node: {
75+
uuid: node.uuid,
76+
isConnected: true,
77+
isNodeOnline: true,
78+
isXrayRunning: true,
79+
lastStatusChange: new Date(),
80+
lastStatusMessage: '',
81+
},
82+
});
83+
84+
if (!node.isConnected) {
85+
this.eventEmitter.emit(
86+
EVENTS.NODE.CONNECTION_RESTORED,
87+
new NodeEvent(node, EVENTS.NODE.CONNECTION_RESTORED),
88+
);
89+
}
90+
}
91+
92+
private async handleDisconnectedNode(node: NodesEntity, message: string | undefined) {
93+
this.logger.debug(`Node ${node.uuid} is disconnected: ${message}`);
94+
95+
const newNodeEntity = await this.updateNode({
96+
node: {
97+
uuid: node.uuid,
98+
isConnected: false,
99+
isNodeOnline: false,
100+
isXrayRunning: false,
101+
lastStatusChange: new Date(),
102+
lastStatusMessage: message,
103+
usersOnline: 0,
104+
},
105+
});
106+
107+
// this.eventBus.publish(new StartNodeEvent(newNodeEntity.response || node));
108+
109+
if (node.isConnected) {
110+
node.lastStatusMessage = message || null;
111+
this.eventEmitter.emit(
112+
EVENTS.NODE.CONNECTION_LOST,
113+
new NodeEvent(node, EVENTS.NODE.CONNECTION_LOST),
114+
);
115+
}
116+
}
117+
118+
private async getEnabledNodes(): Promise<ICommandResponse<NodesEntity[]>> {
119+
return this.queryBus.execute<GetEnabledNodesQuery, ICommandResponse<NodesEntity[]>>(
120+
new GetEnabledNodesQuery(),
121+
);
122+
}
123+
124+
private async updateNode(dto: UpdateNodeCommand): Promise<ICommandResponse<NodesEntity>> {
125+
return this.commandBus.execute<UpdateNodeCommand, ICommandResponse<NodesEntity>>(
126+
new UpdateNodeCommand(dto.node),
127+
);
128+
}
129+
}

0 commit comments

Comments
 (0)