generated from wechaty/puppet-mock
/
rate-manager.ts
118 lines (101 loc) · 3.46 KB
/
rate-manager.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import { EventEmitter } from 'events'
import { log } from '../config.js'
import { WA_ERROR_TYPE } from '../exception/error-type.js'
import WAError from '../exception/whatsapp-error.js'
import { sleep } from '../helper/miscellaneous.js'
interface FunctionObj {
func: () => any,
resolve: (data: any) => void,
reject: (e: any) => void,
delayBefore?: number,
delayAfter?: number,
uniqueKey?: string,
}
export interface RateOptions {
queueId?: string,
delayBefore?: number,
delayAfter?: number,
uniqueKey?: string,
}
type RateManagerEvents = 'error'
const MAX_QUEUE_SIZE = 5000
export class RateManager extends EventEmitter {
private counter = 0
public override emit(event: 'error', error: string): boolean
public override emit(event: never, ...args: never[]): never
public override emit (event: RateManagerEvents, ...args: any[]): boolean {
return super.emit(event, ...args)
}
public override on(event: 'error', listener: (error: string) => void): this
public override on(event: never, listener: never): never
public override on (event: RateManagerEvents, listener: (...args: any[]) => void): this {
super.on(event, listener)
return this
}
private functionQueueMap: { [id: string]: FunctionObj[] } = {}
private runningMap: { [id: string]: boolean } = {}
public getQueueLength (queueId: string) {
if (!this.functionQueueMap[queueId]) {
return 0
}
return this.functionQueueMap[queueId]!.length
}
public async exec<T> (func: () => T, options: RateOptions = {}) {
const queueId = options.queueId || 'default'
const { delayAfter, delayBefore, uniqueKey } = options
if (!this.functionQueueMap[queueId]) {
this.functionQueueMap[queueId] = []
}
if (this.functionQueueMap[queueId]!.length > MAX_QUEUE_SIZE) {
if (this.counter % MAX_QUEUE_SIZE === 0) {
log.error(`EXCEED_QUEUE_SIZE: Max queue size for id: ${queueId} reached: ${this.functionQueueMap[queueId]!.length} > ${MAX_QUEUE_SIZE}(max queue size). Drop these tasks.`)
this.counter = 0
}
this.counter++
}
return new Promise<T>((resolve, reject) => {
this.functionQueueMap[queueId]!.push({ delayAfter, delayBefore, func, reject, resolve, uniqueKey })
if (!this.runningMap[queueId]) {
this.runningMap[queueId] = true
void this.execNext(queueId)
}
})
}
private async execNext (queueId: string) {
const queue = this.functionQueueMap[queueId]
if (!queue) {
return
}
const funcObj = queue.shift()
if (!funcObj) {
throw WAError(WA_ERROR_TYPE.ERR_RATE_FUNCTION_NOT_FOUND, `can not get funcObj from queue with id: ${queueId}.`)
}
const { delayAfter, delayBefore, func, resolve, reject, uniqueKey } = funcObj
await sleep(delayBefore)
try {
const result = await func()
resolve(result)
/**
* If uniqueKey is given, will resolve functions with same key in the queue
*/
if (uniqueKey) {
const sameFuncIndexes = queue.map((f, index) => ({ func: f, index }))
.filter(o => o.func.uniqueKey === uniqueKey)
.map(o => o.index)
.sort((a, b) => b - a)
for (const index of sameFuncIndexes) {
const [sameFunc] = queue.splice(index, 1)
sameFunc!.resolve(result)
}
}
} catch (e) {
reject(e)
}
await sleep(delayAfter)
if (queue.length > 0) {
await this.execNext(queueId)
} else {
delete this.runningMap[queueId]
}
}
}