Skip to content

Commit be67080

Browse files
committed
fix(workflows): dont delete signal rows (#965)
<!-- Please make sure there is an issue that this PR is correlated to. --> ## Changes <!-- If there are frontend changes, please include screenshots. -->
1 parent 7165aed commit be67080

File tree

11 files changed

+72
-32
lines changed

11 files changed

+72
-32
lines changed

docs/libraries/workflow/GLOSSARY.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,32 @@ Examples include:
3939

4040
Operations are not required; all of their functionality can be put into an activity instead.
4141

42+
## Tags
43+
44+
Tags are JSON blobs associated with either workflows or signals. Tags are not meant to be very abstract; i.e.
45+
they should be unique.
46+
47+
## Signal
48+
49+
A payload sent to a specific workflow from anywhere else in the codebase. The workflow must be listening for
50+
this signal for it to be picked up, otherwise it will stay in the database indefinitely until consumed by a
51+
workflow. Signals do not have a response; another signal must be sent back from the workflow and listened to
52+
by the sender.
53+
54+
## Tagged Signal
55+
56+
Same as a signal except it is sent with a JSON blob as its "tags" instead of to a specific workflow. Any
57+
workflow with tags that are a superset of the signals tags will consume the signal. Note that tagged signals
58+
are consumed on a first-come-first-serve basis, meaning if there are two workflows that both have a superset
59+
of the signal's tags, only one will receive the signal.
60+
61+
See [the signals document](./SIGNALS.md).
62+
63+
## Join Signal
64+
65+
A "one of" for signal listening. Allows for listening to multiple signals at once and receiving the first one
66+
that gets sent.
67+
4268
## Workflow Event
4369

4470
An action that gets executed in a workflow. An event can be a:

docs/libraries/workflow/SIGNALS.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Signals
2+
3+
## Tagged signals
4+
5+
Tagged signals are consumed on a first-come-first-serve basis because a single signal being consumed by more
6+
than one workflow is not a supported design pattern. To work around this, consume the signal by a workflow
7+
then publish multiple signals from that workflow.

lib/chirp-workflow/core/src/compat.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ pub async fn signal<I: Signal + Serialize, B: Debug + Clone>(
158158
}
159159

160160
let signal_id = Uuid::new_v4();
161-
161+
162162
tracing::info!(name=%I::NAME, %workflow_id, %signal_id, "dispatching signal");
163163

164164
// Serialize input
@@ -185,7 +185,7 @@ pub async fn tagged_signal<I: Signal + Serialize, B: Debug + Clone>(
185185
}
186186

187187
let signal_id = Uuid::new_v4();
188-
188+
189189
tracing::info!(name=%I::NAME, ?tags, %signal_id, "dispatching tagged signal");
190190

191191
// Serialize input

lib/chirp-workflow/core/src/ctx/api.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ impl ApiCtx {
183183
input: T,
184184
) -> GlobalResult<Uuid> {
185185
let signal_id = Uuid::new_v4();
186-
186+
187187
tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");
188188

189189
// Serialize input
@@ -205,7 +205,7 @@ impl ApiCtx {
205205
input: T,
206206
) -> GlobalResult<Uuid> {
207207
let signal_id = Uuid::new_v4();
208-
208+
209209
tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");
210210

211211
// Serialize input

lib/chirp-workflow/core/src/ctx/test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ impl TestCtx {
187187
input: T,
188188
) -> GlobalResult<Uuid> {
189189
let signal_id = Uuid::new_v4();
190-
190+
191191
tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");
192192

193193
// Serialize input
@@ -209,7 +209,7 @@ impl TestCtx {
209209
input: T,
210210
) -> GlobalResult<Uuid> {
211211
let signal_id = Uuid::new_v4();
212-
212+
213213
tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");
214214

215215
// Serialize input

lib/chirp-workflow/core/src/ctx/workflow.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,7 @@ impl WorkflowCtx {
669669
body: T,
670670
) -> GlobalResult<Uuid> {
671671
let signal_id = Uuid::new_v4();
672-
672+
673673
tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");
674674

675675
// Serialize input
@@ -692,7 +692,7 @@ impl WorkflowCtx {
692692
body: T,
693693
) -> GlobalResult<Uuid> {
694694
let signal_id = Uuid::new_v4();
695-
695+
696696
tracing::debug!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");
697697

698698
// Serialize input

lib/chirp-workflow/core/src/db/postgres.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ impl Database for DatabasePostgres {
132132
FROM db_workflow.signals AS s
133133
WHERE
134134
s.workflow_id = w.workflow_id AND
135-
s.signal_name = ANY(w.wake_signals)
135+
s.signal_name = ANY(w.wake_signals) AND
136+
s.ack_ts IS NULL
136137
LIMIT 1
137138
) OR
138139
-- Tagged signal exists
@@ -141,7 +142,8 @@ impl Database for DatabasePostgres {
141142
FROM db_workflow.tagged_signals AS s
142143
WHERE
143144
s.signal_name = ANY(w.wake_signals) AND
144-
s.tags <@ w.tags
145+
s.tags <@ w.tags AND
146+
s.ack_ts IS NULL
145147
LIMIT 1
146148
) OR
147149
-- Sub workflow completed
@@ -456,27 +458,31 @@ impl Database for DatabasePostgres {
456458
FROM db_workflow.signals
457459
WHERE
458460
workflow_id = $1 AND
459-
signal_name = ANY($2)
461+
signal_name = ANY($2) AND
462+
ack_ts IS NULL
460463
UNION ALL
461464
SELECT true AS tagged, signal_id, create_ts, signal_name, body
462465
FROM db_workflow.tagged_signals
463466
WHERE
464467
signal_name = ANY($2) AND
465-
tags <@ (SELECT tags FROM db_workflow.workflows WHERE workflow_id = $1)
468+
tags <@ (SELECT tags FROM db_workflow.workflows WHERE workflow_id = $1) AND
469+
ack_ts IS NULL
466470
ORDER BY create_ts ASC
467471
LIMIT 1
468472
),
469-
-- If the next signal is not tagged, delete it with this statement
470-
delete_signal AS (
471-
DELETE FROM db_workflow.signals
473+
-- If the next signal is not tagged, acknowledge it with this statement
474+
ack_signal AS (
475+
UPDATE db_workflow.signals
476+
SET ack_ts = $4
472477
WHERE signal_id = (
473478
SELECT signal_id FROM next_signal WHERE tagged = false
474479
)
475480
RETURNING 1
476481
),
477-
-- If the next signal is tagged, delete it with this statement
478-
delete_tagged_signal AS (
479-
DELETE FROM db_workflow.tagged_signals
482+
-- If the next signal is tagged, acknowledge it with this statement
483+
ack_tagged_signal AS (
484+
UPDATE db_workflow.tagged_signals
485+
SET ack_ts = $4
480486
WHERE signal_id = (
481487
SELECT signal_id FROM next_signal WHERE tagged = true
482488
)

svc/Cargo.lock

Lines changed: 0 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

svc/pkg/workflow/db/workflow/migrations/20240626202744_add_tags.up.sql

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,20 @@ CREATE TABLE tagged_signals (
1414
signal_name TEXT NOT NULL,
1515

1616
create_ts INT NOT NULL,
17+
ack_ts INT,
1718
ray_id UUID NOT NULL,
1819

19-
body JSONB NOT NULL,
20-
21-
INDEX (signal_name)
20+
body JSONB NOT NULL
2221
);
2322

2423
CREATE INDEX gin_tagged_signals_tags
2524
ON tagged_signals
26-
USING GIN (tags);
25+
USING GIN (tags)
26+
WHERE ack_ts IS NULL;
27+
28+
-- Fix signal indexes
29+
ALTER TABLE signals
30+
ADD COLUMN ack_ts INT;
31+
32+
DROP INDEX signals@signals_signal_name_idx;
33+
DROP INDEX signals@signals_workflow_id_idx;

svc/pkg/workflow/db/workflow/migrations/20240628222206_signal_ack.down.sql

Whitespace-only changes.

0 commit comments

Comments
 (0)