Skip to content

Commit 0dd800d

Browse files
committed
refactor: optimize expired users processing
1 parent 64434d0 commit 0dd800d

File tree

10 files changed

+171
-18
lines changed

10 files changed

+171
-18
lines changed

src/jobs/tasks/users-jobs/find-expired-users/find-expired-users.service.ts

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ import { Injectable, Logger } from '@nestjs/common';
66
import { UserEvent } from '@intergration-modules/telegram-bot/events/users/interfaces';
77
import { formatExecutionTime, getTime } from '@common/utils/get-elapsed-time';
88
import { ICommandResponse } from '@common/types/command-response.type';
9-
import { EVENTS, USERS_STATUS } from '@libs/contracts/constants';
9+
import { EVENTS } from '@libs/contracts/constants';
1010
import { UserWithActiveInboundsEntity } from '@modules/users/entities/user-with-active-inbounds.entity';
11-
import { ChangeUserStatusCommand } from '@modules/users/commands/change-user-status/change-user-status.command';
1211
import { RemoveUserFromNodeEvent } from '@modules/nodes/events/remove-user-from-node';
1312
import { JOBS_INTERVALS } from 'src/jobs/intervals';
14-
import { GetExpiredUsersQuery } from '@modules/users/queries/get-expired-users';
13+
import { UpdateExpiredUsersCommand } from '@modules/users/commands/update-expired-users';
14+
import { GetUserByUuidQuery } from '@modules/users/queries/get-user-by-uuid';
15+
import { StartAllNodesEvent } from '@modules/nodes/events/start-all-nodes';
1516

