-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
302 lines (274 loc) · 7.72 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
// @flow
const Queue = require('double-ended-queue');
/**
* flow type: TaskRet = any
*/
type TaskRet = any;
/**
* flow type: Task = (depRets?:{ [taskKey:string]: any }) => Promise<TaskRet>;
*/
type Task = (depRets?:{ [taskKey:string]: any }) => Promise<TaskRet>;
type TaskRec = {
key: string,
task: Task,
result?: Promise<TaskRet>,
deps?: Object,
nexts?: Object,
};
const DUMMY_TASK = () => Promise.resolve();
type TraverseVisitor = (node:TaskRec) => bool;
// TraverseVisitor return true to stop
function traverse(root:TaskRec, callback:TraverseVisitor): bool {
if (!root) {
return true;
}
if (callback(root)) {
return false;
}
if (!root.nexts) {
return true;
}
const nextKeys = Object.keys(root.nexts);
for (let ii = 0; ii < nextKeys.length; ii++) {
const key = nextKeys[ii];
const next = root.nexts[key];
if (!traverse(next, callback)) {
return false;
}
}
return true;
}
function insert(root:TaskRec, rec:TaskRec, depKeys?:Array<string>) {
if (!root || !rec) {
throw new Error('wtf 2');
}
// find dummy self, merge into self, use self afterwards, if exists
// no dep just install and return
// has dep:
// find dep
// found: dep on it
// no: dep on root
// make dummy node dep on root
// dep on that dummy node
// if depending on root, rm that
let recRef = rec;
let useExisting = false;
if (root.nexts) {
root.nexts && Object.keys(root.nexts).findIndex((kk) => {
const node = root.nexts && root.nexts[kk]; // to make flow happy
if (node && node.key === rec.key) {
if (node !== rec) {
Object.assign(node, rec);
}
recRef = node;
useExisting = true;
return true;
}
return false;
});
}
if (!depKeys || !depKeys.length) {
if (!useExisting) {
addDep(root, recRef);
}
return;
}
let depi = 0;
do {
const key = depKeys[depi];
if (!key) {
throw new Error('invalid args');
}
let dep: TaskRec | null = null;
traverse(root, (node) => {
if (node.key === key) {
dep = node;
return true;
}
return false;
});
if (dep) {
addDep(dep, recRef);
} else {
const dummyDep = { key, task: DUMMY_TASK };
addDep(root, dummyDep);
addDep(dummyDep, recRef);
}
depi += 1;
} while(depi < depKeys.length);
if (root.nexts && root.nexts[recRef.key]) {
delete root.nexts[recRef.key];
}
if (recRef.deps && recRef.deps[root.key]) {
delete recRef.deps[root.key];
}
}
function addDep(dependant:TaskRec, depender:TaskRec) {
depender.deps = { ...depender.deps || {}, [dependant.key]:dependant };
dependant.nexts = { ...dependant.nexts || {}, [depender.key]: depender };
}
function traverseDepGraph(root:TaskRec, visitor:(node:TaskRec) => void) {
if (!root) return;
const q = new Queue();
q.push(root);
while(!q.isEmpty()) {
const task = q.shift();
visitor(task);
if (task.nexts) {
Object.keys(task.nexts).forEach(
(key) => {
const n = task.nexts[key];
if (n.deps && Object.keys(n.deps).every(kk => !!n.deps[kk].result)) {
q.push(n);
}
}
);
}
}
}
function buildGtask(root:TaskRec) {
const ret = [];
root.result = root.task();
traverseDepGraph(root, (rec) => {
if (rec === root) return;
const { task, deps, result } = rec;
// todo: this is to make flow happy, because all node have deps except root
if (!deps) return;
if (!!result) {
throw new Error('should not happen: dependency graph traverse error');
}
const depKeys = Object.keys(deps);
rec.result = Promise.all(depKeys.map(k => deps[k].result))
.then((depRes) => {
return depKeys.reduce(
(res, it, ii) => ({
...res,
[it]: depRes[ii],
}),
{}
)
})
.then(task);
ret.push(rec);
});
return Promise.all(ret.map(rt => rt.result))
.then((resultsArr) => {
return ret.reduce(
(res, it, ii) => {
return {
...res,
[it.key]: resultsArr[ii],
};
},
{}
);
});
}
function addTask(name:string, task:Task, deps?:Array<string>, beforeRun:bool = true): TaskRec {
if (beforeRun && this.hasRun) {
throw new Error('cannot add task after started');
}
if (!name || typeof task !== 'function') {
throw new Error('invalid args');
}
const t = {
key: name,
task,
};
insert(this.root, t, deps);
return t;
}
const ROOT_KEY = '__ROOT_TASK_KEY_SHOULD_NOT_BE_USED_BY_USER__';
/**
*
*
* @export
* @class TaskScheduler
*/
export default class TaskScheduler {
hasRun: bool
hasFinished: bool
warnedFinish: bool
root:TaskRec
results: Promise<{ [key:string]: TaskRet }>
/**
* Creates an instance of TaskScheduler.
* @memberof TaskScheduler
*/
constructor() {
this.root = {
key: ROOT_KEY,
task: () => Promise.resolve(),
};
}
/**
* Add a task. Must be called before start() is called. Should not be called more than once for each task or all except the last one is effective. There is no requirements in the order in which tasks are added.
* A task must return a promise, if a task is synchronous, just wrap the return value in a promise, like return Promise.resolve(whatEver);
* @param {string} name
* @param {Task} task
* @param {Array<string>} [deps]
* @param {bool} [beforeRun=true]
* @returns
* @memberof TaskScheduler
*/
addTask(name:string, task:Task, deps?:Array<string>) {
addTask.call(this, name, task, deps, true);
}
/**
* Add a task after start() is called. Name should not already be added previously otherwise the behaviour is not defined.
* @param {string} name
* @param {Task} task
* @param {Array<string>} [deps]
* @returns
* @memberof TaskScheduler
*/
appendTask(name:string, task:Task, deps?:Array<string>) {
if (!this.hasRun) {
throw new Error('this is used to add a task after scheduler has already run!');
}
const rec = addTask.call(this, name, task, deps, false);
if (!deps || !deps.length) return;
const depTasks = [];
traverse(this.root, (node) => {
if (deps.indexOf(node.key) >= 0) {
depTasks.push(node);
}
});
rec.result = Promise.all(depTasks.map(it => it.result))
.then((resultsArr) => {
const results = resultsArr.reduce(
(res, it, ii) => ({ ...res, [depTasks[ii].key]: it }),
{}
);
return rec.task(results);
});
const originalResults = this.results;
this.results = rec.result
.then((res) => {
return originalResults
.then((others) => {
return { ...others || {}, [rec.key]: res };
});
});
}
/**
* Begin executing all tasks.
* Each task is executed only after its dependent tasks have already finished and receive their returned values as a 'taskName -> value' map in a single argument.
* Tasks that have no dependent tasks or all its depended tasks have finished will be executed concurrently.
*
* @returns {Promise<{[taskName:string]: TaskRet}>}, the returned promise resolves to an object containing all the added tasks and is keeped as results property in a Scheduler instance. The property only changes if you call appendTask to include the return value of the new task.
* @memberof TaskScheduler
*/
start(): Promise<{[taskName:string]: TaskRet}> {
if (this.hasRun) {
throw new Error('cannot start more than once');
}
this.hasRun = true;
this.results = buildGtask(this.root)
.then((results) => {
this.hasRun = true;
return results;
});
return this.results;
}
}