Skip to content

Commit

Permalink
Convert fork into an operation
Browse files Browse the repository at this point in the history
Originally, this change came out of problems with sequencing a
fork. Specifically, if there was an error inside of fork, and so it
tried to throw() inside of its parent, then the error was hidden by
the fact that [its parent generator was already running]. This was
symptomatic of a deeper problem of the generator syntax being too
tightly coupled to the underlying concept of the execution of a
sequence of steps, and also of the concept of synchronous and
asynchronous execution being too coupled.

As a purely practical consideration, we wondered "what if the parent
generator was yielded at the time we tried to create the fork?" That
way, the fork would be either be at its first yield point, or have
failed when we tried to resume the parent, but this turned out to be a
profoundly positive re-orientation of the effection architecture to
separate out the concepts of context and executions from the generator
synax entirely which enables all sorts of nice things.

Fundamentally, this change makes effection a function-oriented effects
library rather than a generator-oriented effects library. That is, you
don't _need_ generators to declare a tree of processes, it's just
nicer to do it that way. For example:

```js

enter(join(fork(({ resume, ensure })=> resume(5))))
```

The nice effect that this has is that the generator syntax is
implemented as just a plugin to the basic runtime, which means that in
addition to solving the problem at hand, it also [allows you to fork a
control function directly][2], something that had always felt like it
should work, but annoyingly didn't.

This feels like the right direction to take it, because it is the
solution to several seemingly unrelated problems, and it even feels as
though it could provide a pathway to an [asynchronous halt
operation][3] by making not just `fork` but also `halt` an operation.

Finally, because structural concurrency is implemented ultimately as
operations that do nothing more than manipulate the execution context,
other structured effects (state, messaging, etc...) can be implemented
in user space. Using this library, I was able to implement messaging
operations based on the [send-and-receive][4] branch completely in
user-space, and in which both `send()` and `receive()` were
operations.

There are still some kinks to be worked out. For example, it's
awkward to jump into generator syntax from function syntaxt (nothing
blocking though, and certainly, nothing worse than what came before
before).

From the perspective of the JavaScript runtime, execution is still
completely synchronous internally, and the code is much, much simpler
and (in my opinion) easier to follow.

[1]: #26
[2]: #33
[3]: #35
[4]: #49
  • Loading branch information
cowboyd committed Dec 15, 2019
1 parent 953f38b commit bf7944f
Show file tree
Hide file tree
Showing 23 changed files with 861 additions and 703 deletions.
24 changes: 12 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,20 @@ siblings are immediately halted.

## Execution

The process primitive is the `Execution`. To create (and start) an
The process primitive is the `Execution`. To create (and enter) an
`Execution`, use the `fork` function and pass it a generator. This
simplest example waits for 1 second, then prints out "hello world" to
the console.