1617
@Injectable()
1718
export class FindExpiredUsersService {
@@ -50,21 +51,39 @@ export class FindExpiredUsersService {
5051
const ct = getTime();
5152
this.isJobRunning = true;
5253

53-
const usersResponse = await this.getAllExpiredUsers();
54+
const usersResponse = await this.updateExpiredUsers();
5455
if (!usersResponse.isOk || !usersResponse.response) {
5556
this.logger.error('No expired users found');
5657
return;
5758
}
5859

60+
const updatedUsers = usersResponse.response;
61+
62+
if (updatedUsers.length === 0) {
63+
this.logger.debug('No expired users found');
64+
return;
65+
}
66+
5967
const users = usersResponse.response;
6068

61-
for (const user of users) {
62-
await this.changeUserStatus({
63-
userUuid: user.uuid,
64-
status: USERS_STATUS.EXPIRED,
65-
});
69+
if (users.length >= 10_000) {
70+
this.logger.log(
71+
'More than 10,000 expired users found, skipping webhook/telegram events.',
72+
);
73+
74+
this.eventBus.publish(new StartAllNodesEvent());
75+
76+
return;
77+
}
78+
79+
for (const userUuid of users) {
80+
const userResponse = await this.getUserByUuid(userUuid.uuid);
81+
if (!userResponse.isOk || !userResponse.response) {
82+
this.logger.debug('User not found');
83+
continue;
84+
}
6685

67-
user.status = USERS_STATUS.EXPIRED;
86+
const user = userResponse.response;
6887

6988
this.eventEmitter.emit(
7089
EVENTS.USER.EXPIRED,
@@ -82,16 +101,19 @@ export class FindExpiredUsersService {
82101
}
83102
}
84103

85-
private async getAllExpiredUsers(): Promise<ICommandResponse<UserWithActiveInboundsEntity[]>> {
104+
private async getUserByUuid(
105+
uuid: string,
106+
): Promise<ICommandResponse<UserWithActiveInboundsEntity>> {
86107
return this.queryBus.execute<
87-
GetExpiredUsersQuery,
88-
ICommandResponse<UserWithActiveInboundsEntity[]>
89-
>(new GetExpiredUsersQuery());
108+
GetUserByUuidQuery,
109+
ICommandResponse<UserWithActiveInboundsEntity>
110+
>(new GetUserByUuidQuery(uuid));
90111
}
91112

92-
private async changeUserStatus(dto: ChangeUserStatusCommand): Promise<ICommandResponse<void>> {
93-
return this.commandBus.execute<ChangeUserStatusCommand, ICommandResponse<void>>(
94-
new ChangeUserStatusCommand(dto.userUuid, dto.status),
95-
);
113+
private async updateExpiredUsers(): Promise<ICommandResponse<{ uuid: string }[]>> {
114+
return this.commandBus.execute<
115+
UpdateExpiredUsersCommand,
116+
ICommandResponse<{ uuid: string }[]>
117+
>(new UpdateExpiredUsersCommand());
96118
}
97119
}

src/modules/users/commands/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ import { UpdateSubLastOpenedAndUserAgentHandler } from './update-sub-last-opened
33
import { IncrementUsedTrafficHandler } from './increment-used-traffic';
44
import { ChangeUserStatusHandler } from './change-user-status';
55
import { BatchResetUserTrafficHandler } from './batch-reset-user-traffic';
6+
import { UpdateExpiredUsersHandler } from './update-expired-users';
67

78
export const COMMANDS = [
89
IncrementUsedTrafficHandler,
910
ChangeUserStatusHandler,
1011
UpdateStatusAndTrafficAndResetAtHandler,
1112
UpdateSubLastOpenedAndUserAgentHandler,
1213
BatchResetUserTrafficHandler,
14+
UpdateExpiredUsersHandler,
1315
];
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export * from './update-expired-users.command';
2+
export * from './update-expired-users.handler';
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export class UpdateExpiredUsersCommand {
2+
constructor() {}
3+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
2+
import { Transactional } from '@nestjs-cls/transactional';
3+
import { Logger } from '@nestjs/common';
4+
5+
import { ICommandResponse } from '@common/types/command-response.type';
6+
import { ERRORS } from '@contract/constants';
7+
8+
import { UpdateExpiredUsersCommand } from './update-expired-users.command';
9+
import { UsersRepository } from '../../repositories/users.repository';
10+
11+
@CommandHandler(UpdateExpiredUsersCommand)
12+
export class UpdateExpiredUsersHandler
13+
implements ICommandHandler<UpdateExpiredUsersCommand, ICommandResponse<{ uuid: string }[]>>
14+
{
15+
public readonly logger = new Logger(UpdateExpiredUsersHandler.name);
16+
17+
constructor(private readonly usersRepository: UsersRepository) {}
18+
19+
@Transactional()
20+
async execute(): Promise<ICommandResponse<{ uuid: string }[]>> {
21+
try {
22+
const result = await this.usersRepository.updateExpiredUsers();
23+
24+
return {
25+
isOk: true,
26+
response: result,
27+
};
28+
} catch (error: unknown) {
29+
this.logger.error(error);
30+
return {
31+
isOk: false,
32+
...ERRORS.UPDATE_USER_ERROR,
33+
};
34+
}
35+
}
36+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
2+
import { Logger } from '@nestjs/common';
3+
4+
import { ICommandResponse } from '@common/types/command-response.type';
5+
import { ERRORS } from '@libs/contracts/constants';
6+
7+
import { UserWithActiveInboundsEntity } from '../../entities/user-with-active-inbounds.entity';
8+
import { GetUserByUuidQuery } from './get-user-by-uuid.query';
9+
import { UsersRepository } from '../../repositories/users.repository';
10+
11+
@QueryHandler(GetUserByUuidQuery)
12+
export class GetUserByUuidHandler
13+
implements IQueryHandler<GetUserByUuidQuery, ICommandResponse<UserWithActiveInboundsEntity>>
14+
{
15+
private readonly logger = new Logger(GetUserByUuidHandler.name);
16+
constructor(private readonly usersRepository: UsersRepository) {}
17+
18+
async execute(
19+
query: GetUserByUuidQuery,
20+
): Promise<ICommandResponse<UserWithActiveInboundsEntity>> {
21+
try {
22+
const user = await this.usersRepository.findUserByUuid(query.uuid);
23+
24+
if (!user) {
25+
return {
26+
isOk: false,
27+
...ERRORS.USER_NOT_FOUND,
28+
};
29+
}
30+
31+
return {
32+
isOk: true,
33+
response: user,
34+
};
35+
} catch (error) {
36+
this.logger.error(error);
37+
return {
38+
isOk: false,
39+
...ERRORS.INTERNAL_SERVER_ERROR,
40+
};
41+
}
42+
}
43+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export class GetUserByUuidQuery {
2+
constructor(public readonly uuid: string) {}
3+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export * from './get-user-by-uuid.handler';
2+
export * from './get-user-by-uuid.query';

src/modules/users/queries/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { GetAllUsersHandler } from './get-all-users';
77
import { GetExceededTrafficUsageUsersHandler } from './get-exceeded-traffic-usage-users';
88
import { GetExpiredUsersHandler } from './get-expired-users';
99
import { GetUsersByTrafficStrategyAndStatusHandler } from './get-users-by-traffic-strategy-and-status';
10+
import { GetUserByUuidHandler } from './get-user-by-uuid';
1011

1112
export const QUERIES = [
1213
GetUsersForConfigHandler,
@@ -18,4 +19,5 @@ export const QUERIES = [
1819
GetExceededTrafficUsageUsersHandler,
1920
GetExpiredUsersHandler,
2021
GetUsersByTrafficStrategyAndStatusHandler,
22+
GetUserByUuidHandler,
2123
];

src/modules/users/repositories/users.repository.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,19 @@ export class UsersRepository implements ICrud<UserEntity> {
9898
return new UserWithActiveInboundsEntity(result);
9999
}
100100

101+
public async findUserByUuid(uuid: string): Promise<null | UserWithActiveInboundsEntity> {
102+
const result = await this.prisma.tx.users.findUnique({
103+
where: { uuid },
104+
include: USER_INCLUDE_INBOUNDS,
105+
});
106+
107+
if (!result) {
108+
return null;
109+
}
110+
111+
return new UserWithActiveInboundsEntity(result);
112+
}
113+
101114
public async getUserWithActiveInbounds(
102115
uuid: string,
103116
): Promise<null | UserWithActiveInboundsEntity> {
@@ -166,6 +179,31 @@ export class UsersRepository implements ICrud<UserEntity> {
166179
return result.map((value) => new UserWithActiveInboundsEntity(value));
167180
}
168181

182+
public async updateExpiredUsers(): Promise<{ uuid: string }[]> {
183+
const result = await this.prisma.tx.users.updateManyAndReturn({
184+
select: {
185+
uuid: true,
186+
},
187+
where: {
188+
AND: [
189+
{
190+
status: USERS_STATUS.ACTIVE,
191+
},
192+
{
193+
expireAt: {
194+
lt: new Date(),
195+
},
196+
},
197+
],
198+
},
199+
data: {
200+
status: USERS_STATUS.EXPIRED,
201+
},
202+
});
203+
204+
return result;
205+
}
206+
169207
public async getAllUsersByTrafficStrategyAndStatus(
170208
strategy: TResetPeriods,
171209
status: TUsersStatus,

0 commit comments

Comments
 (0)