Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beyond structured concurrency #70

Closed
leonoel opened this issue Aug 24, 2022 · 5 comments
Closed

Beyond structured concurrency #70

leonoel opened this issue Aug 24, 2022 · 5 comments

Comments

@leonoel
Copy link
Owner

leonoel commented Aug 24, 2022

The problem

Purely functional composition of effects is an effective solution to the supervision problem. Constraining effects to be represented as pure values provides information about the structure of the program, each concurrent process has a well-defined parent which naturally enforces a strict supervision hierarchy at runtime. However, this model is unable to express the idea of sharing, i.e. when a process must dispatch a value to multiple consumers.

The problem of sharing is currently solved by reactor and its companions signal! and stream!, respectively for discrete and continuous flows. The current implementation successfully proved that functional effect systems can be used as a foundation for transactional propagation of events, even with arbitrarily dynamic graph topologies. However, it falls short on supervision : all publishers are supervised by their reactor, no matter which process subscribes to them. There is no hierarchy of publishers, so if any publisher crashes the whole reactor crashes. If the reactor is cancelled, all of its publishers are cancelled. Any lifecycle management more complex than that must be implemented manually, defensively and imperative-style.

The goal of this report is to discuss an alternative model reconciling the benefits of both worlds - ability to express sharing and correct supervision by default. The new approach should be general enough to match current semantics for discrete and continuous flows (resp. streams and signals), as well as potential new ones. The essence of the problem is runtime supervision structure : if an effect has an identity that can be shared, then its process may have more than one supervisor, which implies the supervision tree becomes a supervision DAG.

A concrete example

We want to implement a simple static DAG. Each node computes a single value from some other nodes, with an additional delay.

{:a {:delay 1   :deps []}
 :b {:delay 1   :deps []}
 :c {:delay 100 :deps []}
 :d {:delay 100 :deps [:a :b]}
 :e {:delay 1   :deps [:b :c]}
 :f {:delay 1   :deps [:d :e]}}

An acceptable solution must :

  1. match the essential concurrency of the graph - each node must start ASAP such that the total computation delay is minimal (101 ms).
  2. run computation :b only once, even though the result is used twice.
  3. be fully supervised - cancelling the main process must propagate to the entire graph, node processes are not allowed to outlive the main one.

First, we can represent a single node with a task built as the sequential composition of the parallel join of its dependencies, followed by a sleep.

(defn node [delay & deps]
  (m/sp (m/? (m/sleep delay (m/? (apply m/join vector deps))))))

We can then implement a naive, purely-functional solution :

(m/sp
  (let [a (node 1)
        b (node 1)
        c (node 100)
        d (node 100 a b)
        e (node 1   b c)
        f (node 1   d e)]
    (m/? f)))

This task satisfies requirements 1 and 3, but not 2. The node :b is a pure value with no identity, therefore it is run twice instead of being shared and reused. To represent sharing, we have to break referential transparency and implement memoized publishing for node :b.

Futures are the commonly accepted solution to this problem nowadays. In modern functional effect systems, they're generally called fibers but share the same basic properties - an identifier for a running process that can be cancelled, and whose result can be awaited.

