-
Notifications
You must be signed in to change notification settings - Fork 8
/
signals.clj
85 lines (72 loc) · 2.87 KB
/
signals.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
;; Copyright © Manetu, Inc. All rights reserved
(ns temporal.signals
"Methods for managing signals from within workflows"
(:require [taoensso.timbre :as log]
[temporal.workflow :as w]
[temporal.internal.utils :as u]
[temporal.internal.signals :as s])
(:import [io.temporal.workflow Workflow]))
(defn is-empty?
"Returns 'true' if 'signal-name' either doesn't exist within signal-chan or exists but has no pending messages"
[signal-chan signal-name]
(let [signal-name (u/namify signal-name)
ch (s/get-ch @signal-chan signal-name)
r (or (nil? ch) (.isEmpty ch))]
(log/trace "is-empty?:" @signal-chan signal-name r)
r))
(defn- rx
[signal-chan signal-name]
(let [signal-name (u/namify signal-name)
ch (s/get-ch @signal-chan signal-name)
m (.poll ch)]
(log/trace "rx:" signal-name m)
m))
(defn poll
"Non-blocking check of the signal via signal-chan. Consumes and returns a message if found, otherwise returns 'nil'"
[signal-chan signal-name]
(when-not (is-empty? signal-chan signal-name)
(rx signal-chan signal-name)))
(defn <!
"Light-weight/parking receive of a single message from signal-chan with an optional timeout"
([signal-chan] (<! signal-chan ::default))
([signal-chan signal-name] (<! signal-chan signal-name nil))
([signal-chan signal-name timeout]
(log/trace "waiting on:" signal-name "with timeout" timeout)
(let [pred #(not (is-empty? signal-chan signal-name))]
(if (some? timeout)
(do
(when (w/await timeout pred)
(rx signal-chan signal-name)))
(do
(w/await pred)
(rx signal-chan signal-name))))))
(defn >!
"Sends `payload` to `workflow-id` via signal `signal-name`."
[^String workflow-id signal-name payload]
(let [signal-name (u/namify signal-name)
stub (Workflow/newUntypedExternalWorkflowStub workflow-id)]
(.signal stub signal-name (u/->objarray payload))))
(def register-signal-handler!
"
Registers a DynamicSignalHandler listener that handles signals sent to the workflow such as with [[>!]].
Use inside a workflow definition with 'f' closing over any desired workflow state (e.g. atom) to mutate
the workflow state.
Arguments:
- `f`: a 2-arity function, expecting 2 arguments.
`f` arguments:
- `signal-name`: string
- `args`: params value or data structure
```clojure
(defworkflow signalled-workflow
[{:keys [init] :as args}]
(let [state (atom init)]
(register-signal-handler! (fn [signal-name args]
(when (= signal-name \"mysignal\")
(update state #(conj % args)))))
;; workflow implementation
))
```"
s/register-signal-handler!)
(def create-signal-chan
"Registers the calling workflow to receive signals and returns a 'signal-channel' context for use with functions such as [[<!]] amd [[poll]]"
s/create-signal-chan)