Permalink
Browse files

Javelin Reactor, a Functional Reactive Programming library

Summary:
A Functional Reactive Programming library for Javelin,
inspired by Flapjax's library mode.
 * DOM manipulation, coding style, infrastructure and utilities provided by
Javelin
 * Reverse post-order traversal instead of priority queue for event propagation
to deal with cycles
 * Flapjax's "behaviors" are called dynamic values, or "DynVals", to avoid
conflicting with Javelin Behaviors
 * Flapjax's "event streams" are called reactor nodes, or "ReactorNodes", to
avoid conflicting with browser Events

Test Plan: Example page to be updated and released later.

Reviewers: epriestley

Reviewed By: epriestley

CC: aran, epriestley

Differential Revision: 848
  • Loading branch information...
1 parent 0cdaf85 commit 14d3ba10a7f885639cd679d3f7ffd59b68a472d1 adonohue committed Aug 22, 2011
@@ -0,0 +1,48 @@
+/**
+ * @provides javelin-dynval
+ * @requires javelin-install
+ * javelin-reactornode
+ * javelin-util
+ * javelin-reactor
+ * @javelin
+ */
+
+JX.install('DynVal', {
+ members : {
+ _lastPulseVal : null,
+ _reactorNode : null,
+ getValueNow : function() {
+ return this._lastPulseVal;
+ },
+ getChanges : function() {
+ return this._reactorNode;
+ },
+ forceValueNow : function(value) {
+ this.getChanges().forceSendValue(value);
+ },
+ transform : function(fn) {
+ return new JX.DynVal(
+ this.getChanges().transform(fn),
+ fn(this.getValueNow())
+ );
+ },
+ calm : function(min_interval) {
+ return new JX.DynVal(
+ this.getChanges().calm(min_interval),
+ this.getValueNow()
+ );
+ }
+ },
+ construct : function(stream, init) {
+ this._lastPulseVal = init;
+ this._reactorNode =
+ new JX.ReactorNode([stream], JX.bind(this, function(pulse) {
+ if (this._lastPulseVal == pulse) {
+ return JX.Reactor.DoNotPropagate;
+ }
+ this._lastPulseVal = pulse;
+ return pulse;
+ }));
+ }
+});
+
@@ -0,0 +1,92 @@
+/**
+ * @provides javelin-reactor
+ * @requires javelin-install
+ * javelin-dynval
+ * javelin-reactornode
+ * javelin-util
+ * @javelin
+ */
+
+JX.install('Reactor', {
+ statics : {
+ /**
+ * Return this value from a ReactorNode transformer to indicate that
+ * its listeners should not be activated.
+ */
+ DoNotPropagate : {},
+ /**
+ * For internal use by the Reactor system.
+ */
+ propagatePulse : function(start_pulse, start_node) {
+ var reverse_post_order =
+ JX.Reactor._postOrder(start_node).reverse();
+ start_node.primeValue(start_pulse);
+
+ for (var ix = 0; ix < reverse_post_order.length; ix++) {
+ var node = reverse_post_order[ix];
+ var pulse = node.getNextPulse();
+ if (pulse === JX.Reactor.DoNotPropagate) {
+ continue;
+ }
+
+ var next_pulse = node.getTransformer()(pulse);
+ var sends_to = node.getListeners();
+ for (var jx = 0; jx < sends_to.length; jx++) {
+ sends_to[jx].primeValue(next_pulse);
+ }
+ }
+ },
+ /**
+ * For internal use by the Reactor system.
+ */
+ _postOrder : function(node, result, pending) {
+ if (typeof result === "undefined") {
+ result = [];
+ pending = {};
+ }
+ pending[node.getGraphID()] = true;
+
+ var nexts = node.getListeners();
+ for (var ix = 0; ix < nexts.length; ix++) {
+ var next = nexts[ix];
+ if (pending[next.getGraphID()]) {
+ continue;
+ }
+ JX.Reactor._postOrder(next, result, pending);
+ }
+
+ result.push(node);
+ return result;
+ },
+
+ // Helper for lift.
+ _valueNow : function(fn, dynvals) {
+ var values = [];
+ for (var ix = 0; ix < dynvals.length; ix++) {
+ values.push(dynvals[ix].getValueNow());
+ }
+ return fn.apply(null, values);
+ },
+
+ /**
+ * Lift a function over normal values to be a function over dynvals.
+ * @param fn A function expecting normal values
+ * @param dynvals Array of DynVals whose instaneous values will be passed
+ * to fn.
+ * @return A DynVal representing the changing value of fn applies to dynvals
+ * over time.
+ */
+ lift : function(fn, dynvals) {
+ var valueNow = JX.bind(null, JX.Reactor._valueNow, fn, dynvals);
+
+ var streams = [];
+ for (var ix = 0; ix < dynvals.length; ix++) {
+ streams.push(dynvals[ix].getChanges());
+ }
+
+ var result = new JX.ReactorNode(streams, valueNow);
+ return new JX.DynVal(result, valueNow());
+ }
+ }
+});
+
@@ -0,0 +1,97 @@
+/**
+ * @provides javelin-reactornode
+ * @requires javelin-install
+ * javelin-reactor
+ * javelin-util
+ * javelin-reactor-node-calmer
+ * @javelin
+ */
+
+JX.install('ReactorNode', {
+ members : {
+ _transformer : null,
+ _sendsTo : null,
+ _nextPulse : null,
+ _graphID : null,
+
+ getGraphID : function() {
+ return this._graphID || this.__id__;
+ },
+
+ setGraphID : function(id) {
+ this._graphID = id;
+ return this;
+ },
+
+ setTransformer : function(fn) {
+ this._transformer = fn;
+ return this;
+ },
+
+ /**
+ * Set up dest as a listener to this.
+ */
+ listen : function(dest) {
+ this._sendsTo[dest.__id__] = dest;
+ return { remove : JX.bind(null, this._removeListener, dest) };
+ },
+ /**
+ * Helper for listen.
+ */
+ _removeListener : function(dest) {
+ delete this._sendsTo[dest.__id__];
+ },
+ /**
+ * For internal use by the Reactor system
+ */
+ primeValue : function(value) {
+ this._nextPulse = value;
+ },
+ getListeners : function() {
+ var result = [];
+ for (var k in this._sendsTo) {
+ result.push(this._sendsTo[k]);
+ }
+ return result;
+ },
+ /**
+ * For internal use by the Reactor system
+ */
+ getNextPulse : function(pulse) {
+ return this._nextPulse;
+ },
+ getTransformer : function() {
+ return this._transformer;
+ },
+ forceSendValue : function(pulse) {
+ JX.Reactor.propagatePulse(pulse, this);
+ },
+ // fn should return JX.Reactor.DoNotPropagate to indicate a value that
+ // should not be retransmitted.
+ transform : function(fn) {
+ return new JX.ReactorNode([this], fn);
+ },
+
+ /**
+ * Suppress events to happen at most once per min_interval.
+ * The last event that fires within an interval will fire at the end
+ * of the interval. Events that are sandwiched between other events
+ * within an interval are dropped.
+ */
+ calm : function(min_interval) {
+ var result = new JX.ReactorNode([this], JX.id);
+ var transformer = new JX.ReactorNodeCalmer(result, min_interval);
+ result.setTransformer(JX.bind(transformer, transformer.onPulse));
+ return result;
+ }
+ },
+ construct : function(source_streams, transformer) {
+ this._nextPulse = JX.Reactor.DoNotPropagate;
+ this._transformer = transformer;
+ this._sendsTo = {};
+ for (var ix = 0; ix < source_streams.length; ix++) {
+ source_streams[ix].listen(this);
+ }
+ }
+});
+
@@ -0,0 +1,48 @@
+/**
+ * @provides javelin-reactor-node-calmer
+ * @requires javelin-install
+ * javelin-reactor
+ * javelin-util
+ * @javelin
+ */
+
+JX.install('ReactorNodeCalmer', {
+ properties : {
+ lastTime : 0,
+ timeout : null,
+ minInterval : 0,
+ reactorNode : null,
+ isEnabled : true
+ },
+ construct : function(node, min_interval) {
+ this.setLastTime(-min_interval);
+ this.setMinInterval(min_interval);
+ this.setReactorNode(node);
+ },
+ members: {
+ onPulse : function(pulse) {
+ if (!this.getIsEnabled()) {
+ return pulse;
+ }
+ var current_time = new Date().getTime();
+ if (current_time - this.getLastTime() > this.getMinInterval()) {
+ this.setLastTime(current_time);
+ return pulse;
+ } else {
+ clearTimeout(this.getTimeout());
+ this.setTimeout(setTimeout(
+ JX.bind(this, this.send, pulse),
+ this.getLastTime() + this.getMinInterval() - current_time
+ ));
+ return JX.Reactor.DoNotPropagate;
+ }
+ },
+ send : function(pulse) {
+ this.setLastTime(new Date().getTime());
+ this.setIsEnabled(false);
+ this.getReactorNode().forceSendValue(pulse);
+ this.setIsEnabled(true);
+ }
+ }
+});
+
Oops, something went wrong.

0 comments on commit 14d3ba1

Please sign in to comment.