Skip to content

tgriesser/es-observable

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

56 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

ECMAScript Observable

This proposal introduces an Observable type to the ECMAScript standard library. The Observable type can be used to model push-based data sources such as DOM events, timer intervals, and sockets. In addition, observables are:

  • Compositional: Observables can be composed with higher-order combinators.
  • Lazy: Observables do not start emitting data until an observer has subscribed.
  • Integrated with ES6: Data is sent to consumers using the ES6 generator interface.

The Observable concept comes from reactive programming. See http://reactivex.io/ for more information.

Example: Observing Keyboard Events

Using the Observable constructor, we can create a function which returns an observable stream of events for an arbitrary DOM element and event type.

function listen(element, eventName) {
    return new Observable(sink => {
        // Create an event handler which sends data to the sink
        let handler = event => sink.next(event);

        // Attach the event handler
        element.addEventListener(eventName, handler, true);

        // Return a function which will cancel the event stream
        return _ => {
            // Detach the event handler from the element
            element.removeEventListener(eventName, handler, true);

            // Terminate the stream
            sink.return();
        };
    });
}

We can then use standard combinators to filter and map the events in the stream, just like we would with an array.

// Return an observable of special key down commands
function commandKeys(element) {
    let keyCommands = { "38": "up", "40": "down" };

    return listen(element, "keydown")
        .filter(event => event.keyCode in keyCommands)
        .map(event => keyCommands[event.keyCode])
}

When we want to consume the event stream, we subscribe with an observer.

commandKeys(inputElement).subscribe({
    next(value) { console.log("Recieved key command: " + value) },
    throw(error) { console.log("Recieved an error: " + error) },
    return() { console.log("Stream complete") },
});

Because observers implement the ES6 generator interface, we can use a generator function to consume the events.

function consumer() {
    let generator = function*() {
        while (true) {
            console.log("Recieved key command: " + (yield));
        }
    }();

    // "Prime" the generator so that it can receive the first value from the producer
    generator.next();
    return generator;
}

commandKeys(inputElement).subscribe(consumer());

API Specification

This specification is a work-in-progress. Please see the polyfill for a more complete implementation.

Observable(subscriber)

The Observable constructor initializes a new Observable object. It is not intended to be called as a function and will throw an exception when called in that manner.

The subscriber argument must be a function object. It is called each time the subscribe method of the Observable object is invoked. The subscriber function is called with a wrapped observer object and may optionally return a function which will cancel the subscription, or an object which has an "unsubscribe" method.

The Observable constructor performs the following steps:

  1. If NewTarget is undefined, throw a TypeError exception.
  2. If IsCallable(subscriber) is false, throw a TypeError exception.
  3. Let observable be OrdinaryCreateFromConstructor(NewTarget, "%ObservablePrototype%", «‍[[Subscriber]]» ).
  4. ReturnIfAbrupt(observable).
  5. Set observable's [[Subscriber]] internal slot to subscriber.
  6. Return observable.

Observable.prototype.subscribe(observer)

The subscribe function schedules a subscription job to begin sending values to the supplied observer object. It returns a subscription object which may be used to cancel the subscription.

The subscribe function performs the following steps:

  1. Let O be the this value.
  2. If Type(O) is not Object, throw a TypeError exception.
  3. If Type(observer) is not Object, throw a TypeError exception.
  4. Let unsubscribed be false.
  5. Let innerSubscription be undefined.
  6. Let startSubscription be a new built-in anonymous function which performs the following steps:
    1. If unsubscribed is false,
      1. Let subscribeResult be Invoke(O, @@observer, «‍observer»).
      2. ReturnIfAbrupt(subscribeResult).
      3. Let innerSubscription be subscribeResult.
    2. Return undefined.
  7. Perform EnqueueJob("SubscriptionJobs", ObservableSubscribeJobs, «‍startSubscription»).
  8. Let unsubscribe be a new built-in anonymous function which performs the following steps:
    1. If unsubscribed is true, return undefined.
    2. Let unsubscribed be true.
    3. If innerSubscription is not undefined,
      1. If Type(innerSubscription) is not Object, throw a TypeError exception.
      2. Let unsubscribeResult be Invoke(innerSubscription, "unsubscribe", «»).
      3. ReturnIfAbrupt(unsubscribeResult).
      4. Return undefined.
  9. Let subscription be ObjectCreate(%ObjectPrototype%).
  10. Perform CreateDataProperty(subscription, "unsubscribe", unsubscribe).
  11. Return subscription.

Observable.prototype[@@observer](observer)

The @@observer function begins sending values to the supplied observer object by executing the Observable object's subscriber function. It returns a subscription object which may be used to cancel the subscription.

The @@observer function is intended to be used by observable libraries that need to subscribe to an observable without deferring execution to the subscription job queue.