``` javascript
import { fork, timeout } from 'effection';
import { enter, timeout } from 'effection';

let process = fork(function*() {
let execution = enter(function*() {
yield timeout(1000);
return 'hello world';
});

process.isRunning //=> true
execution.isRunning //=> true
// 1000ms passes
// process.isRunning //=> false
// process.result //=> 'hello world'
Expand All @@ -87,7 +87,7 @@ Child processes can be composed freely. So instead of yielding for
1000 ms, we could instead, yield 10 times for 100ms.

``` javascript
fork(function*() {
enter(function*() {
yield function*() {
for (let i = 0; i < 10; i++) {
yield timeout(100);
Expand All @@ -100,7 +100,7 @@ fork(function*() {
And in fact, processes can be easily and arbitrarly deeply nested:

``` javascript
let process = fork(function*() {
let process = enter(function*() {
return yield function*() {
return yield function*() {
return yield function*() {
Expand All @@ -118,13 +118,13 @@ You can pass arguments to an operation by invoking it.

``` javascript

import { fork, timeout } from 'effection';
import { enter, timeout } from 'effection';

function* waitForSeconds(durationSeconds) {
yield timeout(durationSeconds * 1000);
}

fork(waitforseconds(10));
enter(waitforseconds(10));
```

### Asynchronous Execution
Expand All @@ -137,11 +137,11 @@ part of your main process. To do this, you would use the `fork` method
on the execution:

``` javascript
import { fork } from 'effection';
import { enter, fork } from 'effection';

fork(function*() {
fork(createFileServer);
fork(createHttpServer);
enter(function*() {
yield fork(createFileServer);
yield fork(createHttpServer);
});
```

Expand Down
23 changes: 5 additions & 18 deletions examples/async.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/* eslint no-console: 0 */
/* eslint require-yield: 0 */
import { fork, timeout } from '../src/index';
import { enter, fork, timeout } from '../src/index';

import { interruptable } from './inerruptable';

/**
* Fires up some random servers
*/
fork(interruptable(function*() {
fork(randoLogger('Bob'));
fork(randoLogger('Alice'));
enter(interruptable(function*() {
yield fork(randoLogger('Bob'));
yield fork(randoLogger('Alice'));

console.log('Up and running with random number servers Bob and Alice....');
}));
Expand All @@ -32,16 +32,3 @@ function randoLogger(name) {
}
};
}


function interruptable(proc) {
return function*() {
let interrupt = () => console.log('') || this.halt();
process.on('SIGINT', interrupt);
try {
yield proc;
} finally {
process.off('SIGINT', interrupt);
}
};
}
18 changes: 3 additions & 15 deletions examples/countdown.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* eslint no-console: 0 */
import { fork, timeout } from '../src/index';
import { enter, timeout } from '../src/index';

import { interruptable } from './inerruptable';

/**
* A simple script that counts down from 5 to 1, pausing for one
Expand All @@ -24,23 +25,10 @@ import { fork, timeout } from '../src/index';
* handler is uninstalled. Once again, the node process is left with
* nothing left to do and no event handlers, so it exits.
*/
fork(interruptable(function*() {
enter(interruptable(function*() {
for (let i = 5; i > 0; i--) {
console.log(`${i}...`);
yield timeout(1000);
}
console.log('liftoff!');
}));


function interruptable(proc) {
return function*() {
let interrupt = () => console.log('') || this.halt();
process.on('SIGINT', interrupt);
try {
yield proc;
} finally {
process.off('SIGINT', interrupt);
}
};
}
16 changes: 16 additions & 0 deletions examples/inerruptable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/* eslint no-console: 0 */

import { fork, join } from '../src/index';

export function interruptable(operation) {
return function*() {
let child = yield fork(operation);
let interrupt = () => console.log('') || child.halt();
process.on('SIGINT', interrupt);
try {
yield join(child);
} finally {
process.off('SIGINT', interrupt);
}
};
}
167 changes: 167 additions & 0 deletions src/context.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import { ControlFunction, HaltError } from './control';

export class ExecutionContext {
static ids = 1;
get isUnstarted() { return this.state === 'unstarted'; }
get isRunning() { return this.state === 'running'; }
get isWaiting() { return this.state === 'waiting'; }
get isCompleted() { return this.state === 'completed'; }
get isErrored() { return this.state === 'errored'; }
get isHalted() { return this.state === 'halted'; }

get isBlocking() { return this.isRunning || this.isWaiting || this.isUnstarted; }

get hasBlockingChildren() {
for (let child of this.children) {
if (child.isBlocking) {
return true;
}
}
return false;
}

constructor(parent = undefined) {
this.id = this.constructor.ids++;
this.parent = parent;
this.children = new Set();
this.exitHooks = new Set();
this.state = 'unstarted';
this.resume = this.resume.bind(this);
this.fail = this.fail.bind(this);
this.ensure = this.ensure.bind(this);
}

get promise() {
this._promise = new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
this.finalizePromise();
return this._promise;
}

finalizePromise() {
if(this.isCompleted && this.resolve) {
this.resolve(this.result);
} else if(this.isErrored && this.reject) {
this.reject(this.result);
} else if(this.isHalted && this.reject) {
this.reject(new HaltError(this.result));
}
}

then(...args) {
return this.promise.then(...args);
}

catch(...args) {
return this.promise.catch(...args);
}

finally(...args) {
return this.promise.finally(...args);
}

get root() {
if (!this.parent) {
return this;
} else {
return this.parent.root;
}
}

createChild() {
let child = new ExecutionContext(this);
child.ensure(() => this.children.delete(child));
this.children.add(child);
return child;
}

ensure(hook) {
let { exitHooks } = this;
exitHooks.add(hook);
return () => exitHooks.delete(hook);
}

enter(operation) {
if (this.isUnstarted) {
let controller = this.createController(operation);
this.operation = operation;
this.state = 'running';
controller.call({
resume: this.resume,
fail: this.fail,
ensure: this.ensure,
context: this
});
} else {
throw new Error(`
Tried to call Fork#resume() on a Fork that has already been finalized. This
should never happen and so is almost assuredly a bug in effection. All of
its users would be in your eternal debt were you to please take the time to
report this issue here:
https://github.com/thefrontside/effection.js/issues/new
Thanks!`);
}
}

halt(reason) {
if (this.isBlocking) {
this.finalize('halted', reason);
}
}

resume(value) {
if (this.isRunning) {
this.result = value;
if (this.hasBlockingChildren) {
this.state = 'waiting';
} else {
this.finalize('completed', value);
}
} else {
throw new Error(`
Tried to call Fork#resume() on a Fork with state '${this.state}' This
should never happen and so is almost assuredly a bug in effection. All of
its users would be in your eternal debt were you to please take the time to
report this issue here:
https://github.com/thefrontside/effection.js/issues/new
Thanks!`);
}
}

fail(error) {
// if (!this.isBlocking) {error}
this.finalize('errored', error);
}

finalize(state, result) {
this.result = result || this.result;
this.state = state;
for (let hook of this.exitHooks) {
this.exitHooks.delete(hook);
hook(this);
}
for (let child of this.children) {
this.children.delete(child);
child.halt(result);
}
this.finalizePromise();
}

createController(operation) {
let controller = ControlFunction.for(operation);
if (!controller) {
throw new Error(`cannot find controller for ${operation}`);
}
return controller;
}

toString(indent = '') {
let name = this.operation ? this.operation.name || '' : '';
let children = [...this.children].map(child => `${child.toString(indent + ' ')}`);
return [`${indent}-> [${this.id}](${name}): ${this.state}`, ...children].join("\n");
}
}
Loading

0 comments on commit bf7944f

Please sign in to comment.