Skip to content

Commit 6815835

Browse files
committed
refactor: remove BatchResetLimitedUsersUsageBuilder and update related references
1 parent a808498 commit 6815835

File tree

8 files changed

+116
-68
lines changed

8 files changed

+116
-68
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import { Queue } from 'bullmq';
2+
3+
import { Logger } from '@nestjs/common';
4+
5+
import { sleep } from './sleep';
6+
7+
/**
8+
* Throttle a BullMQ queue until there are no active workers
9+
* or the maximum number of attempts is reached. While throttling,
10+
* the queue is repeatedly paused, and the active worker count checked.
11+
*
12+
* @param {Queue} queue - The BullMQ queue to throttle.
13+
* @param {Logger} logger - Logger instance to log progress and status.
14+
* @returns {Promise<() => Promise<void>>} - Returns an async function that, when called, resumes the queue.
15+
* @throws {Error} If the queue still has active workers after 10 attempts.
16+
*/
17+
export async function throttleQueue(
18+
queue: Queue,
19+
logger: Logger,
20+
maxAttempts = 10,
21+
): Promise<() => Promise<void>> {
22+
await queue.pause();
23+
24+
let haveActiveWorkers = await queue.getActiveCount();
25+
let attempts = 0;
26+
27+
logger.log(`Paused ${queue.name} queue. ${haveActiveWorkers} active workers.`);
28+
29+
while (haveActiveWorkers > 0 && attempts < maxAttempts) {
30+
await sleep(1_400);
31+
haveActiveWorkers = await queue.getActiveCount();
32+
attempts++;
33+
}
34+
35+
if (haveActiveWorkers > 0) {
36+
logger.warn(`${queue.name} queue is not empty after ${maxAttempts} attempts.`);
37+
}
38+
39+
return async (): Promise<void> => {
40+
logger.log(`Resuming ${queue.name} queue after throttling.`);
41+
await queue.resume();
42+
};
43+
}

src/modules/users/builders/batch-reset-limited-users-usage/batch-reset-limited-users-usage.builder.ts

Lines changed: 0 additions & 38 deletions
This file was deleted.

src/modules/users/builders/batch-reset-limited-users-usage/index.ts

Lines changed: 0 additions & 1 deletion
This file was deleted.
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
1-
export * from './batch-reset-limited-users-usage';
21
export * from './bulk-delete-by-status';
32
export * from './bulk-update-user-used-traffic';

src/modules/users/commands/batch-reset-limited-users-traffic/batch-reset-limited-users-traffic.handler.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ export class BatchResetLimitedUsersTrafficHandler implements ICommandHandler<Bat
1616

