Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazify ap #109

Open
leonoel opened this issue Apr 21, 2024 · 2 comments
Open

Lazify ap #109

leonoel opened this issue Apr 21, 2024 · 2 comments

Comments

@leonoel
Copy link
Owner

leonoel commented Apr 21, 2024

ap should evaluate synchronous continuations on transfer.

Desired semantics

When an ap evaluation context resumes, i.e. a ? task terminates or a ?>/?< flow is ready to transfer, the
continuation is evaluated eagerly if and only if it has a branch containing an asynchronous operator (? or ?>).
Otherwise, the process becomes ready to transfer and the continuation is run lazily on transfer.

Motivations

ap's current evaluation rules make some patterns inherently unsafe due to possible data losses.

An example of such a use case is the problem discussed in this slack thread
https://clojurians.slack.com/archives/CL85MBPEF/p1713457348286889 - partition an input flow in batches constrained by
both a maximum batch size and a maximum delay between the first and the last element of the batch.

The proposed solution, while functional and elegant, has a subtle issue.

(defn batch [max-size max-delay input]
  (m/ap (let [[_ input] (m/?> (m/group-by {} input))]
          (m/? (m/reduce conj
                 (m/eduction (take-while (complement #{::timeout})) (take max-size)
                   (m/ap (m/amb= (m/?> input) (m/? (m/sleep max-delay ::timeout))))))))))

An input value may be skipped when the scheduler thread races with the thread producing this value. The data loss can
be observed consistently with this test summing the successive batch sizes :

((m/reduce (fn [n b] (+ n (count b))) 0
   (batch 4 15
     (m/ap
       (let [x (m/?> (m/seed (range 10000)))]
         (m/? (m/via m/blk (Thread/sleep (rand 10)) x))))))
 prn prn)
;; eventually prints a number inferior to 10000

The data loss occurs in the inner ap, under the following scenario :

  1. T1 (scheduler thread) resolves the sleep, ap is ready to transfer ::timeout, the eduction stage is notified,
    ap is transferred immediately and the transducer pipeline terminates due to take-while.
  2. T2 (producer thread) makes a new value available on the group consumer and wins the race against T1 trying to cancel
    ap. The internal output buffer is empty, so the group consumer is transferred immediately, ap is now ready to
    transfer the value and the eduction stage is notified again.
  3. T1 cancels ap and flushes remaining values. The group consumer is now cancelled, but the first value of the next
    batch has already been transferred to the internal output buffer and will therefore be discarded.

What should have happened instead :

  • In step 2, ap should have simply notified the eduction stage without transferring the value. This is the correct
    behavior in this case because the result can be computed synchronously.
  • In step 3, the group consumer is cancelled while the transfer is pending, allowing group-by to reinject the value
    on the next group.

Accidental benefits

If ap evaluates ?< lazily then it's strictly more powerful than cp. Therefore, cp can be deprecated.

Chesterton's fence

The current evaluation semantics are mainly a consequence of cloroutine's design. Cloroutine doesn't expose any
information about the continuation, so the only possible way to figure out if the final result can be computed
synchronously is to actually try to compute it.

Implementation strategy

Unknown.

@awwx
Copy link

awwx commented May 2, 2024

I was curious if this might be related (or not)...

(defn take-none [f]
  (m/eduction (take 0) f))

(defn echo [f]
  (m/ap
   (let [v (m/?> f)]
     (prn 'produce v)
     v)))

(m/?
 (m/reduce (fn [_ _]) nil
   (take-none (echo (m/seed (range))))))
produce 0
produce 1

The echo/seed combination gets two values ahead of the consumer before being cancelled.

Now let's add in another layer:

(defn copy [f]
  (m/ap (m/?> f)))

... (take-none (copy (echo (m/seed (range)))))
produce 0
produce 1
produce 2
... (take-none (copy (copy (echo (m/seed (range))))))
produce 0
produce 1
produce 2
produce 3

Each added copy buffers an additional value, so each copy we add causes the producer to get an additional value ahead of the consumer.

Suppose this issue was implemented. In copy, the branch doesn't contain an asynchronous operator (it doesn't do anything but return), so the continuation would be evaluated lazily... and would that mean that copy would no longer buffer an additional value in a pipeline like it does now?

@leonoel
Copy link
Owner Author

leonoel commented May 2, 2024

yes, when this issue is solved copy will be semantically equivalent to identity.

@leonoel leonoel mentioned this issue Jul 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants