Skip to content

Commit 0cda179

Browse files
committed
refactor: update user usage handling and query structure
1 parent 0627275 commit 0cda179

File tree

7 files changed

+41
-35
lines changed

7 files changed

+41
-35
lines changed

src/modules/users/builders/bulk-update-user-used-traffic/bulk-update-user-used-traffic.builder.ts

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,26 +10,27 @@ export class BulkUpdateUserUsedTrafficBuilder {
1010

1111
public getQuery(userUsageList: { u: string; b: string; n: string }[]): Prisma.Sql {
1212
const query = `
13-
WITH updated_users AS ( UPDATE "users" AS u
14-
SET
15-
"used_traffic_bytes" = u."used_traffic_bytes" + data."inc_used",
16-
"lifetime_used_traffic_bytes" = u."lifetime_used_traffic_bytes" + data."inc_used",
17-
"online_at" = NOW(),
18-
"first_connected_at" = COALESCE(u."first_connected_at", NOW()),
19-
"updated_at" = NOW(),
20-
"last_connected_node_uuid" = data."last_connected_node_uuid"
21-
FROM (
22-
VALUES ${userUsageList.map((usageHistory) => `(${usageHistory.b}::bigint, '${usageHistory.u}'::uuid, '${usageHistory.n}'::uuid)`).join(',')}
23-
) AS data("inc_used", "uuid", "last_connected_node_uuid")
24-
WHERE data."uuid" = u."uuid"
25-
RETURNING
26-
u."uuid",
27-
(u."first_connected_at" = u."online_at") AS "isFirstConnection"
28-
)
29-
SELECT uuid
30-
FROM updated_users
31-
WHERE "isFirstConnection";
32-
`;
13+
WITH sorted_data AS (
14+
SELECT * FROM (
15+
VALUES ${userUsageList.map((h) => `(${h.b}, ${h.u}, '${h.n}'::uuid)`).join(',')}
16+
) AS data("inc_used", "t_id", "last_connected_node_uuid")
17+
ORDER BY "t_id"
18+
),
19+
updated_users AS (
20+
UPDATE "users" AS u
21+
SET
22+
"used_traffic_bytes" = u."used_traffic_bytes" + sorted_data."inc_used",
23+
"lifetime_used_traffic_bytes" = u."lifetime_used_traffic_bytes" + sorted_data."inc_used",
24+
"online_at" = NOW(),
25+
"first_connected_at" = COALESCE(u."first_connected_at", NOW()),
26+
"updated_at" = NOW(),
27+
"last_connected_node_uuid" = sorted_data."last_connected_node_uuid"
28+
FROM sorted_data
29+
WHERE sorted_data."t_id" = u."t_id"
30+
RETURNING u."uuid", (u."first_connected_at" = u."online_at") AS "isFirstConnection"
31+
)
32+
SELECT uuid FROM updated_users WHERE "isFirstConnection";
33+
`;
3334
return Prisma.raw(query);
3435
}
3536
}

src/modules/users/queries/get-uuid-by-username/get-uuid-by-username.handler.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,17 @@
11
import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
22
import { Logger } from '@nestjs/common';
33

4-
import { ICommandResponse } from '@common/types/command-response.type';
54
import { ERRORS } from '@libs/contracts/constants';
65

76
import { GetUuidByUsernameQuery } from './get-uuid-by-username.query';
87
import { UsersRepository } from '../../repositories/users.repository';
98

109
@QueryHandler(GetUuidByUsernameQuery)
11-
export class GetUuidByUsernameHandler
12-
implements IQueryHandler<GetUuidByUsernameQuery, ICommandResponse<string | null>>
13-
{
10+
export class GetUuidByUsernameHandler implements IQueryHandler<GetUuidByUsernameQuery> {
1411
private readonly logger = new Logger(GetUuidByUsernameHandler.name);
1512
constructor(private readonly usersRepository: UsersRepository) {}
1613

17-
async execute(query: GetUuidByUsernameQuery): Promise<ICommandResponse<string | null>> {
14+
async execute(query: GetUuidByUsernameQuery) {
1815
try {
1916
const user = await this.usersRepository.getUserUuidByUsername(query.username);
2017

src/modules/users/queries/get-uuid-by-username/get-uuid-by-username.query.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@ import { Query } from '@nestjs/cqrs';
22

33
import { ICommandResponse } from '@common/types/command-response.type';
44

5-
export class GetUuidByUsernameQuery extends Query<ICommandResponse<string | null>> {
5+
export class GetUuidByUsernameQuery extends Query<
6+
ICommandResponse<{
7+
uuid: string;
8+
tId: bigint;
9+
} | null>
10+
> {
611
constructor(public readonly username: string) {
712
super();
813
}

src/modules/users/repositories/users.repository.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,18 +1214,21 @@ export class UsersRepository implements ICrud<BaseUserEntity> {
12141214
).as('activeInternalSquads');
12151215
}
12161216

1217-
public async getUserUuidByUsername(username: string): Promise<string | null> {
1217+
public async getUserUuidByUsername(
1218+
username: string,
1219+
): Promise<{ uuid: string; tId: bigint } | null> {
12181220
const result = await this.qb.kysely
12191221
.selectFrom('users')
1220-
.select('uuid')
1222+
.select(['uuid'])
1223+
.select(sql.ref<bigint>('t_id').as('tId'))
12211224
.where('username', '=', username)
12221225
.executeTakeFirst();
12231226

12241227
if (!result) {
12251228
return null;
12261229
}
12271230

1228-
return result.uuid;
1231+
return { uuid: result.uuid, tId: result.tId };
12291232
}
12301233

12311234
public async findNotConnectedUsers(startDate: Date, endDate: Date): Promise<UserEntity[]> {

src/queue/record-user-usage/record-user-usage.processor.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import { RecordUserUsageJobNames } from './enums';
2424
import { QueueNames } from '../queue.enum';
2525

2626
@Processor(QueueNames.recordUserUsage, {
27-
concurrency: 100,
27+
concurrency: 50,
2828
})
2929
export class RecordUserUsageQueueProcessor extends WorkerHost {
3030
private readonly logger = new Logger(RecordUserUsageQueueProcessor.name);
@@ -109,20 +109,20 @@ export class RecordUserUsageQueueProcessor extends WorkerHost {
109109

110110
const totalBytes = xrayUser.downlink + xrayUser.uplink;
111111

112-
const userUuid = userResponse.response;
112+
const { uuid, tId } = userResponse.response;
113113

114114
allUsageRecords.push(
115115
new NodesUserUsageHistoryEntity({
116116
nodeUuid,
117-
userUuid,
117+
userUuid: uuid,
118118
totalBytes: BigInt(totalBytes),
119119
uploadBytes: BigInt(xrayUser.uplink),
120120
downloadBytes: BigInt(xrayUser.downlink),
121121
}),
122122
);
123123

124124
userUsageList.push({
125-
u: userUuid,
125+
u: tId.toString(),
126126
b: this.multiplyConsumption(consumptionMultiplier, totalBytes).toString(),
127127
n: nodeUuid,
128128
});

src/queue/update-users-usage/update-users-usage.processor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { UpdateUsersUsageJobNames } from './enums';
1414
import { QueueNames } from '../queue.enum';
1515

1616
@Processor(QueueNames.updateUsersUsage, {
17-
concurrency: 1,
17+
concurrency: 10,
1818
})
1919
export class UpdateUsersUsageQueueProcessor extends WorkerHost {
2020
private readonly logger = new Logger(UpdateUsersUsageQueueProcessor.name);

src/queue/update-users-usage/update-users-usage.service.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export class UpdateUsersUsageQueueService
3434
await this.checkConnection();
3535
}
3636

37-
public async updateUserUsage(payload: Array<{ u: string; b: string; n: string }>) {
37+
public async updateUserUsage(payload: { u: string; b: string; n: string }[]) {
3838
const chunks = this.chunks(payload, 1500);
3939
for await (const chunk of chunks) {
4040
await this.addJob(UpdateUsersUsageJobNames.UpdateUsersUsage, chunk, {

0 commit comments

Comments
 (0)