-
Notifications
You must be signed in to change notification settings - Fork 0
/
Pipeline.ts
105 lines (99 loc) · 3.38 KB
/
Pipeline.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import assertState from "./assertState";
export type PipelineComponent<I, O> = {
costBuffer: number;
concurrency: number;
handler: (i: I) => Promise<O | undefined>; // Returning undefined bails out of the pipeline early.
inputCost: (i: I) => number;
};
export default class Pipeline<LastOutput = unknown> {
private readonly components = Array<PipelineComponent<any, any>>();
private constructor(
private readonly producer: () => Promise<any | undefined>
) {}
// If the producer returns undefined, it marks the end of any more inputs.
static from<O>(producer: () => Promise<O | undefined>): Pipeline<O> {
return new Pipeline(producer);
}
then<O>(component: PipelineComponent<LastOutput, O>): Pipeline<O> {
this.components.push(component);
return this as any;
}
finally(final: (o: LastOutput) => Promise<void>) {
return new Promise<void>((resolve) => {
// If true, the producer will no longer produce any more elements.
let ended = false;
// How many elements produced by the producer are still in the pipeline.
let pendingFinal = 0;
const state = this.components.map(() => ({
buffer: Array<any>(),
costUsage: 0,
driving: 0,
}));
const isFull = (idx: number) =>
state[idx].costUsage >= this.components[idx].costBuffer;
const push = (idx: number, val: any) => {
state[idx].buffer.push(val);
state[idx].costUsage += this.components[idx].inputCost(val);
ensureDrivingComponent(idx);
};
const ensureDrivingComponent = async (idx: number) => {
const isFirst = idx === 0;
const isLast = idx === state.length - 1;
if (state[idx].driving >= this.components[idx].concurrency) {
return;
}
state[idx].driving++;
while ((isLast || !isFull(idx + 1)) && state[idx].buffer.length) {
const input = state[idx].buffer.shift()!;
state[idx].costUsage -= this.components[idx].inputCost(input);
// If shifting has now made this component's buffer not full, drive the previous component.
// NOTE: Shifting may not always make it non-full.
if (!isFull(idx)) {
if (isFirst) {
ensureDrivingProducer();
} else {
ensureDrivingComponent(idx - 1);
}
}
const output = await this.components[idx].handler(input);
const handleExit = () => {
pendingFinal--;
assertState(pendingFinal >= 0);
if (!pendingFinal && ended) {
resolve();
}
};
if (output === undefined) {
// Early exit.
handleExit();
} else {
if (isLast) {
final(output).then(handleExit);
} else {
push(idx + 1, output);
}
}
}
state[idx].driving--;
};
let drivingProducer = false;
const ensureDrivingProducer = async () => {
if (drivingProducer || ended) {
return;
}
drivingProducer = true;
while (!isFull(0)) {
const output = await this.producer();
if (output == undefined) {
ended = true;
break;
}
push(0, output);
pendingFinal++;
}
drivingProducer = false;
};
ensureDrivingProducer();
});
}
}