Latest commit d66b056 Dec 11, 2017 @nafg nafg Delete .gitignore'd files



Reactive is a Scala library for Functional Reactive Programming. Its API is modeled somewhat after the Scala collections framework, as explained below.
Functional reactive programming provides a means to build complex changing “state” without actually using mutable state primitives directly. A common application is a user interface, which involves a lot of components interacting with each other and changing.
Traditionally user interfaces are programmed in an imperative manner, involving a lot of shared mutable state. This means that the code is more difficult to reason about, and as a result more prone to error. In addition mutable state often requires extra care to ensure that all the parts of code that access the state are considerate of each other.
The functional programming paradigm encourages a more declarative approach, where functions don’t rely on outside factors, and thus are more composable. However often it is not easy to see how to use this approach for situations that involve a lot of changes, such as a user interface.
Functional reactive programming provides a solution to this, by defining two complementary concepts: signals and event streams, the two kinds of reactive values. A signal represents a value that can change, but at any given time it has a value. It is said to be continuous. On the other hand, an event stream does not hold a value for any duration, but it sends out values at specific points in time. It is said to be discrete. A signal’s changes are events, and one can define a signal based on an event stream whose value is always the most recent event, so in some sense they are two halves of a whole, although they are distinct concepts.
These two types of objects wrap the actual imperativeness and mutable state in their implementation, and provide an API that is completely functional, thus allowing the programmer to program in a purely functional style.
In Reactive, the methods of both trait Signal and trait EventStream seek to mimic methods of collections in the Scala collections framework. The programmer is encouraged to think of signals and event streams as being, in some sense, infinite sequences of values, except that each value only exists at one period or point in time. However, the semantics of many methods are different; they share their names with collection methods for comparison purposes only.


As an example, if you want to perform some behavior, such as invoking println, whenever an event stream fires an event, you can write

eventStream.foreach{e => println(e)}

or, using Scala’s for comprehension syntactic sugar,

for(e <- eventStream) println(e)

Unlike in the collections framework, foreach returns immediately. Since event streams do not retain values, there is nothing to do at the time it is invoked. Instead, the function passed to foreach is saved for later, and whenever the event stream fires an event it will be called.
Note: The current implementation of Signal is a thin wrapper around an event stream. Many methods that should exist on Signal do not yet, and should be invoked on its change EventStream. As an example, to do something every time its value changes, call foreach on its change member.


Similarly, you can generate transformed signals or event streams using many methods named after the collections equivalent. For example, map returns a new signal or event stream whose value is calculated relative to the value of the original signal or event stream using a specified function.

val mapped1 = evenStream map (_ + 10)
val mapped2 = eventStream map {case -1 => None case n => Some(n)}
val mapped3 = for(e <- eventStream) yield e + 10
val mapped4 = signal map (_ + 10)


In the collections framework, flatMap is similar to map in that it takes a function that returns a new value based on an old value. However, unlike map which returns a collection containing all those new values, flatMap takes a function that, for each old value, returns a separate collection; and flatMap returns a new collection which is the concatenation of all those new collections. That’s not an exact specification of its behavior, but it will suffice for the comparison.
In Reactive, flatMap takes a function that goes from a value in the old signal or event stream to a new signal or event stream. For instance, @EventStream@’s flatMap method takes a function that must return an EventStream. The behavior of the new event stream is as follows. Whenever the original event stream fires a value, the new event stream acts like the event stream returned by the function when passed the value of the event fired.
For example, suppose an EventStream called ticks, which fires the number of seconds since the past midnight, every second. Suppose also an EventStream called mouseBtn1, which fires true the mouse button 1 is pressed, and false when it’s released. And suppose a third EventStream called empty, which never fires events.
The following code

mouseBtn1 flatMap {case true => ticks  case false => empty}

returns a new event stream that while the mouse button is down fires every second the number of seconds since midnight. When the mouse button is up it doesn’t fire any events.

The same concept can be applied to signals. Given a Signal ticks that holds the number of seconds since midnight, and a Signal mouseBtn1 that holds the mouse buttons state, true for pressed and false for released, the following code

val empty = Val(None: Option[Int])
mouseBtn1 flatMap {case true =>  case false => empty}

returns a new signal that, while the mouse button is down, holds the number of ticks since midnight wrapped in a Some (a subclass of Option), and while it’s up, holds the value None.


Use filter on an EventStream to return a new EventStream that fires only a subset of events fired by the original EventStream. For instance, to fire the number of minutes since midnight:

val minutes = ticks.filter(_ % 60 == 0).map(_ / 60)


You can use takeWhile on an EventStream to return a new EventStream that ceases firing events after a condition evaluates to false. For instance, to create an EventStream that fires every second, only the first hour after midnight, you can write:

ticks.takeWhile(_ < 60*60)

Note that once the condition evaluates to false the resulting EventStream will never fire again.


There are more methods; see the source, tests, and scaladocs.


There is a subtype of Signal called SeqSignal, which can be used for a signal that needs to hold a scala.Seq. In addition to @Signal@’s change EventStram, it adds the ability to fire events indicating exactly what changed. Here’s an example:

val bufferSignal = BufferSignal(1,2,3)
val mapped = bufferSignal map (_ * 10)
mapped.deltas foreach println += 4

The output should be:


indicating that the mapped signal, whose value was initially Seq(10,20,30), has had the value 40 inserted at position 3, the end.

Also, you can assign a new sequence directly to a BufferSignal and it will calculate the diff and fire corresponding deltas.

Memory management

There is a danger of a memory leak, which can happen as follows. Suppose a window pops up, providing a view on an event stream ticks (such as the one before). It does this by adding a listener to the event stream via the foreach method. Now, what happens when the window closes? The event stream exists outside of the window, but it still has the listener added with foreach. In fact who knows what objects the listener references and prevents from being garbage collected. We don’t want to require an explicit method call to remove the event listener; that would require an imperative approach.
For this reason, event streams always keep listeners wrapped in a WeakReference. Thus event streams don’t prevent their listeners from being garbage collected.
However, this presents a new problem: The listener may be garbage collected too soon. For instance, if the listener function passed to foreach is defined inside a local method, as soon as the method exits, there are no strong references to it and it may be garbage collected.
The solution is a trait called Observing, whose task is to hold strong references. Whenever you call foreach, and currently any method that returns a signal or event stream based on a previous one, there must be an Observing object available in the implicit scope. The easiest way to do this is simply to mix in this trait to the nearest enclosing class (it defines an implicit member that points to itself). foreach will add a reference to your listener function in that Observing object. Thus, the listener will not be garbage collected until there are no references to that Observing object. For instance, in the above scenario you might mix in Observing to your window class. Thus, your listener can be garbage collected only after the window instance is disposed.
If need be you can pass the Observing object in explicitly:

eventStream.foreach{e => doSomething(e)}(observing)

The above solution is based on Ingo Maier’s scala.react.

However, note that this only solves the memory management issue per se. For instance, in the above scenario although your listener is available for garbage collection as soon as the window is disposed, there is no guarantee when it actually will be, and as a result it may be invoked even after the window is closed. The solution to that problem is to use takeWhile. Derive a new event stream via takeWhile, specifying it should only last while the window is open. Then use the new, derived event stream. When an event is fired from the original event stream and the condition evaluates to false, the derived event stream will not fire, and will be removed.