/
MergeIterator.ts
92 lines (71 loc) · 2.32 KB
/
MergeIterator.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import { AsyncIterator } from "asynciterator";
type Selector<T> = (values: Array<T | undefined | null>) => number;
export default class MergeIterator<T> extends AsyncIterator<T> {
public values: T[];
private readonly sourceIterators: Array<AsyncIterator<T>>;
private readonly selector: (values: T[]) => number;
constructor(sourceIterators: Array<AsyncIterator<T>>, selector: Selector<T>) {
super();
this.sourceIterators = sourceIterators;
this.selector = selector;
this.setMaxListeners(1000);
this.values = Array(this.sourceIterators.length).fill(null);
this.readable = true;
for (const iterator of this.sourceIterators) {
this.addListeners(iterator);
}
}
public appendIterator(iterator: AsyncIterator<T>) {
this.sourceIterators.push(iterator);
this.values.push(null);
this.addListeners(iterator);
}
public read(): T {
for (let i = 0; i < this.sourceIterators.length; i++) {
if (this.values[i] === null || this.values[i] === undefined) {
const iterator = this.sourceIterators[i];
if (!iterator.ended) {
const value = iterator.read();
if (value === null) {
this.readable = false;
return null;
}
this.values[i] = value;
}
}
}
const selectedIndex = this.selector(this.values);
const item = this.values[selectedIndex];
this.values[selectedIndex] = null;
return item;
}
public close() {
for (const iterator of this.sourceIterators) {
iterator.close();
}
super.close();
}
private addListeners(iterator: AsyncIterator<T>) {
iterator.on("end", () => {
const allEnded = this.sourceIterators.every((iter) => iter.ended);
if (allEnded) {
this.close();
} else if (!this.readable) {
// everything that's still open is readable
const allReadable = this.sourceIterators.every((iter) => iter.closed || iter.readable);
if (allReadable) {
this.readable = true;
}
}
});
iterator.on("readable", () => {
if (!this.readable) {
// everything that's still open is readable
const allReadable = this.sourceIterators.every((iter) => iter.ended || iter.readable);
if (allReadable) {
this.readable = true;
}
}
});
}
}