Skip to content

Commit

Permalink
Merge pull request #57 from thefrontside/fork-join-primitive-operations
Browse files Browse the repository at this point in the history
Convert `fork` into an operation
  • Loading branch information
cowboyd committed Jan 29, 2020
2 parents 6c3d0df + d866303 commit e356dab
Show file tree
Hide file tree
Showing 25 changed files with 991 additions and 708 deletions.
30 changes: 15 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,22 @@ scenario `d` is of particular importance. It means that if a child
throws an error and its parent doesn't catch it, then all of its
siblings are immediately halted.

## Execution
## Context

The process primitive is the `Execution`. To create (and start) an
`Execution`, use the `fork` function and pass it a generator. This
simplest example waits for 1 second, then prints out "hello world" to
Every operation takes place within an execution context. To create the
very first context, use the `spawn` function and pass it a generator. This
example waits for 1 second, then prints out "hello world" to
the console.

``` javascript
import { fork, timeout } from 'effection';
import { spawn, timeout } from 'effection';

let process = fork(function*() {
let context = spawn(function*() {
yield timeout(1000);
return 'hello world';
});

process.isRunning //=> true
context.isRunning //=> true
// 1000ms passes
// process.isRunning //=> false
// process.result //=> 'hello world'
Expand All @@ -87,7 +87,7 @@ Child processes can be composed freely. So instead of yielding for
1000 ms, we could instead, yield 10 times for 100ms.

``` javascript
fork(function*() {
spawn(function*() {
yield function*() {
for (let i = 0; i < 10; i++) {
yield timeout(100);
Expand All @@ -100,7 +100,7 @@ fork(function*() {
And in fact, processes can be easily and arbitrarly deeply nested:

``` javascript
let process = fork(function*() {
let process = spawn(function*() {
return yield function*() {
return yield function*() {
return yield function*() {
Expand All @@ -118,13 +118,13 @@ You can pass arguments to an operation by invoking it.

``` javascript

import { fork, timeout } from 'effection';
import { spawn, timeout } from 'effection';

function* waitForSeconds(durationSeconds) {
yield timeout(durationSeconds * 1000);
}

fork(waitforseconds(10));
spawn(waitforseconds(10));
```

### Asynchronous Execution
Expand All @@ -137,11 +137,11 @@ part of your main process. To do this, you would use the `fork` method
on the execution:

``` javascript
import { fork } from 'effection';
import { spawn, fork } from 'effection';

fork(function*() {
fork(createFileServer);
fork(createHttpServer);
spawn(function*() {
yield fork(createFileServer);
yield fork(createHttpServer);
});
```

Expand Down
23 changes: 5 additions & 18 deletions examples/async.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/* eslint no-console: 0 */
/* eslint require-yield: 0 */
import { fork, timeout } from '../src/index';
import { spawn, fork, timeout } from '../src/index';

import { interruptable } from './interruptable';

/**
* Fires up some random servers
*/
fork(interruptable(function*() {
fork(randoLogger('Bob'));
fork(randoLogger('Alice'));
spawn(interruptable(function*() {
yield fork(randoLogger('Bob'));
yield fork(randoLogger('Alice'));

console.log('Up and running with random number servers Bob and Alice....');
}));
Expand All @@ -32,16 +32,3 @@ function randoLogger(name) {
}
};
}


function interruptable(proc) {
return function*() {
let interrupt = () => console.log('') || this.halt();
process.on('SIGINT', interrupt);
try {
yield proc;
} finally {
process.off('SIGINT', interrupt);
}
};
}
18 changes: 3 additions & 15 deletions examples/countdown.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* eslint no-console: 0 */
import { fork, timeout } from '../src/index';
import { spawn, timeout } from '../src/index';

import { interruptable } from './interruptable';

/**
* A simple script that counts down from 5 to 1, pausing for one
Expand All @@ -24,23 +25,10 @@ import { fork, timeout } from '../src/index';
* handler is uninstalled. Once again, the node process is left with
* nothing left to do and no event handlers, so it exits.
*/
fork(interruptable(function*() {
spawn(interruptable(function*() {
for (let i = 5; i > 0; i--) {
console.log(`${i}...`);
yield timeout(1000);
}
console.log('liftoff!');
}));


function interruptable(proc) {
return function*() {
let interrupt = () => console.log('') || this.halt();
process.on('SIGINT', interrupt);
try {
yield proc;
} finally {
process.off('SIGINT', interrupt);
}
};
}
16 changes: 16 additions & 0 deletions examples/interruptable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/* eslint no-console: 0 */

import { fork, join } from '../src/index';

export function interruptable(operation) {
return function*() {
let child = yield fork(operation);
let interrupt = () => console.log('') || child.halt();
process.on('SIGINT', interrupt);
try {
yield join(child);
} finally {
process.off('SIGINT', interrupt);
}
};
}
28 changes: 21 additions & 7 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,35 @@
// TypeScript Version: 3.6
declare module "effection" {
export type Operation = SequenceFn | Sequence | Promise<any> | Controller | Execution<any> | undefined;
export type SequenceFn = (this: Execution) => Sequence;
export type Controller = (execution: Execution) => void | (() => void);
export type Operation = SequenceFn | Sequence | Promise<any> | Controller | undefined;
export type SequenceFn = () => Sequence;

export type Controller = (controls: Controls) => void | (() => void);

export interface Sequence extends Generator<Operation, any, any> {}

export interface Execution<T = any> extends PromiseLike<any> {
export interface Context extends PromiseLike<any> {
id: number;
resume(result: T): void;
throw(error: Error): void;
parent?: Context;
result?: any;
halt(reason?: any): void;
catch<R>(fn: (error: Error) => R): Promise<R>;
finally(fn: () => void): Promise<any>;
}

export function fork<T>(operation: Operation): Execution<T>;
export interface Controls {
id: number;
resume(result: any): void;
fail(error: Error): void;
ensure(hook: (context?: Context) => void): () => void;
spawn(operation: Operation): Context;
context: Context;
}

export function main(operation: Operation): Context;

export function fork(operation: Operation): Operation;

export function join(context: Context): Operation;

export function timeout(durationMillis: number): Operation;
}
167 changes: 167 additions & 0 deletions src/context.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import { ControlFunction, HaltError } from './control';

export class ExecutionContext {
static ids = 1;
get isUnstarted() { return this.state === 'unstarted'; }
get isRunning() { return this.state === 'running'; }
get isWaiting() { return this.state === 'waiting'; }
get isCompleted() { return this.state === 'completed'; }
get isErrored() { return this.state === 'errored'; }
get isHalted() { return this.state === 'halted'; }

get isBlocking() { return this.isRunning || this.isWaiting || this.isUnstarted; }

get hasBlockingChildren() {
for (let child of this.children) {
if (child.isBlocking) {
return true;
}
}
return false;
}

constructor(parent = undefined) {
this.id = this.constructor.ids++;
this.parent = parent;
this.children = new Set();
this.exitHooks = new Set();
this.state = 'unstarted';
this.resume = this.resume.bind(this);
this.fail = this.fail.bind(this);
this.ensure = this.ensure.bind(this);
this.spawn = this.spawn.bind(this);
}

get promise() {
this._promise = new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
this.finalizePromise();
return this._promise;
}

finalizePromise() {
if(this.isCompleted && this.resolve) {
this.resolve(this.result);
} else if(this.isErrored && this.reject) {
this.reject(this.result);
} else if(this.isHalted && this.reject) {
this.reject(new HaltError(this.result));
}
}

then(...args) {
return this.promise.then(...args);
}

catch(...args) {
return this.promise.catch(...args);
}

finally(...args) {
return this.promise.finally(...args);
}

get root() {
if (!this.parent) {
return this;
} else {
return this.parent.root;
}
}

spawn(operation) {
let child = new ExecutionContext(this);
this.children.add(child);
child.ensure(() => {
this.children.delete(child);
if (this.isWaiting && !this.hasBlockingChildren) {
this.finalize('completed');
}
});
child.enter(operation);
return child;
}

ensure(hook) {
let run = hook.bind(null, this);
if (this.isBlocking) {
this.exitHooks.add(run);
return () => this.exitHooks.delete(run);
} else {
hook();
return x => x;
}
}

enter(operation) {
if (this.isUnstarted) {
let controller = this.createController(operation);
this.operation = operation;
this.state = 'running';

let { resume, fail, ensure, spawn } = this;
controller.call({ resume, fail, ensure, spawn, context: this });

} else {
throw new Error(`
Tried to call #enter() on a Context that has already been finalized. This
should never happen and so is almost assuredly a bug in effection. All of
its users would be in your eternal debt were you to please take the time to
report this issue here:
https://github.com/thefrontside/effection.js/issues/new
Thanks!`);
}
}

halt(reason) {
if (this.isBlocking) {
this.finalize('halted', reason);
}
}

resume(value) {
if (!this.isRunning) { return; }

this.result = value;
if (this.hasBlockingChildren) {
this.state = 'waiting';
} else {
this.finalize('completed', value);
}
}

fail(error) {
this.finalize('errored', error);
}

finalize(state, result) {
this.result = result || this.result;
this.state = state;
for (let hook of this.exitHooks) {
this.exitHooks.delete(hook);
hook();
}
for (let child of this.children) {
this.children.delete(child);
child.halt(result);
}
this.finalizePromise();
}

createController(operation) {
let controller = ControlFunction.for(operation);
if (!controller) {
throw new Error(`cannot find controller for ${operation}`);
}
return controller;
}

toString(indent = '') {
let name = this.operation ? this.operation.name || '' : '';
let children = [...this.children].map(child => `${child.toString(indent + ' ')}`);
return [`${indent}-> [${this.id}](${name}): ${this.state}`, ...children].join("\n");
}
}
Loading

0 comments on commit e356dab

Please sign in to comment.