(defn future! "
Runs a task immediately and returns a task completing with a memoized view of this task.
Returned object can also be called as a function with no argument to cancel the process.
" [task]
  (let [result (m/dfv)
        cancel (task #(result (fn [] %))
                 #(result (fn [] (throw %))))]
    (fn
      ([] (cancel))
      ([s f] ((m/absolve result) s f)))))

We can now tweak our first attempt :

(m/sp
  (let [a (node 1)
        b (future! (node 1))
        c (node 100)
        d (node 100 a b)
        e (node 1   b c)
        f (node 1   d e)]
    (m/? f)))

The task now satisfies requirements 1 and 2, but 3 doesn't hold anymore. When the main process is cancelled, the cancellation is propagated to the awaiting of node :b, but not to its underlying process. The parent process can terminate while a child is still running, which results in a waste of resources at best, a memory leak at worst. In other words, futures break supervision.

A correct solution could be something like this :

(m/sp
  (let [a (node 1)
        b (future! (node 1))
        c (node 100)
        d (node 100 a b)
        e (node 1   b c)
        f (node 1   d e)]
    (try (m/? f)
         (finally (b) (m/? b)))))

Which is arguably more verbose and error-prone.

Lazy publishers

A lazy publisher is a stateful object associated with an effect (task or flow) and exposing a memoized view of this effect. Running a lazy publisher as an effect registers a new subscription to this view, cancelling a subscription deregisters it.

Unlike eager publishers (e.g. futures, but also signal! and stream! in their current implementation), instanciating a lazy publisher is not an effectful operation. The lifecycle of a lazy publisher is driven by its subscribers, not by its creator. Therefore, creating a lazy publisher doesn't require any special context, and disposal can be safely delegated to the GC.

The effect of a lazy publisher runs as long as there is at least one active subscription. Its values are memoized and dispatched to all subscriptions. The effect process is started when a new subscription is registered and there was no subscription registered before. When an active subscription is deregistered and there weren't any other active subscription, the current effect process is cancelled and the subscription termination is bound to it. Otherwise, the subscription process fails immediately. When the effect process terminates spontaneously (i.e. without cancellation), all active subscriptions and all subsequent ones terminate immediately.

Assuming the existence of memo, a lazy publisher caching the result of a task, we get an elegant solution to the previous problem :

(defn memo "
Returns a lazy publisher task memoizing the result of given task.
" [task]
  ;; TODO implementation
  )

(m/sp
  (let [a (node 1)
        b (memo (node 1))
        c (node 100)
        d (node 100 a b)
        e (node 1   b c)
        f (node 1   d e)]
    (m/? f)))

Prior art

  • clojure.core/delay and core lazy sequences have lazy publisher semantics, but they're not a good fit for effects due to their inconsistent behavior in face of failure and cancellation, also their fundamentally synchronous nature mandates host support for thread blocking which is not an option in clojurescript.
  • ReactiveX Single::cache and Flowable::cache have lazy publisher semantics matching clojure.core/delay and lazy sequences respectively, but do not allow to cancel the underlying process when the result is not required anymore.
  • ReactiveX Flowable::share has lazy publisher semantics matching stream!, and cancels the underlying process on last unsubscription. However, the dispatch mechanism is not transactional, so values may be lost in case of consecutive subscriptions.

Proposed changes

Purely functional operators work just fine, no major change is required.

The transactional propagation engine currently implemented in reactor must be unified with a lazy publisher approach where the lifecycle of effects is driven by subscriptions.

  • a new operator taking a task and returning a lazy publisher memoizing the result of this task. Possible name : memo.
  • a new operator taking a flow and returning a lazy publisher memoizing the successive values produced by this flow. Possible name : replay.
  • stream! and signal! are renamed to stream and signal respectively, they now return lazy publishers.
  • reactor is removed, as instanciation of signals and streams doesn't require a context anymore.
@dustingetz
Copy link
Collaborator

  1. How is m/replay different from m/stream and m/signal
  2. Can you provide a continuous time example under the proposed syntax?

@leonoel
Copy link
Owner Author

leonoel commented Aug 24, 2022

How is m/replay different from m/stream and m/signal

signal is continuous, memoizes the latest value and discards previous ones.
stream is discrete, memoizes each event for a single propagation frame and discards the past
replay is discrete, memoizes all events from start and replays the entire past to new subscribers

Can you provide a continuous time example under the proposed syntax?

The example from readme could look like this :

(def !input (atom 1))
(def main
  (let [<x (m/signal (m/watch !input))
        <y (m/signal (m/latest + <x <x))]
    (m/reduce (fn [_ y] (println y)) nil <y)))

@mjmeintjes
Copy link

Thanks, this will be a very useful addition. I've been using a function similar to memo in my application, and it is very useful to ensure a task only gets run once.

One question:

(def res (memo
            (m/sp
             (m/? (m/sleep 1000))
             (throw (ex-info "ERROR" {})))))
(m/? res) ;; wait 1s, throw exception
(m/? res) ;; (1) ??

What happens at (1) - does it wait for 1s again and then throw the exception, or throw the exception instantly?

@leonoel
Copy link
Owner Author

leonoel commented Aug 25, 2022

What happens at (1) - does it wait for 1s again and then throw the exception, or throw the exception instantly?

At first glance it would make sense to me that any crashed publisher memoizes the error and rethrows it immediately to all current and future subscriptions.

@leonoel
Copy link
Owner Author

leonoel commented Jun 16, 2023

Released in b.31

@leonoel leonoel closed this as completed Jun 16, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants