Skip to content

Commit c456d1a

Browse files
committed
feat: add compat layer between old ctx and new workflows
1 parent 06ae309 commit c456d1a

File tree

14 files changed

+254
-54
lines changed

14 files changed

+254
-54
lines changed

lib/api-helper/build/src/ctx.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,11 @@ impl<A: auth::ApiAuth> Ctx<A> {
6767
self.asn
6868
}
6969
}
70+
71+
impl<A: auth::ApiAuth> std::ops::Deref for Ctx<A> {
72+
type Target = OperationContext<()>;
73+
74+
fn deref(&self) -> &Self::Target {
75+
&self.op_ctx
76+
}
77+
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
// Forwards compatibility from old operation ctx to new workflows
2+
3+
use std::{fmt::Debug, time::Duration};
4+
5+
use global_error::prelude::*;
6+
use serde::Serialize;
7+
use uuid::Uuid;
8+
9+
use crate::{
10+
DatabaseHandle, DatabasePostgres, Operation, OperationCtx, OperationInput, Signal, Workflow,
11+
WorkflowError, WorkflowInput,
12+
};
13+
14+
pub async fn dispatch_workflow<I, B>(
15+
ctx: &rivet_operation::OperationContext<B>,
16+
input: I,
17+
) -> GlobalResult<Uuid>
18+
where
19+
I: WorkflowInput,
20+
<I as WorkflowInput>::Workflow: Workflow<Input = I>,
21+
B: Debug + Clone,
22+
{
23+
if ctx.from_workflow {
24+
bail!("cannot dispatch a workflow from an operation within a workflow execution. trigger it from the workflow's body.");
25+
}
26+
27+
let name = I::Workflow::name();
28+
29+
tracing::debug!(%name, ?input, "dispatching workflow");
30+
31+
let id = Uuid::new_v4();
32+
33+
// Serialize input
34+
let input_val = serde_json::to_value(input)
35+
.map_err(WorkflowError::SerializeWorkflowOutput)
36+
.map_err(GlobalError::raw)?;
37+
38+
db(ctx)
39+
.await?
40+
.dispatch_workflow(ctx.ray_id(), id, &name, input_val)
41+
.await
42+
.map_err(GlobalError::raw)?;
43+
44+
tracing::info!(%name, ?id, "workflow dispatched");
45+
46+
Ok(id)
47+
}
48+
49+
pub async fn wait_for_workflow<W: Workflow, B: Debug + Clone>(
50+
ctx: &rivet_operation::OperationContext<B>,
51+
workflow_id: Uuid,
52+
) -> GlobalResult<W::Output> {
53+
tracing::info!(name=W::name(), id=?workflow_id, "waiting for workflow");
54+
55+
let period = Duration::from_millis(50);
56+
let mut interval = tokio::time::interval(period);
57+
loop {
58+
interval.tick().await;
59+
60+
// Check if state finished
61+
let workflow = db(ctx)
62+
.await?
63+
.get_workflow(workflow_id)
64+
.await
65+
.map_err(GlobalError::raw)?
66+
.ok_or(WorkflowError::WorkflowNotFound)
67+
.map_err(GlobalError::raw)?;
68+
if let Some(output) = workflow.parse_output::<W>().map_err(GlobalError::raw)? {
69+
return Ok(output);
70+
}
71+
}
72+
}
73+
74+
pub async fn workflow<I, B>(
75+
ctx: &rivet_operation::OperationContext<B>,
76+
input: I,
77+
) -> GlobalResult<<<I as WorkflowInput>::Workflow as Workflow>::Output>
78+
where
79+
I: WorkflowInput,
80+
<I as WorkflowInput>::Workflow: Workflow<Input = I>,
81+
B: Debug + Clone,
82+
{
83+
let workflow_id = dispatch_workflow(ctx, input).await?;
84+
let output = wait_for_workflow::<I::Workflow, _>(ctx, workflow_id).await?;
85+
Ok(output)
86+
}
87+
88+
pub async fn signal<I: Signal + Serialize, B: Debug + Clone>(
89+
ctx: &rivet_operation::OperationContext<B>,
90+
workflow_id: Uuid,
91+
input: I,
92+
) -> GlobalResult<Uuid> {
93+
if ctx.from_workflow {
94+
bail!("cannot dispatch a signal from an operation within a workflow execution. trigger it from the workflow's body.");
95+
}
96+
97+
tracing::debug!(name=%I::name(), %workflow_id, "dispatching signal");
98+
99+
let signal_id = Uuid::new_v4();
100+
101+
// Serialize input
102+
let input_val = serde_json::to_value(input)
103+
.map_err(WorkflowError::SerializeSignalBody)
104+
.map_err(GlobalError::raw)?;
105+
106+
db(ctx)
107+
.await?
108+
.publish_signal(ctx.ray_id(), workflow_id, signal_id, I::name(), input_val)
109+
.await
110+
.map_err(GlobalError::raw)?;
111+
112+
Ok(signal_id)
113+
}
114+
115+
pub async fn op<I, B>(
116+
ctx: &rivet_operation::OperationContext<B>,
117+
input: I,
118+
) -> GlobalResult<<<I as OperationInput>::Operation as Operation>::Output>
119+
where
120+
I: OperationInput,
121+
<I as OperationInput>::Operation: Operation<Input = I>,
122+
B: Debug + Clone,
123+
{
124+
let mut ctx = OperationCtx::new(
125+
db(ctx).await?,
126+
ctx.conn(),
127+
ctx.ray_id(),
128+
ctx.req_ts(),
129+
ctx.from_workflow(),
130+
I::Operation::name(),
131+
);
132+
133+
I::Operation::run(&mut ctx, &input)
134+
.await
135+
.map_err(WorkflowError::OperationFailure)
136+
.map_err(GlobalError::raw)
137+
}
138+
139+
// Get crdb pool as a trait object
140+
async fn db<B: Debug + Clone>(
141+
ctx: &rivet_operation::OperationContext<B>,
142+
) -> GlobalResult<DatabaseHandle> {
143+
let crdb = ctx.crdb().await?;
144+
145+
Ok(DatabasePostgres::from_pool(crdb))
146+
}

lib/chirp-workflow/core/src/ctx/activity.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use uuid::Uuid;
55
use crate::{ctx::OperationCtx, util, DatabaseHandle, Operation, OperationInput, WorkflowError};
66

77
pub struct ActivityCtx {
8-
workflow_id: Uuid,
98
ray_id: Uuid,
109
name: &'static str,
1110
ts: i64,
@@ -22,16 +21,14 @@ impl ActivityCtx {
2221
pub fn new(
2322
db: DatabaseHandle,
2423
conn: &rivet_connection::Connection,
25-
workflow_id: Uuid,
2624
workflow_create_ts: i64,
2725
ray_id: Uuid,
2826
name: &'static str,
2927
) -> Self {
3028
let ts = rivet_util::timestamp::now();
31-
let (conn, op_ctx) = util::wrap_conn(conn, ray_id, workflow_create_ts, name, ts);
29+
let (conn, op_ctx) = util::wrap_conn(conn, ray_id, workflow_create_ts, true, name, ts);
3230

3331
ActivityCtx {
34-
workflow_id,
3532
ray_id,
3633
name,
3734
ts,
@@ -44,7 +41,7 @@ impl ActivityCtx {
4441

4542
impl ActivityCtx {
4643
pub async fn op<I>(
47-
&mut self,
44+
&self,
4845
input: I,
4946
) -> GlobalResult<<<I as OperationInput>::Operation as Operation>::Output>
5047
where
@@ -54,9 +51,9 @@ impl ActivityCtx {
5451
let mut ctx = OperationCtx::new(
5552
self.db.clone(),
5653
&self.conn,
57-
self.workflow_id,
5854
self.ray_id,
5955
self.op_ctx.req_ts(),
56+
true,
6057
I::Operation::name(),
6158
);
6259

lib/chirp-workflow/core/src/ctx/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,5 @@ pub use activity::ActivityCtx;
66
pub use operation::OperationCtx;
77
pub use test::TestCtx;
88
pub use workflow::WorkflowCtx;
9+
10+
// TODO: StandaloneCtx, ApiCtx

lib/chirp-workflow/core/src/ctx/operation.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use uuid::Uuid;
55
use crate::{util, DatabaseHandle, Operation, OperationInput, WorkflowError};
66

77
pub struct OperationCtx {
8-
workflow_id: Uuid,
98
ray_id: Uuid,
109
name: &'static str,
1110
ts: i64,
@@ -22,16 +21,15 @@ impl OperationCtx {
2221
pub fn new(
2322
db: DatabaseHandle,
2423
conn: &rivet_connection::Connection,
25-
workflow_id: Uuid,
2624
ray_id: Uuid,
2725
req_ts: i64,
26+
from_workflow: bool,
2827
name: &'static str,
2928
) -> Self {
3029
let ts = rivet_util::timestamp::now();
31-
let (conn, op_ctx) = util::wrap_conn(conn, ray_id, req_ts, name, ts);
30+
let (conn, op_ctx) = util::wrap_conn(conn, ray_id, req_ts, from_workflow, name, ts);
3231

3332
OperationCtx {
34-
workflow_id,
3533
ray_id,
3634
name,
3735
ts,
@@ -54,9 +52,9 @@ impl OperationCtx {
5452
let mut ctx = OperationCtx::new(
5553
self.db.clone(),
5654
&self.conn,
57-
self.workflow_id,
5855
self.ray_id,
5956
self.op_ctx.req_ts(),
57+
self.op_ctx.from_workflow(),
6058
I::Operation::name(),
6159
);
6260

lib/chirp-workflow/core/src/ctx/test.rs

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,41 +5,56 @@ use serde::Serialize;
55
use tokio::time::Duration;
66
use uuid::Uuid;
77

8-
use crate::{DatabaseHandle, DatabasePostgres, Signal, Workflow, WorkflowError, WorkflowInput};
8+
use crate::{
9+
util, DatabaseHandle, DatabasePostgres, Operation, OperationCtx, OperationInput, Signal,
10+
Workflow, WorkflowError, WorkflowInput,
11+
};
912

1013
pub type TestCtxHandle = Arc<TestCtx>;
1114

1215
pub struct TestCtx {
1316
name: String,
1417
ray_id: Uuid,
18+
ts: i64,
1519

16-
pub db: DatabaseHandle,
20+
db: DatabaseHandle,
21+
22+
conn: Option<rivet_connection::Connection>,
1723
}
1824

1925
impl TestCtx {
20-
pub fn new(db: DatabaseHandle) -> TestCtxHandle {
21-
Arc::new(TestCtx {
22-
name: "internal-test".to_string(),
23-
ray_id: Uuid::new_v4(),
24-
db,
25-
})
26-
}
27-
2826
pub async fn from_env(test_name: &str) -> TestCtx {
2927
let service_name = format!(
3028
"{}-test--{}",
3129
std::env::var("CHIRP_SERVICE_NAME").unwrap(),
3230
test_name
3331
);
32+
33+
let ray_id = Uuid::new_v4();
3434
let pools = rivet_pools::from_env(service_name.clone())
3535
.await
3636
.expect("failed to create pools");
37+
let shared_client = chirp_client::SharedClient::from_env(pools.clone())
38+
.expect("failed to create chirp client");
39+
let cache =
40+
rivet_cache::CacheInner::from_env(pools.clone()).expect("failed to create cache");
41+
let conn = util::new_conn(
42+
&shared_client,
43+
&pools,
44+
&cache,
45+
ray_id,
46+
Uuid::new_v4(),
47+
&service_name,
48+
);
49+
3750
let db = DatabasePostgres::from_pool(pools.crdb().unwrap());
3851

3952
TestCtx {
4053
name: service_name,
41-
ray_id: Uuid::new_v4(),
54+
ray_id,
55+
ts: rivet_util::timestamp::now(),
4256
db,
57+
conn: Some(conn),
4358
}
4459
}
4560
}
@@ -136,4 +151,29 @@ impl TestCtx {
136151

137152
Ok(signal_id)
138153
}
154+
155+
pub async fn op<I>(
156+
&mut self,
157+
input: I,
158+
) -> GlobalResult<<<I as OperationInput>::Operation as Operation>::Output>
159+
where
160+
I: OperationInput,
161+
<I as OperationInput>::Operation: Operation<Input = I>,
162+
{
163+
let mut ctx = OperationCtx::new(
164+
self.db.clone(),
165+
self.conn
166+
.as_ref()
167+
.expect("ops cannot be triggered from an internal test"),
168+
self.ray_id,
169+
self.ts,
170+
false,
171+
I::Operation::name(),
172+
);
173+
174+
I::Operation::run(&mut ctx, &input)
175+
.await
176+
.map_err(WorkflowError::OperationFailure)
177+
.map_err(GlobalError::raw)
178+
}
139179
}

lib/chirp-workflow/core/src/ctx/workflow.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,6 @@ impl WorkflowCtx {
205205
let mut ctx = ActivityCtx::new(
206206
self.db.clone(),
207207
&self.conn,
208-
self.workflow_id,
209208
self.create_ts,
210209
self.ray_id,
211210
A::name(),

lib/chirp-workflow/core/src/db/postgres.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ impl DatabasePostgres {
4545
Arc::new(DatabasePostgres { pool })
4646
}
4747

48-
pub async fn conn(&self) -> WorkflowResult<PoolConnection<Postgres>> {
48+
async fn conn(&self) -> WorkflowResult<PoolConnection<Postgres>> {
4949
// Attempt to use an existing connection
5050
if let Some(conn) = self.pool.try_acquire() {
5151
Ok(conn)

lib/chirp-workflow/core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod activity;
2+
pub mod compat;
23
mod ctx;
34
pub mod db;
45
mod error;

0 commit comments

Comments
 (0)