Skip to content

Commit

Permalink
fix(workflows): dont delete signal rows
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Jul 5, 2024
1 parent 4fd3ac5 commit 5cea2f5
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 32 deletions.
26 changes: 26 additions & 0 deletions docs/libraries/workflow/GLOSSARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,32 @@ Examples include:

Operations are not required; all of their functionality can be put into an activity instead.

## Tags

Tags are JSON blobs associated with either workflows or signals. Tags are not meant to be very abstract; i.e.
they should be unique.

## Signal

A payload sent to a specific workflow from anywhere else in the codebase. The workflow must be listening for
this signal for it to be picked up, otherwise it will stay in the database indefinitely until consumed by a
workflow. Signals do not have a response; another signal must be sent back from the workflow and listened to
by the sender.

## Tagged Signal

Same as a signal except it is sent with a JSON blob as its "tags" instead of to a specific workflow. Any
workflow with tags that are a superset of the signals tags will consume the signal. Note that tagged signals
are consumed on a first-come-first-serve basis, meaning if there are two workflows that both have a superset
of the signal's tags, only one will receive the signal.

See [the signals document](./SIGNALS.md).

## Join Signal

A "one of" for signal listening. Allows for listening to multiple signals at once and receiving the first one
that gets sent.

## Workflow Event

An action that gets executed in a workflow. An event can be a:
Expand Down
7 changes: 7 additions & 0 deletions docs/libraries/workflow/SIGNALS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Signals

## Tagged signals

Tagged signals are consumed on a first-come-first-serve basis because a single signal being consumed by more
than one workflow is not a supported design pattern. To work around this, consume the signal by a workflow
then publish multiple signals from that workflow.
4 changes: 2 additions & 2 deletions lib/chirp-workflow/core/src/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub async fn signal<I: Signal + Serialize, B: Debug + Clone>(
}

let signal_id = Uuid::new_v4();

tracing::info!(name=%I::NAME, %workflow_id, %signal_id, "dispatching signal");

// Serialize input
Expand All @@ -185,7 +185,7 @@ pub async fn tagged_signal<I: Signal + Serialize, B: Debug + Clone>(
}

let signal_id = Uuid::new_v4();

tracing::info!(name=%I::NAME, ?tags, %signal_id, "dispatching tagged signal");

// Serialize input
Expand Down
4 changes: 2 additions & 2 deletions lib/chirp-workflow/core/src/ctx/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl ApiCtx {
input: T,
) -> GlobalResult<Uuid> {
let signal_id = Uuid::new_v4();

tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");

// Serialize input
Expand All @@ -205,7 +205,7 @@ impl ApiCtx {
input: T,
) -> GlobalResult<Uuid> {
let signal_id = Uuid::new_v4();

tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");

// Serialize input
Expand Down
4 changes: 2 additions & 2 deletions lib/chirp-workflow/core/src/ctx/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl TestCtx {
input: T,
) -> GlobalResult<Uuid> {
let signal_id = Uuid::new_v4();

tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");

// Serialize input
Expand All @@ -209,7 +209,7 @@ impl TestCtx {
input: T,
) -> GlobalResult<Uuid> {
let signal_id = Uuid::new_v4();

tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");

// Serialize input
Expand Down
4 changes: 2 additions & 2 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ impl WorkflowCtx {
body: T,
) -> GlobalResult<Uuid> {
let signal_id = Uuid::new_v4();

tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");

// Serialize input
Expand All @@ -692,7 +692,7 @@ impl WorkflowCtx {
body: T,
) -> GlobalResult<Uuid> {
let signal_id = Uuid::new_v4();

tracing::debug!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");

// Serialize input
Expand Down
26 changes: 16 additions & 10 deletions lib/chirp-workflow/core/src/db/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ impl Database for DatabasePostgres {
FROM db_workflow.signals AS s
WHERE
s.workflow_id = w.workflow_id AND
s.signal_name = ANY(w.wake_signals)
s.signal_name = ANY(w.wake_signals) AND
s.ack_ts IS NULL
LIMIT 1
) OR
-- Tagged signal exists
Expand All @@ -141,7 +142,8 @@ impl Database for DatabasePostgres {
FROM db_workflow.tagged_signals AS s
WHERE
s.signal_name = ANY(w.wake_signals) AND
s.tags <@ w.tags
s.tags <@ w.tags AND
s.ack_ts IS NULL
LIMIT 1
) OR
-- Sub workflow completed
Expand Down Expand Up @@ -456,27 +458,31 @@ impl Database for DatabasePostgres {
FROM db_workflow.signals
WHERE
workflow_id = $1 AND
signal_name = ANY($2)
signal_name = ANY($2) AND
ack_ts IS NULL
UNION ALL
SELECT true AS tagged, signal_id, create_ts, signal_name, body
FROM db_workflow.tagged_signals
WHERE
signal_name = ANY($2) AND
tags <@ (SELECT tags FROM db_workflow.workflows WHERE workflow_id = $1)
tags <@ (SELECT tags FROM db_workflow.workflows WHERE workflow_id = $1) AND
ack_ts IS NULL
ORDER BY create_ts ASC
LIMIT 1
),
-- If the next signal is not tagged, delete it with this statement
delete_signal AS (
DELETE FROM db_workflow.signals
-- If the next signal is not tagged, acknowledge it with this statement
ack_signal AS (
UPDATE db_workflow.signals
SET ack_ts = $4
WHERE signal_id = (
SELECT signal_id FROM next_signal WHERE tagged = false
)
RETURNING 1
),
-- If the next signal is tagged, delete it with this statement
delete_tagged_signal AS (
DELETE FROM db_workflow.tagged_signals
-- If the next signal is tagged, acknowledge it with this statement
ack_tagged_signal AS (
UPDATE db_workflow.tagged_signals
SET ack_ts = $4
WHERE signal_id = (
SELECT signal_id FROM next_signal WHERE tagged = true
)
Expand Down
10 changes: 0 additions & 10 deletions svc/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,20 @@ CREATE TABLE tagged_signals (
signal_name TEXT NOT NULL,

create_ts INT NOT NULL,
ack_ts INT,
ray_id UUID NOT NULL,

body JSONB NOT NULL,

INDEX (signal_name)
body JSONB NOT NULL
);

CREATE INDEX gin_tagged_signals_tags
ON tagged_signals
USING GIN (tags);
USING GIN (tags)
WHERE ack_ts IS NULL;

-- Fix signal indexes
ALTER TABLE signals
ADD COLUMN ack_ts INT;

DROP INDEX signals@signals_signal_name_idx;
DROP INDEX signals@signals_workflow_id_idx;
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Conditional index for selecting before ack'd
CREATE UNIQUE INDEX idx_signals_workflow_id
ON signals (workflow_id)
WHERE ack_ts IS NULL;

0 comments on commit 5cea2f5

Please sign in to comment.