Skip to content

Commit 7a1fc5b

Browse files
committed
refactor: consolidate and simplify scheduler tasks
- Remove multiple granular scheduler services - Streamline task management in scheduler module - Introduce lightweight tasks for core scheduling functions - Clean up unused and redundant scheduler-related files - Update scheduler module to focus on essential periodic jobs
1 parent 409f02f commit 7a1fc5b

File tree

77 files changed

+1323
-1100
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+1323
-1100
lines changed

src/modules/nodes-user-usage-history/builders/bulk-upsert-history-entry/bulk-upsert-history-entry.builder.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ export class BulkUpsertHistoryEntryBuilder {
2222
"total_bytes",
2323
"created_at",
2424
"updated_at"
25-
) VALUES ${usageHistoryList.map((usageHistory) => `('${usageHistory.nodeUuid}', '${usageHistory.userUuid}', ${usageHistory.downloadBytes}', '${usageHistory.uploadBytes}', '${usageHistory.totalBytes}', '${date.toISOString()}', NOW())`).join(',')}
25+
) VALUES ${usageHistoryList.map((usageHistory) => `('${usageHistory.nodeUuid}', '${usageHistory.userUuid}', ${usageHistory.downloadBytes}, ${usageHistory.uploadBytes}, ${usageHistory.totalBytes}, '${date.toISOString()}', NOW())`).join(',')}
2626
ON CONFLICT ("node_uuid","user_uuid","created_at")
2727
DO UPDATE
2828
SET
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
2+
import { Logger } from '@nestjs/common';
3+
4+
import { ICommandResponse } from '@common/types/command-response.type';
5+
import { ERRORS } from '@libs/contracts/constants';
6+
7+
import { NodesRepository } from '../../repositories/nodes.repository';
8+
import { NodesEntity } from '../../entities/nodes.entity';
9+
import { GetAllNodesQuery } from './get-all-nodes.query';
10+
11+
@QueryHandler(GetAllNodesQuery)
12+
export class GetAllNodesHandler
13+
implements IQueryHandler<GetAllNodesQuery, ICommandResponse<NodesEntity[]>>
14+
{
15+
private readonly logger = new Logger(GetAllNodesHandler.name);
16+
constructor(private readonly nodesRepository: NodesRepository) {}
17+
18+
async execute(): Promise<ICommandResponse<NodesEntity[]>> {
19+
try {
20+
const nodes = await this.nodesRepository.findAllNodes();
21+
22+
return {
23+
isOk: true,
24+
response: nodes,
25+
};
26+
} catch (error) {
27+
this.logger.error(error);
28+
return {
29+
isOk: false,
30+
...ERRORS.INTERNAL_SERVER_ERROR,
31+
};
32+
}
33+
}
34+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export class GetAllNodesQuery {
2+
constructor() {}
3+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export * from './get-all-nodes.handler';
2+
export * from './get-all-nodes.query';

src/modules/nodes/queries/index.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
import { GetNodesByCriteriaHandler } from './get-nodes-by-criteria';
22
import { GetEnabledNodesHandler } from './get-enabled-nodes';
33
import { GetOnlineNodesHandler } from './get-online-nodes';
4+
import { GetAllNodesHandler } from './get-all-nodes';
45

5-
export const QUERIES = [GetEnabledNodesHandler, GetOnlineNodesHandler, GetNodesByCriteriaHandler];
6+
export const QUERIES = [
7+
GetEnabledNodesHandler,
8+
GetOnlineNodesHandler,
9+
GetNodesByCriteriaHandler,
10+
GetAllNodesHandler,
11+
];

src/modules/nodes/repositories/nodes.repository.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@ export class NodesRepository implements ICrud<NodesEntity> {
5353
return nodesList.map((value) => new NodesEntity(value));
5454
}
5555

56+
public async findAllNodes(): Promise<NodesEntity[]> {
57+
const nodesList = await this.prisma.tx.nodes.findMany({
58+
include: ADD_EXCLUSIONS_SELECT,
59+
});
60+
61+
return nodesList.map((value) => new NodesEntity(value));
62+
}
63+
5664
public async incrementUsedTraffic(nodeUuid: string, bytes: bigint): Promise<void> {
5765
await this.prisma.tx.nodes.update({
5866
where: { uuid: nodeUuid },

src/queue/node-users/node-users.processor.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { Job } from 'bullmq';
22

33
import { Processor, WorkerHost } from '@nestjs/bullmq';
4-
import { CommandBus, QueryBus } from '@nestjs/cqrs';
54
import { Logger } from '@nestjs/common';
65

76
import { AxiosService } from '@common/axios';
@@ -17,11 +16,7 @@ import { QueueNames } from '../queue.enum';
1716
export class NodeUsersQueueProcessor extends WorkerHost {
1817
private readonly logger = new Logger(NodeUsersQueueProcessor.name);
1918

20-
constructor(
21-
private readonly queryBus: QueryBus,
22-
private readonly commandBus: CommandBus,
23-
private readonly axios: AxiosService,
24-
) {
19+
constructor(private readonly axios: AxiosService) {
2520
super();
2621
}
2722

src/queue/queue.enum.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
export enum QueueNames {
22
nodeHealthCheck = 'NODE_HEALTH_CHECK_QUEUE',
33
nodeUsers = 'NODE_USERS_QUEUE',
4+
recordNodeUsage = 'RECORD_NODE_USAGE_QUEUE',
5+
recordUserUsage = 'RECORD_USER_USAGE_QUEUE',
6+
resetUserTraffic = 'RESET_USER_TRAFFIC_QUEUE',
47
startAllNodes = 'START_ALL_NODES_QUEUE',
58
startNode = 'START_NODE_QUEUE',
69
stopNode = 'STOP_NODE_QUEUE',
10+
userJobs = 'USER_JOBS_QUEUE',
711
}

src/queue/queue.module.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,26 @@ import { BasicAuthMiddleware } from '@common/middlewares';
1010
import { useBullBoard } from '@common/utils/startup-app';
1111
import { BULLBOARD_ROOT } from '@libs/contracts/api';
1212

13+
import { ResetUserTrafficQueueModule } from './reset-user-traffic/reset-user-traffic.module';
1314
import { NodeHealthCheckQueueModule } from './node-health-check/node-health-check.module';
15+
import { RecordNodeUsageQueueModule } from './record-node-usage/record-node-usage.module';
16+
import { RecordUserUsageQueueModule } from './record-user-usage/record-user-usage.module';
1417
import { StartAllNodesQueueModule } from './start-all-nodes/start-all-nodes.module';
1518
import { StartNodeQueueModule } from './start-node/start-node.module';
1619
import { NodeUsersQueueModule } from './node-users/node-users.module';
1720
import { StopNodeQueueModule } from './stop-node/stop-node.module';
21+
import { UserJobsQueueModule } from './user-jobs/user-jobs.module';
1822

1923
const queueModules = [
2024
StartAllNodesQueueModule,
2125
StartNodeQueueModule,
2226
StopNodeQueueModule,
2327
NodeHealthCheckQueueModule,
2428
NodeUsersQueueModule,
29+
RecordNodeUsageQueueModule,
30+
RecordUserUsageQueueModule,
31+
ResetUserTrafficQueueModule,
32+
UserJobsQueueModule,
2533
];
2634

2735
const bullBoard = [
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from './record-node-usage-job-names.enum';

0 commit comments

Comments
 (0)