Skip to content

Commit

Permalink
fix(workflows): add ts dt (#943)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Jun 26, 2024
1 parent 152c55b commit 1b362fd
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 24 deletions.
8 changes: 2 additions & 6 deletions lib/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl ActivityCtx {
pub fn new(
db: DatabaseHandle,
conn: &rivet_connection::Connection,
workflow_create_ts: i64,
activity_create_ts: i64,
ray_id: Uuid,
name: &'static str,
) -> Self {
Expand All @@ -36,7 +36,7 @@ impl ActivityCtx {
req_id,
ray_id,
ts,
workflow_create_ts,
activity_create_ts,
(),
);
op_ctx.from_workflow = true;
Expand Down Expand Up @@ -106,10 +106,6 @@ impl ActivityCtx {
self.ts.saturating_sub(self.op_ctx.req_ts())
}

// pub fn perf(&self) -> &chirp_perf::PerfCtx {
// self.conn.perf()
// }

pub fn trace(&self) -> &[chirp_client::TraceEntry] {
self.conn.trace()
}
Expand Down
4 changes: 0 additions & 4 deletions lib/chirp-workflow/core/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,6 @@ impl OperationCtx {
self.ts.saturating_sub(self.op_ctx.req_ts())
}

// pub fn perf(&self) -> &chirp_perf::PerfCtx {
// self.conn.perf()
// }

pub fn trace(&self) -> &[chirp_client::TraceEntry] {
self.conn.trace()
}
Expand Down
32 changes: 30 additions & 2 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct WorkflowCtx {
/// Name of the workflow to run in the registry.
pub name: String,
create_ts: i64,
ts: i64,
ray_id: Uuid,

registry: RegistryHandle,
Expand Down Expand Up @@ -66,6 +67,7 @@ impl WorkflowCtx {
workflow_id: workflow.workflow_id,
name: workflow.workflow_name,
create_ts: workflow.create_ts,
ts: rivet_util::timestamp::now(),

ray_id: workflow.ray_id,

Expand Down Expand Up @@ -96,6 +98,7 @@ impl WorkflowCtx {
workflow_id: self.workflow_id,
name: self.name.clone(),
create_ts: self.create_ts,
ts: self.ts,
ray_id: self.ray_id,

registry: self.registry.clone(),
Expand Down Expand Up @@ -245,6 +248,7 @@ impl WorkflowCtx {
&mut self,
input: &A::Input,
activity_id: &ActivityId,
create_ts: i64,
) -> WorkflowResult<A::Output> {
let ctx = ActivityCtx::new(
self.db.clone(),
Expand Down Expand Up @@ -272,6 +276,7 @@ impl WorkflowCtx {
self.workflow_id,
self.full_location().as_ref(),
activity_id,
create_ts,
input_val,
Ok(output_val),
)
Expand All @@ -290,6 +295,7 @@ impl WorkflowCtx {
self.workflow_id,
self.full_location().as_ref(),
activity_id,
create_ts,
input_val,
Err(&err.to_string()),
)
Expand All @@ -307,6 +313,7 @@ impl WorkflowCtx {
self.workflow_id,
self.full_location().as_ref(),
activity_id,
create_ts,
input_val,
Err(&err.to_string()),
)
Expand Down Expand Up @@ -484,6 +491,7 @@ impl WorkflowCtx {
workflow_id: self.workflow_id,
name: I::Workflow::NAME.to_string(),
create_ts: rivet_util::timestamp::now(),
ts: rivet_util::timestamp::now(),
ray_id: self.ray_id,

registry: self.registry.clone(),
Expand Down Expand Up @@ -551,7 +559,10 @@ impl WorkflowCtx {
else {
let error_count = activity.error_count;

match self.run_activity::<I::Activity>(&input, &activity_id).await {
match self
.run_activity::<I::Activity>(&input, &activity_id, activity.create_ts)
.await
{
Err(err) => {
// Convert error in the case of max retries exceeded. This will only act on retryable
// errors
Expand Down Expand Up @@ -581,7 +592,7 @@ impl WorkflowCtx {
}
// This is a new activity
else {
self.run_activity::<I::Activity>(&input, &activity_id)
self.run_activity::<I::Activity>(&input, &activity_id, rivet_util::timestamp::now())
.await
.map_err(GlobalError::raw)?
};
Expand Down Expand Up @@ -698,3 +709,20 @@ impl WorkflowCtx {

// TODO: sleep_for, sleep_until
}

impl WorkflowCtx {
/// Timestamp at which this workflow run started.
pub fn ts(&self) -> i64 {
self.ts
}

/// Timestamp at which the workflow was created.
pub fn create_ts(&self) -> i64 {
self.create_ts
}

/// Time between when the timestamp was processed and when it was published.
pub fn req_dt(&self) -> i64 {
self.ts.saturating_sub(self.create_ts)
}
}
3 changes: 3 additions & 0 deletions lib/chirp-workflow/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub trait Database: Send {
workflow_id: Uuid,
location: &[usize],
activity_id: &ActivityId,
create_ts: i64,
input: serde_json::Value,
output: Result<serde_json::Value, &str>,
) -> WorkflowResult<()>;
Expand Down Expand Up @@ -76,6 +77,7 @@ pub trait Database: Send {
input: serde_json::Value,
) -> WorkflowResult<()>;

/// Fetches a workflow that has the given json as a subset of its input after the given ts.
async fn poll_workflow(
&self,
name: &str,
Expand Down Expand Up @@ -132,6 +134,7 @@ pub struct ActivityEventRow {
pub input_hash: Vec<u8>,
pub output: Option<serde_json::Value>,
pub error_count: i64,
pub create_ts: i64,
}

#[derive(sqlx::FromRow)]
Expand Down
35 changes: 23 additions & 12 deletions lib/chirp-workflow/core/src/db/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ impl Database for DatabasePostgres {
ev.activity_name,
ev.input_hash,
ev.output,
ev.create_ts,
COUNT(err.workflow_id) AS error_count
FROM db_workflow.workflow_activity_events AS ev
LEFT JOIN db_workflow.workflow_activity_errors AS err
Expand Down Expand Up @@ -226,7 +227,10 @@ impl Database for DatabasePostgres {
sqlx::query_as::<_, SubWorkflowEventRow>(indoc!(
"
SELECT
sw.workflow_id, sw.location, sw.sub_workflow_id, w.workflow_name AS sub_workflow_name
sw.workflow_id,
sw.location,
sw.sub_workflow_id,
w.workflow_name AS sub_workflow_name
FROM db_workflow.workflow_sub_workflow_events AS sw
JOIN db_workflow.workflows AS w
ON sw.sub_workflow_id = w.workflow_id
Expand Down Expand Up @@ -329,17 +333,20 @@ impl Database for DatabasePostgres {
workflow_id: Uuid,
location: &[usize],
activity_id: &ActivityId,
create_ts: i64,
input: serde_json::Value,
res: Result<serde_json::Value, &str>,
) -> WorkflowResult<()> {
match res {
Ok(output) => {
sqlx::query(indoc!(
"
UPSERT INTO db_workflow.workflow_activity_events (
workflow_id, location, activity_name, input_hash, input, output
INSERT INTO db_workflow.workflow_activity_events (
workflow_id, location, activity_name, input_hash, input, output, create_ts
)
VALUES ($1, $2, $3, $4, $5, $6)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (workflow_id, location) DO UPDATE
SET output = excluded.output
",
))
.bind(workflow_id)
Expand All @@ -348,6 +355,8 @@ impl Database for DatabasePostgres {
.bind(activity_id.input_hash.to_le_bytes())
.bind(input)
.bind(output)
.bind(rivet_util::timestamp::now())
.bind(create_ts)
.execute(&mut *self.conn().await?)
.await
.map_err(WorkflowError::Sqlx)?;
Expand All @@ -357,10 +366,11 @@ impl Database for DatabasePostgres {
"
WITH
event AS (
UPSERT INTO db_workflow.workflow_activity_events (
workflow_id, location, activity_name, input_hash, input
INSERT INTO db_workflow.workflow_activity_events (
workflow_id, location, activity_name, input_hash, input, create_ts
)
VALUES ($1, $2, $3, $4, $5)
VALUES ($1, $2, $3, $4, $5, $7)
ON CONFLICT (workflow_id, location) DO NOTHING
RETURNING 1
),
err AS (
Expand All @@ -379,7 +389,7 @@ impl Database for DatabasePostgres {
.bind(activity_id.input_hash.to_le_bytes())
.bind(input)
.bind(err)
.bind(rivet_util::timestamp::now())
.bind(create_ts)
.execute(&mut *self.conn().await?)
.await
.map_err(WorkflowError::Sqlx)?;
Expand Down Expand Up @@ -417,9 +427,9 @@ impl Database for DatabasePostgres {
),
insert_event AS (
INSERT INTO db_workflow.workflow_signal_events(
workflow_id, location, signal_id, signal_name, body
workflow_id, location, signal_id, signal_name, body, ack_ts
)
SELECT workflow_id, $3 AS location, signal_id, signal_name, body
SELECT workflow_id, $3 AS location, signal_id, signal_name, body, $4 AS ack_ts
FROM next_signal
RETURNING 1
)
Expand All @@ -429,6 +439,7 @@ impl Database for DatabasePostgres {
.bind(workflow_id)
.bind(filter)
.bind(location.iter().map(|x| *x as i64).collect::<Vec<_>>())
.bind(rivet_util::timestamp::now())
.fetch_optional(&mut *self.conn().await?)
.await
.map_err(WorkflowError::Sqlx)?;
Expand Down Expand Up @@ -484,9 +495,9 @@ impl Database for DatabasePostgres {
),
sub_workflow AS (
INSERT INTO db_workflow.workflow_sub_workflow_events(
workflow_id, location, sub_workflow_id
workflow_id, location, sub_workflow_id, create_ts
)
VALUES($1, $6, $7)
VALUES($1, $6, $7, $3)
RETURNING 1
)
SELECT 1
Expand Down
2 changes: 2 additions & 0 deletions lib/chirp-workflow/core/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub enum Event {
#[derive(Debug)]
pub struct ActivityEvent {
pub activity_id: ActivityId,
pub create_ts: i64,

/// If activity succeeds, this will be some.
pub(crate) output: Option<serde_json::Value>,
Expand All @@ -45,6 +46,7 @@ impl TryFrom<ActivityEventRow> for ActivityEvent {
fn try_from(value: ActivityEventRow) -> WorkflowResult<Self> {
Ok(ActivityEvent {
activity_id: ActivityId::from_bytes(value.activity_name, value.input_hash)?,
create_ts: value.create_ts,
output: value.output,
error_count: value
.error_count
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
ALTER TABLE workflow_activity_events
ADD COLUMN create_ts INT NOT NULL DEFAULT 1;

ALTER TABLE workflow_signal_events
ADD COLUMN ack_ts INT NOT NULL DEFAULT 1;

ALTER TABLE workflow_sub_workflow_events
ADD COLUMN create_ts INT NOT NULL DEFAULT 1;

0 comments on commit 1b362fd

Please sign in to comment.