diff --git a/lib/chirp-workflow/core/src/ctx/activity.rs b/lib/chirp-workflow/core/src/ctx/activity.rs index cc3c2ce88d..2df0b39bbc 100644 --- a/lib/chirp-workflow/core/src/ctx/activity.rs +++ b/lib/chirp-workflow/core/src/ctx/activity.rs @@ -2,7 +2,7 @@ use global_error::{GlobalError, GlobalResult}; use rivet_pools::prelude::*; use uuid::Uuid; -use crate::{ctx::OperationCtx, util, DatabaseHandle, Operation, OperationInput, WorkflowError}; +use crate::{ctx::OperationCtx, DatabaseHandle, Operation, OperationInput, WorkflowError}; pub struct ActivityCtx { ray_id: Uuid, @@ -26,7 +26,19 @@ impl ActivityCtx { name: &'static str, ) -> Self { let ts = rivet_util::timestamp::now(); - let (conn, op_ctx) = util::wrap_conn(conn, ray_id, workflow_create_ts, true, name, ts); + let req_id = Uuid::new_v4(); + let conn = conn.wrap(req_id, ray_id, name); + let mut op_ctx = rivet_operation::OperationContext::new( + name.to_string(), + std::time::Duration::from_secs(60), + conn.clone(), + req_id, + ray_id, + ts, + workflow_create_ts, + (), + ); + op_ctx.from_workflow = true; ActivityCtx { ray_id, diff --git a/lib/chirp-workflow/core/src/ctx/operation.rs b/lib/chirp-workflow/core/src/ctx/operation.rs index 2139a444f5..74490ebda3 100644 --- a/lib/chirp-workflow/core/src/ctx/operation.rs +++ b/lib/chirp-workflow/core/src/ctx/operation.rs @@ -2,7 +2,7 @@ use global_error::{GlobalError, GlobalResult}; use rivet_pools::prelude::*; use uuid::Uuid; -use crate::{util, DatabaseHandle, Operation, OperationInput, WorkflowError}; +use crate::{DatabaseHandle, Operation, OperationInput, WorkflowError}; pub struct OperationCtx { ray_id: Uuid, @@ -27,7 +27,19 @@ impl OperationCtx { name: &'static str, ) -> Self { let ts = rivet_util::timestamp::now(); - let (conn, op_ctx) = util::wrap_conn(conn, ray_id, req_ts, from_workflow, name, ts); + let req_id = Uuid::new_v4(); + let conn = conn.wrap(req_id, ray_id, name); + let mut op_ctx = rivet_operation::OperationContext::new( + name.to_string(), + std::time::Duration::from_secs(60), + conn.clone(), + req_id, + ray_id, + ts, + req_ts, + (), + ); + op_ctx.from_workflow = from_workflow; OperationCtx { ray_id, diff --git a/lib/chirp-workflow/core/src/ctx/workflow.rs b/lib/chirp-workflow/core/src/ctx/workflow.rs index acc9bd8ffd..6eb6ea93f9 100644 --- a/lib/chirp-workflow/core/src/ctx/workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/workflow.rs @@ -420,8 +420,7 @@ impl WorkflowCtx { } } - // TODO(RVTEE-103): Run sub workflow inline as a branch of the parent workflow - /// Trigger another workflow and wait for its response. + /// Runs a sub workflow in the same process as the current workflow and returns its response. pub async fn workflow( &mut self, input: I, @@ -430,11 +429,66 @@ impl WorkflowCtx { I: WorkflowInput, ::Workflow: Workflow, { - let sub_workflow_id = self.dispatch_workflow(input).await?; - let output = self - .wait_for_workflow::(sub_workflow_id) - .await?; - Ok(output) + // Lookup workflow + let Ok(workflow) = self.registry.get_workflow(I::Workflow::name()) else { + tracing::warn!( + id=%self.workflow_id, + name=%I::Workflow::name(), + "sub workflow not found in current registry", + ); + + // TODO(RVT-3755): If a sub workflow is dispatched, then the worker is updated to include the sub + // worker in the registry, this will diverge in history because it will try to run the sub worker + // in process during the replay + // If the workflow isn't in the current registry, dispatch the workflow instead + let sub_workflow_id = self.dispatch_workflow(input).await?; + let output = self + .wait_for_workflow::(sub_workflow_id) + .await?; + + return Ok(output); + }; + + tracing::info!(id=%self.workflow_id, name=%I::Workflow::name(), "running sub workflow"); + + // Create a new branched workflow context for the sub workflow + let mut ctx = WorkflowCtx { + workflow_id: self.workflow_id, + name: I::Workflow::name().to_string(), + create_ts: rivet_util::timestamp::now(), + ray_id: self.ray_id, + + registry: self.registry.clone(), + db: self.db.clone(), + + conn: self + .conn + .wrap(Uuid::new_v4(), self.ray_id, I::Workflow::name()), + + event_history: self.event_history.clone(), + + // TODO(RVT-3756): This is redundant with the deserialization in `workflow.run` in the registry + input: Arc::new(serde_json::to_value(input)?), + + root_location: self + .root_location + .iter() + .cloned() + .chain(std::iter::once(self.location_idx)) + .collect(), + location_idx: 0, + }; + + self.location_idx += 1; + + // Run workflow + let output = (workflow.run)(&mut ctx).await?; + + // TODO: RVT-3756 + // Deserialize output + serde_json::from_value(output) + .map_err(WorkflowError::DeserializeWorkflowOutput) + .map_err(GlobalError::raw) } /// Run activity. Will replay on failure. diff --git a/lib/chirp-workflow/core/src/db/postgres.rs b/lib/chirp-workflow/core/src/db/postgres.rs index 7e26f94a1a..f9d8c47196 100644 --- a/lib/chirp-workflow/core/src/db/postgres.rs +++ b/lib/chirp-workflow/core/src/db/postgres.rs @@ -413,7 +413,7 @@ impl Database for DatabasePostgres { INSERT INTO db_workflow.workflows ( workflow_id, workflow_name, create_ts, ray_id, input, wake_immediate ) - VALUES ($5, $2, $3, $4, $5, true) + VALUES ($7, $2, $3, $4, $5, true) RETURNING 1 ), sub_workflow AS ( diff --git a/lib/chirp-workflow/core/src/util.rs b/lib/chirp-workflow/core/src/util.rs index 19302e2041..f5c66da979 100644 --- a/lib/chirp-workflow/core/src/util.rs +++ b/lib/chirp-workflow/core/src/util.rs @@ -136,40 +136,3 @@ pub(crate) fn new_conn( rivet_connection::Connection::new(client, pools.clone(), cache.clone()) } - -pub fn wrap_conn( - conn: &rivet_connection::Connection, - ray_id: Uuid, - req_ts: i64, - from_workflow: bool, - name: &str, - ts: i64, -) -> ( - rivet_connection::Connection, - rivet_operation::OperationContext<()>, -) { - let req_id = Uuid::new_v4(); - let trace_entry = chirp_client::TraceEntry { - context_name: name.to_string(), - req_id: Some(req_id.into()), - ts, - run_context: match rivet_util::env::run_context() { - rivet_util::env::RunContext::Service => chirp_client::RunContext::Service, - rivet_util::env::RunContext::Test => chirp_client::RunContext::Test, - } as i32, - }; - let conn = conn.wrap(req_id, ray_id, trace_entry); - let mut op_ctx = rivet_operation::OperationContext::new( - name.to_string(), - std::time::Duration::from_secs(60), - conn.clone(), - req_id, - ray_id, - ts, - req_ts, - (), - ); - op_ctx.from_workflow = from_workflow; - - (conn, op_ctx) -} diff --git a/lib/chirp-workflow/core/src/worker.rs b/lib/chirp-workflow/core/src/worker.rs index 0b5d603119..f327bcc9e8 100644 --- a/lib/chirp-workflow/core/src/worker.rs +++ b/lib/chirp-workflow/core/src/worker.rs @@ -21,6 +21,7 @@ impl Worker { pub async fn start(mut self, pools: rivet_pools::Pools) -> GlobalResult<()> { let mut interval = tokio::time::interval(TICK_INTERVAL); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let shared_client = chirp_client::SharedClient::from_env(pools.clone())?; let cache = rivet_cache::CacheInner::from_env(pools.clone())?; @@ -31,7 +32,7 @@ impl Worker { } } - // Query the database for new workflows and run them. + /// Query the database for new workflows and run them. async fn tick( &mut self, shared_client: &chirp_client::SharedClientHandle, diff --git a/lib/chirp-workflow/core/tests/basic.rs b/lib/chirp-workflow/core/tests/basic.rs deleted file mode 100644 index 4c6fef7228..0000000000 --- a/lib/chirp-workflow/core/tests/basic.rs +++ /dev/null @@ -1,67 +0,0 @@ -use anyhow::*; -use futures_util::{StreamExt, TryStreamExt}; -use serde::{Deserialize, Serialize}; -use wf::*; - -mod common; - -use crate::common::MyActivityInput; - -#[tokio::test] -async fn basic() -> Result<()> { - common::setup(); - - let db = - DatabasePostgres::new("postgres://root@127.0.0.1:26257/postgres?sslmode=disable").await?; - - let mut registry = Registry::new(); - registry.register_workflow::(); - let registry = registry.handle(); - - let worker = Worker::new(registry.clone(), db.clone()); - tokio::spawn(async move { - if let Err(err) = worker.start().await { - tracing::error!(?err, "worker failed"); - } - }); - - let ctx = common::TestCtx::new(db); - - // Run 20 workflows at once - futures_util::stream::iter(0..20) - .map(|_| async { - let output = ctx.workflow(MyWorkflowInput { x: 5 }).await?; - assert_eq!(20, output.y); - - Ok(()) - }) - .buffer_unordered(100) - .try_collect::>() - .await?; - - Ok(()) -} - -#[derive(Debug, Serialize, Deserialize, Hash)] -pub struct MyWorkflowInput { - x: i64, -} - -#[derive(Debug, Serialize, Deserialize, Hash)] -pub struct MyWorkflowOutput { - y: i64, -} - -#[macros::workflow(MyWorkflow)] -async fn my_workflow(ctx: &mut WorkflowCtx, input: &MyWorkflowInput) -> Result { - let a = ctx.activity(MyActivityInput { x: input.x }).await?; - let b = ctx.activity(MyActivityInput { x: a.y }).await?; - - let my_num = if b.y > 7 { - ctx.activity(MyActivityInput { x: 10 }).await?.y - } else { - ctx.activity(MyActivityInput { x: 20 }).await?.y - }; - - Ok(MyWorkflowOutput { y: my_num }) -} diff --git a/lib/chirp-workflow/core/tests/common.rs b/lib/chirp-workflow/core/tests/common.rs deleted file mode 100644 index 638eadc128..0000000000 --- a/lib/chirp-workflow/core/tests/common.rs +++ /dev/null @@ -1,33 +0,0 @@ -use std::sync::Arc; - -use anyhow::*; -use serde::{Deserialize, Serialize}; -use tokio::time::Duration; -use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter}; -use uuid::Uuid; -use wf::*; - -pub fn setup() { - tracing_subscriber::fmt() - .pretty() - .with_env_filter(EnvFilter::new("trace")) - .with_span_events(FmtSpan::CLOSE) - .init(); -} - -// MARK: Activity -#[derive(Debug, Serialize, Deserialize, Hash)] -pub struct MyActivityInput { - pub x: i64, -} - -#[derive(Debug, Serialize, Deserialize, Hash)] -pub struct MyActivityOutput { - pub y: i64, -} - -#[macros::activity(MyActivity)] -pub fn my_activity(my_ctx: &mut ActivityCtx, input: &MyActivityInput) -> Result { - util::inject_fault()?; - Ok(MyActivityOutput { y: input.x * 2 }) -} diff --git a/lib/chirp-workflow/core/tests/joins.rs b/lib/chirp-workflow/core/tests/joins.rs deleted file mode 100644 index 3ced7b6a30..0000000000 --- a/lib/chirp-workflow/core/tests/joins.rs +++ /dev/null @@ -1,90 +0,0 @@ -use anyhow::*; -use futures_util::{FutureExt, StreamExt, TryStreamExt}; -use serde::{Deserialize, Serialize}; -use wf::*; - -mod common; - -use crate::common::MyActivityInput; - -#[tokio::test] -async fn joins() -> Result<()> { - common::setup(); - - let db = - DatabasePostgres::new("postgres://root@127.0.0.1:26257/postgres?sslmode=disable").await?; - - let mut registry = Registry::new(); - registry.register_workflow::(); - let registry = registry.handle(); - - let worker = Worker::new(registry.clone(), db.clone()); - tokio::spawn(async move { - if let Err(err) = worker.start().await { - tracing::error!(?err, "worker failed"); - } - }); - - let ctx = common::TestCtx::new(db); - - // Run 20 workflows at once - futures_util::stream::iter(0..20) - .map(|_| async { - let output = ctx.workflow(MyParallelWorkflowInput { x: 5 }).await?; - assert_eq!(138, output.y); - - Ok(()) - }) - .buffer_unordered(100) - .try_collect::>() - .await?; - - Ok(()) -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct MyParallelWorkflowInput { - x: i64, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct MyParallelWorkflowOutput { - y: i64, -} - -// GOTCHA: "returning this value requires that `'life1` must outlive `'static`" error comes from trying to use -// the `input` variable of a workflow in a closure without cloning it -#[macros::workflow(MyParallelWorkflow)] -async fn my_parallel_workflow( - ctx: &mut WorkflowCtx, - input: &MyParallelWorkflowInput, -) -> Result { - let a = ctx.activity(MyActivityInput { x: input.x }).await?; - - let (b, c, d) = ctx - .join(( - MyActivityInput { x: input.x }, - MyActivityInput { x: 12 }, - closure(|ctx: &mut WorkflowCtx| { - async move { - let mut sum = 0; - - for i in 0..5 { - sum += ctx.activity(MyActivityInput { x: i }).await?.y; - } - - let (e, f) = ctx - .join((MyActivityInput { x: 3 }, MyActivityInput { x: 34 })) - .await?; - - WorkflowResult::Ok(sum + e.y + f.y) - } - .boxed() - }), - )) - .await?; - - Ok(MyParallelWorkflowOutput { - y: a.y + b.y + c.y + d, - }) -} diff --git a/lib/chirp-workflow/core/tests/provisioning.rs b/lib/chirp-workflow/core/tests/provisioning.rs deleted file mode 100644 index 6d2ac971a5..0000000000 --- a/lib/chirp-workflow/core/tests/provisioning.rs +++ /dev/null @@ -1,394 +0,0 @@ -// TODO: This test doesnt work and should be removed - -use std::{ - net::{IpAddr, Ipv4Addr}, - sync::Arc, - time::Duration, -}; - -use anyhow::*; -// use futures_util::{FutureExt, StreamExt, TryStreamExt}; -use indoc::indoc; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; -use wf::*; - -mod common; - -#[tokio::test] -async fn provisioning() -> Result<()> { - common::setup(); - - let db = - DatabasePostgres::new("postgres://root@127.0.0.1:26257/postgres?sslmode=disable").await?; - - let mut registry = Registry::new(); - registry.register_workflow::(); - let registry = registry.handle(); - - let worker = Worker::new(registry.clone(), db.clone()); - tokio::spawn(async move { - if let Err(err) = worker.start().await { - tracing::error!(?err, "worker failed"); - } - }); - - let ctx = common::TestCtx::new(db.clone()); - dc_scale(ctx, db).await?; - - Ok(()) -} - -async fn dc_scale(ctx: common::TestCtxHandle, db: Arc) -> Result<()> { - let server_id = Uuid::new_v4(); - let datacenter_id = Uuid::new_v4(); - - let workflow_id = Uuid::new_v4(); - - sqlx::query(indoc!( - " - INSERT INTO servers ( - server_id, - datacenter_id, - pool_type, - create_ts - ) - VALUES($1, $2, $3, $4) - ", - )) - .bind(server_id) - .bind(workflow_id) - .bind(datacenter_id) - .bind(serde_json::to_string(&PoolType::Job)?) - .bind(util::now()) - .execute(&mut *db.conn().await?) - .await - .map_err(WorkflowError::Sqlx)?; - - // Provision server - ctx.dispatch_workflow_with_id( - workflow_id, - ServerInput { - provider: Provider::Linode, - }, - ) - .await?; - - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - ctx.signal(workflow_id, DestroyServer {}).await?; - - let output = ctx.wait_for_workflow::(workflow_id).await?; - - Ok(()) -} - -#[derive(sqlx::FromRow)] -struct ServerRow { - server_id: Uuid, - datacenter_id: Uuid, - pool_type: sqlx::types::Json, - create_ts: i64, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct ServerInput { - server_id: Uuid, - provider: Provider, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct ServerOutput {} - -#[macros::workflow(ServerWorkflow)] -async fn server_workflow(ctx: &mut WorkflowCtx, input: &ServerInput) -> Result { - let db = - DatabasePostgres::new("postgres://root@127.0.0.1:26257/postgres?sslmode=disable").await?; - - ctx.activity(SetServerWorkflowInput { - server_id: input.server_id, - }) - .await?; - - // TODO: Replace with server_get operation - let server = sqlx::query_as::<_, ServerRow>(indoc!( - " - SELECT * - FROM servers - WHERE server_id = $1 - ", - )) - .bind(input.server_id) - .fetch_one(&mut *db.conn().await?) - .await - .map_err(WorkflowError::Sqlx)?; - - let dc_get_res = ctx - .op(DatacenterGetInput { - datacenter_ids: vec![server.datacenter_id], - }) - .await?; - let dc = dc_get_res.datacenters.first().context("dc not found")?; - let pool = dc - .pools - .iter() - .find(|p| p.pool_type == *server.pool_type) - .context("datacenter does not have this type of pool configured")?; - - ctx.activity(AssignVlanIpInput { - server_id: input.server_id, - }) - .await?; - - let provision_res = ctx.workflow(ProvisionServerInput {}).await?; - - // Destroy early - if let Some(destroy_signal) = ctx.query_signal::().await? { - ctx.workflow(DestroyServerInput {}).await?; - - return Ok(ServerOutput {}); - } - - // Install components - if !provision_res.already_installed { - ctx.workflow(ServerInstallInput {}).await?; - } - - // Create DNS record - if let PoolType::Gg = input.pool_type { - ctx.workflow(ServerCreateDnsInput {}).await?; - } - - match ctx.listen::().await? { - LiveJoin::DrainServer(sig) => { - ctx.workflow(DrainServerInput {}).await?; - - match ctx.listen::().await? { - UndrainJoin::UndrainServer(sig) => {} - UndrainJoin::DestroyServer(sig) => ctx.workflow(DestroyServerInput {}).await?, - } - } - LiveJoin::TaintServer(sig) => { - ctx.workflow(TaintServerInput {}).await?; - - let destroy_sig = ctx.listen::().await?; - } - LiveJoin::DestroyServer(sig) => ctx.workflow(DestroyServerInput {}).await?, - } - - Ok(ServerOutput {}) -} - -join_signal!(LiveJoin, [DrainServer, TaintServer, DestroyServer]); -join_signal!(UndrainJoin, [DrainServer, DestroyServer]); - -fn provision_server() { - // Iterate through list of hardware and attempt to schedule a server. Goes to the next - // hardware if an error happens during provisioning - let mut hardware_list = pool.hardware.iter(); - let provision_res = loop { - // List exhausted - let Some(hardware) = hardware_list.next() else { - break None; - }; - - tracing::info!( - "attempting to provision hardware: {}", - hardware.provider_hardware - ); - - match input.provider { - Provider::Linode => { - // TODO: Workflow - // let res = op!([ctx] linode_server_provision { - // datacenter_id: ctx.datacenter_id, - // server_id: ctx.server_id, - // provider_datacenter_id: datacenter.provider_datacenter_id.clone(), - // hardware: Some(hardware.clone()), - // pool_type: server.pool_type, - // vlan_ip: vlan_ip.to_string(), - // tags: ctx.tags.clone(), - // }) - // .await; - - match res { - Ok(res) => { - break Some(ProvisionResponse { - provider_server_id: res.provider_server_id.clone(), - provider_hardware: hardware.provider_hardware.clone(), - public_ip: res.public_ip.clone(), - already_installed: res.already_installed, - }) - } - Err(err) => { - tracing::error!( - ?err, - ?server_id, - "failed to provision server, cleaning up" - ); - - // TODO: Workflow - // cleanup(&ctx, server_id).await?; - } - } - } - } - }; - - if let Some(provision_res) = provision_res { - let provision_complete_ts = util::timestamp::now(); - - let (create_ts,) = sql_fetch_one!( - [ctx, (i64,)] - " - UPDATE db_cluster.servers - SET - provider_server_id = $2, - provider_hardware = $3, - public_ip = $4, - provision_complete_ts = $5, - install_complete_ts = $6 - WHERE server_id = $1 - RETURNING create_ts - ", - server_id, - &provision_res.provider_server_id, - &provision_res.provider_hardware, - &provision_res.public_ip, - provision_complete_ts, - if provision_res.already_installed { - Some(provision_complete_ts) - } else { - None - }, - ) - .await?; - } else { - tracing::error!(?server_id, hardware_options=?pool.hardware.len(), "failed all attempts to provision server"); - bail!("failed all attempts to provision server"); - } -} - -#[derive(Debug, Serialize, Deserialize, Hash)] -pub struct SetServerWorkflowInput { - server_id: Uuid, -} - -#[derive(Debug, Serialize, Deserialize, Hash)] -pub struct SetServerWorkflowOutput {} - -#[macros::activity(SetServerWorkflow)] -pub fn set_server_workflow( - ctx: &mut ActivityCtx, - input: &SetServerWorkflowInput, -) -> Result { - let db = - DatabasePostgres::new("postgres://root@127.0.0.1:26257/postgres?sslmode=disable").await?; - - sqlx::query(indoc!( - " - UPDATE servers - SET workflow_id = $2 - WHERE server_id = $1 - ", - )) - .bind(input.server_id) - .bind(ctx.workflow_id) - .execute(&mut *db.conn().await?) - .await - .map_err(WorkflowError::Sqlx)?; - - Ok(SetServerWorkflowOutput {}) -} - -#[derive(Debug, Serialize, Deserialize, Hash)] -pub struct AssignVlanIpInput { - server_id: Uuid, -} - -#[derive(Debug, Serialize, Deserialize, Hash)] -pub struct AssignVlanIpOutput { - vlan_ip: Ipv4Addr, -} - -#[macros::activity(AssignVlanIp)] -pub fn assign_vlan_ip( - ctx: &mut ActivityCtx, - input: &AssignVlanIpInput, -) -> Result { - let db = - DatabasePostgres::new("postgres://root@127.0.0.1:26257/postgres?sslmode=disable").await?; - - // Get a new vlan ip - let vlan_ip = Ipv4Addr::new(1, 2, 3, 4); - - sqlx::query(indoc!( - " - UPDATE servers - SET vlan_ip = $2 - WHERE server_id = $1 - ", - )) - .bind(input.server_id) - .bind(IpAddr::V4(vlan_ip)) - .execute(&mut *db.conn().await?) - .await - .map_err(WorkflowError::Sqlx)?; - - Ok(AssignVlanIpOutput { vlan_ip }) -} - -mod ops { - use anyhow::*; - use serde::{Deserialize, Serialize}; - use uuid::Uuid; - use wf::*; - - pub struct DatacenterGetInput { - pub datacenter_ids: Vec, - } - - pub struct DatacenterGetOutput { - pub datacenters: Vec, - } - - #[macros::operation(DatacenterGet)] - pub fn datacenter_get( - ctx: &mut OperationCtx, - input: &DatacenterGetInput, - ) -> Result { - Ok(DatacenterGetOutput { - datacenters: input - .datacenter_ids - .iter() - .map(|id| Datacenter { - datacenter_id: *id, - pools: vec![Pool { - pool_type: PoolType::Job, - }], - }) - .collect(), - }) - } - - pub struct Datacenter { - pub datacenter_id: Uuid, - pub pools: Vec, - } - - pub struct Pool { - pub pool_type: PoolType, - } - - #[derive(Serialize, Deserialize, PartialEq)] - pub enum PoolType { - Job, - Gg, - Ats, - } - - #[derive(Serialize, Deserialize, PartialEq)] - pub enum Provider { - Linode, - } -} -use ops::*; diff --git a/lib/chirp-workflow/core/tests/signals.rs b/lib/chirp-workflow/core/tests/signals.rs deleted file mode 100644 index b7ad73eee1..0000000000 --- a/lib/chirp-workflow/core/tests/signals.rs +++ /dev/null @@ -1,93 +0,0 @@ -use anyhow::*; -use futures_util::{StreamExt, TryStreamExt}; -use serde::{Deserialize, Serialize}; -use wf::*; - -mod common; - -use crate::common::MyActivityInput; - -#[tokio::test] -async fn signals() -> Result<()> { - common::setup(); - - let db = - DatabasePostgres::new("postgres://root@127.0.0.1:26257/postgres?sslmode=disable").await?; - - let mut registry = Registry::new(); - registry.register_workflow::(); - let registry = registry.handle(); - - let worker = Worker::new(registry.clone(), db.clone()); - tokio::spawn(async move { - if let Err(err) = worker.start().await { - tracing::error!(?err, "worker failed"); - } - }); - - let ctx = common::TestCtx::new(db.clone()); - - // Run 20 workflows at once - futures_util::stream::iter(0..20) - .map(|_| async { - let workflow_id = ctx - .dispatch_workflow(MySignalWorkflowInput { x: 5 }) - .await?; - - ctx.signal(workflow_id, MySignal { x: 12 }).await?; - - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - ctx.signal(workflow_id, MySignal2 { y: 3 }).await?; - - let output = ctx - .wait_for_workflow::(workflow_id) - .await?; - assert_eq!(25, output.y); - - Ok(()) - }) - .buffer_unordered(100) - .try_collect::>() - .await?; - - Ok(()) -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct MySignalWorkflowInput { - x: i64, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct MySignalWorkflowOutput { - y: i64, -} - -#[macros::workflow(MySignalWorkflow)] -async fn my_signal_workflow( - ctx: &mut WorkflowCtx, - input: &MySignalWorkflowInput, -) -> Result { - let a = ctx.activity(MyActivityInput { x: input.x }).await?; - - let b = ctx.listen::().await?; - - let c = match ctx.listen::().await? { - Join::MySignal(sig) => sig.x, - Join::MySignal2(sig) => sig.y, - }; - - Ok(MySignalWorkflowOutput { y: a.y + b.x + c }) -} - -#[macros::signal("my-signal")] -struct MySignal { - x: i64, -} - -#[macros::signal("my-signal2")] -struct MySignal2 { - y: i64, -} - -join_signal!(Join, [MySignal, MySignal2]); diff --git a/lib/chirp-workflow/core/tests/sub_workflow.rs b/lib/chirp-workflow/core/tests/sub_workflow.rs deleted file mode 100644 index 7089bde52e..0000000000 --- a/lib/chirp-workflow/core/tests/sub_workflow.rs +++ /dev/null @@ -1,90 +0,0 @@ -use anyhow::*; -use futures_util::{StreamExt, TryStreamExt}; -use serde::{Deserialize, Serialize}; -use wf::*; - -mod common; - -use crate::common::MyActivityInput; - -#[tokio::test] -async fn sub_workflow() -> Result<()> { - common::setup(); - - let db = - DatabasePostgres::new("postgres://root@127.0.0.1:26257/postgres?sslmode=disable").await?; - - let mut registry = Registry::new(); - registry.register_workflow::(); - registry.register_workflow::(); - let registry = registry.handle(); - - let worker = Worker::new(registry.clone(), db.clone()); - tokio::spawn(async move { - if let Err(err) = worker.start().await { - tracing::error!(?err, "worker failed"); - } - }); - - let ctx = common::TestCtx::new(db); - - // Run 20 workflows at once - futures_util::stream::iter(0..20) - .map(|_| async { - let output = ctx.workflow(MyWorkflowInput { x: 5 }).await?; - assert_eq!(60, output.y); - - Ok(()) - }) - .buffer_unordered(100) - .try_collect::>() - .await?; - - Ok(()) -} - -// MARK: Parallelized workflow -#[derive(Debug, Serialize, Deserialize)] -pub struct MyWorkflowInput { - x: i64, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct MyWorkflowOutput { - y: i64, -} - -// GOTCHA: "returning this value requires that `'life1` must outlive `'static`" error comes from trying to use -// the `input` variable of a workflow in a closure without cloning it -#[macros::workflow(MyWorkflow)] -async fn my_workflow(ctx: &mut WorkflowCtx, input: &MyWorkflowInput) -> Result { - let a = ctx.activity(MyActivityInput { x: input.x }).await?; - - let b = ctx.workflow(MySubWorkflowInput { x: input.x }).await?; - - let c = ctx.activity(MyActivityInput { x: input.x }).await?; - - Ok(MyWorkflowOutput { y: a.y + b.y + c.y }) -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct MySubWorkflowInput { - x: i64, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct MySubWorkflowOutput { - y: i64, -} - -// GOTCHA: "returning this value requires that `'life1` must outlive `'static`" error comes from trying to use -// the `input` variable of a workflow in a closure without cloning it -#[macros::workflow(MySubWorkflow)] -async fn my_sub_workflow( - ctx: &mut WorkflowCtx, - input: &MySubWorkflowInput, -) -> Result { - let a = ctx.activity(MyActivityInput { x: input.x }).await?; - - Ok(MySubWorkflowOutput { y: a.y * 4 }) -} diff --git a/lib/connection/src/lib.rs b/lib/connection/src/lib.rs index 4cded717f5..9cfa32a5ad 100644 --- a/lib/connection/src/lib.rs +++ b/lib/connection/src/lib.rs @@ -30,10 +30,19 @@ impl Connection { &self, parent_req_id: Uuid, ray_id: Uuid, - trace_entry: chirp_client::TraceEntry, + name: &str, ) -> Connection { // Not the same as the operation ctx's ts because this cannot be overridden by debug start ts let ts = rivet_util::timestamp::now(); + let trace_entry = chirp_client::TraceEntry { + context_name: name.to_string(), + req_id: Some(parent_req_id.into()), + ts, + run_context: match rivet_util::env::run_context() { + rivet_util::env::RunContext::Service => chirp_client::RunContext::Service, + rivet_util::env::RunContext::Test => chirp_client::RunContext::Test, + } as i32, + }; Connection::new( (*self.client).clone().wrap( diff --git a/lib/operation/core/src/lib.rs b/lib/operation/core/src/lib.rs index b82e1ee976..ca7c5b30bf 100644 --- a/lib/operation/core/src/lib.rs +++ b/lib/operation/core/src/lib.rs @@ -137,20 +137,11 @@ where fn wrap(&self, body: O::Request) -> OperationContext { let now = util::timestamp::now(); let req_id = Uuid::new_v4(); - let trace_entry = chirp_client::TraceEntry { - context_name: O::NAME.to_string(), - req_id: Some(req_id.into()), - ts: now, - run_context: match rivet_util::env::run_context() { - rivet_util::env::RunContext::Service => chirp_client::RunContext::Service, - rivet_util::env::RunContext::Test => chirp_client::RunContext::Test, - } as i32, - }; OperationContext { name: O::NAME.to_string(), timeout: O::TIMEOUT, - conn: self.conn.wrap(req_id, self.ray_id, trace_entry), + conn: self.conn.wrap(req_id, self.ray_id, O::NAME), req_id, ray_id: self.ray_id, ts: now,