Skip to content

Commit 289d2ee

Browse files
committed
refactor: node queries
1 parent 90ae927 commit 289d2ee

File tree

12 files changed

+88
-40
lines changed

12 files changed

+88
-40
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
2+
import { Logger } from '@nestjs/common';
3+
4+
import { fail, ok, TResult } from '@common/types';
5+
import { ERRORS } from '@libs/contracts/constants';
6+
7+
import {
8+
GetEnabledNodesPartialQuery,
9+
IGetEnabledNodesPartialResponse,
10+
} from './get-enabled-nodes-partial.query';
11+
import { NodesRepository } from '../../repositories/nodes.repository';
12+
13+
@QueryHandler(GetEnabledNodesPartialQuery)
14+
export class GetEnabledNodesPartialHandler implements IQueryHandler<GetEnabledNodesPartialQuery> {
15+
private readonly logger = new Logger(GetEnabledNodesPartialHandler.name);
16+
constructor(private readonly nodesRepository: NodesRepository) {}
17+
18+
async execute(): Promise<TResult<IGetEnabledNodesPartialResponse[]>> {
19+
try {
20+
const nodes = await this.nodesRepository.findEnabledNodesPartial();
21+
22+
return ok(nodes);
23+
} catch (error) {
24+
this.logger.error(error);
25+
return fail(ERRORS.INTERNAL_SERVER_ERROR);
26+
}
27+
}
28+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import { Query } from '@nestjs/cqrs';
2+
3+
import { TResult } from '@common/types';
4+
5+
export interface IGetEnabledNodesPartialResponse {
6+
uuid: string;
7+
address: string;
8+
port: number | null;
9+
isConnected: boolean;
10+
}
11+
12+
export class GetEnabledNodesPartialQuery extends Query<TResult<IGetEnabledNodesPartialResponse[]>> {
13+
constructor() {
14+
super();
15+
}
16+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export * from './get-enabled-nodes-partial.handler';
2+
export * from './get-enabled-nodes-partial.query';

src/modules/nodes/queries/get-enabled-nodes/get-enabled-nodes.handler.ts

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,18 @@
11
import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
22
import { Logger } from '@nestjs/common';
33

4-
import { fail, ok, TResult } from '@common/types';
4+
import { fail, ok } from '@common/types';
55
import { ERRORS } from '@libs/contracts/constants';
66

77
import { NodesRepository } from '../../repositories/nodes.repository';
88
import { GetEnabledNodesQuery } from './get-enabled-nodes.query';
9-
import { NodesEntity } from '../../entities/nodes.entity';
109

1110
@QueryHandler(GetEnabledNodesQuery)
12-
export class GetEnabledNodesHandler implements IQueryHandler<
13-
GetEnabledNodesQuery,
14-
TResult<NodesEntity[]>
15-
> {
11+
export class GetEnabledNodesHandler implements IQueryHandler<GetEnabledNodesQuery> {
1612
private readonly logger = new Logger(GetEnabledNodesHandler.name);
1713
constructor(private readonly nodesRepository: NodesRepository) {}
1814

19-
async execute(): Promise<TResult<NodesEntity[]>> {
15+
async execute() {
2016
try {
2117
const nodes = await this.nodesRepository.findByCriteria({
2218
isDisabled: false,
Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1-
export class GetEnabledNodesQuery {
2-
constructor() {}
1+
import { Query } from '@nestjs/cqrs';
2+
3+
import { TResult } from '@common/types';
4+
5+
import { NodesEntity } from '../../entities/nodes.entity';
6+
7+
export class GetEnabledNodesQuery extends Query<TResult<NodesEntity[]>> {
8+
constructor() {
9+
super();
10+
}
311
}

src/modules/nodes/queries/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { GetEnabledNodesPartialHandler } from './get-enabled-nodes-partial/get-enabled-nodes-partial.handler';
12
import { FindNodesByCriteriaHandler } from './find-nodes-by-criteria';
23
import { GetNodesByCriteriaHandler } from './get-nodes-by-criteria';
34
import { CountOnlineUsersHandler } from './count-online-users';
@@ -14,4 +15,5 @@ export const QUERIES = [
1415
CountOnlineUsersHandler,
1516
GetNodeByUuidHandler,
1617
FindNodesByCriteriaHandler,
18+
GetEnabledNodesPartialHandler,
1719
];

src/modules/nodes/repositories/nodes.repository.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { getKyselyUuid } from '@common/helpers/kysely/get-kysely-uuid';
99
import { TxKyselyService } from '@common/database';
1010
import { ICrud } from '@common/types/crud-port';
1111

12+
import { IGetEnabledNodesPartialResponse } from '../queries/get-enabled-nodes-partial/get-enabled-nodes-partial.query';
1213
import { IGetOnlineNodesPartialResponse } from '../queries/get-online-nodes';
1314
import { NodesEntity } from '../entities/nodes.entity';
1415
import { NodesConverter } from '../nodes.converter';
@@ -81,6 +82,17 @@ export class NodesRepository implements ICrud<NodesEntity> {
8182
return nodesList;
8283
}
8384

85+
public async findEnabledNodesPartial(): Promise<IGetEnabledNodesPartialResponse[]> {
86+
const nodesList = await this.qb.kysely
87+
.selectFrom('nodes')
88+
.select(['uuid', 'address', 'port', 'isConnected'])
89+
.where('isDisabled', '=', false)
90+
.where('isConnecting', '=', false)
91+
.execute();
92+
93+
return nodesList;
94+
}
95+
8496
public async findConnectedNodesWithoutInbounds(): Promise<
8597
{
8698
uuid: string;

src/queue/_nodes/interfaces/node-health-check.interface.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,4 @@ export interface INodeHealthCheckPayload {
33
nodeAddress: string;
44
nodePort: number | null;
55
isConnected: boolean;
6-
isConnecting: boolean;
76
}

src/queue/_nodes/nodes-queues.service.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { Queue } from 'bullmq';
33
import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common';
44
import { InjectQueue } from '@nestjs/bullmq';
55

6-
import { NodesEntity } from '@modules/nodes';
6+
import { IGetEnabledNodesPartialResponse } from '@modules/nodes/queries/get-enabled-nodes-partial/get-enabled-nodes-partial.query';
77

88
import { QUEUES_NAMES } from '@queue/queue.enum';
99

@@ -77,7 +77,7 @@ export class NodesQueuesService implements OnApplicationBootstrap {
7777
});
7878
}
7979

80-
public async checkNodeHealthBulk(payload: NodesEntity[]) {
80+
public async checkNodeHealthBulk(payload: IGetEnabledNodesPartialResponse[]) {
8181
return this.nodeHealthCheckQueue.addBulk(
8282
payload.map((node) => {
8383
return {
@@ -87,7 +87,6 @@ export class NodesQueuesService implements OnApplicationBootstrap {
8787
nodeAddress: node.address,
8888
nodePort: node.port,
8989
isConnected: node.isConnected,
90-
isConnecting: node.isConnecting,
9190
} satisfies INodeHealthCheckPayload,
9291
opts: {
9392
jobId: `${NODES_JOB_NAMES.NODE_HEALTH_CHECK}-${node.uuid}`,

src/queue/_nodes/processors/node-health-check.processor.ts

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,7 @@ export class NodeHealthCheckQueueProcessor extends WorkerHost {
3636
}
3737
async process(job: Job<INodeHealthCheckPayload>) {
3838
try {
39-
const { nodeAddress, nodePort, nodeUuid, isConnected, isConnecting } = job.data;
40-
41-
if (isConnecting) {
42-
this.logger.warn(
43-
`Node ${nodeUuid}, ${nodeAddress}:${nodePort} – is connecting, skipping health check`,
44-
);
45-
return;
46-
}
39+
const { nodeAddress, nodePort, nodeUuid, isConnected } = job.data;
4740

4841
const attemptsLimit = 2;
4942
let attempts = 0;

0 commit comments

Comments
 (0)