Skip to content

Commit 23829e7

Browse files
committed
feat: optimize user config streaming with keyset pagination
- Introducing tId for efficient cursor-based pagination - Adding strategic indexes on Users table for status and createdAt - Adding composite index on ConfigProfileInbounds for faster lookups - Simplifying join operations in getUsersForConfigStream
1 parent 6d4d1ea commit 23829e7

File tree

6 files changed

+144
-37
lines changed

6 files changed

+144
-37
lines changed
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
-- AlterTable
2+
ALTER TABLE "admin" ALTER COLUMN "created_at" SET DEFAULT now(),
3+
ALTER COLUMN "updated_at" SET DEFAULT now();
4+
5+
-- AlterTable
6+
ALTER TABLE "api_tokens" ALTER COLUMN "created_at" SET DEFAULT now(),
7+
ALTER COLUMN "updated_at" SET DEFAULT now();
8+
9+
-- AlterTable
10+
ALTER TABLE "config_profiles" ALTER COLUMN "created_at" SET DEFAULT now(),
11+
ALTER COLUMN "updated_at" SET DEFAULT now();
12+
13+
-- AlterTable
14+
ALTER TABLE "hwid_user_devices" ALTER COLUMN "created_at" SET DEFAULT now(),
15+
ALTER COLUMN "updated_at" SET DEFAULT now();
16+
17+
-- AlterTable
18+
ALTER TABLE "infra_billing_nodes" ALTER COLUMN "created_at" SET DEFAULT now(),
19+
ALTER COLUMN "updated_at" SET DEFAULT now();
20+
21+
-- AlterTable
22+
ALTER TABLE "infra_providers" ALTER COLUMN "created_at" SET DEFAULT now(),
23+
ALTER COLUMN "updated_at" SET DEFAULT now();
24+
25+
-- AlterTable
26+
ALTER TABLE "internal_squads" ALTER COLUMN "created_at" SET DEFAULT now(),
27+
ALTER COLUMN "updated_at" SET DEFAULT now();
28+
29+
-- AlterTable
30+
ALTER TABLE "keygen" ALTER COLUMN "created_at" SET DEFAULT now(),
31+
ALTER COLUMN "updated_at" SET DEFAULT now();
32+
33+
-- AlterTable
34+
ALTER TABLE "nodes" ALTER COLUMN "created_at" SET DEFAULT now(),
35+
ALTER COLUMN "updated_at" SET DEFAULT now();
36+
37+
-- AlterTable
38+
ALTER TABLE "nodes_traffic_usage_history" ALTER COLUMN "reset_at" SET DEFAULT now();
39+
40+
-- AlterTable
41+
ALTER TABLE "nodes_usage_history" ALTER COLUMN "created_at" SET DEFAULT date_trunc('hour', now()),
42+
ALTER COLUMN "updated_at" SET DEFAULT now();
43+
44+
-- AlterTable
45+
ALTER TABLE "nodes_user_usage_history" ALTER COLUMN "updated_at" SET DEFAULT now();
46+
47+
-- AlterTable
48+
ALTER TABLE "subscription_settings" ALTER COLUMN "created_at" SET DEFAULT now(),
49+
ALTER COLUMN "updated_at" SET DEFAULT now();
50+
51+
-- AlterTable
52+
ALTER TABLE "subscription_templates" ALTER COLUMN "created_at" SET DEFAULT now(),
53+
ALTER COLUMN "updated_at" SET DEFAULT now();
54+
55+
-- AlterTable
56+
ALTER TABLE "user_traffic_history" ALTER COLUMN "reset_at" SET DEFAULT now();
57+
58+
-- AlterTable
59+
ALTER TABLE "users" ALTER COLUMN "created_at" SET DEFAULT now(),
60+
ALTER COLUMN "updated_at" SET DEFAULT now();
61+
62+
-- CreateIndex
63+
CREATE INDEX "config_profile_inbounds_profile_uuid_uuid_idx" ON "config_profile_inbounds"("profile_uuid", "uuid");
64+
65+
-- CreateIndex
66+
CREATE INDEX "users_status_idx" ON "users" USING HASH ("status");
67+
68+
-- CreateIndex
69+
CREATE INDEX "users_created_at_idx" ON "users"("created_at" ASC);
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
Warnings:
3+
4+
- A unique constraint covering the columns `[t_id]` on the table `users` will be added. If there are existing duplicate values, this will fail.
5+
6+
*/
7+
8+
-- 1. Add t_id column
9+
ALTER TABLE "users" ADD COLUMN "t_id" BIGINT;
10+
11+
-- 2. Fill t_id in correct order
12+
WITH numbered AS (
13+
SELECT uuid, ROW_NUMBER() OVER (ORDER BY created_at ASC, uuid ASC) as row_num
14+
FROM users
15+
)
16+
UPDATE users
17+
SET t_id = numbered.row_num
18+
FROM numbered
19+
WHERE users.uuid = numbered.uuid;
20+
21+
-- 3. Create sequence and set current value
22+
CREATE SEQUENCE IF NOT EXISTS users_t_id_seq;
23+
SELECT setval('users_t_id_seq', COALESCE((SELECT MAX(t_id) FROM users), 0));
24+
25+
-- 4. Make field NOT NULL with default
26+
ALTER TABLE "users"
27+
ALTER COLUMN "t_id" SET NOT NULL,
28+
ALTER COLUMN "t_id" SET DEFAULT nextval('users_t_id_seq');
29+
30+
-- 5. Set sequence ownership
31+
ALTER SEQUENCE users_t_id_seq OWNED BY users.t_id;
32+
33+
-- 6. Create unique index
34+
CREATE UNIQUE INDEX "users_t_id_key" ON "users"("t_id");

