-
Notifications
You must be signed in to change notification settings - Fork 24
/
control.js
171 lines (147 loc) · 4.73 KB
/
control.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import { isGeneratorFunction, isGenerator } from './generator-function';
export class ControlFunction {
static of(call) {
return new this(call);
}
/**
* Builtin controls for promises, generators, generator functions
* and raw functions.
*/
static for(operation) {
if (operation == null) {
return ControlFunction.of(x => x);
} else if (operation instanceof ControlFunction) {
return operation;
} else if (isGeneratorFunction(operation)) {
return GeneratorFunctionControl(operation);
} else if (isGenerator(operation)) {
return GeneratorControl(operation);
} else if (typeof operation.then === 'function') {
return PromiseControl(operation);
} else if (typeof operation === 'function') {
return ControlFunction.of(operation);
}
}
constructor(call) {
this.call = call;
}
}
export function PromiseControl(promise) {
return ControlFunction.of(function control({ resume, fail, ensure }) {
let resolve = resume;
let reject = fail;
let noop = x => x;
// return values of succeed and fail are deliberately ignored.
// see https://github.com/thefrontside/effection.js/pull/44
promise.then(value => { resolve(value); }, error => { reject(error); });
// this execution has passed out of scope, so we don't care
// what happened to the promise, so make the callbacks noops.
// this effectively "unsubscribes" to the promise.
ensure(() => resolve = reject = noop);
});
}
/**
* Controls the execution of Generator Functions. It just invokes the
* generator function to get a reference to the generator, and then
* delegates to the GeneratorControl to do all the work.
*
* spawn(function*() { yield timeout(10); return 5; });
*/
export const GeneratorFunctionControl = sequence => ControlFunction.of((...args) => {
return GeneratorControl(sequence()).call(...args);
});
/**
* Control a sequence of operations expressed as a generator.
* For each step of the generator. Each `yield` expression of
* the generator should pass an operation which will then be
* executed in its own context. These child container contexts have
* their own `fail` and `resume` functions local to the generator
* that proceed to the next step. Once the generator is finished or
* throws an exception, control is passed back to the calling parent.
*/
export const GeneratorControl = generator => ControlFunction.of(self => {
self.ensure(() => generator.return());
let resume = value => advance(() => generator.next(value));
let fail = error => advance(() => generator.throw(error));
function advance(getNext) {
try {
let next = getNext();
if (next.done) {
self.resume(next.value);
} else {
let operation = next.value;
let child = self.context.spawn(operation, child => {
if (self.context.isBlocking) {
if (child.isErrored) {
fail(child.result);
}
if (child.isCompleted) {
resume(child.result);
}
if (child.isHalted) {
fail(new HaltError(child.result));
}
}
});
if (child.isBlocking) {
self.context.requiredChildren.add(child);
}
}
} catch (error) {
self.fail(error);
}
}
resume();
});
export function fork(operation) {
return ({ resume, context } ) => {
let parent = context.parent ? context.parent : context;
let child = parent.spawn(operation, child => {
if (child.isErrored) {
parent.fail(child.result);
} else if (parent.isWaiting && parent.requiredChildren.size === 0) {
parent.resume();
}
});
if (child.isBlocking) {
parent.requiredChildren.add(child);
}
resume(child);
};
}
export function monitor(operation) {
return ({ resume, context } ) => {
let parent = context.parent ? context.parent : context;
let child = parent.spawn(operation, () => {
if (child.isErrored) {
parent.fail(child.result);
} else if (parent.isWaiting && parent.requiredChildren.size === 0) {
parent.resume();
}
});
resume(child);
};
}
export function join(antecedent) {
return ({ resume, fail, ensure, context }) => {
let disconnect = antecedent.ensure(function join() {
if (context.isRunning) {
let { result } = antecedent;
if (antecedent.isCompleted) {
resume(result);
} else if (antecedent.isErrored) {
fail(result);
} else if (antecedent.isHalted) {
fail(new HaltError(antecedent.result));
}
}
});
ensure(disconnect);
};
}
export class HaltError extends Error {
constructor(cause) {
super(`Interrupted: ${cause}`);
this.cause = cause;
}
}