Skip to content

Commit 21fc035

Browse files
committed
feat: run sub workflows in the same process
1 parent 9f1ead6 commit 21fc035

File tree

12 files changed

+92
-817
lines changed

12 files changed

+92
-817
lines changed

lib/chirp-workflow/core/src/ctx/activity.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use global_error::{GlobalError, GlobalResult};
22
use rivet_pools::prelude::*;
33
use uuid::Uuid;
44

5-
use crate::{ctx::OperationCtx, util, DatabaseHandle, Operation, OperationInput, WorkflowError};
5+
use crate::{ctx::OperationCtx, DatabaseHandle, Operation, OperationInput, WorkflowError};
66

77
pub struct ActivityCtx {
88
ray_id: Uuid,
@@ -26,7 +26,19 @@ impl ActivityCtx {
2626
name: &'static str,
2727
) -> Self {
2828
let ts = rivet_util::timestamp::now();
29-
let (conn, op_ctx) = util::wrap_conn(conn, ray_id, workflow_create_ts, true, name, ts);
29+
let req_id = Uuid::new_v4();
30+
let conn = conn.wrap(req_id, ray_id, name);
31+
let mut op_ctx = rivet_operation::OperationContext::new(
32+
name.to_string(),
33+
std::time::Duration::from_secs(60),
34+
conn.clone(),
35+
req_id,
36+
ray_id,
37+
ts,
38+
workflow_create_ts,
39+
(),
40+
);
41+
op_ctx.from_workflow = true;
3042

3143
ActivityCtx {
3244
ray_id,

lib/chirp-workflow/core/src/ctx/operation.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use global_error::{GlobalError, GlobalResult};
22
use rivet_pools::prelude::*;
33
use uuid::Uuid;
44

5-
use crate::{util, DatabaseHandle, Operation, OperationInput, WorkflowError};
5+
use crate::{DatabaseHandle, Operation, OperationInput, WorkflowError};
66

77
pub struct OperationCtx {
88
ray_id: Uuid,
@@ -27,7 +27,19 @@ impl OperationCtx {
2727
name: &'static str,
2828
) -> Self {
2929
let ts = rivet_util::timestamp::now();
30-
let (conn, op_ctx) = util::wrap_conn(conn, ray_id, req_ts, from_workflow, name, ts);
30+
let req_id = Uuid::new_v4();
31+
let conn = conn.wrap(req_id, ray_id, name);
32+
let mut op_ctx = rivet_operation::OperationContext::new(
33+
name.to_string(),
34+
std::time::Duration::from_secs(60),
35+
conn.clone(),
36+
req_id,
37+
ray_id,
38+
ts,
39+
req_ts,
40+
(),
41+
);
42+
op_ctx.from_workflow = from_workflow;
3143

3244
OperationCtx {
3345
ray_id,

lib/chirp-workflow/core/src/ctx/workflow.rs

Lines changed: 61 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -420,8 +420,7 @@ impl WorkflowCtx {
420420
}
421421
}
422422

423-
// TODO(RVTEE-103): Run sub workflow inline as a branch of the parent workflow
424-
/// Trigger another workflow and wait for its response.
423+
/// Runs a sub workflow in the same process as the current workflow and returns its response.
425424
pub async fn workflow<I>(
426425
&mut self,
427426
input: I,
@@ -430,11 +429,66 @@ impl WorkflowCtx {
430429
I: WorkflowInput,
431430
<I as WorkflowInput>::Workflow: Workflow<Input = I>,
432431
{
433-
let sub_workflow_id = self.dispatch_workflow(input).await?;
434-
let output = self
435-
.wait_for_workflow::<I::Workflow>(sub_workflow_id)
436-
.await?;
437-
Ok(output)
432+
// Lookup workflow
433+
let Ok(workflow) = self.registry.get_workflow(I::Workflow::name()) else {
434+
tracing::warn!(
435+
id=%self.workflow_id,
436+
name=%I::Workflow::name(),
437+
"sub workflow not found in current registry",
438+
);
439+
440+
// TODO(RVT-3755): If a sub workflow is dispatched, then the worker is updated to include the sub
441+
// worker in the registry, this will diverge in history because it will try to run the sub worker
442+
// in process during the replay
443+
// If the workflow isn't in the current registry, dispatch the workflow instead
444+
let sub_workflow_id = self.dispatch_workflow(input).await?;
445+
let output = self
446+
.wait_for_workflow::<I::Workflow>(sub_workflow_id)
447+
.await?;
448+
449+
return Ok(output);
450+
};
451+
452+
tracing::info!(id=%self.workflow_id, name=%I::Workflow::name(), "running sub workflow");
453+
454+
// Create a new branched workflow context for the sub workflow
455+
let mut ctx = WorkflowCtx {
456+
workflow_id: self.workflow_id,
457+
name: I::Workflow::name().to_string(),
458+
create_ts: rivet_util::timestamp::now(),
459+
ray_id: self.ray_id,
460+
461+
registry: self.registry.clone(),
462+
db: self.db.clone(),
463+
464+
conn: self
465+
.conn
466+
.wrap(Uuid::new_v4(), self.ray_id, I::Workflow::name()),
467+
468+
event_history: self.event_history.clone(),
469+
470+
// TODO(RVT-3756): This is redundant with the deserialization in `workflow.run` in the registry
471+
input: Arc::new(serde_json::to_value(input)?),
472+
473+
root_location: self
474+
.root_location
475+
.iter()
476+
.cloned()
477+
.chain(std::iter::once(self.location_idx))
478+
.collect(),
479+
location_idx: 0,
480+
};
481+
482+
self.location_idx += 1;
483+
484+
// Run workflow
485+
let output = (workflow.run)(&mut ctx).await?;
486+
487+
// TODO: RVT-3756
488+
// Deserialize output
489+
serde_json::from_value(output)
490+
.map_err(WorkflowError::DeserializeWorkflowOutput)
491+
.map_err(GlobalError::raw)
438492
}
439493

440494
/// Run activity. Will replay on failure.

lib/chirp-workflow/core/src/db/postgres.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ impl Database for DatabasePostgres {
413413
INSERT INTO db_workflow.workflows (
414414
workflow_id, workflow_name, create_ts, ray_id, input, wake_immediate
415415
)
416-
VALUES ($5, $2, $3, $4, $5, true)
416+
VALUES ($7, $2, $3, $4, $5, true)
417417
RETURNING 1
418418
),
419419
sub_workflow AS (

lib/chirp-workflow/core/src/util.rs

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -136,40 +136,3 @@ pub(crate) fn new_conn(
136136

137137
rivet_connection::Connection::new(client, pools.clone(), cache.clone())
138138
}
139-
140-
pub fn wrap_conn(
141-
conn: &rivet_connection::Connection,
142-
ray_id: Uuid,
143-
req_ts: i64,
144-
from_workflow: bool,
145-
name: &str,
146-
ts: i64,
147-
) -> (
148-
rivet_connection::Connection,
149-
rivet_operation::OperationContext<()>,
150-
) {
151-
let req_id = Uuid::new_v4();
152-
let trace_entry = chirp_client::TraceEntry {
153-
context_name: name.to_string(),
154-
req_id: Some(req_id.into()),
155-
ts,
156-
run_context: match rivet_util::env::run_context() {
157-
rivet_util::env::RunContext::Service => chirp_client::RunContext::Service,
158-
rivet_util::env::RunContext::Test => chirp_client::RunContext::Test,
159-
} as i32,
160-
};
161-
let conn = conn.wrap(req_id, ray_id, trace_entry);
162-
let mut op_ctx = rivet_operation::OperationContext::new(
163-
name.to_string(),
164-
std::time::Duration::from_secs(60),
165-
conn.clone(),
166-
req_id,
167-
ray_id,
168-
ts,
169-
req_ts,
170-
(),
171-
);
172-
op_ctx.from_workflow = from_workflow;
173-
174-
(conn, op_ctx)
175-
}

lib/chirp-workflow/core/src/worker.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ impl Worker {
2121

2222
pub async fn start(mut self, pools: rivet_pools::Pools) -> GlobalResult<()> {
2323
let mut interval = tokio::time::interval(TICK_INTERVAL);
24+
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2425

2526
let shared_client = chirp_client::SharedClient::from_env(pools.clone())?;
2627
let cache = rivet_cache::CacheInner::from_env(pools.clone())?;
@@ -31,7 +32,7 @@ impl Worker {
3132
}
3233
}
3334

34-
// Query the database for new workflows and run them.
35+
/// Query the database for new workflows and run them.
3536
async fn tick(
3637
&mut self,
3738
shared_client: &chirp_client::SharedClientHandle,

lib/chirp-workflow/core/tests/basic.rs

Lines changed: 0 additions & 67 deletions
This file was deleted.

lib/chirp-workflow/core/tests/common.rs

Lines changed: 0 additions & 33 deletions
This file was deleted.

0 commit comments

Comments
 (0)