prisma/schema.prisma

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ datasource db {
2525

2626
model Users {
2727
uuid String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid
28+
tId BigInt @unique @default(autoincrement()) @map("t_id") @ignore
2829
shortUuid String @unique @map("short_uuid")
2930
username String @unique @map("username")
3031
@@ -75,6 +76,8 @@ model Users {
7576
activeInternalSquads InternalSquadMembers[]
7677
7778
@@index([tag])
79+
@@index([status], type: Hash)
80+
@@index([createdAt(sort: Asc)])
7881
@@map("users")
7982
}
8083

@@ -386,6 +389,7 @@ model ConfigProfileInbounds {
386389
internalSquadInbounds InternalSquadInbounds[]
387390
388391
@@unique([tag])
392+
@@index([profileUuid, uuid])
389393
@@map("config_profile_inbounds")
390394
}
391395

src/modules/users/entities/users-for-config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@ export class UserForConfigEntity {
44
public vlessUuid: string;
55
public ssPassword: string;
66
public tags: string[];
7+
public tId: bigint;
78

89
constructor(data: UserForConfigEntity) {
910
this.username = data.username;
1011
this.trojanPassword = data.trojanPassword;
1112
this.vlessUuid = data.vlessUuid;
1213
this.ssPassword = data.ssPassword;
1314
this.tags = data.tags;
15+
this.tId = data.tId;
1416
}
1517
}

src/modules/users/queries/get-prepared-config-with-users/get-prepared-config-with-users.handler.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,7 @@ export class GetPreparedConfigWithUsersHandler
5858

5959
config.leaveInbounds(activeInboundsTags);
6060

61-
const usersStream = this.usersRepository.getUsersForConfigStream(
62-
configProfileUuid,
63-
activeInbounds,
64-
);
61+
const usersStream = this.usersRepository.getUsersForConfigStream(activeInbounds);
6562

6663
for await (const userBatch of usersStream) {
6764
config.includeUserBatch(userBatch, inboundsUserSets);

src/modules/users/repositories/users.repository.ts

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { DB } from 'prisma/generated/types';
88

99
import { TransactionalAdapterPrisma } from '@nestjs-cls/transactional-adapter-prisma';
1010
import { TransactionHost } from '@nestjs-cls/transactional';
11-
import { Injectable } from '@nestjs/common';
11+
import { Injectable, Logger } from '@nestjs/common';
1212

1313
import { TxKyselyService } from '@common/database/tx-kysely.service';
1414
import { getKyselyUuid } from '@common/helpers/kysely';
@@ -41,6 +41,8 @@ dayjs.extend(utc);
4141

4242
@Injectable()
4343
export class UsersRepository implements ICrud<BaseUserEntity> {
44+
private readonly logger = new Logger(UsersRepository.name);
45+
4446
constructor(
4547
private readonly prisma: TransactionHost<TransactionalAdapterPrisma>,
4648
private readonly qb: TxKyselyService,
@@ -664,47 +666,35 @@ export class UsersRepository implements ICrud<BaseUserEntity> {
664666
}
665667

666668
public async *getUsersForConfigStream(
667-
configProfileUuid: string,
668669
activeInbounds: ConfigProfileInboundEntity[],
669670
): AsyncGenerator<UserForConfigEntity[]> {
670-
// TODO: configure batch size
671671
const BATCH_SIZE = 100_000;
672-
let offset = 0;
672+
let lastTId: bigint | null = null;
673673
let hasMoreData = true;
674674

675675
while (hasMoreData) {
676676
const builder = this.qb.kysely
677-
.selectFrom('internalSquadMembers')
678-
.innerJoin('users', (join) =>
679-
join
680-
.onRef('internalSquadMembers.userUuid', '=', 'users.uuid')
681-
.on('users.status', '=', USERS_STATUS.ACTIVE),
682-
)
677+
.selectFrom('users')
678+
.where('users.status', '=', USERS_STATUS.ACTIVE)
679+
.innerJoin('internalSquadMembers', 'internalSquadMembers.userUuid', 'users.uuid')
683680
.innerJoin(
684681
'internalSquadInbounds',
685-
'internalSquadMembers.internalSquadUuid',
686682
'internalSquadInbounds.internalSquadUuid',
683+
'internalSquadMembers.internalSquadUuid',
687684
)
688-
.innerJoin('configProfileInbounds', (join) =>
689-
join
690-
.onRef(
691-
'internalSquadInbounds.inboundUuid',
692-
'=',
693-
'configProfileInbounds.uuid',
694-
)
695-
.on(
696-
'configProfileInbounds.profileUuid',
697-
'=',
698-
getKyselyUuid(configProfileUuid),
699-
)
700-
.on(
701-
'configProfileInbounds.uuid',
702-
'in',
703-
activeInbounds.map((inbound) => getKyselyUuid(inbound.uuid)),
704-
),
685+
.innerJoin(
686+
'configProfileInbounds',
687+
'configProfileInbounds.uuid',
688+
'internalSquadInbounds.inboundUuid',
689+
)
690+
.$if(lastTId !== null, (qb) => qb.where(sql.ref('users.t_id'), '>', lastTId!))
691+
.where(
692+
'internalSquadInbounds.inboundUuid',
693+
'in',
694+
activeInbounds.map((inbound) => getKyselyUuid(inbound.uuid)),
705695
)
706-
707696
.select((eb) => [
697+
sql.ref<bigint>('users.t_id').as('tId'),
708698
'users.username',
709699
'users.trojanPassword',
710700
'users.vlessUuid',
@@ -715,18 +705,29 @@ export class UsersRepository implements ICrud<BaseUserEntity> {
715705
'tags',
716706
),
717707
])
718-
.groupBy('users.uuid')
719-
.orderBy('users.createdAt', 'asc');
708+
.groupBy([
709+
sql.ref<bigint>('users.t_id'),
710+
'users.username',
711+
'users.trojanPassword',
712+
'users.vlessUuid',
713+
'users.ssPassword',
714+
])
715+
.orderBy(sql<string>`users.t_id asc`)
716+
.limit(BATCH_SIZE);
720717

721-
const result = await builder.limit(BATCH_SIZE).offset(offset).execute();
718+
const start = performance.now();
719+
const result = await builder.execute();
720+
this.logger.log(
721+
`[getUsersForConfigStream] ${performance.now() - start}ms, length: ${result.length}`,
722+
);
722723

723724
if (result.length < BATCH_SIZE) {
724725
hasMoreData = false;
725726
}
726727

727728
if (result.length > 0) {
729+
lastTId = result[result.length - 1].tId;
728730
yield result;
729-
offset += result.length;
730731
} else {
731732
break;
732733
}

0 commit comments

Comments
 (0)