Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions lib/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl ActivityCtx {
pub fn new(
db: DatabaseHandle,
conn: &rivet_connection::Connection,
workflow_create_ts: i64,
activity_create_ts: i64,
ray_id: Uuid,
name: &'static str,
) -> Self {
Expand All @@ -36,7 +36,7 @@ impl ActivityCtx {
req_id,
ray_id,
ts,
workflow_create_ts,
activity_create_ts,
(),
);
op_ctx.from_workflow = true;
Expand Down Expand Up @@ -106,10 +106,6 @@ impl ActivityCtx {
self.ts.saturating_sub(self.op_ctx.req_ts())
}

// pub fn perf(&self) -> &chirp_perf::PerfCtx {
// self.conn.perf()
// }

pub fn trace(&self) -> &[chirp_client::TraceEntry] {
self.conn.trace()
}
Expand Down
4 changes: 0 additions & 4 deletions lib/chirp-workflow/core/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,6 @@ impl OperationCtx {
self.ts.saturating_sub(self.op_ctx.req_ts())
}

// pub fn perf(&self) -> &chirp_perf::PerfCtx {
// self.conn.perf()
// }

pub fn trace(&self) -> &[chirp_client::TraceEntry] {
self.conn.trace()
}
Expand Down
32 changes: 30 additions & 2 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct WorkflowCtx {
/// Name of the workflow to run in the registry.
pub name: String,
create_ts: i64,
ts: i64,
ray_id: Uuid,

registry: RegistryHandle,
Expand Down Expand Up @@ -66,6 +67,7 @@ impl WorkflowCtx {
workflow_id: workflow.workflow_id,
name: workflow.workflow_name,
create_ts: workflow.create_ts,
ts: rivet_util::timestamp::now(),

ray_id: workflow.ray_id,

Expand Down Expand Up @@ -96,6 +98,7 @@ impl WorkflowCtx {
workflow_id: self.workflow_id,
name: self.name.clone(),
create_ts: self.create_ts,
ts: self.ts,
ray_id: self.ray_id,

registry: self.registry.clone(),
Expand Down Expand Up @@ -245,6 +248,7 @@ impl WorkflowCtx {
&mut self,
input: &A::Input,
activity_id: &ActivityId,
create_ts: i64,
) -> WorkflowResult<A::Output> {
let ctx = ActivityCtx::new(
self.db.clone(),
Expand Down Expand Up @@ -272,6 +276,7 @@ impl WorkflowCtx {
self.workflow_id,
self.full_location().as_ref(),
activity_id,
create_ts,
input_val,
Ok(output_val),
)
Expand All @@ -290,6 +295,7 @@ impl WorkflowCtx {
self.workflow_id,
self.full_location().as_ref(),
activity_id,
create_ts,
input_val,
Err(&err.to_string()),
)
Expand All @@ -307,6 +313,7 @@ impl WorkflowCtx {
self.workflow_id,
self.full_location().as_ref(),
activity_id,
create_ts,
input_val,
Err(&err.to_string()),
)
Expand Down Expand Up @@ -484,6 +491,7 @@ impl WorkflowCtx {
workflow_id: self.workflow_id,
name: I::Workflow::NAME.to_string(),
create_ts: rivet_util::timestamp::now(),
ts: rivet_util::timestamp::now(),
ray_id: self.ray_id,

registry: self.registry.clone(),
Expand Down Expand Up @@ -551,7 +559,10 @@ impl WorkflowCtx {
else {
let error_count = activity.error_count;

match self.run_activity::<I::Activity>(&input, &activity_id).await {
match self
.run_activity::<I::Activity>(&input, &activity_id, activity.create_ts)
.await
{
Err(err) => {
// Convert error in the case of max retries exceeded. This will only act on retryable
// errors
Expand Down Expand Up @@ -581,7 +592,7 @@ impl WorkflowCtx {
}
// This is a new activity
else {
self.run_activity::<I::Activity>(&input, &activity_id)
self.run_activity::<I::Activity>(&input, &activity_id, rivet_util::timestamp::now())
.await
.map_err(GlobalError::raw)?
};
Expand Down Expand Up @@ -698,3 +709,20 @@ impl WorkflowCtx {

// TODO: sleep_for, sleep_until
}

impl WorkflowCtx {
/// Timestamp at which this workflow run started.
pub fn ts(&self) -> i64 {
self.ts
}

/// Timestamp at which the workflow was created.
pub fn create_ts(&self) -> i64 {
self.create_ts
}

/// Time between when the timestamp was processed and when it was published.
pub fn req_dt(&self) -> i64 {
self.ts.saturating_sub(self.create_ts)
}
}
3 changes: 3 additions & 0 deletions lib/chirp-workflow/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub trait Database: Send {
workflow_id: Uuid,
location: &[usize],
activity_id: &ActivityId,
create_ts: i64,
input: serde_json::Value,
output: Result<serde_json::Value, &str>,
) -> WorkflowResult<()>;
Expand Down Expand Up @@ -76,6 +77,7 @@ pub trait Database: Send {
input: serde_json::Value,
) -> WorkflowResult<()>;

/// Fetches a workflow that has the given json as a subset of its input after the given ts.
async fn poll_workflow(
&self,
name: &str,
Expand Down Expand Up @@ -132,6 +134,7 @@ pub struct ActivityEventRow {
pub input_hash: Vec<u8>,
pub output: Option<serde_json::Value>,
pub error_count: i64,
pub create_ts: i64,
}

#[derive(sqlx::FromRow)]
Expand Down
35 changes: 23 additions & 12 deletions lib/chirp-workflow/core/src/db/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ impl Database for DatabasePostgres {
ev.activity_name,
ev.input_hash,
ev.output,
ev.create_ts,
COUNT(err.workflow_id) AS error_count
FROM db_workflow.workflow_activity_events AS ev
LEFT JOIN db_workflow.workflow_activity_errors AS err
Expand Down Expand Up @@ -226,7 +227,10 @@ impl Database for DatabasePostgres {
sqlx::query_as::<_, SubWorkflowEventRow>(indoc!(
"
SELECT
sw.workflow_id, sw.location, sw.sub_workflow_id, w.workflow_name AS sub_workflow_name
sw.workflow_id,
sw.location,
sw.sub_workflow_id,
w.workflow_name AS sub_workflow_name
FROM db_workflow.workflow_sub_workflow_events AS sw
JOIN db_workflow.workflows AS w
ON sw.sub_workflow_id = w.workflow_id
Expand Down Expand Up @@ -329,17 +333,20 @@ impl Database for DatabasePostgres {
workflow_id: Uuid,
location: &[usize],
activity_id: &ActivityId,
create_ts: i64,
input: serde_json::Value,
res: Result<serde_json::Value, &str>,
) -> WorkflowResult<()> {
match res {
Ok(output) => {
sqlx::query(indoc!(
"
UPSERT INTO db_workflow.workflow_activity_events (
workflow_id, location, activity_name, input_hash, input, output
INSERT INTO db_workflow.workflow_activity_events (
workflow_id, location, activity_name, input_hash, input, output, create_ts
)
VALUES ($1, $2, $3, $4, $5, $6)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (workflow_id, location) DO UPDATE
SET output = excluded.output
",
))
.bind(workflow_id)
Expand All @@ -348,6 +355,8 @@ impl Database for DatabasePostgres {
.bind(activity_id.input_hash.to_le_bytes())
.bind(input)
.bind(output)
.bind(rivet_util::timestamp::now())
.bind(create_ts)
.execute(&mut *self.conn().await?)
.await
.map_err(WorkflowError::Sqlx)?;
Expand All @@ -357,10 +366,11 @@ impl Database for DatabasePostgres {
"
WITH
event AS (
UPSERT INTO db_workflow.workflow_activity_events (
workflow_id, location, activity_name, input_hash, input
INSERT INTO db_workflow.workflow_activity_events (
workflow_id, location, activity_name, input_hash, input, create_ts
)
VALUES ($1, $2, $3, $4, $5)
VALUES ($1, $2, $3, $4, $5, $7)
ON CONFLICT (workflow_id, location) DO NOTHING
RETURNING 1
),
err AS (
Expand All @@ -379,7 +389,7 @@ impl Database for DatabasePostgres {
.bind(activity_id.input_hash.to_le_bytes())
.bind(input)
.bind(err)
.bind(rivet_util::timestamp::now())
.bind(create_ts)
.execute(&mut *self.conn().await?)
.await
.map_err(WorkflowError::Sqlx)?;
Expand Down Expand Up @@ -417,9 +427,9 @@ impl Database for DatabasePostgres {
),
insert_event AS (
INSERT INTO db_workflow.workflow_signal_events(
workflow_id, location, signal_id, signal_name, body
workflow_id, location, signal_id, signal_name, body, ack_ts
)
SELECT workflow_id, $3 AS location, signal_id, signal_name, body
SELECT workflow_id, $3 AS location, signal_id, signal_name, body, $4 AS ack_ts
FROM next_signal
RETURNING 1
)
Expand All @@ -429,6 +439,7 @@ impl Database for DatabasePostgres {
.bind(workflow_id)
.bind(filter)
.bind(location.iter().map(|x| *x as i64).collect::<Vec<_>>())
.bind(rivet_util::timestamp::now())
.fetch_optional(&mut *self.conn().await?)
.await
.map_err(WorkflowError::Sqlx)?;
Expand Down Expand Up @@ -484,9 +495,9 @@ impl Database for DatabasePostgres {
),
sub_workflow AS (
INSERT INTO db_workflow.workflow_sub_workflow_events(
workflow_id, location, sub_workflow_id
workflow_id, location, sub_workflow_id, create_ts
)
VALUES($1, $6, $7)
VALUES($1, $6, $7, $3)
RETURNING 1
)
SELECT 1
Expand Down
2 changes: 2 additions & 0 deletions lib/chirp-workflow/core/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub enum Event {
#[derive(Debug)]
pub struct ActivityEvent {
pub activity_id: ActivityId,
pub create_ts: i64,

/// If activity succeeds, this will be some.
pub(crate) output: Option<serde_json::Value>,
Expand All @@ -45,6 +46,7 @@ impl TryFrom<ActivityEventRow> for ActivityEvent {
fn try_from(value: ActivityEventRow) -> WorkflowResult<Self> {
Ok(ActivityEvent {
activity_id: ActivityId::from_bytes(value.activity_name, value.input_hash)?,
create_ts: value.create_ts,
output: value.output,
error_count: value
.error_count
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
ALTER TABLE workflow_activity_events
ADD COLUMN create_ts INT NOT NULL DEFAULT 1;

ALTER TABLE workflow_signal_events
ADD COLUMN ack_ts INT NOT NULL DEFAULT 1;

ALTER TABLE workflow_sub_workflow_events
ADD COLUMN create_ts INT NOT NULL DEFAULT 1;