Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revamp Subscription, Stream APIs to make combinators easy #693

Closed
cowboyd opened this issue Apr 6, 2023 · 4 comments
Closed

Revamp Subscription, Stream APIs to make combinators easy #693

cowboyd opened this issue Apr 6, 2023 · 4 comments

Comments

@cowboyd
Copy link
Member

cowboyd commented Apr 6, 2023

Right now a stream is a context-free operation that when run, yields a subscription that is coupled to the current context. And the subscription will yield the next item, and importantly, when that context passes away, the subscription will be automatically be released and do whatever cleanup is necessary.

It looks like this:

export interface Operation<T> {
  [Symbol.iterator](): Iterator<Instruction, T, any>;
}

export type Stream<T, TReturn> = Operation<Subscription<T, TReturn>>;

export type Subscription<T, TReturn> = Operation<IteratorResult<T, TReturn>>;

But this is annoying because streams and subscriptions are very similar in the way they behave. The only difference is that one is "concrete" and the other is abstract. However, we would expect the same kind of result for both yield* first(stream) and yield* first(subscription) and so we'd like for first not to care if it is operating on a Stream or a Subscription. The same applies to filter, map, forEach, etc....

I think the answer might lie in more closely aligning Stream and Subscription with Iterable and Iterator and AsyncIterable, and AsyncIterator which is what they really are: Stream === OperationIterable, Subscription === Operation Iterator.

  1. AsyncIterators are AsyncIterable by returning themselves for the [Symbol.asyncIterator]() method.
  2. Generators are Iterable by returning themselves for the [Symbol.iterator]() method.

I propose that we make Subscription implement Stream, by returning itself for the [Symbol.iterator] method. That way, all combinators can be implemented to work on Stream, and a live subscription is just a special type of stream to which you are already subscribed. The new types would look like:

export interface Operation<T> {
  [Symbol.iterator](): Iterator<Instruction, T, any>;
}

export type Stream<T, TReturn> = Operation<Subscription<T, TReturn>>;

export interface Subscription<T, TReturn> extends Stream<T,TReturn> {
  next(): Operation<IteratorResult<T, TReturn>>
}

Now, if we define forEach to work on a Stream:

declare function* forEach<T,R>(stream: Stream<T,R>, each?: (t: T) => Operation<void>): Operation<R>;

Then, it will work on both a stream, and a live subscription. The same applies for all our combinators you would care to write.

@taras
Copy link
Member

taras commented Apr 7, 2023

This sounds like a nice simplification.

@cowboyd
Copy link
Member Author

cowboyd commented Apr 7, 2023

The one difficulty is that it is annoying to have to save the back reference to Subscription when creating it by hand:

let subscription = {
  *next() {
     // implement next
  },
  *[Symbol.iterator]() { return subscription; },
};
return subscription;

Perhaps we can make a helper "createSubscription()" that helps you do it my just accepting the content of next:

createSubscription(function* next() {
  // implementation of next operation.
});

So for example, the implementation of Subscription in Channel would be something like:

subscription: createSubscription(function* next() {
  let message = items.pop();
  if (message) {
    return message;
  } else {
    return yield* action<Item>(function* (resolve) {
       consumers.unshift(resolve);
    });
  }
}),

Thus far, it has been extremely rare to create subscriptions by hand, since most of the time, I just end up using Channel for lower level ops.

@cowboyd cowboyd mentioned this issue Apr 19, 2023
@cowboyd
Copy link
Member Author

cowboyd commented Apr 24, 2023

I'm going to advocate that we first disambiguate streams from subscriptions (you should never need to deal with a subscription directly) and then we can add make subscriptions streamable individually which will be possible if they have separate API.

cowboyd added a commit that referenced this issue Apr 24, 2023
> fixes #693

In order to simplify working with streams and subscriptions, we need
to be able to disambiguate them easily. The relationship between
`Stream` and `Subscription` is the same as that between `Iterable`
-> `Iterator` and `AsyncIterable` -> `AsyncIterator`.

This converts `Subscription` into an API that is highly analogous to
`AsyncIterator`. The `next()` method returns an `Operation` which
yields an `IteratorResult` So in the same way `AsyncIterator` has:

```ts
next(): Promise<IteratorResult>;
```

The `Subscription` api will look like:

```ts
next(): Operation<IteratorResult>;
```

Once we have this in place, it is trivial to create a stream out of
any Subscription:

```ts
function streamable(subscription: Subscription): Stream {
  return ({
    *[Symbol.iterator]() {
      return subscription;
    }
  });
}
```
cowboyd added a commit that referenced this issue Apr 24, 2023
> fixes #693

In order to simplify working with streams and subscriptions, we need
to be able to disambiguate them easily. The relationship between
`Stream` and `Subscription` is the same as that between `Iterable`
-> `Iterator` and `AsyncIterable` -> `AsyncIterator`.

This converts `Subscription` into an API that is highly analogous to
`AsyncIterator`. The `next()` method returns an `Operation` which
yields an `IteratorResult` So in the same way `AsyncIterator` has:

```ts
next(): Promise<IteratorResult>;
```

The `Subscription` api will look like:

```ts
next(): Operation<IteratorResult>;
```

Once we have this in place, it is trivial to create a stream out of
any Subscription:

```ts
function streamable(subscription: Subscription): Stream {
  return ({
    *[Symbol.iterator]() {
      return subscription;
    }
  });
}
```
@cowboyd
Copy link
Member Author

cowboyd commented Apr 26, 2023

Closed by #696

@cowboyd cowboyd closed this as completed Apr 26, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants