-
Notifications
You must be signed in to change notification settings - Fork 0
/
concurrent.ts
152 lines (137 loc) · 4.46 KB
/
concurrent.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import type { IteratorResolve, Reject } from '@/types'
import { isAsyncIterable } from '@/utils'
export class Concurrent {
static of(length: number) {
return new Concurrent(length)
}
/** number of concurrency */
length: number
constructor(length: number) {
this.length = length
}
}
export function isConcurrent(concurrent: unknown): concurrent is Concurrent {
return concurrent instanceof Concurrent
}
/**
* Concurrent is used to balance the load of multiple asynchronous requests. It controls the number of concurrency for `iterable`.
*
* @example
* ```typescript
* await pipe(
* [1, 2, 3, 4, 5, 6],
* toAsync,
* map((a) => delay(1000, a)),
* concurrent(3),
* each(console.log), // log 1, 2, 3, 4, 5, 6
* ); // takes 2 seconds
* // Task 1, 2, 3 start together, and task 4, 5, 6 wait for all the three task finished, then start together.
*
* await pipe(
* [1, 2, 3, 4, 5, 6],
* toAsync,
* map((a) => delay(1000, a)),
* each(console.log), // log 1, 2, 3, 4, 5, 6
* ); // takes 6 seconds
* // Tasks start sequentially, each task starts after the previous one finished.
* ```
*
* {@link https://github.com/niuiic/fx-flow/blob/main/packages/test/src/pipe/lazy/concurrent.spec.ts | More examples}
*/
function concurrent<A>(length: number, iterable: AsyncIterable<A>): AsyncIterableIterator<A>
function concurrent<A>(length: number): (iterable: AsyncIterable<A>) => AsyncIterableIterator<A>
function concurrent<A>(
length: number,
iterable?: AsyncIterable<A>
): AsyncIterableIterator<A> | ((iterable: AsyncIterable<A>) => AsyncIterableIterator<A>) {
if (iterable === undefined) {
return (iterable: AsyncIterable<A>) => concurrent(length, iterable)
}
if (!Number.isFinite(length) || length <= 0) {
throw new RangeError('"length" must be positive integer')
}
if (!isAsyncIterable(iterable)) {
throw new TypeError('Type of "iterable" must be AsyncIterable')
}
const iterator = iterable[Symbol.asyncIterator]()
const taskResults: PromiseSettledResult<IteratorResult<A>>[] = []
let prevGroup = Promise.resolve()
// count of calling `next` method of the iterator
let nextCallCount = 0
// count of fulfilled(exclude rejected) tasks
let resolvedItemCount = 0
// all tasks are fulfilled or some task is rejected
let iterFinished = false
// whether previous group of tasks finished
let groupTasksFinished = true
const settlementQueue: [IteratorResolve<A>, Reject][] = []
const resolveTaskResults = () => {
// nextCallCount > resolvedItemCount => settlementQueue.length > 0
while (taskResults.length > 0 && nextCallCount > resolvedItemCount) {
const taskResult = taskResults.shift()!
const [resolve, reject] = settlementQueue.shift()!
if (taskResult.status === 'fulfilled') {
resolvedItemCount++
resolve(taskResult.value)
if (taskResult.value.done) {
iterFinished = true
}
} else {
reject(taskResult.reason)
iterFinished = true
break
}
}
}
const performTasks = () => {
if (groupTasksFinished) {
const nextGroupTasks = Promise.allSettled(
// pass `Concurrent` instance => tell previous step to control concurrency
Array.from({ length }, () => iterator.next(Concurrent.of(length) as any))
)
groupTasksFinished = false
prevGroup = prevGroup
.then(() => nextGroupTasks)
.then((results) => {
taskResults.push(...results)
groupTasksFinished = true
recur()
})
} else {
prevGroup = prevGroup.then(() => {
if (!iterFinished && nextCallCount > resolvedItemCount) {
performTasks()
}
})
}
}
const recur = () => {
// `nextCallCount` represents the number of times `next` is called for subsequent steps
// normally `resolvedItemCount` should be less than `nextCallCount`
// they may be the same if the subsequent step exits the iteration
if (iterFinished || nextCallCount === resolvedItemCount) {
return
}
if (taskResults.length > 0) {
resolveTaskResults()
} else {
performTasks()
}
}
return {
[Symbol.asyncIterator]() {
return this
},
next() {
nextCallCount++
if (iterFinished) {
return { done: true, value: undefined }
}
return new Promise((resolve, reject) => {
settlementQueue.push([resolve, reject])
recur()
})
}
} as AsyncIterableIterator<A>
}
export { concurrent }