Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions lib/bolt/core/src/dep/cargo/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,15 +551,24 @@ pub async fn build_tests<'a, T: AsRef<str>>(
if let Some(executable) = v["filenames"][0].as_str() {
// Parsing the cargo package name (foo-bar) from
// path+file:///foo/bar#foo-bar@0.0.1
let package = v["package_id"]
.as_str()
.context("missing package_id")?
.split_once('#')
.context("split_once failed")?
.1
.split_once('@')
.context("split_once failed")?
.0;
let package_id = v["package_id"].as_str().context("missing package_id")?;
let package = if package_id.contains('@') {
package_id
.split_once('#')
.context("split_once failed")?
.1
.split_once('@')
.context("split_once failed")?
.0
} else {
package_id
.split_once('#')
.context("split_once failed")?
.0
.rsplit_once('/')
.context("split_once failed")?
.1
};

let target = v["target"]["name"]
.as_str()
Expand Down
44 changes: 41 additions & 3 deletions lib/chirp-workflow/core/src/activity.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use std::{fmt::Debug, hash::Hash};
use std::{
collections::hash_map::DefaultHasher,
fmt::Debug,
hash::{Hash, Hasher},
};

use async_trait::async_trait;
use global_error::GlobalResult;
use serde::{de::DeserializeOwned, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};

use crate::ActivityCtx;
use crate::{ActivityCtx, WorkflowError, WorkflowResult};

#[async_trait]
pub trait Activity {
Expand All @@ -21,3 +25,37 @@ pub trait Activity {
pub trait ActivityInput: Serialize + DeserializeOwned + Debug + Hash + Send {
type Activity: Activity;
}

/// Unique identifier for a specific run of an activity. Used to check for equivalence of activity
/// runs performantly.
///
/// Based on the name of the activity and the hash of the inputs to the activity.
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, Debug)]
pub struct ActivityId {
pub name: String,
pub input_hash: u64,
}

impl ActivityId {
pub fn new<A: Activity>(input: &A::Input) -> Self {
let mut hasher = DefaultHasher::new();
input.hash(&mut hasher);
let input_hash = hasher.finish();

Self {
name: A::NAME.to_string(),
input_hash,
}
}

pub fn from_bytes(name: String, input_hash: Vec<u8>) -> WorkflowResult<Self> {
Ok(ActivityId {
name,
input_hash: u64::from_le_bytes(
input_hash
.try_into()
.map_err(|_| WorkflowError::IntegerConversion)?,
),
})
}
}
12 changes: 6 additions & 6 deletions lib/chirp-workflow/core/src/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ where

let name = I::Workflow::NAME;

tracing::debug!(%name, ?input, "dispatching workflow");
tracing::info!(%name, ?input, "dispatching workflow");

let id = Uuid::new_v4();

Expand Down Expand Up @@ -62,7 +62,7 @@ where

let name = I::Workflow::NAME;

tracing::debug!(%name, ?input, "dispatching workflow");
tracing::info!(%name, ?input, "dispatching workflow");

let id = Uuid::new_v4();

Expand Down Expand Up @@ -157,9 +157,9 @@ pub async fn signal<I: Signal + Serialize, B: Debug + Clone>(
bail!("cannot dispatch a signal from an operation within a workflow execution. trigger it from the workflow's body.");
}

tracing::debug!(name=%I::NAME, %workflow_id, "dispatching signal");

let signal_id = Uuid::new_v4();

tracing::info!(name=%I::NAME, %workflow_id, %signal_id, "dispatching signal");

// Serialize input
let input_val = serde_json::to_value(input)
Expand All @@ -184,9 +184,9 @@ pub async fn tagged_signal<I: Signal + Serialize, B: Debug + Clone>(
bail!("cannot dispatch a signal from an operation within a workflow execution. trigger it from the workflow's body.");
}

tracing::debug!(name=%I::NAME, ?tags, "dispatching tagged signal");

let signal_id = Uuid::new_v4();

tracing::info!(name=%I::NAME, ?tags, %signal_id, "dispatching tagged signal");

// Serialize input
let input_val = serde_json::to_value(input)
Expand Down
8 changes: 3 additions & 5 deletions lib/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,9 @@ impl ActivityCtx {

pub async fn update_workflow_tags(&self, tags: &serde_json::Value) -> GlobalResult<()> {
self.db
.update_workflow_tags(
self.workflow_id,
tags,
)
.await.map_err(GlobalError::raw)
.update_workflow_tags(self.workflow_id, tags)
.await
.map_err(GlobalError::raw)
}
}

Expand Down
18 changes: 11 additions & 7 deletions lib/chirp-workflow/core/src/ctx/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl ApiCtx {
{
let name = I::Workflow::NAME;

tracing::debug!(%name, ?input, "dispatching workflow");
tracing::info!(%name, ?input, "dispatching workflow");

let id = Uuid::new_v4();

Expand All @@ -83,14 +83,18 @@ impl ApiCtx {
Ok(id)
}

pub async fn dispatch_tagged_workflow<I>(&self, tags: &serde_json::Value, input: I) -> GlobalResult<Uuid>
pub async fn dispatch_tagged_workflow<I>(
&self,
tags: &serde_json::Value,
input: I,
) -> GlobalResult<Uuid>
where
I: WorkflowInput,
<I as WorkflowInput>::Workflow: Workflow<Input = I>,
{
let name = I::Workflow::NAME;

tracing::debug!(%name, ?tags, ?input, "dispatching tagged workflow");
tracing::info!(%name, ?tags, ?input, "dispatching tagged workflow");

let id = Uuid::new_v4();

Expand Down Expand Up @@ -178,9 +182,9 @@ impl ApiCtx {
workflow_id: Uuid,
input: T,
) -> GlobalResult<Uuid> {
tracing::debug!(name=%T::NAME, %workflow_id, "dispatching signal");

let signal_id = Uuid::new_v4();

tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");

// Serialize input
let input_val = serde_json::to_value(input)
Expand All @@ -200,9 +204,9 @@ impl ApiCtx {
tags: &serde_json::Value,
input: T,
) -> GlobalResult<Uuid> {
tracing::debug!(name=%T::NAME, ?tags, "dispatching tagged signal");

let signal_id = Uuid::new_v4();

tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");

// Serialize input
let input_val = serde_json::to_value(input)
Expand Down
18 changes: 11 additions & 7 deletions lib/chirp-workflow/core/src/ctx/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl TestCtx {
{
let name = I::Workflow::NAME;

tracing::debug!(%name, ?input, "dispatching workflow");
tracing::info!(%name, ?input, "dispatching workflow");

let id = Uuid::new_v4();

Expand All @@ -99,14 +99,18 @@ impl TestCtx {
Ok(id)
}

pub async fn dispatch_tagged_workflow<I>(&self, tags: &serde_json::Value, input: I) -> GlobalResult<Uuid>
pub async fn dispatch_tagged_workflow<I>(
&self,
tags: &serde_json::Value,
input: I,
) -> GlobalResult<Uuid>
where
I: WorkflowInput,
<I as WorkflowInput>::Workflow: Workflow<Input = I>,
{
let name = I::Workflow::NAME;

tracing::debug!(%name, ?tags, ?input, "dispatching tagged workflow");
tracing::info!(%name, ?tags, ?input, "dispatching tagged workflow");

let id = Uuid::new_v4();

Expand Down Expand Up @@ -182,9 +186,9 @@ impl TestCtx {
workflow_id: Uuid,
input: T,
) -> GlobalResult<Uuid> {
tracing::debug!(name=%T::NAME, %workflow_id, "dispatching signal");

let signal_id = Uuid::new_v4();

tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");

// Serialize input
let input_val = serde_json::to_value(input)
Expand All @@ -204,9 +208,9 @@ impl TestCtx {
tags: &serde_json::Value,
input: T,
) -> GlobalResult<Uuid> {
tracing::debug!(name=%T::NAME, ?tags, "dispatching tagged signal");

let signal_id = Uuid::new_v4();

tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");

// Serialize input
let input_val = serde_json::to_value(input)
Expand Down
27 changes: 18 additions & 9 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use tokio::time::Duration;
use uuid::Uuid;

use crate::{
schema::{ActivityId, Event},
activity::ActivityId,
event::Event,
util::{self, Location},
Activity, ActivityCtx, ActivityInput, DatabaseHandle, Executable, Listen, PulledWorkflow,
RegistryHandle, Signal, SignalRow, Workflow, WorkflowError, WorkflowInput, WorkflowResult,
Expand Down Expand Up @@ -365,15 +366,23 @@ impl WorkflowCtx {
}

/// Dispatch another workflow with tags.
pub async fn dispatch_tagged_workflow<I>(&mut self, tags: &serde_json::Value, input: I) -> GlobalResult<Uuid>
pub async fn dispatch_tagged_workflow<I>(
&mut self,
tags: &serde_json::Value,
input: I,
) -> GlobalResult<Uuid>
where
I: WorkflowInput,
<I as WorkflowInput>::Workflow: Workflow<Input = I>,
{
self.dispatch_workflow_inner(Some(tags), input).await
}

async fn dispatch_workflow_inner<I>(&mut self, tags: Option<&serde_json::Value>, input: I) -> GlobalResult<Uuid>
async fn dispatch_workflow_inner<I>(
&mut self,
tags: Option<&serde_json::Value>,
input: I,
) -> GlobalResult<Uuid>
where
I: WorkflowInput,
<I as WorkflowInput>::Workflow: Workflow<Input = I>,
Expand Down Expand Up @@ -404,7 +413,7 @@ impl WorkflowCtx {
else {
let name = I::Workflow::NAME;

tracing::debug!(%name, ?tags, ?input, "dispatching workflow");
tracing::info!(%name, ?tags, ?input, "dispatching workflow");

let sub_workflow_id = Uuid::new_v4();

Expand Down Expand Up @@ -659,9 +668,9 @@ impl WorkflowCtx {
workflow_id: Uuid,
body: T,
) -> GlobalResult<Uuid> {
tracing::debug!(name=%T::NAME, %workflow_id, "dispatching signal");

let signal_id = Uuid::new_v4();

tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");

// Serialize input
let input_val = serde_json::to_value(&body)
Expand All @@ -682,9 +691,9 @@ impl WorkflowCtx {
tags: &serde_json::Value,
body: T,
) -> GlobalResult<Uuid> {
tracing::debug!(name=%T::NAME, ?tags, "dispatching tagged signal");

let signal_id = Uuid::new_v4();

tracing::debug!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");

// Serialize input
let input_val = serde_json::to_value(&body)
Expand Down Expand Up @@ -717,7 +726,7 @@ impl WorkflowCtx {
}
// Listen for new messages
else {
tracing::debug!(name=%self.name, id=%self.workflow_id, "listening for signal");
tracing::info!(name=%self.name, id=%self.workflow_id, "listening for signal");

let mut retries = 0;
let mut interval = tokio::time::interval(SIGNAL_RETRY);
Expand Down
2 changes: 1 addition & 1 deletion lib/chirp-workflow/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use uuid::Uuid;

use crate::{schema::ActivityId, Workflow, WorkflowError, WorkflowResult};
use crate::{activity::ActivityId, Workflow, WorkflowError, WorkflowResult};

mod postgres;
pub use postgres::DatabasePostgres;
Expand Down
15 changes: 10 additions & 5 deletions lib/chirp-workflow/core/src/db/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::{
ActivityEventRow, Database, PulledWorkflow, PulledWorkflowRow, SignalEventRow, SignalRow,
SubWorkflowEventRow, WorkflowRow,
};
use crate::{schema::ActivityId, WorkflowError, WorkflowResult};
use crate::{activity::ActivityId, WorkflowError, WorkflowResult};

pub struct DatabasePostgres {
pool: PgPool,
Expand Down Expand Up @@ -76,7 +76,7 @@ impl Database for DatabasePostgres {
.bind(workflow_name)
.bind(rivet_util::timestamp::now())
.bind(ray_id)
.bind(tags)
.bind(&tags)
.bind(input)
.execute(&mut *self.conn().await?)
.await
Expand Down Expand Up @@ -130,14 +130,18 @@ impl Database for DatabasePostgres {
(
SELECT true
FROM db_workflow.signals AS s
WHERE s.signal_name = ANY(wake_signals)
WHERE
s.workflow_id = w.workflow_id AND
s.signal_name = ANY(w.wake_signals)
LIMIT 1
) OR
-- Tagged signal exists
(
SELECT true
FROM db_workflow.tagged_signals AS s
WHERE w.tags @> s.tags
WHERE
s.signal_name = ANY(w.wake_signals) AND
s.tags <@ w.tags
LIMIT 1
) OR
-- Sub workflow completed
Expand Down Expand Up @@ -413,7 +417,7 @@ impl Database for DatabasePostgres {
INSERT INTO db_workflow.workflow_activity_errors (
workflow_id, location, activity_name, error, ts
)
VALUES ($1, $2, $3, $6, $7)
VALUES ($1, $2, $3, $6, $8)
RETURNING 1
)
SELECT 1
Expand All @@ -426,6 +430,7 @@ impl Database for DatabasePostgres {
.bind(input)
.bind(err)
.bind(create_ts)
.bind(rivet_util::timestamp::now())
.execute(&mut *self.conn().await?)
.await
.map_err(WorkflowError::Sqlx)?;
Expand Down
Loading