Skip to content

Commit 0f68612

Browse files
committed
refactor: streamline subscription service and user subscription request history queue
- Replaced direct calls to update subscription last opened and user agent with a queue-based approach. - Consolidated HWID device upsert logic into the user subscription request history queue. - Introduced new job names for updating user subscriptions and checking/upserting HWID devices. - Enhanced job handling in the user subscription request history processor and service.
1 parent 51b3843 commit 0f68612

File tree

7 files changed

+179
-49
lines changed

7 files changed

+179
-49
lines changed

src/modules/subscription/subscription.service.ts

Lines changed: 23 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ import {
4646
SubscriptionRawResponse,
4747
SubscriptionWithConfigResponse,
4848
} from './models';
49-
import { UpdateSubLastOpenedAndUserAgentCommand } from '../users/commands/update-sub-last-opened-and-user-agent';
5049
import { getSubscriptionRefillDate, getSubscriptionUserInfo } from './utils/get-user-info.headers';
5150
import { HostWithRawInbound } from '../hosts/entities/host-with-inbound-tag.entity';
5251
import { GetHostsForUserQuery } from '../hosts/queries/get-hosts-for-user';
@@ -392,7 +391,7 @@ export class SubscriptionService {
392391
return new SubscriptionNotFoundResponse();
393392
}
394393

395-
await this.updateSubLastOpenedAndUserAgent({
394+
await this.userSubscriptionRequestHistoryQueue.updateUserSub({
396395
userUuid: user.response.uuid,
397396
subLastOpenedAt: new Date(),
398397
subLastUserAgent: userAgent,
@@ -780,21 +779,6 @@ export class SubscriptionService {
780779
);
781780
}
782781

783-
private async updateSubLastOpenedAndUserAgent(
784-
dto: UpdateSubLastOpenedAndUserAgentCommand,
785-
): Promise<ICommandResponse<void>> {
786-
return this.commandBus.execute<
787-
UpdateSubLastOpenedAndUserAgentCommand,
788-
ICommandResponse<void>
789-
>(
790-
new UpdateSubLastOpenedAndUserAgentCommand(
791-
dto.userUuid,
792-
dto.subLastOpenedAt,
793-
dto.subLastUserAgent,
794-
),
795-
);
796-
}
797-
798782
private async countHwidUserDevices(
799783
dto: CountUsersDevicesQuery,
800784
): Promise<ICommandResponse<number>> {
@@ -832,15 +816,13 @@ export class SubscriptionService {
832816
try {
833817
if (user.hwidDeviceLimit === 0) {
834818
if (hwidHeaders !== null) {
835-
await this.upsertHwidUserDevice({
836-
hwidUserDevice: new HwidUserDeviceEntity({
837-
hwid: hwidHeaders.hwid,
838-
userUuid: user.uuid,
839-
platform: hwidHeaders.platform,
840-
osVersion: hwidHeaders.osVersion,
841-
deviceModel: hwidHeaders.deviceModel,
842-
userAgent: hwidHeaders.userAgent,
843-
}),
819+
await this.userSubscriptionRequestHistoryQueue.checkAndUpsertHwidDevice({
820+
hwid: hwidHeaders.hwid,
821+
userUuid: user.uuid,
822+
platform: hwidHeaders.platform,
823+
osVersion: hwidHeaders.osVersion,
824+
deviceModel: hwidHeaders.deviceModel,
825+
userAgent: hwidHeaders.userAgent,
844826
});
845827
}
846828

@@ -858,15 +840,13 @@ export class SubscriptionService {
858840

859841
if (isDeviceExists.isOk && isDeviceExists.response) {
860842
if (isDeviceExists.response.exists) {
861-
await this.upsertHwidUserDevice({
862-
hwidUserDevice: new HwidUserDeviceEntity({
863-
hwid: hwidHeaders.hwid,
864-
userUuid: user.uuid,
865-
platform: hwidHeaders.platform,
866-
osVersion: hwidHeaders.osVersion,
867-
deviceModel: hwidHeaders.deviceModel,
868-
userAgent: hwidHeaders.userAgent,
869-
}),
843+
await this.userSubscriptionRequestHistoryQueue.checkAndUpsertHwidDevice({
844+
hwid: hwidHeaders.hwid,
845+
userUuid: user.uuid,
846+
platform: hwidHeaders.platform,
847+
osVersion: hwidHeaders.osVersion,
848+
deviceModel: hwidHeaders.deviceModel,
849+
userAgent: hwidHeaders.userAgent,
870850
});
871851

872852
return { isOk: true, response: { isSubscriptionAllowed: true } };
@@ -922,15 +902,13 @@ export class SubscriptionService {
922902
return;
923903
}
924904

925-
await this.upsertHwidUserDevice({
926-
hwidUserDevice: new HwidUserDeviceEntity({
927-
hwid: hwidHeaders.hwid,
928-
userUuid: user.uuid,
929-
platform: hwidHeaders.platform,
930-
osVersion: hwidHeaders.osVersion,
931-
deviceModel: hwidHeaders.deviceModel,
932-
userAgent: hwidHeaders.userAgent,
933-
}),
905+
await this.userSubscriptionRequestHistoryQueue.checkAndUpsertHwidDevice({
906+
hwid: hwidHeaders.hwid,
907+
userUuid: user.uuid,
908+
platform: hwidHeaders.platform,
909+
osVersion: hwidHeaders.osVersion,
910+
deviceModel: hwidHeaders.deviceModel,
911+
userAgent: hwidHeaders.userAgent,
934912
});
935913
} catch (error) {
936914
this.logger.error(`Error upserting hwid user device: ${error}`);
@@ -957,7 +935,7 @@ export class SubscriptionService {
957935
requestIp?: string,
958936
): Promise<void> {
959937
try {
960-
await this.updateSubLastOpenedAndUserAgent({
938+
await this.userSubscriptionRequestHistoryQueue.updateUserSub({
961939
userUuid,
962940
subLastOpenedAt: new Date(),
963941
subLastUserAgent: userAgent,
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
export enum UserSubscriptionRequestHistoryJobNames {
22
addRecord = 'addRecord',
3+
checkAndUpserHwidUserDevice = 'checkAndUpserHwidUserDevice',
4+
updateUserSub = 'updateUserSub',
35
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
export interface ICheckAndUpsertHwidDevicePayload {
2+
hwid: string;
3+
userUuid: string;
4+
platform: string | undefined;
5+
osVersion: string | undefined;
6+
deviceModel: string | undefined;
7+
userAgent: string | undefined;
8+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
export * from './add-user-subscription-request-history.interface';
2+
export * from './check-and-upsert-hwid-device.interface';
3+
export * from './update-user-sub.interface';
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export interface IUpdateUserSubPayload {
2+
userUuid: string;
3+
subLastOpenedAt: Date;
4+
subLastUserAgent: string;
5+
}

src/queue/user-subscription-request-history/user-subscription-request-history.processor.ts

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,20 @@ import { Processor, WorkerHost } from '@nestjs/bullmq';
44
import { CommandBus } from '@nestjs/cqrs';
55
import { Logger } from '@nestjs/common';
66

7+
import { ICommandResponse } from '@common/types/command-response.type';
8+
79
import { CountAndDeleteSubscriptionRequestHistoryCommand } from '@modules/user-subscription-request-history/commands/count-and-delete-subscription-request-history';
810
import { CreateSubscriptionRequestHistoryCommand } from '@modules/user-subscription-request-history/commands/create-subscription-request-history';
11+
import { UpdateSubLastOpenedAndUserAgentCommand } from '@modules/users/commands/update-sub-last-opened-and-user-agent';
12+
import { UpsertHwidUserDeviceCommand } from '@modules/hwid-user-devices/commands/upsert-hwid-user-device';
13+
import { HwidUserDeviceEntity } from '@modules/hwid-user-devices/entities/hwid-user-device.entity';
914
import { UserSubscriptionRequestHistoryEntity } from '@modules/user-subscription-request-history';
1015

11-
import { IAddUserSubscriptionRequestHistoryPayload } from './interfaces';
16+
import {
17+
IAddUserSubscriptionRequestHistoryPayload,
18+
ICheckAndUpsertHwidDevicePayload,
19+
IUpdateUserSubPayload,
20+
} from './interfaces';
1221
import { UserSubscriptionRequestHistoryJobNames } from './enums';
1322
import { QueueNames } from '../queue.enum';
1423

@@ -22,10 +31,14 @@ export class UserSubscriptionRequestHistoryQueueProcessor extends WorkerHost {
2231
super();
2332
}
2433

25-
async process(job: Job<IAddUserSubscriptionRequestHistoryPayload>) {
34+
async process(job: Job<any>) {
2635
switch (job.name) {
2736
case UserSubscriptionRequestHistoryJobNames.addRecord:
2837
return await this.handleAddRecordJob(job);
38+
case UserSubscriptionRequestHistoryJobNames.updateUserSub:
39+
return await this.handleUpdateUserSubJob(job);
40+
case UserSubscriptionRequestHistoryJobNames.checkAndUpserHwidUserDevice:
41+
return await this.handleCheckAndUpsertHwidDeviceJob(job);
2942
default:
3043
this.logger.warn(`Job "${job.name}" is not handled.`);
3144
break;
@@ -62,4 +75,77 @@ export class UserSubscriptionRequestHistoryQueueProcessor extends WorkerHost {
6275
};
6376
}
6477
}
78+
79+
private async handleUpdateUserSubJob(job: Job<IUpdateUserSubPayload>) {
80+
try {
81+
const { userUuid, subLastOpenedAt, subLastUserAgent } = job.data;
82+
83+
await this.updateSubLastOpenedAndUserAgent({
84+
userUuid,
85+
subLastOpenedAt: new Date(subLastOpenedAt),
86+
subLastUserAgent,
87+
});
88+
89+
return {
90+
isOk: true,
91+
};
92+
} catch (error) {
93+
this.logger.error(`Error updating user sub: ${error}`);
94+
95+
return {
96+
isOk: false,
97+
};
98+
}
99+
}
100+
101+
private async handleCheckAndUpsertHwidDeviceJob(job: Job<ICheckAndUpsertHwidDevicePayload>) {
102+
try {
103+
const { hwid, userUuid, platform, osVersion, deviceModel, userAgent } = job.data;
104+
105+
await this.upsertHwidUserDevice({
106+
hwidUserDevice: new HwidUserDeviceEntity({
107+
hwid,
108+
userUuid,
109+
platform,
110+
osVersion,
111+
deviceModel,
112+
userAgent,
113+
}),
114+
});
115+
116+
return {
117+
isOk: true,
118+
};
119+
} catch (error) {
120+
this.logger.error(`Error checking and upserting hwid device: ${error}`);
121+
122+
return {
123+
isOk: false,
124+
};
125+
}
126+
}
127+
128+
private async updateSubLastOpenedAndUserAgent(
129+
dto: UpdateSubLastOpenedAndUserAgentCommand,
130+
): Promise<ICommandResponse<void>> {
131+
return this.commandBus.execute<
132+
UpdateSubLastOpenedAndUserAgentCommand,
133+
ICommandResponse<void>
134+
>(
135+
new UpdateSubLastOpenedAndUserAgentCommand(
136+
dto.userUuid,
137+
dto.subLastOpenedAt,
138+
dto.subLastUserAgent,
139+
),
140+
);
141+
}
142+
143+
private async upsertHwidUserDevice(
144+
dto: UpsertHwidUserDeviceCommand,
145+
): Promise<ICommandResponse<HwidUserDeviceEntity>> {
146+
return this.commandBus.execute<
147+
UpsertHwidUserDeviceCommand,
148+
ICommandResponse<HwidUserDeviceEntity>
149+
>(new UpsertHwidUserDeviceCommand(dto.hwidUserDevice));
150+
}
65151
}

src/queue/user-subscription-request-history/user-subscription-request-history.service.ts

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ import _ from 'lodash';
44
import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common';
55
import { InjectQueue } from '@nestjs/bullmq';
66

7-
import { IAddUserSubscriptionRequestHistoryPayload } from './interfaces';
7+
import {
8+
IAddUserSubscriptionRequestHistoryPayload,
9+
ICheckAndUpsertHwidDevicePayload,
10+
IUpdateUserSubPayload,
11+
} from './interfaces';
812
import { UserSubscriptionRequestHistoryJobNames } from './enums';
913
import { AbstractQueueService } from '../queue.service';
1014
import { QueueNames } from '../queue.enum';
@@ -37,6 +41,51 @@ export class UserSubscriptionRequestHistoryQueueService
3741
}
3842

3943
public async addRecord(payload: IAddUserSubscriptionRequestHistoryPayload) {
40-
return this.addJob(UserSubscriptionRequestHistoryJobNames.addRecord, payload);
44+
return this.addJob(UserSubscriptionRequestHistoryJobNames.addRecord, payload, {
45+
removeOnComplete: {
46+
age: 3_600,
47+
count: 500,
48+
},
49+
removeOnFail: {
50+
age: 24 * 3_600,
51+
},
52+
deduplication: {
53+
id: `${payload.userUuid}_AR`,
54+
},
55+
});
56+
}
57+
58+
public async updateUserSub(payload: IUpdateUserSubPayload) {
59+
return this.addJob(UserSubscriptionRequestHistoryJobNames.updateUserSub, payload, {
60+
removeOnComplete: {
61+
age: 3_600,
62+
count: 500,
63+
},
64+
removeOnFail: {
65+
age: 24 * 3_600,
66+
},
67+
deduplication: {
68+
id: `${payload.userUuid}_USS`,
69+
},
70+
});
71+
}
72+
73+
public async checkAndUpsertHwidDevice(payload: ICheckAndUpsertHwidDevicePayload) {
74+
return this.addJob(
75+
UserSubscriptionRequestHistoryJobNames.checkAndUpserHwidUserDevice,
76+
payload,
77+
{
78+
removeOnComplete: {
79+
age: 3_600,
80+
count: 100,
81+
},
82+
removeOnFail: {
83+
age: 24 * 3_600,
84+
},
85+
deduplication: {
86+
id: `${payload.userUuid}-${payload.hwid}_CAUHD`,
87+
},
88+
},
89+
);
4190
}
4291
}

0 commit comments

Comments
 (0)