The @@observer function performs the following steps:

  1. Let O be the this value.
  2. If Type(O) is not Object, throw a TypeError exception.
  3. If O does not have an [[Subscriber]] internal slot, throw a TypeError exception.
  4. If Type(observer) is not Object, throw a TypeError exception.
  5. Let observer be CreateSubscriptionObserver(observer).
  6. ReturnIfAbrupt(observer).
  7. Let subscriber be the value of O's [[Subscriber]] internal slot.
  8. Assert: IsCallable(subscriber) is true.
  9. Let subscriberResult be ExecuteSubscriber(subscriber, observer).
  10. If subscriberResult is an abrupt completion,
    1. Let throwResult be Invoke(subscriptionObserver, "throw"", «‍subscriberResult.[[value]]»).
    2. ReturnIfAbrupt(throwResult).
  11. Else, set the [[Subscription]] internal slot of observer to subscriberResult.[[value]].
  12. If the value of the [[Observer]] internal slot of observer is undefined,
    1. Let cancelResult be CancelSubscription(observer).
    2. ReturnIfAbrupt(cancelResult).
  13. Return subscription.

ExecuteSubscriber(subscriber, observer)

The abstract operation ExecuteSubscriber with arguments subscriber and observer performs the following steps:

  1. Assert: IsCallable(subscriber) is true.
  2. Assert: Type(observer) is Object.
  3. Let subscriberResult be Call(subscriber, undefined, observer).
  4. ReturnIfAbrupt(subscriberResult).
  5. Let isSubscription be HasUnsubscribe(subscriptionResult).
  6. ReturnIfAbrupt(isSubscription).
  7. If isSubscription is true, let subscription be subscriberResult.
  8. Else, if IsCallable(subscriberResult) is true, let subscription be CreateSubscription(subscriberResult).
  9. Else,
    1. Let unsubscribe be a new built-in anonymous function that performs the following steps:
      1. Let result be Invoke(observer, "return", «»).
      2. ReturnIfAbrupt(result).
      3. Return undefined.
    2. Let subscriber be CreateSubscription(unsubscribe).
  10. Return subscription.

HasUnsubscribe(x) Abstract Operation

The abstract operation HasUnsubscribe with argument x performs the following steps:

  1. If Type(x) is not Object, return false.
  2. Let unsubscribe be Get(x, "unsubscribe").
  3. ReturnIfAbrupt(unsubscribe).
  4. Return IsCallable(unsubscribe).

CreateSubscription(unsubscribe) Abstract Operation

The abstract operation CreateSubscription with argument unsubscribe performs the following steps:

  1. Let subscription be ObjectCreate(%ObjectPrototype%).
  2. Perform CreateDataProperty(subscription, "unsubscribe", unsubscribe).
  3. Return subscription.

Observable.prototype.forEach(callbackfn [, thisArg])

The forEach function subscribes to the Observable object, calling callbackfn once for each value in the sequence. It returns a Promise object which is either fulfilled with undefined when the sequence terminates normally or rejected with the error value of the sequence.

The forEach function performs the following steps:

  1. Let O be ToObject(this value).
  2. ReturnIfAbrupt(O).
  3. If thisArg was supplied, let T be thisArg; else let T be undefined.
  4. Let promiseCapability be NewPromiseCapability(%Promise%).
  5. ReturnIfAbrupt(promiseCapability).
  6. If IsCallable(callbackfn) is false,
    1. Let rejectResult be Call(promiseCapability.[[Reject]], undefined, «a newly created TypeError object»).
    2. ReturnIfAbrupt(rejectResult).
    3. Return promiseCapability.[[Promise]].
  7. Let observerNext be a new built-in anonymous function which performs the following steps when called with argument value:
    1. Let nextResult be Call(callbackfn, T, «‍value»).
    2. ReturnIfAbrupt(nextResult).
    3. Return undefined.
  8. Let observerThrow be promiseCapability.[[Reject]].
  9. Let observerReturn be a new built-in anonymous function which performs the following steps when called with argument value:
    1. Let returnResult be Call(promiseCapability.[[Resolve]], undefined, «‍undefined»).
    2. ReturnIfAbrupt(returnResult).
    3. Return undefined.
  10. Let observer be ObjectCreate(%ObjectPrototype%).
  11. Perform CreateDataProperty(observer, "next", observerNext).
  12. Perform CreateDataProperty(observer, "throw", observerThrow).
  13. Perform CreateDataProperty(observer, "return", observerReturn).
  14. Let result be Invoke(O, "subscribe", «‍observer»).
  15. IfAbruptRejectPromise(result, promiseCapability).
  16. Return promiseCapability.[[Promise]].

Subscription Observer Objects

A Subscription Observer is an object which wraps the observer argument supplied to the subscribe method of Observable objects. Subscription Observer objects are passed as the single parameter to an observable's subscriber function. They enforce the following guarantees:

  • If the observer's next method returns an iterator result object with a done property whose value is true, then the observer will not be invoked again and the observable's cancellation function will be called.
  • If the observer's throw method is called, then the observer will not be invoked again and the observable's cancellation function will be called.
  • If the observer's return method is called, then the observer will not be invoked again and the observable's cancellation function will be called.
  • The observable's cancellation function will be called at most one time.
  • After the cancellation function has returned, the observer will not be invoked again.

In addition, Subscription Observer objects provide default behaviors when the observer does not implement throw or return.

CreateSubscriptionObserver(observer) Abstract Operation

The abstract operation CreateSubscriptionObserver with argument observer is used to create a normalized observer which can be supplied the an observable's subscriber function. It performs the following steps:

  1. Assert: Type(observer) is Object.
  2. Let subscriptionObserver be ObjectCreate(%SubscriptionObserverPrototype%, «‍[[Observer]], [[Subscription]]»).
  3. Set subscriptionObserver's [[Observer]] internal slot to observer.
  4. Set subscriptionObserver's [[Subscription]] internal slot to subscription.
  5. Return subscriptionObserver.

CloseSubscription(subscriptionObserver) Abstract Operation

The abstract operation CloseSubscription with argument subscriptionObserver performs the following steps:

  1. Assert: The value of the [[Observer]] internal slot of subscriptionObserver is not undefined.
  2. Set the value of the [[Observer]] internal slot of subscriptionObserver to undefined.
  3. Return CancelSubscription(subscriptionObserver).

CancelSubscription(subscriptionObserver) Abstract Operation

The abstract operation CancelSubscription with argument subscriptionObserver performs the following steps:

  1. Let subscription be the value of the [[Subscription]] internal slot of subscriptionObserver.
  2. If subscription is undefined, return undefined.
  3. Assert: Type(subscription) is Object.
  4. Set the value of the [[Subscription]] internal slot of subscriptionObserver to undefined.
  5. Let result be Invoke(subscription, "unsubscribe", «‍»).
  6. Set the value of the [[Observer]] internal slot of subscriptionObserver to undefined.
  7. ReturnIfAbrupt(result).
  8. Return undefined.

The %SubscriptionObserverPrototype% Object

All Subscription Observer objects inherit properties from the %SubscriptionObserverPrototype% intrinsic object. The %SubscriptionObserverPrototype% object is an ordinary object and its [[Prototype]] internal slot is the %ObjectPrototype% intrinsic object. In addition, %SubscriptionObserverPrototype% has the following properties:

%SubscriptionObserverPrototype%.next(value)

  1. Let O be the this value.
  2. If Type(O) is not Object, throw a TypeError exception.
  3. If O does not have all of the internal slots of a Subscription Observer instance, throw a TypeError exception.
  4. Let subscription be the value of the [[Subscription]] internal slot of O.
  5. Let observer be the value of the [[Observer]] internal slot of O.
  6. If observer is undefined, return CreateIterResultObject(undefined, true).
  7. Let result be Invoke(observer, "next", «‍value»).
  8. Let closeSubscription be false.
  9. If result is an abrupt completion,
    1. Let closeSubscription be true.
  10. Else, if Type(result.[[value]]) is Object,
    1. Let closeSubscription be IteratorComplete(result.[[value]]).
  11. If closeSubscription is true,
    1. Let closeResult be CloseSubscription(subscription).
    2. ReturnIfAbrupt(closeResult).
  12. Return Completion(result).

%SubscriptionObserverPrototype%.throw(exception)

  1. Let O be the this value.
  2. If Type(O) is not Object, throw a TypeError exception.
  3. If O does not have all of the internal slots of a Subscription Observer instance, throw a TypeError exception.
  4. Let subscription be the value of the [[Subscription]] internal slot of O.
  5. Let observer be the value of the [[Observer]] internal slot of O.
  6. If observer is undefined, return Completion{[[type]]: throw, [[value]]: exception, [[target]]: empty}.
  7. Set the value of the [[Observer]] internal slot of O to undefined.
  8. Let result be Get(observer, "throw").
  9. If result.[[type]] is normal,
    1. Let throwAction be result.[[value]].
    2. If IsCallable(throwAction) is true,
      1. Let result be Call(throwAction, observer, «‍exception»).
    3. Else,
      1. Let result be Completion{[[type]]: throw, [[value]]: exception, [[target]]: empty}.
  10. Let cancelResult be CancelSubscription(subscription).
  11. ReturnIfAbrupt(cancelResult).
  12. Return Completion(result).

%SubscriptionObserverPrototype%.return(value)

  1. Let O be the this value.
  2. If Type(O) is not Object, throw a TypeError exception.
  3. If O does not have all of the internal slots of a Subscription Observer instance, throw a TypeError exception.
  4. Let subscription be the value of the [[Subscription]] internal slot of O.
  5. Let observer be the value of the [[Observer]] internal slot of O.
  6. If observer is undefined, return CreateIterResultObject(value, true).
  7. Set the value of the [[Observer]] internal slot of O to undefined.
  8. Let result be Get(observer, "return").
  9. If result.[[type]] is normal,
    1. Let returnAction be result.[[value]].
    2. If IsCallable(returnAction) is true,
      1. Let result be Call(returnAction, observer, «‍value»).
    3. Else,
      1. Let result be NormalCompletion(CreateIterResultObject(value, true)).
  10. Let cancelResult be CancelSubscription(subscription).
  11. ReturnIfAbrupt(cancelResult).
  12. Return Completion(result).

About

Observables for ECMAScript

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • JavaScript 99.1%
  • Shell 0.9%