Skip to content

Commit fb17fea

Browse files
committed
fix(workflows): fix gc, event history graph, internal naming
1 parent 07aa4ae commit fb17fea

File tree

15 files changed

+179
-120
lines changed

15 files changed

+179
-120
lines changed

lib/bolt/core/src/dep/cargo/cli.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -551,15 +551,24 @@ pub async fn build_tests<'a, T: AsRef<str>>(
551551
if let Some(executable) = v["filenames"][0].as_str() {
552552
// Parsing the cargo package name (foo-bar) from
553553
// path+file:///foo/bar#foo-bar@0.0.1
554-
let package = v["package_id"]
555-
.as_str()
556-
.context("missing package_id")?
557-
.split_once('#')
558-
.context("split_once failed")?
559-
.1
560-
.split_once('@')
561-
.context("split_once failed")?
562-
.0;
554+
let package_id = v["package_id"].as_str().context("missing package_id")?;
555+
let package = if package_id.contains('@') {
556+
package_id
557+
.split_once('#')
558+
.context("split_once failed")?
559+
.1
560+
.split_once('@')
561+
.context("split_once failed")?
562+
.0
563+
} else {
564+
package_id
565+
.split_once('#')
566+
.context("split_once failed")?
567+
.0
568+
.rsplit_once('/')
569+
.context("split_once failed")?
570+
.1
571+
};
563572

564573
let target = v["target"]["name"]
565574
.as_str()

lib/chirp-workflow/core/src/activity.rs

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1-
use std::{fmt::Debug, hash::Hash};
1+
use std::{
2+
collections::hash_map::DefaultHasher,
3+
fmt::Debug,
4+
hash::{Hash, Hasher},
5+
};
26

37
use async_trait::async_trait;
48
use global_error::GlobalResult;
5-
use serde::{de::DeserializeOwned, Serialize};
9+
use serde::{de::DeserializeOwned, Deserialize, Serialize};
610

7-
use crate::ActivityCtx;
11+
use crate::{ActivityCtx, WorkflowError, WorkflowResult};
812

913
#[async_trait]
1014
pub trait Activity {
@@ -21,3 +25,37 @@ pub trait Activity {
2125
pub trait ActivityInput: Serialize + DeserializeOwned + Debug + Hash + Send {
2226
type Activity: Activity;
2327
}
28+
29+
/// Unique identifier for a specific run of an activity. Used to check for equivalence of activity
30+
/// runs performantly.
31+
///
32+
/// Based on the name of the activity and the hash of the inputs to the activity.
33+
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, Debug)]
34+
pub struct ActivityId {
35+
pub name: String,
36+
pub input_hash: u64,
37+
}
38+
39+
impl ActivityId {
40+
pub fn new<A: Activity>(input: &A::Input) -> Self {
41+
let mut hasher = DefaultHasher::new();
42+
input.hash(&mut hasher);
43+
let input_hash = hasher.finish();
44+
45+
Self {
46+
name: A::NAME.to_string(),
47+
input_hash,
48+
}
49+
}
50+
51+
pub fn from_bytes(name: String, input_hash: Vec<u8>) -> WorkflowResult<Self> {
52+
Ok(ActivityId {
53+
name,
54+
input_hash: u64::from_le_bytes(
55+
input_hash
56+
.try_into()
57+
.map_err(|_| WorkflowError::IntegerConversion)?,
58+
),
59+
})
60+
}
61+
}

lib/chirp-workflow/core/src/compat.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ where
2626

2727
let name = I::Workflow::NAME;
2828

29-
tracing::debug!(%name, ?input, "dispatching workflow");
29+
tracing::info!(%name, ?input, "dispatching workflow");
3030

3131
let id = Uuid::new_v4();
3232

@@ -62,7 +62,7 @@ where
6262

6363
let name = I::Workflow::NAME;
6464

65-
tracing::debug!(%name, ?input, "dispatching workflow");
65+
tracing::info!(%name, ?input, "dispatching workflow");
6666

6767
let id = Uuid::new_v4();
6868

@@ -157,9 +157,9 @@ pub async fn signal<I: Signal + Serialize, B: Debug + Clone>(
157157
bail!("cannot dispatch a signal from an operation within a workflow execution. trigger it from the workflow's body.");
158158
}
159159

160-
tracing::debug!(name=%I::NAME, %workflow_id, "dispatching signal");
161-
162160
let signal_id = Uuid::new_v4();
161+
162+
tracing::info!(name=%I::NAME, %workflow_id, %signal_id, "dispatching signal");
163163

164164
// Serialize input
165165
let input_val = serde_json::to_value(input)
@@ -184,9 +184,9 @@ pub async fn tagged_signal<I: Signal + Serialize, B: Debug + Clone>(
184184
bail!("cannot dispatch a signal from an operation within a workflow execution. trigger it from the workflow's body.");
185185
}
186186

187-
tracing::debug!(name=%I::NAME, ?tags, "dispatching tagged signal");
188-
189187
let signal_id = Uuid::new_v4();
188+
189+
tracing::info!(name=%I::NAME, ?tags, %signal_id, "dispatching tagged signal");
190190

191191
// Serialize input
192192
let input_val = serde_json::to_value(input)

lib/chirp-workflow/core/src/ctx/activity.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,9 @@ impl ActivityCtx {
8282

8383
pub async fn update_workflow_tags(&self, tags: &serde_json::Value) -> GlobalResult<()> {
8484
self.db
85-
.update_workflow_tags(
86-
self.workflow_id,
87-
tags,
88-
)
89-
.await.map_err(GlobalError::raw)
85+
.update_workflow_tags(self.workflow_id, tags)
86+
.await
87+
.map_err(GlobalError::raw)
9088
}
9189
}
9290

lib/chirp-workflow/core/src/ctx/api.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ impl ApiCtx {
6464
{
6565
let name = I::Workflow::NAME;
6666

67-
tracing::debug!(%name, ?input, "dispatching workflow");
67+
tracing::info!(%name, ?input, "dispatching workflow");
6868

6969
let id = Uuid::new_v4();
7070

@@ -83,14 +83,18 @@ impl ApiCtx {
8383
Ok(id)
8484
}
8585

86-
pub async fn dispatch_tagged_workflow<I>(&self, tags: &serde_json::Value, input: I) -> GlobalResult<Uuid>
86+
pub async fn dispatch_tagged_workflow<I>(
87+
&self,
88+
tags: &serde_json::Value,
89+
input: I,
90+
) -> GlobalResult<Uuid>
8791
where
8892
I: WorkflowInput,
8993
<I as WorkflowInput>::Workflow: Workflow<Input = I>,
9094
{
9195
let name = I::Workflow::NAME;
9296

93-
tracing::debug!(%name, ?tags, ?input, "dispatching tagged workflow");
97+
tracing::info!(%name, ?tags, ?input, "dispatching tagged workflow");
9498

9599
let id = Uuid::new_v4();
96100

@@ -178,9 +182,9 @@ impl ApiCtx {
178182
workflow_id: Uuid,
179183
input: T,
180184
) -> GlobalResult<Uuid> {
181-
tracing::debug!(name=%T::NAME, %workflow_id, "dispatching signal");
182-
183185
let signal_id = Uuid::new_v4();
186+
187+
tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");
184188

185189
// Serialize input
186190
let input_val = serde_json::to_value(input)
@@ -200,9 +204,9 @@ impl ApiCtx {
200204
tags: &serde_json::Value,
201205
input: T,
202206
) -> GlobalResult<Uuid> {
203-
tracing::debug!(name=%T::NAME, ?tags, "dispatching tagged signal");
204-
205207
let signal_id = Uuid::new_v4();
208+
209+
tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");
206210

207211
// Serialize input
208212
let input_val = serde_json::to_value(input)

lib/chirp-workflow/core/src/ctx/test.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ impl TestCtx {
8080
{
8181
let name = I::Workflow::NAME;
8282

83-
tracing::debug!(%name, ?input, "dispatching workflow");
83+
tracing::info!(%name, ?input, "dispatching workflow");
8484

8585
let id = Uuid::new_v4();
8686

@@ -99,14 +99,18 @@ impl TestCtx {
9999
Ok(id)
100100
}
101101

102-
pub async fn dispatch_tagged_workflow<I>(&self, tags: &serde_json::Value, input: I) -> GlobalResult<Uuid>
102+
pub async fn dispatch_tagged_workflow<I>(
103+
&self,
104+
tags: &serde_json::Value,
105+
input: I,
106+
) -> GlobalResult<Uuid>
103107
where
104108
I: WorkflowInput,
105109
<I as WorkflowInput>::Workflow: Workflow<Input = I>,
106110
{
107111
let name = I::Workflow::NAME;
108112

109-
tracing::debug!(%name, ?tags, ?input, "dispatching tagged workflow");
113+
tracing::info!(%name, ?tags, ?input, "dispatching tagged workflow");
110114

111115
let id = Uuid::new_v4();
112116

@@ -182,9 +186,9 @@ impl TestCtx {
182186
workflow_id: Uuid,
183187
input: T,
184188
) -> GlobalResult<Uuid> {
185-
tracing::debug!(name=%T::NAME, %workflow_id, "dispatching signal");
186-
187189
let signal_id = Uuid::new_v4();
190+
191+
tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");
188192

189193
// Serialize input
190194
let input_val = serde_json::to_value(input)
@@ -204,9 +208,9 @@ impl TestCtx {
204208
tags: &serde_json::Value,
205209
input: T,
206210
) -> GlobalResult<Uuid> {
207-
tracing::debug!(name=%T::NAME, ?tags, "dispatching tagged signal");
208-
209211
let signal_id = Uuid::new_v4();
212+
213+
tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");
210214

211215
// Serialize input
212216
let input_val = serde_json::to_value(input)

lib/chirp-workflow/core/src/ctx/workflow.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ use tokio::time::Duration;
66
use uuid::Uuid;
77

88
use crate::{
9-
schema::{ActivityId, Event},
9+
activity::ActivityId,
10+
event::Event,
1011
util::{self, Location},
1112
Activity, ActivityCtx, ActivityInput, DatabaseHandle, Executable, Listen, PulledWorkflow,
1213
RegistryHandle, Signal, SignalRow, Workflow, WorkflowError, WorkflowInput, WorkflowResult,
@@ -365,15 +366,23 @@ impl WorkflowCtx {
365366
}
366367

367368
/// Dispatch another workflow with tags.
368-
pub async fn dispatch_tagged_workflow<I>(&mut self, tags: &serde_json::Value, input: I) -> GlobalResult<Uuid>
369+
pub async fn dispatch_tagged_workflow<I>(
370+
&mut self,
371+
tags: &serde_json::Value,
372+
input: I,
373+
) -> GlobalResult<Uuid>
369374
where
370375
I: WorkflowInput,
371376
<I as WorkflowInput>::Workflow: Workflow<Input = I>,
372377
{
373378
self.dispatch_workflow_inner(Some(tags), input).await
374379
}
375380

376-
async fn dispatch_workflow_inner<I>(&mut self, tags: Option<&serde_json::Value>, input: I) -> GlobalResult<Uuid>
381+
async fn dispatch_workflow_inner<I>(
382+
&mut self,
383+
tags: Option<&serde_json::Value>,
384+
input: I,
385+
) -> GlobalResult<Uuid>
377386
where
378387
I: WorkflowInput,
379388
<I as WorkflowInput>::Workflow: Workflow<Input = I>,
@@ -404,7 +413,7 @@ impl WorkflowCtx {
404413
else {
405414
let name = I::Workflow::NAME;
406415

407-
tracing::debug!(%name, ?tags, ?input, "dispatching workflow");
416+
tracing::info!(%name, ?tags, ?input, "dispatching workflow");
408417

409418
let sub_workflow_id = Uuid::new_v4();
410419

@@ -659,9 +668,9 @@ impl WorkflowCtx {
659668
workflow_id: Uuid,
660669
body: T,
661670
) -> GlobalResult<Uuid> {
662-
tracing::debug!(name=%T::NAME, %workflow_id, "dispatching signal");
663-
664671
let signal_id = Uuid::new_v4();
672+
673+
tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");
665674

666675
// Serialize input
667676
let input_val = serde_json::to_value(&body)
@@ -682,9 +691,9 @@ impl WorkflowCtx {
682691
tags: &serde_json::Value,
683692
body: T,
684693
) -> GlobalResult<Uuid> {
685-
tracing::debug!(name=%T::NAME, ?tags, "dispatching tagged signal");
686-
687694
let signal_id = Uuid::new_v4();
695+
696+
tracing::debug!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");
688697

689698
// Serialize input
690699
let input_val = serde_json::to_value(&body)
@@ -717,7 +726,7 @@ impl WorkflowCtx {
717726
}
718727
// Listen for new messages
719728
else {
720-
tracing::debug!(name=%self.name, id=%self.workflow_id, "listening for signal");
729+
tracing::info!(name=%self.name, id=%self.workflow_id, "listening for signal");
721730

722731
let mut retries = 0;
723732
let mut interval = tokio::time::interval(SIGNAL_RETRY);

lib/chirp-workflow/core/src/db/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::sync::Arc;
22

33
use uuid::Uuid;
44

5-
use crate::{schema::ActivityId, Workflow, WorkflowError, WorkflowResult};
5+
use crate::{activity::ActivityId, Workflow, WorkflowError, WorkflowResult};
66

77
mod postgres;
88
pub use postgres::DatabasePostgres;

lib/chirp-workflow/core/src/db/postgres.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use super::{
88
ActivityEventRow, Database, PulledWorkflow, PulledWorkflowRow, SignalEventRow, SignalRow,
99
SubWorkflowEventRow, WorkflowRow,
1010
};
11-
use crate::{schema::ActivityId, WorkflowError, WorkflowResult};
11+
use crate::{activity::ActivityId, WorkflowError, WorkflowResult};
1212

1313
pub struct DatabasePostgres {
1414
pool: PgPool,
@@ -76,7 +76,7 @@ impl Database for DatabasePostgres {
7676
.bind(workflow_name)
7777
.bind(rivet_util::timestamp::now())
7878
.bind(ray_id)
79-
.bind(tags)
79+
.bind(&tags)
8080
.bind(input)
8181
.execute(&mut *self.conn().await?)
8282
.await
@@ -130,14 +130,18 @@ impl Database for DatabasePostgres {
130130
(
131131
SELECT true
132132
FROM db_workflow.signals AS s
133-
WHERE s.signal_name = ANY(wake_signals)
133+
WHERE
134+
s.workflow_id = w.workflow_id AND
135+
s.signal_name = ANY(w.wake_signals)
134136
LIMIT 1
135137
) OR
136138
-- Tagged signal exists
137139
(
138140
SELECT true
139141
FROM db_workflow.tagged_signals AS s
140-
WHERE w.tags @> s.tags
142+
WHERE
143+
s.signal_name = ANY(w.wake_signals) AND
144+
s.tags <@ w.tags
141145
LIMIT 1
142146
) OR
143147
-- Sub workflow completed
@@ -413,7 +417,7 @@ impl Database for DatabasePostgres {
413417
INSERT INTO db_workflow.workflow_activity_errors (
414418
workflow_id, location, activity_name, error, ts
415419
)
416-
VALUES ($1, $2, $3, $6, $7)
420+
VALUES ($1, $2, $3, $6, $8)
417421
RETURNING 1
418422
)
419423
SELECT 1
@@ -426,6 +430,7 @@ impl Database for DatabasePostgres {
426430
.bind(input)
427431
.bind(err)
428432
.bind(create_ts)
433+
.bind(rivet_util::timestamp::now())
429434
.execute(&mut *self.conn().await?)
430435
.await
431436
.map_err(WorkflowError::Sqlx)?;

0 commit comments

Comments
 (0)