Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task succeeds on cancel when task body is wrapped in a try/catch #82

Closed
den1k opened this issue Mar 25, 2023 · 10 comments
Closed

task succeeds on cancel when task body is wrapped in a try/catch #82

den1k opened this issue Mar 25, 2023 · 10 comments

Comments

@den1k
Copy link

den1k commented Mar 25, 2023

(let [task   (m/via m/blk
                    (try
                      (Thread/sleep 1000)
                      (catch Throwable t
                        :cancelled-and-should-not-succeed-or-fail)))
      cancel (task #(println :success %)
                   #(println :fail %))]
  (println :cancelling)
  (cancel))

;; :cancelling
;; :success :cancelled-and-should-not-succeed-or-fail

Not sure if this is a bug. However, I'm looking to prevent callbacks of either success/fail after cancellation since they cause unwanted side effects in my application. Is there a way to achieve this?

@dustingetz
Copy link
Collaborator

Can we see the business situation you have, that has the unwanted side effects?

@den1k
Copy link
Author

den1k commented Mar 25, 2023

I can explain it simply: it's part of a spreadsheet app that evaluates code in cells. When a cell is still evaluating but the user has updated the underlying code, the current execution should be cancelled in favor of the next one.

Presently evaluation is cancelled but because the task calls success with the error, the error value becomes the cell value until the next evaluation is complete.

@xificurC
Copy link
Collaborator

If you need the catch-all then maybe catch missionary.Cancelled before and rethrow it?

@den1k
Copy link
Author

den1k commented Mar 25, 2023

Thanks. I have it working now with a version of

(let [task            (m/via m/blk
                             (try
                               (Thread/sleep 1000)
                               (catch Throwable t
                                 :cancelled-and-should-not-succeed-or-fail)))
      ignore-success? (volatile! false)
      cancel          (task #(when-not @ignore-success?
                               (println :success %))
                            #(println :fail %))]
  (println :cancelling)
  (vreset! ignore-success? true)
  (cancel))

@leonoel
Copy link
Owner

leonoel commented Mar 25, 2023

Not sure if this is a bug.

It is expected. The task protocol mandates calling either callback exactly once because the cancellation may not be immediate and the caller must be able to know when the process was completely shut down.

Is there a way to achieve this?

Sorry for this answer, but you should not do that. Calling tasks and flows directly is low-level and dangerous, it's OK at the app entrypoint but otherwise you should always prefer functional composition.

When a cell is still evaluating but the user has updated the underlying code, the current execution should be cancelled in favor of the next one.

Depending on the cell state you expect between the user update and the resolution of the next task, ?< with ap or cp looks like a good fit, you would need to turn the task into a flow first (cf electric's offload)

@den1k
Copy link
Author

den1k commented Mar 25, 2023

Interesting. This is starting to make sense. However, in my case evals are triggered on the backend as part of a transaction listener which means cells are always received as collections, e.g. similar to

(let [cells [{:cell/form-str ":foo"}
             {:cell/form-str ":bar"}]]
  (m/?
    (->>
      (m/ap
        ;; fork process for every cell
        ;; to exec in **parallel**
        (let [cell        (m/?> ##Inf (m/seed cells))
              evaled-cell (m/? (m/timeout
                                 (m/via m/blk
                                        (let [{:cell/keys [cell/form-str]} cell]
                                          (assoc cell :cell/ret (eval (read-string form-str)))))
                                 1000
                                 ::timeout))]
          (if (= evaled-cell ::timeout)
            (-> cell
                (dissoc :cell/ret-pending?)
                (assoc :cell/exception? true
                       :cell/ret evaled-cell
                       :cell/ret-str (pr-str evaled-cell)))
            evaled-cell)))
      (m/reduce
        (fn [out v]
          (println v))
        []))))

; prints
{:cell/form-str :bar, :cell/ret :bar}
{:cell/form-str :foo, :cell/ret :foo}

But code would need to be aware of each cell's identity (by :db/ids) so that only evaluations of the same cell cancels its previous evaluation if one exists. Any way to achieve this?

@dustingetz
Copy link
Collaborator

Electric for-by is m/group-by under the hood. Have you considered modeling the spreadsheet with Electric for loops? Each cell is a signal. e/offload lets you call side effects (managing the task, the bridge to flow, the cancellation when something upstream changes).

@den1k
Copy link
Author

den1k commented Mar 26, 2023 via email

@den1k
Copy link
Author

den1k commented Mar 27, 2023

So what would be an approach using functional composition that would allow cell evaluations while also cancelling evals of the same cells as new cell values transfer?

@den1k
Copy link
Author

den1k commented Mar 27, 2023

Got it!

(time
  (let [delays [{:id    1
                 :delay 1000}
                {:id    2
                 :delay 2000}
                {:id    1
                 :delay 500}
                {:id    1
                 :delay 200}]]
    (m/?
      (->>
        (m/ap
          (let [[k vs] (m/?> ##Inf (m/group-by :id (m/seed delays)))
                m (m/?< vs)]
            (try
              (m/? (m/sleep (:delay m) (assoc m :slept true)))
              (catch missionary.Cancelled c
                (println :cancelled m)
                (m/amb)))))
        (m/reduce
          (fn [out v]
            (println v)
            (conj out v))
          [])))))
:cancelled {:id 1, :delay 1000}
:cancelled {:id 1, :delay 500}
{:id 1, :delay 200, :slept true}
{:id 2, :delay 2000, :slept true}
"Elapsed time: 2004.552279 msecs"
=> [{:id 1, :delay 200, :slept true} {:id 2, :delay 2000, :slept true}]

@den1k den1k closed this as completed Mar 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants