Skip to content

Commit d8d7ef9

Browse files
committed
refactor: optimize user traffic reset jobs
- Simplified user traffic reset logic for daily, weekly, and monthly jobs - Introduced BatchResetLimitedUsersTrafficCommand for efficient user processing - Added handling for large user sets with node restart - Removed redundant status and history creation methods - Updated SQL query to set user status to ACTIVE during traffic reset
1 parent 423a9a1 commit d8d7ef9

File tree

6 files changed

+178
-155
lines changed

6 files changed

+178
-155
lines changed

src/app.module.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ import { PrismaModule } from '@common/database';
3030
imports: [PrismaModule],
3131
adapter: new TransactionalAdapterPrisma({
3232
prismaInjectionToken: PrismaService,
33+
defaultTxOptions: {
34+
timeout: 60_000,
35+
},
3336
}),
3437
}),
3538
],

src/jobs/tasks/reset-user-traffic-jobs/reset-user-traffic-day/reset-user-traffic-day.service.ts

Lines changed: 56 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,16 @@ import { EventEmitter2 } from '@nestjs/event-emitter';
44
import { Injectable, Logger } from '@nestjs/common';
55

66
import { UserEvent } from '@intergration-modules/telegram-bot/events/users/interfaces';
7-
import { EVENTS, RESET_PERIODS, USERS_STATUS } from '@libs/contracts/constants';
7+
import { EVENTS, RESET_PERIODS } from '@libs/contracts/constants';
88
import { formatExecutionTime, getTime } from '@common/utils/get-elapsed-time';
99
import { ICommandResponse } from '@common/types/command-response.type';
1010
import { JOBS_INTERVALS } from 'src/jobs/intervals';
1111
import { AddUserToNodeEvent } from '@modules/nodes/events/add-user-to-node';
12-
import { UserTrafficHistoryEntity } from '@modules/user-traffic-history/entities/user-traffic-history.entity';
1312
import { UserWithActiveInboundsEntity } from '@modules/users/entities/user-with-active-inbounds.entity';
14-
import { UpdateStatusAndTrafficAndResetAtCommand } from '@modules/users/commands/update-status-and-traffic-and-reset-at';
15-
import { CreateUserTrafficHistoryCommand } from '@modules/user-traffic-history/commands/create-user-traffic-history';
16-
import { GetUsersByTrafficStrategyAndStatusQuery } from '@modules/users/queries/get-users-by-traffic-strategy-and-status';
1713
import { BatchResetUserTrafficCommand } from '@modules/users/commands/batch-reset-user-traffic';
14+
import { BatchResetLimitedUsersTrafficCommand } from '@modules/users/commands/batch-reset-limited-users-traffic';
15+
import { StartAllNodesEvent } from '@modules/nodes/events/start-all-nodes';
16+
import { GetUserByUuidQuery } from '@modules/users/queries/get-user-by-uuid';
1817

1918
@Injectable()
2019
export class ResetUserTrafficCalendarDayService {
@@ -67,42 +66,53 @@ export class ResetUserTrafficCalendarDayService {
6766
);
6867
}
6968

70-
const usersResponse = await this.getAllUsers({
69+
const updatedUsersUuids = await this.batchResetLimitedUsersTraffic({
7170
strategy: RESET_PERIODS.DAY,
72-
status: USERS_STATUS.LIMITED,
7371
});
7472

75-
if (!usersResponse.isOk || !usersResponse.response) {
73+
if (!updatedUsersUuids.isOk || !updatedUsersUuids.response) {
7674
this.logger.debug('No users found');
7775
return;
7876
}
7977

80-
const users = usersResponse.response;
78+
const updatedUsers = updatedUsersUuids.response;
8179

82-
for (const user of users) {
83-
let status = undefined;
80+
if (updatedUsers.length === 0) {
81+
this.logger.debug('No expired users found');
82+
return;
83+
}
84+
85+
const users = updatedUsersUuids.response;
86+
87+
if (users.length >= 10_000) {
88+
this.logger.log(
89+
`Job ${ResetUserTrafficCalendarDayService.CRON_NAME} has found more than 10,000 users, skipping webhook/telegram events. Restarting all nodes.`,
90+
);
91+
92+
this.eventBus.publish(new StartAllNodesEvent());
93+
94+
return;
95+
}
96+
97+
this.logger.log(
98+
`Job ${ResetUserTrafficCalendarDayService.CRON_NAME} has found ${users.length} users.`,
99+
);
100+
101+
for (const userUuid of users) {
102+
const userResponse = await this.getUserByUuid(userUuid.uuid);
103+
if (!userResponse.isOk || !userResponse.response) {
104+
this.logger.debug('User not found');
105+
continue;
106+
}
107+
108+
const user = userResponse.response;
84109

85-
status = USERS_STATUS.ACTIVE;
86110
this.eventEmitter.emit(
87111
EVENTS.USER.ENABLED,
88112
new UserEvent(user, EVENTS.USER.ENABLED),
89113
);
90114

91115
this.eventBus.publish(new AddUserToNodeEvent(user));
92-
93-
await this.updateUserStatusAndTrafficAndResetAt({
94-
userUuid: user.uuid,
95-
lastResetAt: new Date(),
96-
status,
97-
});
98-
99-
await this.createUserUsageHistory({
100-
userTrafficHistory: new UserTrafficHistoryEntity({
101-
userUuid: user.uuid,
102-
resetAt: new Date(),
103-
usedBytes: BigInt(user.usedTrafficBytes),
104-
}),
105-
});
106116
}
107117

108118
this.logger.debug(`Reseted Daily Users Traffic. Time: ${formatExecutionTime(ct)}`);
@@ -114,24 +124,6 @@ export class ResetUserTrafficCalendarDayService {
114124
}
115125
}
116126

117-
private async getAllUsers(
118-
dto: GetUsersByTrafficStrategyAndStatusQuery,
119-
): Promise<ICommandResponse<UserWithActiveInboundsEntity[]>> {
120-
return this.queryBus.execute<
121-
GetUsersByTrafficStrategyAndStatusQuery,
122-
ICommandResponse<UserWithActiveInboundsEntity[]>
123-
>(new GetUsersByTrafficStrategyAndStatusQuery(dto.strategy, dto.status));
124-
}
125-
126-
private async updateUserStatusAndTrafficAndResetAt(
127-
dto: UpdateStatusAndTrafficAndResetAtCommand,
128-
): Promise<ICommandResponse<void>> {
129-
return this.commandBus.execute<
130-
UpdateStatusAndTrafficAndResetAtCommand,
131-
ICommandResponse<void>
132-
>(new UpdateStatusAndTrafficAndResetAtCommand(dto.userUuid, dto.lastResetAt, dto.status));
133-
}
134-
135127
private async batchResetUserTraffic(
136128
dto: BatchResetUserTrafficCommand,
137129
): Promise<ICommandResponse<{ affectedRows: number }>> {
@@ -143,11 +135,25 @@ export class ResetUserTrafficCalendarDayService {
143135
>(new BatchResetUserTrafficCommand(dto.strategy));
144136
}
145137

146-
private async createUserUsageHistory(
147-
dto: CreateUserTrafficHistoryCommand,
148-
): Promise<ICommandResponse<void>> {
149-
return this.commandBus.execute<CreateUserTrafficHistoryCommand, ICommandResponse<void>>(
150-
new CreateUserTrafficHistoryCommand(dto.userTrafficHistory),
151-
);
138+
private async getUserByUuid(
139+
uuid: string,
140+
): Promise<ICommandResponse<UserWithActiveInboundsEntity>> {
141+
return this.queryBus.execute<
142+
GetUserByUuidQuery,
143+
ICommandResponse<UserWithActiveInboundsEntity>
144+
>(new GetUserByUuidQuery(uuid));
145+
}
146+
147+
private async batchResetLimitedUsersTraffic(
148+
dto: BatchResetLimitedUsersTrafficCommand,
149+
): Promise<ICommandResponse<{ uuid: string }[]>> {
150+
return this.commandBus.execute<
151+
BatchResetLimitedUsersTrafficCommand,
152+
ICommandResponse<
153+
{
154+
uuid: string;
155+
}[]
156+
>
157+
>(new BatchResetLimitedUsersTrafficCommand(dto.strategy));
152158
}
153159
}

src/jobs/tasks/reset-user-traffic-jobs/reset-user-traffic-month/reset-user-traffic-month.service.ts

Lines changed: 55 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,16 @@ import { EventEmitter2 } from '@nestjs/event-emitter';
44
import { Injectable, Logger } from '@nestjs/common';
55

66
import { UserEvent } from '@intergration-modules/telegram-bot/events/users/interfaces';
7-
import { EVENTS, RESET_PERIODS, USERS_STATUS } from '@libs/contracts/constants';
7+
import { EVENTS, RESET_PERIODS } from '@libs/contracts/constants';
88
import { formatExecutionTime, getTime } from '@common/utils/get-elapsed-time';
99
import { ICommandResponse } from '@common/types/command-response.type';
1010
import { JOBS_INTERVALS } from 'src/jobs/intervals';
1111
import { AddUserToNodeEvent } from '@modules/nodes/events/add-user-to-node';
12-
import { UserTrafficHistoryEntity } from '@modules/user-traffic-history/entities/user-traffic-history.entity';
1312
import { UserWithActiveInboundsEntity } from '@modules/users/entities/user-with-active-inbounds.entity';
14-
import { UpdateStatusAndTrafficAndResetAtCommand } from '@modules/users/commands/update-status-and-traffic-and-reset-at';
15-
import { CreateUserTrafficHistoryCommand } from '@modules/user-traffic-history/commands/create-user-traffic-history';
16-
import { GetUsersByTrafficStrategyAndStatusQuery } from '@modules/users/queries/get-users-by-traffic-strategy-and-status';
1713
import { BatchResetUserTrafficCommand } from '@modules/users/commands/batch-reset-user-traffic';
14+
import { BatchResetLimitedUsersTrafficCommand } from '@modules/users/commands/batch-reset-limited-users-traffic';
15+
import { StartAllNodesEvent } from '@modules/nodes/events/start-all-nodes';
16+
import { GetUserByUuidQuery } from '@modules/users/queries/get-user-by-uuid';
1817

1918
@Injectable()
2019
export class ResetUserTrafficCalendarMonthService {
@@ -67,44 +66,53 @@ export class ResetUserTrafficCalendarMonthService {
6766
);
6867
}
6968

70-
const usersResponse = await this.getAllUsers({
69+
const updatedUsersUuids = await this.batchResetLimitedUsersTraffic({
7170
strategy: RESET_PERIODS.MONTH,
72-
status: USERS_STATUS.LIMITED,
7371
});
7472

75-
if (!usersResponse.isOk || !usersResponse.response) {
73+
if (!updatedUsersUuids.isOk || !updatedUsersUuids.response) {
7674
this.logger.debug('No users found');
7775
return;
7876
}
7977

80-
const users = usersResponse.response;
78+
const updatedUsers = updatedUsersUuids.response;
8179

82-
for (const user of users) {
83-
let status = undefined;
80+
if (updatedUsers.length === 0) {
81+
this.logger.debug('No expired users found');
82+
return;
83+
}
84+
85+
const users = updatedUsersUuids.response;
86+
87+
if (users.length >= 10_000) {
88+
this.logger.log(
89+
`Job ${ResetUserTrafficCalendarMonthService.CRON_NAME} has found more than 10,000 users, skipping webhook/telegram events. Restarting all nodes.`,
90+
);
91+
92+
this.eventBus.publish(new StartAllNodesEvent());
93+
94+
return;
95+
}
96+
97+
this.logger.log(
98+
`Job ${ResetUserTrafficCalendarMonthService.CRON_NAME} has found ${users.length} users.`,
99+
);
84100

85-
status = USERS_STATUS.ACTIVE;
86-
user.status = status;
101+
for (const userUuid of users) {
102+
const userResponse = await this.getUserByUuid(userUuid.uuid);
103+
if (!userResponse.isOk || !userResponse.response) {
104+
this.logger.debug('User not found');
105+
continue;
106+
}
107+
108+
const user = userResponse.response;
87109

88110
this.eventEmitter.emit(
89111
EVENTS.USER.ENABLED,
90112
new UserEvent(user, EVENTS.USER.ENABLED),
91113
);
92114

93115
this.eventBus.publish(new AddUserToNodeEvent(user));
94-
95-
await this.updateUserStatusAndTrafficAndResetAt({
96-
userUuid: user.uuid,
97-
lastResetAt: new Date(),
98-
status,
99-
});
100-
101-
await this.createUserUsageHistory({
102-
userTrafficHistory: new UserTrafficHistoryEntity({
103-
userUuid: user.uuid,
104-
resetAt: new Date(),
105-
usedBytes: BigInt(user.usedTrafficBytes),
106-
}),
107-
});
108116
}
109117

110118
this.logger.debug(`Reseted Monthly Users Traffic. Time: ${formatExecutionTime(ct)}`);
@@ -116,24 +124,6 @@ export class ResetUserTrafficCalendarMonthService {
116124
}
117125
}
118126

119-
private async getAllUsers(
120-
dto: GetUsersByTrafficStrategyAndStatusQuery,
121-
): Promise<ICommandResponse<UserWithActiveInboundsEntity[]>> {
122-
return this.queryBus.execute<
123-
GetUsersByTrafficStrategyAndStatusQuery,
124-
ICommandResponse<UserWithActiveInboundsEntity[]>
125-
>(new GetUsersByTrafficStrategyAndStatusQuery(dto.strategy, dto.status));
126-
}
127-
128-
private async updateUserStatusAndTrafficAndResetAt(
129-
dto: UpdateStatusAndTrafficAndResetAtCommand,
130-
): Promise<ICommandResponse<void>> {
131-
return this.commandBus.execute<
132-
UpdateStatusAndTrafficAndResetAtCommand,
133-
ICommandResponse<void>
134-
>(new UpdateStatusAndTrafficAndResetAtCommand(dto.userUuid, dto.lastResetAt, dto.status));
135-
}
136-
137127
private async batchResetUserTraffic(
138128
dto: BatchResetUserTrafficCommand,
139129
): Promise<ICommandResponse<{ affectedRows: number }>> {
@@ -145,11 +135,25 @@ export class ResetUserTrafficCalendarMonthService {
145135
>(new BatchResetUserTrafficCommand(dto.strategy));
146136
}
147137

148-
private async createUserUsageHistory(
149-
dto: CreateUserTrafficHistoryCommand,
150-
): Promise<ICommandResponse<void>> {
151-
return this.commandBus.execute<CreateUserTrafficHistoryCommand, ICommandResponse<void>>(
152-
new CreateUserTrafficHistoryCommand(dto.userTrafficHistory),
153-
);
138+
private async getUserByUuid(
139+
uuid: string,
140+
): Promise<ICommandResponse<UserWithActiveInboundsEntity>> {
141+
return this.queryBus.execute<
142+
GetUserByUuidQuery,
143+
ICommandResponse<UserWithActiveInboundsEntity>
144+
>(new GetUserByUuidQuery(uuid));
145+
}
146+
147+
private async batchResetLimitedUsersTraffic(
148+
dto: BatchResetLimitedUsersTrafficCommand,
149+
): Promise<ICommandResponse<{ uuid: string }[]>> {
150+
return this.commandBus.execute<
151+
BatchResetLimitedUsersTrafficCommand,
152+
ICommandResponse<
153+
{
154+
uuid: string;
155+
}[]
156+
>
157+
>(new BatchResetLimitedUsersTrafficCommand(dto.strategy));
154158
}
155159
}

0 commit comments

Comments
 (0)