Skip to content

Commit 4a38051

Browse files
committed
feat: implement node disabling and stopping logic in queue processors
- Added logic to disable nodes without active inbounds in StartAllNodesByProfileQueueProcessor. - Integrated StopNodeQueueService to handle stopping of nodes in both StartAllNodesByProfileQueueProcessor and StartNodeQueueProcessor.
1 parent 26de7af commit 4a38051

File tree

2 files changed

+53
-1
lines changed

2 files changed

+53
-1
lines changed

src/queue/start-all-nodes-by-profile/start-all-nodes-by-profile.processor.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { NodesEntity, NodesRepository } from '@modules/nodes';
1313

1414
import { StartAllNodesQueueService } from '@queue/start-all-nodes';
1515
import { StartNodeQueueService } from '@queue/start-node';
16+
import { StopNodeQueueService } from '@queue/stop-node';
1617

1718
import { StartAllNodesByProfileJobNames } from './enums';
1819
import { QueueNames } from '../queue.enum';
@@ -34,6 +35,7 @@ export class StartAllNodesByProfileQueueProcessor extends WorkerHost {
3435
private readonly nodesRepository: NodesRepository,
3536
private readonly axios: AxiosService,
3637
private readonly startNodeQueueService: StartNodeQueueService,
38+
private readonly stopNodeQueueService: StopNodeQueueService,
3739
private readonly startAllNodesQueueService: StartAllNodesQueueService,
3840
private readonly queryBus: QueryBus,
3941
) {
@@ -69,6 +71,32 @@ export class StartAllNodesByProfileQueueProcessor extends WorkerHost {
6971
const activeNodeTags = new Map<string, string[]>();
7072

7173
for (const node of nodes) {
74+
if (node.activeInbounds.length === 0) {
75+
this.logger.warn(
76+
`No active inbounds found for node ${node.uuid} with profile ${payload.profileUuid}, disabling and clearing profile from node...`,
77+
);
78+
79+
await this.nodesRepository.update({
80+
uuid: node.uuid,
81+
isDisabled: true,
82+
activeConfigProfileUuid: null,
83+
isConnecting: false,
84+
isXrayRunning: false,
85+
isNodeOnline: false,
86+
isConnected: false,
87+
lastStatusMessage: null,
88+
lastStatusChange: new Date(),
89+
usersOnline: 0,
90+
});
91+
92+
await this.stopNodeQueueService.stopNode({
93+
nodeUuid: node.uuid,
94+
isNeedToBeDeleted: false,
95+
});
96+
97+
continue;
98+
}
99+
72100
await this.nodesRepository.update({
73101
uuid: node.uuid,
74102
isConnecting: true,

src/queue/start-node/start-node.processor.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import { NodeEvent } from '@integration-modules/notifications/interfaces';
1313
import { GetPreparedConfigWithUsersQuery } from '@modules/users/queries/get-prepared-config-with-users';
1414
import { NodesRepository } from '@modules/nodes';
1515

16+
import { StopNodeQueueService } from '@queue/stop-node';
17+
1618
import { StartNodeJobNames } from './enums';
1719
import { QueueNames } from '../queue.enum';
1820

@@ -25,6 +27,7 @@ export class StartNodeQueueProcessor extends WorkerHost {
2527
constructor(
2628
private readonly axios: AxiosService,
2729
private readonly nodesRepository: NodesRepository,
30+
private readonly stopNodeQueueService: StopNodeQueueService,
2831
private readonly queryBus: QueryBus,
2932
private readonly eventEmitter: EventEmitter2,
3033
) {
@@ -47,7 +50,28 @@ export class StartNodeQueueProcessor extends WorkerHost {
4750
}
4851

4952
if (!nodeEntity.activeConfigProfileUuid || !nodeEntity.activeInbounds) {
50-
this.logger.error(`Node ${nodeUuid} has no active config profile or inbounds`);
53+
this.logger.warn(
54+
`Node ${nodeUuid} has no active config profile or inbounds, disabling and clearing profile from node...`,
55+
);
56+
57+
await this.nodesRepository.update({
58+
uuid: nodeEntity.uuid,
59+
isDisabled: true,
60+
activeConfigProfileUuid: null,
61+
isConnecting: false,
62+
isXrayRunning: false,
63+
isNodeOnline: false,
64+
isConnected: false,
65+
lastStatusMessage: null,
66+
lastStatusChange: new Date(),
67+
usersOnline: 0,
68+
});
69+
70+
await this.stopNodeQueueService.stopNode({
71+
nodeUuid: nodeEntity.uuid,
72+
isNeedToBeDeleted: false,
73+
});
74+
5175
return;
5276
}
5377

0 commit comments

Comments
 (0)