Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export function subscribe<T, R>(iter: AsyncIterator<T, R>): Subscription<T, R> {

export function stream<T, R>(iterable: AsyncIterable<T, R>): Stream<T, R> {
return {
*[Symbol.iterator]() {
*subscribe() {
return subscribe(iterable[Symbol.asyncIterator]());
},
};
Expand Down
37 changes: 24 additions & 13 deletions lib/channel.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
import type { Channel, Operation } from "./types.ts";
import type { Operation, Stream } from "./types.ts";
import { createSignal } from "./signal.ts";
import { lift } from "./lift.ts";

/**
* A broadcast channel that multiple consumers can subscribe to the
* via the same {@link Stream}, and messages sent to the channel are
* received by all consumers. The channel is not buffered, so if there
* are no consumers, the message is dropped.
*/
export interface Channel<T, TClose> extends Stream<T, TClose> {
/**
* Send a message to all subscribers of this {@link Channel}
*/
send(message: T): Operation<void>;

/**
* End every subscription to this {@link Channel}
*/
close(value: TClose): Operation<void>;
}

/**
* Create a new {@link Channel}. Use channels to communicate between operations.
Expand Down Expand Up @@ -32,21 +51,13 @@ import { createSignal } from "./signal.ts";
* console.log(yield* subscription2.next()); //=> { done: false, value: 'world' }
* });
* ```
*
* @typeParam T - the type of each value sent to the channel
* @typeParam TClose - the type of the channel's final value.
*/
export function createChannel<T, TClose = void>(): Channel<T, TClose> {
let signal = createSignal<T, TClose>();

let input = {
*send(value: T): Operation<void> {
signal.send(value);
},
*close(value: TClose) {
signal.close(value);
},
return {
send: lift(signal.send),
close: lift(signal.close),
subscribe: signal.subscribe,
};

return { input, output: signal.stream };
}
2 changes: 1 addition & 1 deletion lib/each.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const EachStack = createContext<EachLoop<unknown>[]>("each");
function iterate<T>(stream: Stream<T, unknown>): Operation<Iterable<T>> {
return {
*[Symbol.iterator]() {
let subscription = yield* stream;
let subscription = yield* stream.subscribe();
let current = yield* subscription.next();
let stack = yield* EachStack.get();
if (!stack) {
Expand Down
30 changes: 18 additions & 12 deletions lib/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import { createSignal } from "./signal.ts";
import { resource } from "./instructions.ts";
import { first } from "./mod.ts";
import type { Operation, Stream } from "./types.ts";
import type { Operation, Stream, Subscription } from "./types.ts";

type FN = (...any: any[]) => any;

Expand Down Expand Up @@ -50,17 +50,23 @@ export function on<
T extends EventTarget,
K extends EventList<T> | (string & {}),
>(target: T, name: K): Stream<EventTypeFromEventTarget<T, K>, never> {
return resource(function* (provide) {
let { send, stream } = createSignal<Event>();
return {
subscribe() {
return resource(function* (provide) {
let { send, subscribe } = createSignal<Event>();

target.addEventListener(name, send);
target.addEventListener(name, send);

try {
yield* provide(
yield* stream as Stream<EventTypeFromEventTarget<T, K>, never>,
);
} finally {
target.removeEventListener(name, send);
}
});
try {
yield* provide(
yield* subscribe() as Operation<
Subscription<EventTypeFromEventTarget<T, K>, never>
>,
);
} finally {
target.removeEventListener(name, send);
}
});
},
};
}
28 changes: 16 additions & 12 deletions lib/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,24 @@ export interface Predicate<A> {
export function filter<A>(predicate: Predicate<A>) {
return function <TClose>(stream: Stream<A, TClose>): Stream<A, TClose> {
return {
*[Symbol.iterator]() {
let subscription = yield* stream;

subscribe() {
return {
*next() {
while (true) {
let next = yield* subscription.next();
*[Symbol.iterator]() {
let subscription = yield* stream.subscribe();

return {
*next() {
while (true) {
let next = yield* subscription.next();

if (next.done) {
return next;
} else if (yield* predicate(next.value)) {
return next;
}
}
if (next.done) {
return next;
} else if (yield* predicate(next.value)) {
return next;
}
}
},
};
},
};
},
Expand Down
2 changes: 1 addition & 1 deletion lib/first.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ export function first<T>(stream: Stream<T, never>): Operation<T>;
export function* first<T>(
stream: Stream<T, unknown>,
): Operation<T | undefined> {
let subscription = yield* stream;
let subscription = yield* stream.subscribe();
let result = yield* subscription.next();

if (!result.done) {
Expand Down
35 changes: 35 additions & 0 deletions lib/lift.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { type Operation } from "./types.ts";

/**
* Convert a simple function into an {@link Operation}
*
* @example
* ```js
* let log = lift((message) => console.log(message));
*
* export function* run() {
* yield* log("hello world");
* yield* log("done");
* }
* ```
*
* @returns a function returning an operation that invokes `fn` when evaluated
*/
export function lift<TArgs extends unknown[], TReturn>(
fn: Fn<TArgs, TReturn>,
): LiftedFn<TArgs, TReturn> {
return (...args: TArgs) => {
return ({
[Symbol.iterator]() {
let value = fn(...args);
return { next: () => ({ done: true, value }) };
},
});
};
}

type Fn<TArgs extends unknown[], TReturn> = (...args: TArgs) => TReturn;

type LiftedFn<TArgs extends unknown[], TReturn> = (
...args: TArgs
) => Operation<TReturn>;
24 changes: 14 additions & 10 deletions lib/map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@ import type { Operation, Stream } from "./types.ts";
export function map<A, B>(op: (a: A) => Operation<B>) {
return function <TClose>(stream: Stream<A, TClose>): Stream<B, TClose> {
return {
*[Symbol.iterator]() {
let subscription = yield* stream;

subscribe() {
return {
*next() {
let next = yield* subscription.next();
*[Symbol.iterator]() {
let subscription = yield* stream.subscribe();

return {
*next() {
let next = yield* subscription.next();

if (next.done) {
return next;
} else {
return { ...next, value: yield* op(next.value) };
}
if (next.done) {
return next;
} else {
return { ...next, value: yield* op(next.value) };
}
},
};
},
};
},
Expand Down
2 changes: 1 addition & 1 deletion lib/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export * from "./result.ts";
export * from "./map.ts";
export * from "./filter.ts";
export * from "./pipe.ts";
export * from "./op.ts";
export * from "./lift.ts";
export * from "./events.ts";
export * from "./main.ts";
export * from "./all.ts";
Expand Down
19 changes: 0 additions & 19 deletions lib/op.ts

This file was deleted.

21 changes: 8 additions & 13 deletions lib/signal.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Stream } from "./types.ts";
import type { Stream, Subscription } from "./types.ts";

import { createQueue, type Queue } from "./queue.ts";
import { resource } from "./instructions.ts";
Expand All @@ -15,25 +15,25 @@ import { resource } from "./instructions.ts";
* import { createSignal, each } from "effection";
*
* export function* logClicks(function*(button) {
* let { send, stream } = createSignal<MouseEvent>();
* let clicks = createSignal<MouseEvent>();
*
* button.addEventListener("click", send);
* button.addEventListener("click", clicks.send);
*
* try {
* for (let click of yield* each(stream)) {
* for (let click of yield* each(clicks)) {
* console.log(`click:`, click);
* yield* each.next();
* }
* } finally {
* button.removeEventListener("click", send);
* button.removeEventListener("click", clicks.send);
* }
* })
* ````
*
* @typeParam T - type of each event sent by this signal
* @typeParam TClose - type of the final event sent by this signal
*/
export interface Signal<T, TClose> {
export interface Signal<T, TClose> extends Stream<T, TClose> {
/**
* Send a value to all the consumers of this signal.
*
Expand All @@ -46,11 +46,6 @@ export interface Signal<T, TClose> {
* @param value - the final value.
*/
close(value: TClose): void;

/**
* The {@link Stream} that receives events that are sent to this signal.
*/
stream: Stream<T, TClose>;
}

/**
Expand Down Expand Up @@ -83,7 +78,7 @@ export interface Signal<T, TClose> {
export function createSignal<T, TClose = never>(): Signal<T, TClose> {
let subscribers = new Set<Queue<T, TClose>>();

let stream: Stream<T, TClose> = resource(function* Subscription(provide) {
let useSubscription = resource<Subscription<T, TClose>>(function* (provide) {
let queue = createQueue<T, TClose>();
subscribers.add(queue);

Expand All @@ -106,5 +101,5 @@ export function createSignal<T, TClose = never>(): Signal<T, TClose> {
}
}

return { send, close, stream };
return { send, close, subscribe: () => useSubscription };
}
28 changes: 4 additions & 24 deletions lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ export interface Scope {
*
* @see https://frontside.com/effection/docs/collections#stream
*/
export type Stream<T, TReturn> = Operation<Subscription<T, TReturn>>;
//export type Stream<T, TReturn> = Operation<Subscription<T, TReturn>>;
export interface Stream<T, TReturn> {
subscribe(): Operation<Subscription<T, TReturn>>;
}

/**
* The Effection equivalent of an [`AsyncIterator`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/AsyncIterator)
Expand All @@ -233,29 +236,6 @@ export interface Subscription<T, R> {
next(): Operation<IteratorResult<T, R>>;
}

export interface Port<T, R> {
send(message: T): Operation<void>;
close(value: R): Operation<void>;
}

/**
* A broadcast channel that multiple consumers can subscribe to the
* via the same {@link Stream}, and messages sent to the channel are
* received by all consumers. The channel is not buffered, so if there
* are no consumers, the message is dropped.
*/
export interface Channel<T, TClose> {
/**
* The port through which messages to the channel are sent.
*/
input: Port<T, TClose>;

/**
* The stream through which all messages to the channel are read.
*/
output: Stream<T, TClose>;
}

/**
* `Context`` defines a value which is in effect for a given scope which is an
* (action, resource, call, or spawn).
Expand Down
Loading