Skip to content

A Simple FRP Implementation

Rohit edited this page Jan 13, 2017 · 9 revisions

Read the note on Signals and Vars from the last lecture.

We now develop a simple implementation of Signals and Vars, that we used in the last lecture, which together make up the basis of our approach to functional reactive programming.

The classes are assumed to be in a package frp.

Their user-facing APIs are summarized in the next slides.

Summary: The Signal API

class Signal[T](expr: => T) {
    def apply(): T = ???
}
object Signal {
    def apply[T](expr: => T) = new Signal(expr)
}
// Having the companion object Signal with the apply() method allows us to use the syntax Signal(expr)

Summary: The Var API

class Var[T](expr: => T) extends Signal[T](expr) {
    def update(expr: => T): Unit = ???
}
object Var {
    def apply[T](expr: => T) = new Var(expr)
}
// Having the companion object Var with the apply() method allows us to use the syntax Var(expr)

Implementation Idea

Each signal maintains 3 things:

  1. its current value,
  2. the expression that defines the signal value at the current time
  3. a set of observers: the other signals that depend on its value (if the signal changes, all observers need to be re-evaluated).

Dependency Maintenance

How do we record dependencies in observers?

  • When evaluating a signal-valued expression, need to know which signal caller gets defined or updated by the expression.
  • If we know that, then executing a sig() means adding caller to the observers of sig.
  • When signal sig’s value changes, all previously observing signals are re-evaluated and the set sig.observers is cleared.
  • Re-evaluation will re-enter a calling signal caller in sig.observers, as long as callers value still depends on sig.

Who’s Calling?

How do we find out on whose behalf a signal expression is evaluated?

One simple (simplistic?) way to do this is to maintain a global data structure referring to the current caller. (We will discuss and refine this later).

That data structure is accessed in a stack-like fashion because one evaluation of a signal might trigger others.

Stackable Variables

Here’s a class for stackable variables:

class StackableVariable[T](init: T) {
    private var values: List[T] = List(init)
    def value: T = values.head
    def withValue[R](newValue: T)(op: => R): R = {
        values = newValue :: values
        try op finally values = values.tail
    }
}

You use it like this

val caller = new StackableVar(initialSig)
caller.withValue(otherSig) { ... }
... caller.value ... // so while the other expression evaluates, the caller is already updated with otherSig and we can use caller.value to get the new value of caller.

Set Up in Object Signal

We also evaluate signal expressions at the top-level when there is no other signal that’s defined or updated. We use the "sentinel" object NoSignal as the caller for these expressions. Together:

object NoSignal extends Signal[Nothing](???) { ... } // to initialize the caller with an empty value
object Signal {
    val caller = new StackableVariable[Signal[_]](NoSignal) // Signal[_] means StackableVariable can take Signal of any value type
    def apply[T](expr: => T) = new Signal(expr)
}

The Signal Class

class Signal[T](expr: => T) {

    import Signal._
    private var myExpr: () => T = _
    private var myValue: T = _
    private var observers: Set[Signal[_]] = Set()
    update(expr)

    protected def update(expr: => T): Unit = {
        myExpr = ( () => expr ) // take the expr provided, and assign it to MyExpr
        computeValue()
    }

    protected def computeValue(): Unit = {
        myValue = caller.withValue(this)(myExpr())
    }

    def apply() = {
        observers += caller.value // add the current caller to the set of observers
        assert(!caller.value.observers.contains(this), "cyclic signal definition") // S() = S() + 1 i.e. signal which depends on itself cannot be observed.
        myValue
    }
}

Question: The Signal class still lacks an essential part. Which is it?

  • Error handling
  • Reevaluating callers
  • Constructing observers --> Reevaluating Callers

Reevaluating Callers

A signal’s current value can change when

  • somebody calls an update operation on a Var, or
  • the value of a dependent signal changes

Propagating requires a more refined implementation of computeValue:

protected def computeValue(): Unit = {
    val newValue = caller.withValue(this)(myExpr())
    if (myValue != newValue) {
        myValue = newValue            // assign the newValue to myValue
        val obs = observers           // take the observers into a local value obs
        observers = Set()             // clear the set of observers
        obs.foreach(_.computeValue()) // do a compute value for each observer
    }
}

Handling NoSignal

computeValue needs to be disabled for NoSignal because we cannot evaluate an expression of type Nothing:

object NoSignal extends Signal[Nothing](???) {
     override def computeValue() = ()
}

Handling Vars

Recall that Var is a Signal which can be updated by the client program. In fact, all necessary functionality is already present in class Signal; we just need to expose it:

class Var[T](expr: => T) extends Signal[T](expr) {
    override def update(expr: => T): Unit = super.update(expr) // re use the same implementation, and expose it publicly
}
object Var {
    def apply[T](expr: => T) = new Var(expr)
}

Discussion

Our implementation of FRP is quite stunning in its simplicity. But you might argue that it is too simplistic. In particular, it makes use of the worst kind of state: global state.

object Signal {
    val caller = new StackableVariable[Signal[_]](NoSignal) // accessed by all
    ...
}

One immediate problem is: What happens if we try to evaluate several signal expressions in parallel (in multiple threads, etc)?

  • The caller signal will become “garbled” by concurrent updates and may cause race conditions and unpredictable results.

Thread-Local State

One way to get around the problem of concurrent accesses to global state is to use synchronization.

But this blocks threads, can be slow, and can lead to deadlocks.

Another solution is to replace global state by thread-local state.

  • Thread-local state means that each thread accesses a separate copy of a variable.
  • It is supported in Scala through class scala.util.DynamicVariable.

Using Thread-Local State

The API of DynamicVariable matches the one of StackableVriable So we can simply swap it into our Signal implementation:

object Signal {
    val caller = new DynamicVariable[Signal[_]](NoSignal)
    ...
}

Disadvantages

Thread-local state still comes with a number of disadvantages:

  • Its imperative nature often produces hidden dependencies which are hard to manage.
  • Its implementation on the JDK involves a global hash table lookup, which can be a performance problem.
  • It does not play well in situations where threads are multiplexed between several tasks.

Another Solution: Implicit Parameters

A cleaner solution involves implicit parameters.

  • Instead of maintaining a thread-local variable, pass its current value into a signal expression as an implicit parameter.
  • This is purely functional.
  • In current Scala it requires more boilerplate than the thread-local solution.
  • Future versions of Scala might solve that problem

Summary

We have given a quick tour through functional reactive programming, with some usage examples and an implementation.

This is just a taster, there’s much more to be discovered in the field.

In particular,** we only covered one particular style of FRP: Discrete signals changed by events**. Some variants of FRP also treat continuous signals. Values in these systems are often computed by sampling (computed as needed instead of each possible time, with sufficient density) instead of event propagation.