Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions lib/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use global_error::{GlobalError, GlobalResult};
use rivet_pools::prelude::*;
use uuid::Uuid;

use crate::{ctx::OperationCtx, util, DatabaseHandle, Operation, OperationInput, WorkflowError};
use crate::{ctx::OperationCtx, DatabaseHandle, Operation, OperationInput, WorkflowError};

pub struct ActivityCtx {
ray_id: Uuid,
Expand All @@ -26,7 +26,19 @@ impl ActivityCtx {
name: &'static str,
) -> Self {
let ts = rivet_util::timestamp::now();
let (conn, op_ctx) = util::wrap_conn(conn, ray_id, workflow_create_ts, true, name, ts);
let req_id = Uuid::new_v4();
let conn = conn.wrap(req_id, ray_id, name);
let mut op_ctx = rivet_operation::OperationContext::new(
name.to_string(),
std::time::Duration::from_secs(60),
conn.clone(),
req_id,
ray_id,
ts,
workflow_create_ts,
(),
);
op_ctx.from_workflow = true;

ActivityCtx {
ray_id,
Expand Down
16 changes: 14 additions & 2 deletions lib/chirp-workflow/core/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use global_error::{GlobalError, GlobalResult};
use rivet_pools::prelude::*;
use uuid::Uuid;

use crate::{util, DatabaseHandle, Operation, OperationInput, WorkflowError};
use crate::{DatabaseHandle, Operation, OperationInput, WorkflowError};

pub struct OperationCtx {
ray_id: Uuid,
Expand All @@ -27,7 +27,19 @@ impl OperationCtx {
name: &'static str,
) -> Self {
let ts = rivet_util::timestamp::now();
let (conn, op_ctx) = util::wrap_conn(conn, ray_id, req_ts, from_workflow, name, ts);
let req_id = Uuid::new_v4();
let conn = conn.wrap(req_id, ray_id, name);
let mut op_ctx = rivet_operation::OperationContext::new(
name.to_string(),
std::time::Duration::from_secs(60),
conn.clone(),
req_id,
ray_id,
ts,
req_ts,
(),
);
op_ctx.from_workflow = from_workflow;

OperationCtx {
ray_id,
Expand Down
68 changes: 61 additions & 7 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,7 @@ impl WorkflowCtx {
}
}

// TODO(RVTEE-103): Run sub workflow inline as a branch of the parent workflow
/// Trigger another workflow and wait for its response.
/// Runs a sub workflow in the same process as the current workflow and returns its response.
pub async fn workflow<I>(
&mut self,
input: I,
Expand All @@ -430,11 +429,66 @@ impl WorkflowCtx {
I: WorkflowInput,
<I as WorkflowInput>::Workflow: Workflow<Input = I>,
{
let sub_workflow_id = self.dispatch_workflow(input).await?;
let output = self
.wait_for_workflow::<I::Workflow>(sub_workflow_id)
.await?;
Ok(output)
// Lookup workflow
let Ok(workflow) = self.registry.get_workflow(I::Workflow::name()) else {
tracing::warn!(
id=%self.workflow_id,
name=%I::Workflow::name(),
"sub workflow not found in current registry",
);

// TODO(RVT-3755): If a sub workflow is dispatched, then the worker is updated to include the sub
// worker in the registry, this will diverge in history because it will try to run the sub worker
// in process during the replay
// If the workflow isn't in the current registry, dispatch the workflow instead
let sub_workflow_id = self.dispatch_workflow(input).await?;
let output = self
.wait_for_workflow::<I::Workflow>(sub_workflow_id)
.await?;

return Ok(output);
};

tracing::info!(id=%self.workflow_id, name=%I::Workflow::name(), "running sub workflow");

// Create a new branched workflow context for the sub workflow
let mut ctx = WorkflowCtx {
workflow_id: self.workflow_id,
name: I::Workflow::name().to_string(),
create_ts: rivet_util::timestamp::now(),
ray_id: self.ray_id,

registry: self.registry.clone(),
db: self.db.clone(),

conn: self
.conn
.wrap(Uuid::new_v4(), self.ray_id, I::Workflow::name()),

event_history: self.event_history.clone(),

// TODO(RVT-3756): This is redundant with the deserialization in `workflow.run` in the registry
input: Arc::new(serde_json::to_value(input)?),

root_location: self
.root_location
.iter()
.cloned()
.chain(std::iter::once(self.location_idx))
.collect(),
location_idx: 0,
};

self.location_idx += 1;

// Run workflow
let output = (workflow.run)(&mut ctx).await?;

// TODO: RVT-3756
// Deserialize output
serde_json::from_value(output)
.map_err(WorkflowError::DeserializeWorkflowOutput)
.map_err(GlobalError::raw)
}

/// Run activity. Will replay on failure.
Expand Down
2 changes: 1 addition & 1 deletion lib/chirp-workflow/core/src/db/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ impl Database for DatabasePostgres {
INSERT INTO db_workflow.workflows (
workflow_id, workflow_name, create_ts, ray_id, input, wake_immediate
)
VALUES ($5, $2, $3, $4, $5, true)
VALUES ($7, $2, $3, $4, $5, true)
RETURNING 1
),
sub_workflow AS (
Expand Down
37 changes: 0 additions & 37 deletions lib/chirp-workflow/core/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,40 +136,3 @@ pub(crate) fn new_conn(

rivet_connection::Connection::new(client, pools.clone(), cache.clone())
}

pub fn wrap_conn(
conn: &rivet_connection::Connection,
ray_id: Uuid,
req_ts: i64,
from_workflow: bool,
name: &str,
ts: i64,
) -> (
rivet_connection::Connection,
rivet_operation::OperationContext<()>,
) {
let req_id = Uuid::new_v4();
let trace_entry = chirp_client::TraceEntry {
context_name: name.to_string(),
req_id: Some(req_id.into()),
ts,
run_context: match rivet_util::env::run_context() {
rivet_util::env::RunContext::Service => chirp_client::RunContext::Service,
rivet_util::env::RunContext::Test => chirp_client::RunContext::Test,
} as i32,
};
let conn = conn.wrap(req_id, ray_id, trace_entry);
let mut op_ctx = rivet_operation::OperationContext::new(
name.to_string(),
std::time::Duration::from_secs(60),
conn.clone(),
req_id,
ray_id,
ts,
req_ts,
(),
);
op_ctx.from_workflow = from_workflow;

(conn, op_ctx)
}
3 changes: 2 additions & 1 deletion lib/chirp-workflow/core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ impl Worker {

pub async fn start(mut self, pools: rivet_pools::Pools) -> GlobalResult<()> {
let mut interval = tokio::time::interval(TICK_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let shared_client = chirp_client::SharedClient::from_env(pools.clone())?;
let cache = rivet_cache::CacheInner::from_env(pools.clone())?;
Expand All @@ -31,7 +32,7 @@ impl Worker {
}
}

// Query the database for new workflows and run them.
/// Query the database for new workflows and run them.
async fn tick(
&mut self,
shared_client: &chirp_client::SharedClientHandle,
Expand Down
67 changes: 0 additions & 67 deletions lib/chirp-workflow/core/tests/basic.rs

This file was deleted.

33 changes: 0 additions & 33 deletions lib/chirp-workflow/core/tests/common.rs

This file was deleted.

Loading