Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

observe must be guarded against subject/cleanup throwing #4

Closed
lgrapenthin opened this issue Jul 19, 2019 · 4 comments
Closed

observe must be guarded against subject/cleanup throwing #4

lgrapenthin opened this issue Jul 19, 2019 · 4 comments
Assignees
Labels
bug Something isn't working

Comments

@lgrapenthin
Copy link

In the following experiment,

  • the aggregate is never produced ("res" is not printed)
  • even though the canceller of observe is called after 3 numbers are produced ("cancelled" is printed), apparently (.stop producer) has no effect, and it also prints "Producing 3", 4 5..
  • without the transform expression, calling (inst) canceller at any time results in immediate aggregate printing and cancellation being effective
(defn printer [t]
  (t (fn [res] (println "res" res))
     (fn [err] (println "err" err))))

(def inst
  (printer (->> (m/observe
                 (fn [event-fn]
                   (let [producer
                         (Thread.
                          (fn []
                            (dotimes [n 6]
                              (println "Producing" n)
                              (Thread/sleep 1000)
                              (event-fn n))))]
                     (.start producer)
                     (fn []
                       (println "cancelled")
                       (.stop producer)))))
                (m/transform (take 3))
                (m/aggregate conj))))
@leonoel
Copy link
Owner

leonoel commented Jul 19, 2019

The problem is the unsubscribe function throws, and observe doesn't expect this. Replace (.stop producer) with (.interrupt producer) and it should do what you expect (this the recommended way to kill a thread, Thread.stop is deprecated).

Intended or not : good question. The current behavior is obviously confusing, but at the time the unsubscribe function is called the process is already terminated (successfully). At first glance I'd say the whole process should fail, because proper cleanup of resources is part of the job.

@leonoel
Copy link
Owner

leonoel commented Jul 21, 2019

Post-hammock thoughts : no ambiguity here. Both subject and cleanup functions must be guarded against throwing, because they're user-provided. In both cases, observe must propagate error and terminate. In this example, cancellation is triggered by the early termination of transform so the error will be swallowed and this is intended behavior.

@leonoel leonoel changed the title Unclear whether intended behavior with observe/transform observe must be guarded against suject/cleanup throwing Jul 21, 2019
@leonoel leonoel self-assigned this Jul 21, 2019
@leonoel leonoel changed the title observe must be guarded against suject/cleanup throwing observe must be guarded against subject/cleanup throwing Jul 21, 2019
@lgrapenthin
Copy link
Author

lgrapenthin commented Jul 21, 2019

Interesting, I now understand that the problem is cancelled because a Thread Death is thrown by unsubscribe.

Wouldn't this example still have to print "err" ...
Otherwise, how would I as a user ever understand that my unsubscribe function throwing is the reason?

@leonoel
Copy link
Owner

leonoel commented Jul 23, 2019

I acknowledge that swallowed errors is a source of frustration for users. The purpose of observe is to provide an easy way to interop with classic observer pattern where you register a callback on a event emitter, without backpressure concerns. The cleanup function must be provided to deregister the callback (forgetting it is a common source of memory leaks), and it is expected to do it synchronously in a non-blocking way. In practice, both subject and cleanup functions should be simple wrappers for a third-party observer framework. The example you provided is arguably contrived from this point of view.

Now if you need to be sure your subject/cleanup functions are correct, you can always test it in isolation. Another possibility could be to catch errors in an ap block :

(defn show-error [flow]
  (m/ap (try (m/?? flow)
             (catch Throwable e
                (prn e)
                (throw e)))))

Then you could insert this just before transform in your pipeline and the cleanup error would be guaranteed to be printed.

I considered two alternatives :

  • make the whole pipeline fail. This goes against general design, where cancelling a task/flow often results in a failure (e.g enumerate, sleep), consistently with how java threads work (interrupting a thread waiting for a blocking operation generally makes it throw an InterruptedException). We don't want early termination in transform to propagate these errors. The whole purpose of early termination is that we don't care anymore about what happens to the input next.
  • spit the error at a user-specified location (e.g via a callback that could default to printing on stderr). This complicates the API and introduces imperative concerns, not a very good tradeoff IMO.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants