Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions lib/api-helper/build/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,11 @@ impl<A: auth::ApiAuth> Ctx<A> {
self.asn
}
}

impl<A: auth::ApiAuth> std::ops::Deref for Ctx<A> {
type Target = OperationContext<()>;

fn deref(&self) -> &Self::Target {
&self.op_ctx
}
}
146 changes: 146 additions & 0 deletions lib/chirp-workflow/core/src/compat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Forwards compatibility from old operation ctx to new workflows

use std::{fmt::Debug, time::Duration};

use global_error::prelude::*;
use serde::Serialize;
use uuid::Uuid;

use crate::{
DatabaseHandle, DatabasePostgres, Operation, OperationCtx, OperationInput, Signal, Workflow,
WorkflowError, WorkflowInput,
};

pub async fn dispatch_workflow<I, B>(
ctx: &rivet_operation::OperationContext<B>,
input: I,
) -> GlobalResult<Uuid>
where
I: WorkflowInput,
<I as WorkflowInput>::Workflow: Workflow<Input = I>,
B: Debug + Clone,
{
if ctx.from_workflow {
bail!("cannot dispatch a workflow from an operation within a workflow execution. trigger it from the workflow's body.");
}

let name = I::Workflow::name();

tracing::debug!(%name, ?input, "dispatching workflow");

let id = Uuid::new_v4();

// Serialize input
let input_val = serde_json::to_value(input)
.map_err(WorkflowError::SerializeWorkflowOutput)
.map_err(GlobalError::raw)?;

db(ctx)
.await?
.dispatch_workflow(ctx.ray_id(), id, &name, input_val)
.await
.map_err(GlobalError::raw)?;

tracing::info!(%name, ?id, "workflow dispatched");

Ok(id)
}

pub async fn wait_for_workflow<W: Workflow, B: Debug + Clone>(
ctx: &rivet_operation::OperationContext<B>,
workflow_id: Uuid,
) -> GlobalResult<W::Output> {
tracing::info!(name=W::name(), id=?workflow_id, "waiting for workflow");

let period = Duration::from_millis(50);
let mut interval = tokio::time::interval(period);
loop {
interval.tick().await;

// Check if state finished
let workflow = db(ctx)
.await?
.get_workflow(workflow_id)
.await
.map_err(GlobalError::raw)?
.ok_or(WorkflowError::WorkflowNotFound)
.map_err(GlobalError::raw)?;
if let Some(output) = workflow.parse_output::<W>().map_err(GlobalError::raw)? {
return Ok(output);
}
}
}

pub async fn workflow<I, B>(
ctx: &rivet_operation::OperationContext<B>,
input: I,
) -> GlobalResult<<<I as WorkflowInput>::Workflow as Workflow>::Output>
where
I: WorkflowInput,
<I as WorkflowInput>::Workflow: Workflow<Input = I>,
B: Debug + Clone,
{
let workflow_id = dispatch_workflow(ctx, input).await?;
let output = wait_for_workflow::<I::Workflow, _>(ctx, workflow_id).await?;
Ok(output)
}

pub async fn signal<I: Signal + Serialize, B: Debug + Clone>(
ctx: &rivet_operation::OperationContext<B>,
workflow_id: Uuid,
input: I,
) -> GlobalResult<Uuid> {
if ctx.from_workflow {
bail!("cannot dispatch a signal from an operation within a workflow execution. trigger it from the workflow's body.");
}

tracing::debug!(name=%I::name(), %workflow_id, "dispatching signal");

let signal_id = Uuid::new_v4();

// Serialize input
let input_val = serde_json::to_value(input)
.map_err(WorkflowError::SerializeSignalBody)
.map_err(GlobalError::raw)?;

db(ctx)
.await?
.publish_signal(ctx.ray_id(), workflow_id, signal_id, I::name(), input_val)
.await
.map_err(GlobalError::raw)?;

Ok(signal_id)
}

pub async fn op<I, B>(
ctx: &rivet_operation::OperationContext<B>,
input: I,
) -> GlobalResult<<<I as OperationInput>::Operation as Operation>::Output>
where
I: OperationInput,
<I as OperationInput>::Operation: Operation<Input = I>,
B: Debug + Clone,
{
let mut ctx = OperationCtx::new(
db(ctx).await?,
ctx.conn(),
ctx.ray_id(),
ctx.req_ts(),
ctx.from_workflow(),
I::Operation::name(),
);

I::Operation::run(&mut ctx, &input)
.await
.map_err(WorkflowError::OperationFailure)
.map_err(GlobalError::raw)
}

// Get crdb pool as a trait object
async fn db<B: Debug + Clone>(
ctx: &rivet_operation::OperationContext<B>,
) -> GlobalResult<DatabaseHandle> {
let crdb = ctx.crdb().await?;

Ok(DatabasePostgres::from_pool(crdb))
}
9 changes: 3 additions & 6 deletions lib/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use uuid::Uuid;
use crate::{ctx::OperationCtx, util, DatabaseHandle, Operation, OperationInput, WorkflowError};

pub struct ActivityCtx {
workflow_id: Uuid,
ray_id: Uuid,
name: &'static str,
ts: i64,
Expand All @@ -22,16 +21,14 @@ impl ActivityCtx {
pub fn new(
db: DatabaseHandle,
conn: &rivet_connection::Connection,
workflow_id: Uuid,
workflow_create_ts: i64,
ray_id: Uuid,
name: &'static str,
) -> Self {
let ts = rivet_util::timestamp::now();
let (conn, op_ctx) = util::wrap_conn(conn, ray_id, workflow_create_ts, name, ts);
let (conn, op_ctx) = util::wrap_conn(conn, ray_id, workflow_create_ts, true, name, ts);

ActivityCtx {
workflow_id,
ray_id,
name,
ts,
Expand All @@ -44,7 +41,7 @@ impl ActivityCtx {

impl ActivityCtx {
pub async fn op<I>(
&mut self,
&self,
input: I,
) -> GlobalResult<<<I as OperationInput>::Operation as Operation>::Output>
where
Expand All @@ -54,9 +51,9 @@ impl ActivityCtx {
let mut ctx = OperationCtx::new(
self.db.clone(),
&self.conn,
self.workflow_id,
self.ray_id,
self.op_ctx.req_ts(),
true,
I::Operation::name(),
);

Expand Down
2 changes: 2 additions & 0 deletions lib/chirp-workflow/core/src/ctx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ pub use activity::ActivityCtx;
pub use operation::OperationCtx;
pub use test::TestCtx;
pub use workflow::WorkflowCtx;

// TODO: StandaloneCtx, ApiCtx
8 changes: 3 additions & 5 deletions lib/chirp-workflow/core/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use uuid::Uuid;
use crate::{util, DatabaseHandle, Operation, OperationInput, WorkflowError};

pub struct OperationCtx {
workflow_id: Uuid,
ray_id: Uuid,
name: &'static str,
ts: i64,
Expand All @@ -22,16 +21,15 @@ impl OperationCtx {
pub fn new(
db: DatabaseHandle,
conn: &rivet_connection::Connection,
workflow_id: Uuid,
ray_id: Uuid,
req_ts: i64,
from_workflow: bool,
name: &'static str,
) -> Self {
let ts = rivet_util::timestamp::now();
let (conn, op_ctx) = util::wrap_conn(conn, ray_id, req_ts, name, ts);
let (conn, op_ctx) = util::wrap_conn(conn, ray_id, req_ts, from_workflow, name, ts);

OperationCtx {
workflow_id,
ray_id,
name,
ts,
Expand All @@ -54,9 +52,9 @@ impl OperationCtx {
let mut ctx = OperationCtx::new(
self.db.clone(),
&self.conn,
self.workflow_id,
self.ray_id,
self.op_ctx.req_ts(),
self.op_ctx.from_workflow(),
I::Operation::name(),
);

Expand Down
62 changes: 51 additions & 11 deletions lib/chirp-workflow/core/src/ctx/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,56 @@ use serde::Serialize;
use tokio::time::Duration;
use uuid::Uuid;

use crate::{DatabaseHandle, DatabasePostgres, Signal, Workflow, WorkflowError, WorkflowInput};
use crate::{
util, DatabaseHandle, DatabasePostgres, Operation, OperationCtx, OperationInput, Signal,
Workflow, WorkflowError, WorkflowInput,
};

pub type TestCtxHandle = Arc<TestCtx>;

pub struct TestCtx {
name: String,
ray_id: Uuid,
ts: i64,

pub db: DatabaseHandle,
db: DatabaseHandle,

conn: Option<rivet_connection::Connection>,
}

impl TestCtx {
pub fn new(db: DatabaseHandle) -> TestCtxHandle {
Arc::new(TestCtx {
name: "internal-test".to_string(),
ray_id: Uuid::new_v4(),
db,
})
}

pub async fn from_env(test_name: &str) -> TestCtx {
let service_name = format!(
"{}-test--{}",
std::env::var("CHIRP_SERVICE_NAME").unwrap(),
test_name
);

let ray_id = Uuid::new_v4();
let pools = rivet_pools::from_env(service_name.clone())
.await
.expect("failed to create pools");
let shared_client = chirp_client::SharedClient::from_env(pools.clone())
.expect("failed to create chirp client");
let cache =
rivet_cache::CacheInner::from_env(pools.clone()).expect("failed to create cache");
let conn = util::new_conn(
&shared_client,
&pools,
&cache,
ray_id,
Uuid::new_v4(),
&service_name,
);

let db = DatabasePostgres::from_pool(pools.crdb().unwrap());

TestCtx {
name: service_name,
ray_id: Uuid::new_v4(),
ray_id,
ts: rivet_util::timestamp::now(),
db,
conn: Some(conn),
}
}
}
Expand Down Expand Up @@ -136,4 +151,29 @@ impl TestCtx {

Ok(signal_id)
}

pub async fn op<I>(
&mut self,
input: I,
) -> GlobalResult<<<I as OperationInput>::Operation as Operation>::Output>
where
I: OperationInput,
<I as OperationInput>::Operation: Operation<Input = I>,
{
let mut ctx = OperationCtx::new(
self.db.clone(),
self.conn
.as_ref()
.expect("ops cannot be triggered from an internal test"),
self.ray_id,
self.ts,
false,
I::Operation::name(),
);

I::Operation::run(&mut ctx, &input)
.await
.map_err(WorkflowError::OperationFailure)
.map_err(GlobalError::raw)
}
}
1 change: 0 additions & 1 deletion lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ impl WorkflowCtx {
let mut ctx = ActivityCtx::new(
self.db.clone(),
&self.conn,
self.workflow_id,
self.create_ts,
self.ray_id,
A::name(),
Expand Down
2 changes: 1 addition & 1 deletion lib/chirp-workflow/core/src/db/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl DatabasePostgres {
Arc::new(DatabasePostgres { pool })
}

pub async fn conn(&self) -> WorkflowResult<PoolConnection<Postgres>> {
async fn conn(&self) -> WorkflowResult<PoolConnection<Postgres>> {
// Attempt to use an existing connection
if let Some(conn) = self.pool.try_acquire() {
Ok(conn)
Expand Down
1 change: 1 addition & 0 deletions lib/chirp-workflow/core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod activity;
pub mod compat;
mod ctx;
pub mod db;
mod error;
Expand Down
Loading