Skip to content

Commit

Permalink
Merge branch 'feature/csp-v3' into develop
Browse files Browse the repository at this point in the history
* feature/csp-v3:
  docs(csp): update readme
  feat(csp): complete rewrite of Channel, Mult, PubSub, remove deps, add new operators
  feat(csp): add write queue, update tests
  feat(csp): update consume() to accept opt. limit
  refactor(csp): update types, Mult, PubSub for new channel impl
  refactor(examples): update zig-canvas for zig v0.12.0 (WIP)
  feat(wasm-api): update build script for zig v0.12.0 (incomplete)
  refactor(wasm-api-dom): minor zig v0.12.0 updates
  refactor(wasm-api): minor zig v0.12.0 updates
  refactor(csp): update type usage
  feat(csp): add initial new Channel impl & related ops/tests
  • Loading branch information
postspectacular committed Apr 25, 2024
2 parents 2a892cd + 3b6394b commit bab0913
Show file tree
Hide file tree
Showing 18 changed files with 721 additions and 1,753 deletions.
13 changes: 13 additions & 0 deletions examples/zig-canvas/build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,19 @@ pub fn build(b: *std.Build) void {
// build mode override
.optimize = .ReleaseSmall,
});
lib.root_module.export_symbol_names = &.{
"start",
"_wasm_allocate",
"_wasm_free",
"_dom_init",
"_dom_callListener",
"_dom_addListener",
"_dom_removeListener",
"_dom_callRAF",
"_dom_fullscreenChanged",
"_schedule_init",
"_schedule_callback",
};

b.installArtifact(lib);

