Skip to content

Commit bccce31

Browse files
committed
fix: loops and cache (#1010)
<!-- Please make sure there is an issue that this PR is correlated to. --> ## Changes <!-- If there are frontend changes, please include screenshots. -->
1 parent 9c2e884 commit bccce31

File tree

12 files changed

+95
-77
lines changed

12 files changed

+95
-77
lines changed

lib/chirp-workflow/core/src/compat.rs

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ where
3434
bail!("cannot dispatch a workflow from an operation within a workflow execution. trigger it from the workflow's body.");
3535
}
3636

37-
let name = I::Workflow::NAME;
38-
let id = Uuid::new_v4();
37+
let workflow_name = I::Workflow::NAME;
38+
let workflow_id = Uuid::new_v4();
3939

40-
tracing::info!(%name, %id, ?input, "dispatching workflow");
40+
tracing::info!(%workflow_name, %workflow_id, ?input, "dispatching workflow");
4141

4242
// Serialize input
4343
let input_val = serde_json::to_value(input)
@@ -46,13 +46,13 @@ where
4646

4747
db_from_ctx(ctx)
4848
.await?
49-
.dispatch_workflow(ctx.ray_id(), id, &name, None, input_val)
49+
.dispatch_workflow(ctx.ray_id(), workflow_id, &workflow_name, None, input_val)
5050
.await
5151
.map_err(GlobalError::raw)?;
5252

53-
tracing::info!(%name, ?id, "workflow dispatched");
53+
tracing::info!(%workflow_name, ?workflow_id, "workflow dispatched");
5454

55-
Ok(id)
55+
Ok(workflow_id)
5656
}
5757

5858
pub async fn dispatch_tagged_workflow<I, B>(
@@ -69,10 +69,10 @@ where
6969
bail!("cannot dispatch a workflow from an operation within a workflow execution. trigger it from the workflow's body.");
7070
}
7171

72-
let name = I::Workflow::NAME;
73-
let id = Uuid::new_v4();
72+
let workflow_name = I::Workflow::NAME;
73+
let workflow_id = Uuid::new_v4();
7474

75-
tracing::info!(%name, %id, ?input, "dispatching workflow");
75+
tracing::info!(%workflow_name, %workflow_id, ?input, "dispatching tagged workflow");
7676

7777
// Serialize input
7878
let input_val = serde_json::to_value(input)
@@ -81,13 +81,19 @@ where
8181

8282
db_from_ctx(ctx)
8383
.await?
84-
.dispatch_workflow(ctx.ray_id(), id, &name, Some(tags), input_val)
84+
.dispatch_workflow(
85+
ctx.ray_id(),
86+
workflow_id,
87+
&workflow_name,
88+
Some(tags),
89+
input_val,
90+
)
8591
.await
8692
.map_err(GlobalError::raw)?;
8793

88-
tracing::info!(%name, ?id, "workflow dispatched");
94+
tracing::info!(%workflow_name, ?workflow_id, "workflow tagged dispatched");
8995

90-
Ok(id)
96+
Ok(workflow_id)
9197
}
9298