1717
async execute(command: BatchResetLimitedUsersTrafficCommand) {
1818
try {
19-
const result = await this.usersRepository.resetLimitedUsersTraffic(command.strategy);
19+
const result = await this.usersRepository.resetLimitedUserTraffic(command.strategy);
2020

2121
return ok(result);
2222
} catch (error: unknown) {

src/modules/users/repositories/users.repository.ts

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,6 @@ import { GetAllUsersCommand } from '@libs/contracts/commands';
1616

1717
import { ConfigProfileInboundEntity } from '@modules/config-profiles/entities';
1818

19-
import {
20-
BatchResetLimitedUsersUsageBuilder,
21-
BulkDeleteByStatusBuilder,
22-
BulkUpdateUserUsedTrafficBuilder,
23-
} from '../builders';
2419
import {
2520
IGetUserAccessibleNodes,
2621
IGetUserAccessibleNodesResponse,
@@ -34,6 +29,7 @@ import {
3429
UserWithResolvedInboundEntity,
3530
} from '../entities';
3631
import { TriggerThresholdNotificationsBuilder } from '../builders/trigger-threshold-notifications-builder';
32+
import { BulkDeleteByStatusBuilder, BulkUpdateUserUsedTrafficBuilder } from '../builders';
3733
import { UserTrafficEntity } from '../entities/user-traffic.entity';
3834
import { UserConverter } from '../users.converter';
3935

@@ -651,16 +647,25 @@ export class UsersRepository {
651647

652648
public async resetUserTraffic(strategy: TResetPeriods): Promise<void> {
653649
await this.qb.kysely
650+
.with('targetUsers', (db) =>
651+
db
652+
.selectFrom('users')
653+
.select('tId')
654+
.where('trafficLimitStrategy', '=', strategy)
655+
.where('status', '!=', USERS_STATUS.LIMITED)
656+
.orderBy('tId')
657+
.forUpdate(),
658+
)
654659
.with('updateUsers', (db) =>
655660
db
656661
.updateTable('users')
662+
.from('targetUsers')
663+
.whereRef('users.tId', '=', 'targetUsers.tId')
657664
.set({
658665
lastTrafficResetAt: new Date(),
659666
lastTriggeredThreshold: 0,
660667
})
661-
.where('trafficLimitStrategy', '=', strategy)
662-
.where('status', '!=', USERS_STATUS.LIMITED)
663-
.returning('tId'),
668+
.returning('users.tId'),
664669
)
665670
.updateTable('userTraffic')
666671
.from('updateUsers')
@@ -671,9 +676,37 @@ export class UsersRepository {
671676
.execute();
672677
}
673678

674-
public async resetLimitedUsersTraffic(strategy: TResetPeriods): Promise<{ tId: bigint }[]> {
675-
const { query } = new BatchResetLimitedUsersUsageBuilder(strategy);
676-
const result = await this.prisma.tx.$queryRaw<{ tId: bigint }[]>(query);
679+
public async resetLimitedUserTraffic(strategy: TResetPeriods): Promise<{ tId: bigint }[]> {
680+
const result = await this.qb.kysely
681+
.with('targetUsers', (db) =>
682+
db
683+
.selectFrom('users')
684+
.select('tId')
685+
.where('trafficLimitStrategy', '=', strategy)
686+
.where('status', '=', USERS_STATUS.LIMITED)
687+
.orderBy('tId')
688+
.forUpdate(),
689+
)
690+
.with('updateUsers', (db) =>
691+
db
692+
.updateTable('users')
693+
.from('targetUsers')
694+
.whereRef('users.tId', '=', 'targetUsers.tId')
695+
.set({
696+
lastTrafficResetAt: new Date(),
697+
lastTriggeredThreshold: 0,
698+
status: USERS_STATUS.ACTIVE,
699+
})
700+
.returning('users.tId'),
701+
)
702+
.updateTable('userTraffic')
703+
.from('updateUsers')
704+
.whereRef('userTraffic.tId', '=', 'updateUsers.tId')
705+
.set({
706+
usedTrafficBytes: 0n,
707+
})
708+
.returning('userTraffic.tId')
709+
.execute();
677710

678711
return result;
679712
}

src/queue/_users/processors/reset-user-traffic.processor.ts

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { Logger, Scope } from '@nestjs/common';
55
import { CommandBus } from '@nestjs/cqrs';
66

77
import { formatExecutionTime, getTime } from '@common/utils/get-elapsed-time';
8+
import { throttleQueue } from '@common/utils/throttle-queue.util';
89
import { RESET_PERIODS, TResetPeriods } from '@libs/contracts/constants';
910
import { EVENTS } from '@libs/contracts/constants/events/events';
1011

@@ -23,7 +24,7 @@ import { UsersQueuesService } from '../users-queues.service';
2324
scope: Scope.REQUEST,
2425
},
2526
{
26-
concurrency: 2,
27+
concurrency: 1,
2728
},
2829
)
2930
export class ResetUserTrafficQueueProcessor extends WorkerHost {
@@ -39,20 +40,31 @@ export class ResetUserTrafficQueueProcessor extends WorkerHost {
3940
}
4041

4142
async process(job: Job) {
42-
switch (job.name) {
43-
case USERS_JOB_NAMES.RESET_DAILY_USER_TRAFFIC:
44-
return this.handleResetUserTraffic(job, RESET_PERIODS.DAY);
45-
case USERS_JOB_NAMES.RESET_MONTHLY_USER_TRAFFIC:
46-
return this.handleResetUserTraffic(job, RESET_PERIODS.MONTH);
47-
case USERS_JOB_NAMES.RESET_WEEKLY_USER_TRAFFIC:
48-
return this.handleResetUserTraffic(job, RESET_PERIODS.WEEK);
49-
case USERS_JOB_NAMES.RESET_NO_RESET_USER_TRAFFIC:
50-
return this.handleResetUserTraffic(job, RESET_PERIODS.NO_RESET);
51-
case USERS_JOB_NAMES.RESET_ALL_USER_TRAFFIC:
52-
return this.handleResetAllUserTraffic(job);
53-
default:
54-
this.logger.warn(`Job "${job.name}" is not handled.`);
55-
break;
43+
const activateQueue = await throttleQueue(
44+
this.usersQueuesService.queues.updateUsersUsage,
45+
this.logger,
46+
);
47+
48+
try {
49+
switch (job.name) {
50+
case USERS_JOB_NAMES.RESET_DAILY_USER_TRAFFIC:
51+
return this.handleResetUserTraffic(job, RESET_PERIODS.DAY);
52+
case USERS_JOB_NAMES.RESET_MONTHLY_USER_TRAFFIC:
53+
return this.handleResetUserTraffic(job, RESET_PERIODS.MONTH);
54+
case USERS_JOB_NAMES.RESET_WEEKLY_USER_TRAFFIC:
55+
return this.handleResetUserTraffic(job, RESET_PERIODS.WEEK);
56+
case USERS_JOB_NAMES.RESET_NO_RESET_USER_TRAFFIC:
57+
return this.handleResetUserTraffic(job, RESET_PERIODS.NO_RESET);
58+
case USERS_JOB_NAMES.RESET_ALL_USER_TRAFFIC:
59+
return this.handleResetAllUserTraffic(job);
60+
default:
61+
this.logger.warn(`Job "${job.name}" is not handled.`);
62+
break;
63+
}
64+
} catch (error) {
65+
this.logger.error(`Error handling "${job.name}" job: ${error}`);
66+
} finally {
67+
await activateQueue();
5668
}
5769
}
5870

src/queue/_users/users-queues.service.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ export class UsersQueuesService implements OnApplicationBootstrap {
6363
this.logger.log(`${Object.values(this.queues).length} queues are connected.`);
6464

6565
await this.serialUsersOperationsQueue.setGlobalConcurrency(1);
66-
await this.resetUserTrafficQueue.setGlobalConcurrency(2);
66+
await this.resetUserTrafficQueue.setGlobalConcurrency(1);
6767
await this.usersWatchdogQueue.setGlobalConcurrency(2);
6868

6969
await this.updateUsersUsageQueue.setGlobalConcurrency(5); // TODO: carefully

0 commit comments

Comments
 (0)