Skip to content

Commit 2eb7b1c

Browse files
committed
feat(workflows): add metrics
1 parent 2b1ed92 commit 2eb7b1c

File tree

19 files changed

+1603
-121
lines changed

19 files changed

+1603
-121
lines changed

infra/tf/grafana/grafana_dashboards/chirp-workflow.json

Lines changed: 1121 additions & 0 deletions
Large diffs are not rendered by default.

lib/chirp-workflow/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ formatted-error = { path = "../../formatted-error" }
1414
futures-util = "0.3"
1515
global-error = { path = "../../global-error" }
1616
indoc = "2.0.5"
17+
lazy_static = "1.4"
1718
prost = "0.12.4"
1819
prost-types = "0.12.4"
1920
rand = "0.8.5"

lib/chirp-workflow/core/src/ctx/listen.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ impl<'a> ListenCtx<'a> {
3232
return Err(WorkflowError::NoSignalFound(Box::from(signal_names)));
3333
};
3434

35+
let recv_lag = (rivet_util::timestamp::now() as f64 - signal.create_ts as f64) / 1000.;
36+
crate::metrics::SIGNAL_RECV_LAG
37+
.with_label_values(&[&self.ctx.name(), &signal.signal_name])
38+
.observe(recv_lag);
39+
3540
tracing::info!(
3641
workflow_name=%self.ctx.name(),
3742
workflow_id=%self.ctx.workflow_id(),

lib/chirp-workflow/core/src/ctx/message.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,12 @@ impl MessageCtx {
322322
let message = if let Some(message_buf) = message_buf {
323323
let message = ReceivedMessage::<M>::deserialize(message_buf.as_slice())?;
324324
tracing::info!(?message, "immediate read tail message");
325+
326+
let recv_lag = (rivet_util::timestamp::now() as f64 - message.ts as f64) / 1000.;
327+
crate::metrics::MESSAGE_RECV_LAG
328+
.with_label_values(&[M::NAME])
329+
.observe(recv_lag);
330+
325331
Some(message)
326332
} else {
327333
tracing::info!("no tail message to read");
@@ -520,6 +526,11 @@ where
520526
let message = ReceivedMessage::<M>::deserialize(&nats_message.payload[..])?;
521527
tracing::info!(?message, "received message");
522528

529+
let recv_lag = (rivet_util::timestamp::now() as f64 - message.ts as f64) / 1000.;
530+
crate::metrics::MESSAGE_RECV_LAG
531+
.with_label_values(&[M::NAME])
532+
.observe(recv_lag);
533+
523534
return Ok(message);
524535
}
525536

lib/chirp-workflow/core/src/ctx/workflow.rs

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::HashMap, sync::Arc};
1+
use std::{collections::HashMap, sync::Arc, time::Instant};
22

33
use global_error::{GlobalError, GlobalResult};
44
use serde::{de::DeserializeOwned, Serialize};
@@ -15,9 +15,10 @@ use crate::{
1515
executable::{closure, AsyncResult, Executable},
1616
listen::{CustomListener, Listen},
1717
message::Message,
18+
metrics,
1819
registry::RegistryHandle,
1920
signal::Signal,
20-
util::Location,
21+
util::{GlobalErrorExt, Location},
2122
workflow::{Workflow, WorkflowInput},
2223
};
2324

@@ -211,7 +212,7 @@ impl WorkflowCtx {
211212
// Retry the workflow if its recoverable
212213
let deadline_ts = if let Some(deadline_ts) = err.backoff() {
213214
Some(deadline_ts)
214-
} else if err.is_recoverable() {
215+
} else if err.is_retryable() {
215216
Some(rivet_util::timestamp::now() + RETRY_TIMEOUT_MS as i64)
216217
} else {
217218
None
@@ -225,11 +226,14 @@ impl WorkflowCtx {
225226
// finish. This workflow will be retried when the sub workflow completes
226227
let wake_sub_workflow = err.sub_workflow();
227228

228-
if deadline_ts.is_some() || !wake_signals.is_empty() || wake_sub_workflow.is_some()
229-
{
229+
if err.is_recoverable() && !err.is_retryable() {
230230
tracing::info!(name=%self.name, id=%self.workflow_id, ?err, "workflow sleeping");
231231
} else {
232232
tracing::error!(name=%self.name, id=%self.workflow_id, ?err, "workflow error");
233+
234+
metrics::WORKFLOW_ERRORS
235+
.with_label_values(&[&self.name, err.to_string().as_str()])
236+
.inc();
233237
}
234238

235239
let err_str = err.to_string();
@@ -288,10 +292,14 @@ impl WorkflowCtx {
288292
A::NAME,
289293
);
290294

295+
let start_instant = Instant::now();
296+
291297
let res = tokio::time::timeout(A::TIMEOUT, A::run(&ctx, input))
292298
.await
293299
.map_err(|_| WorkflowError::ActivityTimeout);
294300

301+
let dt = start_instant.elapsed().as_secs_f64();
302+
295303
match res {
296304
Ok(Ok(output)) => {
297305
tracing::debug!("activity success");
@@ -313,45 +321,69 @@ impl WorkflowCtx {
313321
)
314322
.await?;
315323

324+
metrics::ACTIVITY_DURATION
325+
.with_label_values(&[&self.name, A::NAME, ""])
326+
.observe(dt);
327+
316328
Ok(output)
317329
}
318330
Ok(Err(err)) => {
319331
tracing::debug!(?err, "activity error");
320332

321-
// Write error (failed state)
333+
let err_str = err.to_string();
322334
let input_val =
323335
serde_json::to_value(input).map_err(WorkflowError::SerializeActivityInput)?;
336+
337+
// Write error (failed state)
324338
self.db
325339
.commit_workflow_activity_event(
326340
self.workflow_id,
327341
self.full_location().as_ref(),
328342
activity_id,
329343
create_ts,
330344
input_val,
331-
Err(&err.to_string()),
345+
Err(&err_str),
332346
self.loop_location(),
333347
)
334348
.await?;
335349

350+
if !err.is_workflow_recoverable() {
351+
metrics::ACTIVITY_ERRORS
352+
.with_label_values(&[&self.name, A::NAME, &err_str])
353+
.inc();
354+
}
355+
metrics::ACTIVITY_DURATION
356+
.with_label_values(&[&self.name, A::NAME, &err_str])
357+
.observe(dt);
358+
336359
Err(WorkflowError::ActivityFailure(err, 0))
337360
}
338361
Err(err) => {
339362
tracing::debug!("activity timeout");
340363

364+
let err_str = err.to_string();
341365
let input_val =
342366
serde_json::to_value(input).map_err(WorkflowError::SerializeActivityInput)?;
367+
343368
self.db
344369
.commit_workflow_activity_event(
345370
self.workflow_id,
346371
self.full_location().as_ref(),
347372
activity_id,
348373
create_ts,
349374
input_val,
350-
Err(&err.to_string()),
375+
Err(&err_str),
351376
self.loop_location(),
352377
)
353378
.await?;
354379

380+
metrics::ACTIVITY_ERRORS
381+
.with_label_values(&[&self.name, A::NAME, &err_str])
382+
.inc();
383+
metrics::ACTIVITY_DURATION
384+
.with_label_values(&[&self.name, A::NAME, &err_str])
385+
.observe(dt);
386+
355387
Err(err)
356388
}
357389
}

lib/chirp-workflow/core/src/db/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ pub struct SignalRow {
236236
pub signal_id: Uuid,
237237
pub signal_name: String,
238238
pub body: serde_json::Value,
239+
pub create_ts: i64,
239240
}
240241

241242
#[derive(sqlx::FromRow)]

0 commit comments

Comments
 (0)