Expand Down
233 changes: 50 additions & 183 deletions packages/csp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,47 +20,46 @@
- [Installation](#installation)
- [Dependencies](#dependencies)
- [API](#api)
- [File loading & word frequency analysis](#file-loading--word-frequency-analysis)
- [Channel merging](#channel-merging)
- [PubSub](#pubsub)
- [Authors](#authors)
- [License](#license)

## About

ES6 promise based CSP primitives & operations.

- `Channel` with/without buffering and/or
[transducers](https://github.com/thi-ng/umbrella/blob/develop/packages/transducers)
- optional channel IDs
- choice of buffer behaviors (fixed, sliding, dropping)
- channel selection
- channel merging (many-to-one, serial or parallel)
- channel piping (w/ transducers)
- timeouts / sleeping / throttling / delaying
- prepopulated channel ctors (iterators, ranges, promise, constants etc.)
- `Mult` for channel multiplexing (one-to-many splitting)
- individual transducers per tap
- dynamic add/removal of taps
- `PubSub` for topic subscriptions
- each topic implemented as `Mult`
- wildcard topic for processing fallthrough messages
Primitives & operators for Communicating Sequential Processes based on async/await and async iterables.

## Status
This package was temporarily deprecated (throughout most of 2023), but meanwhile
has been reanimated in the form of a **complete rewrite**, using a new, more
simple and more modern approach afforded by contemporary ES language features
(and widespread support for them).

**DEPRECATED** - superseded by other package(s)
**This new/current implementation is in most cases NOT compatible with earlier
versions**.

[Search or submit any issues for this package](https://github.com/thi-ng/umbrella/issues?q=%5Bcsp%5D+in%3Atitle)
Provided are:

This package is deprecated. Please see the following actively maintained
alternatives providing similar functionality:
- [CSP `Channel`
primitive](https://docs.thi.ng/umbrella/csp/classes/Channel.html) supporting a
choice of buffer behaviors (fifo, sliding, dropping, see
[thi.ng/buffers](https://github.com/thi-ng/umbrella/blob/develop/packages/buffers)
for options)
- Composable channel operators (see list below)
- [`Mult`](https://docs.thi.ng/umbrella/csp/classes/Mult.html) for channel
multiplexing (one-to-many splitting) and dynamic add/removal of subscribers
- [`PubSub`](https://docs.thi.ng/umbrella/csp/classes/PubSub.html) for
topic-based subscriptions, each topic implemented as `Mult`

- [@thi.ng/fibers](https://github.com/thi-ng/umbrella/tree/develop/packages/fibers)
- [@thi.ng/rstream](https://github.com/thi-ng/umbrella/tree/develop/packages/rstream)
## Status

**BETA** - possibly breaking changes forthcoming

[Search or submit any issues for this package](https://github.com/thi-ng/umbrella/issues?q=%5Bcsp%5D+in%3Atitle)

## Related packages

- [@thi.ng/fibers](https://github.com/thi-ng/umbrella/tree/develop/packages/fibers) - Process hierarchies & operators for cooperative multitasking
- [@thi.ng/rstream](https://github.com/thi-ng/umbrella/tree/develop/packages/rstream) - Reactive streams & subscription primitives for constructing dataflow graphs / pipelines
- [@thi.ng/transducers-async](https://github.com/thi-ng/umbrella/tree/develop/packages/transducers-async) - Async versions of various highly composable transducers, reducers and iterators

## Installation

Expand Down Expand Up @@ -88,183 +87,51 @@ For Node.js REPL:
const csp = await import("@thi.ng/csp");
```

Package sizes (brotli'd, pre-treeshake): ESM: 2.58 KB
Package sizes (brotli'd, pre-treeshake): ESM: 1.77 KB

## Dependencies

- [@thi.ng/api](https://github.com/thi-ng/umbrella/tree/develop/packages/api)
- [@thi.ng/arrays](https://github.com/thi-ng/umbrella/tree/develop/packages/arrays)
- [@thi.ng/buffers](https://github.com/thi-ng/umbrella/tree/develop/packages/buffers)
- [@thi.ng/checks](https://github.com/thi-ng/umbrella/tree/develop/packages/checks)
- [@thi.ng/dcons](https://github.com/thi-ng/umbrella/tree/develop/packages/dcons)
- [@thi.ng/errors](https://github.com/thi-ng/umbrella/tree/develop/packages/errors)
- [@thi.ng/transducers](https://github.com/thi-ng/umbrella/tree/develop/packages/transducers)

## API

[Generated API docs](https://docs.thi.ng/umbrella/csp/)

### File loading & word frequency analysis

```ts
import { Channel } from "@thi.ng/csp";
import * as tx from "@thi.ng/transducers";

// compose transducer to split source file into words
// and filter out short strings
const proc: tx.Transducer<string, string> = tx.comp(
tx.mapcat((src: string) => src.toLowerCase().split(/[^\w]+/g)),
tx.filter((w: string) => w.length > 1)
);

// define a channel which receives file paths
// and resolves them with their contents
const paths = new Channel<any>(
tx.map((path: string) =>
new Promise<string>(
resolve => fs.readFile(path, (_, data) => resolve(data.toString()))
)
)
);

// define multiplexed output channel
// items in this channel will have this form: `[word, count]`
const results = new Mult("results");

// tap result channel and sum word counts
const counter = results
.tap(tx.map(x => x[1]))
.reduce(tx.add());

// 2nd output channel with streaming sort transducer
// (using a sliding window size of 500 items) and dropping
// words with < 20 occurrences
const sorted = results.tap(
tx.comp(
tx.streamSort(500, x => x[1]),
tx.dropWhile(x => x[1] < 20)
)
);

// define workflow:
// pipe source files into a new channel and
// reduce this channel using `frequencies` reducer
// finally stream the result map (word frequencies)
// into the `sorted` channel
// (`freqs` is a JS Map and is iterable)
paths.pipe(proc)
.reduce(tx.frequencies())
.then(freqs => results.channel().into(freqs));

// start tracing sorted outputs and
// wait for all to finish
Promise
.all([sorted.consume(), counter])
.then(([_, num]) => console.log("total words:", num));

// no real work has been executed thus far (only scheduled via promises)
// now kick off entire process by writing file paths into the 1st channel
paths.into(["src/channel.ts", "src/mult.ts", "src/pubsub.ts"]);

// results-tap1 : [ 'let', 20 ]
// results-tap1 : [ 'topic', 20 ]
// results-tap1 : [ 'chan', 20 ]
// results-tap1 : [ 'number', 22 ]
// results-tap1 : [ 'buf', 23 ]
// results-tap1 : [ 'length', 23 ]
// results-tap1 : [ 'tx', 25 ]
// results-tap1 : [ 'state', 25 ]
// results-tap1 : [ 'from', 27 ]
// results-tap1 : [ 'close', 28 ]
// results-tap1 : [ 'new', 33 ]
// results-tap1 : [ 'args', 34 ]
// results-tap1 : [ 'id', 36 ]
// results-tap1 : [ 'any', 36 ]
// results-tap1 : [ 'src', 38 ]
// results-tap1 : [ 'if', 40 ]
// results-tap1 : [ 'return', 47 ]
// results-tap1 : [ 'channel', 57 ]
// results-tap1 : [ 'this', 120 ]
// results-tap1 done
// total words: 1607
```

### Channel merging

```ts
import { Channel } from "@thi.ng/csp";
import { push } from "@thi.ng/transducers";

Channel.merge([
Channel.range(0, 3),
Channel.range(10, 15),
Channel.range(100, 110)
]).reduce(push()).then(console.log);

// [ 0, 100, 101, 102, 103, 1, 2, 104, 105, 10, 11, 12, 13, 106, 14, 107, 108, 109 ]

// emit tuples of values read from all inputs
// preserves ordering of all inputs, but
// throughput controlled by slowest input
// by default stops & closes when any of the inputs closes
Channel.mergeTuples([
Channel.from([1, 2, 3]),
Channel.from([10, 20, 30, 40]),
Channel.from([100, 200, 300, 400, 500])
], null, false).consume();

// chan-3 : [ 1, 10, 100 ]
// chan-3 : [ 2, 20, 200 ]
// chan-3 : [ 3, 30, 300 ]
// chan-3 done

// same as above, however continues until all inputs are closed
Channel.mergeTuples([
Channel.from([1, 2, 3]),
Channel.from([10, 20, 30, 40]),
Channel.from([100, 200, 300, 400, 500])
], null, false).consume();

// chan-3 : [ 1, 10, 100 ]
// chan-3 : [ 2, 20, 200 ]
// chan-3 : [ 3, 30, 300 ]
// chan-3 : [ undefined, 40, 400 ]
// chan-3 : [ undefined, undefined, 500 ]
// chan-3 done
```

### PubSub

```ts
import { Channel, PubSub } from "@thi.ng/csp";
import { map } from "@thi.ng/transducers";
```ts tangle:export/readme-pubsub.ts
import { channel, consumeWith, pubsub } from "@thi.ng/csp";

// define a channel publisher with transducer and topic function applied to each item
// the input channel receives names and transforms them into indexable objects
const pub = new PubSub(
new Channel<any>("users", map((x: string) => ({ type: x.charAt(0), val: x }))),
(x) => x.type
);
// input channel (optional)
const src = channel<string>({ id: "users" });

// create subscriptions (channel + debug consumer)
// publisher with a topic function
// (topic here is the first character of each received string)
const pub = pubsub<string>(src, (x) => x[0]);

// create topic subscriptions (channel & debug consumer)
// under the hood each topic is a Mult (multiplexed channel)
// sub channels are automatically named:
// subscription channels are automatically named:
// `<src-id>-<topic>-tap<tapid>` (see below)
for (let i of "abc") {
pub.sub(i).consume();
consumeWith(pub.subscribeTopic(i), (x, ch) => console.log(ch.id, x));
}

// start processing, then close everything down
// (pubsubs & mults are closed recursively once the input channel is closed)
pub.channel().into(["alice", "bert", "bella", "charlie", "arthur"]);

// users-a-tap0 : { type: 'a', val: 'alice' }
// users-b-tap0 : { type: 'b', val: 'bert' }
// users-b-tap0 : { type: 'b', val: 'bella' }
// users-c-tap0 : { type: 'c', val: 'charlie' }
// users-a-tap0 : { type: 'a', val: 'arthur' }
// users-b-tap0 done
// users-c-tap0 done
// users-a-tap0 done
// start processing
for (let x of ["alice", "bert", "bella", "charlie", "arthur"]) {
await src.write(x);
}
// users-a-tap0 alice
// users-b-tap1 bert
// users-b-tap1 bella
// users-c-tap2 charlie
// users-a-tap0 arthur

// pubsubs & mults are closed recursively once we close the input channel
src.close();
```

## Authors
Expand Down
20 changes: 10 additions & 10 deletions packages/csp/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@thi.ng/csp",
"version": "2.1.116",
"description": "ES6 promise based CSP primitives & operations",
"description": "Primitives & operators for Communicating Sequential Processes based on async/await and async iterables",
"type": "module",
"module": "./index.js",
"typings": "./index.d.ts",
Expand Down Expand Up @@ -41,11 +41,9 @@
},
"dependencies": {
"@thi.ng/api": "^8.11.1",
"@thi.ng/arrays": "^2.9.5",
"@thi.ng/buffers": "^0.1.2",
"@thi.ng/checks": "^3.6.3",
"@thi.ng/dcons": "^3.2.111",
"@thi.ng/errors": "^2.5.6",
"@thi.ng/transducers": "^9.0.3"
"@thi.ng/errors": "^2.5.6"
},
"devDependencies": {
"@microsoft/api-extractor": "^7.43.0",
Expand Down Expand Up @@ -83,24 +81,26 @@
"./api": {
"default": "./api.js"
},
"./buffer": {
"default": "./buffer.js"
},
"./channel": {
"default": "./channel.js"
},
"./mult": {
"default": "./mult.js"
},
"./ops": {
"default": "./ops.js"
},
"./pubsub": {
"default": "./pubsub.js"
}
},
"thi.ng": {
"related": [
"rstream"
"fibers",
"rstream",
"transducers-async"
],
"status": "deprecated",
"status": "beta",
"year": 2016
}
}
Loading

0 comments on commit bab0913

Please sign in to comment.