9399
/// Wait for a given workflow to complete.
@@ -96,7 +102,7 @@ pub async fn wait_for_workflow<W: Workflow, B: Debug + Clone>(
96102
ctx: &rivet_operation::OperationContext<B>,
97103
workflow_id: Uuid,
98104
) -> GlobalResult<W::Output> {
99-
tracing::info!(name=W::NAME, id=?workflow_id, "waiting for workflow");
105+
tracing::info!(sub_workflow_name=W::NAME, sub_workflow_id=?workflow_id, "waiting for workflow");
100106

101107
tokio::time::timeout(WORKFLOW_TIMEOUT, async move {
102108
let mut interval = tokio::time::interval(SUB_WORKFLOW_RETRY);
@@ -161,7 +167,7 @@ pub async fn signal<I: Signal + Serialize, B: Debug + Clone>(
161167

162168
let signal_id = Uuid::new_v4();
163169

164-
tracing::info!(name=%I::NAME, %workflow_id, %signal_id, "dispatching signal");
170+
tracing::info!(signal_name=%I::NAME, to_workflow_id=%workflow_id, %signal_id, "dispatching signal");
165171

166172
// Serialize input
167173
let input_val = serde_json::to_value(input)
@@ -188,7 +194,7 @@ pub async fn tagged_signal<I: Signal + Serialize, B: Debug + Clone>(
188194

189195
let signal_id = Uuid::new_v4();
190196

191-
tracing::info!(name=%I::NAME, ?tags, %signal_id, "dispatching tagged signal");
197+
tracing::info!(signal_name=%I::NAME, ?tags, %signal_id, "dispatching tagged signal");
192198

193199
// Serialize input
194200
let input_val = serde_json::to_value(input)

lib/chirp-workflow/core/src/ctx/activity.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ impl ActivityCtx {
7070
I: OperationInput,
7171
<I as OperationInput>::Operation: Operation<Input = I>,
7272
{
73-
tracing::info!(?input, "operation call");
73+
tracing::info!(activity_name=%self.name, ?input, "operation call");
7474

7575
let ctx = OperationCtx::new(
7676
self.db.clone(),
@@ -87,7 +87,7 @@ impl ActivityCtx {
8787
.map_err(WorkflowError::OperationFailure)
8888
.map_err(GlobalError::raw);
8989

90-
tracing::info!(?res, "operation response");
90+
tracing::info!(activity_name=%self.name, ?res, "operation response");
9191

9292
res
9393
}

lib/chirp-workflow/core/src/ctx/api.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ impl ApiCtx {
7979
let name = I::Workflow::NAME;
8080
let id = Uuid::new_v4();
8181

82-
tracing::info!(%name, %id, ?input, "dispatching workflow");
82+
tracing::info!(workflow_name=%name, workflow_id=%id, ?input, "dispatching workflow");
8383

8484
// Serialize input
8585
let input_val = serde_json::to_value(input)
@@ -91,7 +91,7 @@ impl ApiCtx {
9191
.await
9292
.map_err(GlobalError::raw)?;
9393

94-
tracing::info!(%name, ?id, "workflow dispatched");
94+
tracing::info!(workflow_name=%name, workflow_id=%id, "workflow dispatched");
9595

9696
Ok(id)
9797
}
@@ -108,7 +108,7 @@ impl ApiCtx {
108108
let name = I::Workflow::NAME;
109109
let id = Uuid::new_v4();
110110

111-
tracing::info!(%name, %id, ?tags, ?input, "dispatching tagged workflow");
111+
tracing::info!(workflow_name=%name, workflow_id=%id, ?tags, ?input, "dispatching tagged workflow");
112112

113113
// Serialize input
114114
let input_val = serde_json::to_value(input)
@@ -120,7 +120,7 @@ impl ApiCtx {
120120
.await
121121
.map_err(GlobalError::raw)?;
122122

123-
tracing::info!(%name, ?id, "workflow dispatched");
123+
tracing::info!(workflow_name=%name, workflow_id=%id, "tagged workflow dispatched");
124124

125125
Ok(id)
126126
}
@@ -131,7 +131,7 @@ impl ApiCtx {
131131
&self,
132132
workflow_id: Uuid,
133133
) -> GlobalResult<W::Output> {
134-
tracing::info!(name=W::NAME, id=?workflow_id, "waiting for workflow");
134+
tracing::info!(workflow_name=%W::NAME, %workflow_id, "waiting for workflow");
135135

136136
tokio::time::timeout(WORKFLOW_TIMEOUT, async move {
137137
let mut interval = tokio::time::interval(SUB_WORKFLOW_RETRY);
@@ -188,7 +188,7 @@ impl ApiCtx {
188188
) -> GlobalResult<Uuid> {
189189
let signal_id = Uuid::new_v4();
190190

191-
tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");
191+
tracing::info!(signal_name=%T::NAME, to_workflow_id=%workflow_id, %signal_id, "dispatching signal");
192192

193193
// Serialize input
194194
let input_val = serde_json::to_value(input)
@@ -210,7 +210,7 @@ impl ApiCtx {
210210
) -> GlobalResult<Uuid> {
211211
let signal_id = Uuid::new_v4();
212212

213-
tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");
213+
tracing::info!(signal_name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");
214214

215215
// Serialize input
216216
let input_val = serde_json::to_value(input)

lib/chirp-workflow/core/src/ctx/operation.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ impl OperationCtx {
9797
) -> GlobalResult<Uuid> {
9898
let signal_id = Uuid::new_v4();
9999

100-
tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");
100+
tracing::info!(signal_name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");
101101

102102
// Serialize input
103103
let input_val = serde_json::to_value(input)
@@ -119,7 +119,7 @@ impl OperationCtx {
119119
) -> GlobalResult<Uuid> {
120120
let signal_id = Uuid::new_v4();
121121

122-
tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");
122+
tracing::info!(signal_name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");
123123

124124
// Serialize input
125125
let input_val = serde_json::to_value(input)

lib/chirp-workflow/core/src/ctx/standalone.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ impl StandaloneCtx {
7878
let name = I::Workflow::NAME;
7979
let id = Uuid::new_v4();
8080

81-
tracing::info!(%name, %id, ?input, "dispatching workflow");
81+
tracing::info!(workflow_name=%name, workflow_id=%id, ?input, "dispatching workflow");
8282

8383
// Serialize input
8484
let input_val = serde_json::to_value(input)
@@ -90,7 +90,7 @@ impl StandaloneCtx {
9090
.await
9191
.map_err(GlobalError::raw)?;
9292

93-
tracing::info!(%name, ?id, "workflow dispatched");
93+
tracing::info!(workflow_name=%name, workflow_id=%id, "workflow dispatched");
9494

9595
Ok(id)
9696
}
@@ -107,7 +107,7 @@ impl StandaloneCtx {
107107
let name = I::Workflow::NAME;
108108
let id = Uuid::new_v4();
109109

110-
tracing::info!(%name, %id, ?tags, ?input, "dispatching tagged workflow");
110+
tracing::info!(workflow_name=%name, workflow_id=%id, ?tags, ?input, "dispatching tagged workflow");
111111

112112
// Serialize input
113113
let input_val = serde_json::to_value(input)
@@ -119,7 +119,7 @@ impl StandaloneCtx {
119119
.await
120120
.map_err(GlobalError::raw)?;
121121

122-
tracing::info!(%name, ?id, "workflow dispatched");
122+
tracing::info!(workflow_name=%name, workflow_id=%id, "workflow dispatched");
123123

124124
Ok(id)
125125
}
@@ -130,7 +130,7 @@ impl StandaloneCtx {
130130
&self,
131131
workflow_id: Uuid,
132132
) -> GlobalResult<W::Output> {
133-
tracing::info!(name=W::NAME, id=?workflow_id, "waiting for workflow");
133+
tracing::info!(workflow_name=%W::NAME, id=?workflow_id, "waiting for workflow");
134134

135135
tokio::time::timeout(WORKFLOW_TIMEOUT, async move {
136136
let mut interval = tokio::time::interval(SUB_WORKFLOW_RETRY);
@@ -187,7 +187,7 @@ impl StandaloneCtx {
187187
) -> GlobalResult<Uuid> {
188188
let signal_id = Uuid::new_v4();
189189

190-
tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");
190+
tracing::info!(signal_name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");
191191

192192
// Serialize input
193193
let input_val = serde_json::to_value(input)
@@ -209,7 +209,7 @@ impl StandaloneCtx {
209209
) -> GlobalResult<Uuid> {
210210
let signal_id = Uuid::new_v4();
211211

212-
tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");
212+
tracing::info!(signal_name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");
213213

214214
// Serialize input
215215
let input_val = serde_json::to_value(input)

lib/chirp-workflow/core/src/ctx/test.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ impl TestCtx {
9494
let name = I::Workflow::NAME;
9595
let id = Uuid::new_v4();
9696

97-
tracing::info!(%name, %id, ?input, "dispatching workflow");
97+
tracing::info!(workflow_name=%name, workflow_id=%id, ?input, "dispatching workflow");
9898

9999
// Serialize input
100100
let input_val = serde_json::to_value(input)
@@ -106,7 +106,7 @@ impl TestCtx {
106106
.await
107107
.map_err(GlobalError::raw)?;
108108

109-
tracing::info!(%name, ?id, "workflow dispatched");
109+
tracing::info!(workflow_name=%name, workflow_id=%id, "workflow dispatched");
110110

111111
Ok(id)
112112
}
@@ -123,7 +123,7 @@ impl TestCtx {
123123
let name = I::Workflow::NAME;
124124
let id = Uuid::new_v4();
125125

126-
tracing::info!(%name, %id, ?tags, ?input, "dispatching tagged workflow");
126+
tracing::info!(workflow_name=%name, workflow_id=%id, ?tags, ?input, "dispatching tagged workflow");
127127

128128
// Serialize input
129129
let input_val = serde_json::to_value(input)
@@ -135,7 +135,7 @@ impl TestCtx {
135135
.await
136136
.map_err(GlobalError::raw)?;
137137

138-
tracing::info!(%name, ?id, "workflow dispatched");
138+
tracing::info!(workflow_name=%name, workflow_id=%id, "workflow dispatched");
139139

140140
Ok(id)
141141
}
@@ -144,7 +144,7 @@ impl TestCtx {
144144
&self,
145145
workflow_id: Uuid,
146146
) -> GlobalResult<W::Output> {
147-
tracing::info!(name=W::NAME, id=?workflow_id, "waiting for workflow");
147+
tracing::info!(workflow_name=%W::NAME, %workflow_id, "waiting for workflow");
148148

149149
let mut interval = tokio::time::interval(SUB_WORKFLOW_RETRY);
150150
loop {
@@ -198,7 +198,7 @@ impl TestCtx {
198198
) -> GlobalResult<Uuid> {
199199
let signal_id = Uuid::new_v4();
200200

201-
tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");
201+
tracing::info!(signal_name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");
202202

203203
// Serialize input
204204
let input_val = serde_json::to_value(input)
@@ -220,7 +220,7 @@ impl TestCtx {
220220
) -> GlobalResult<Uuid> {
221221
let signal_id = Uuid::new_v4();
222222

223-
tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");
223+
tracing::info!(signal_name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");
224224

225225
// Serialize input
226226
let input_val = serde_json::to_value(input)

0 commit comments

Comments
 (0)