Skip to content

Commit db7c1e4

Browse files
committed
feat: global error raw variant
1 parent 0dbb713 commit db7c1e4

File tree

14 files changed

+54
-45
lines changed

14 files changed

+54
-45
lines changed

lib/chirp-workflow/core/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ edition = "2021"
66
license = "Apache-2.0"
77

88
[dependencies]
9-
anyhow = "1.0.82"
109
async-trait = "0.1.80"
1110
chirp-client = { path = "../../chirp/client" }
1211
chirp-workflow-macros = { path = "../macros" }
@@ -32,3 +31,6 @@ tokio = { version = "1.37.0", features = ["full"] }
3231
tracing = "0.1.40"
3332
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
3433
uuid = { version = "1.8.0", features = ["v4", "serde"] }
34+
35+
[dev-dependencies]
36+
anyhow = "1.0.82"

lib/chirp-workflow/core/src/activity.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{fmt::Debug, hash::Hash};
22

3-
use anyhow::*;
43
use async_trait::async_trait;
4+
use global_error::GlobalResult;
55
use serde::{de::DeserializeOwned, Serialize};
66

77
use crate::ActivityCtx;
@@ -13,7 +13,7 @@ pub trait Activity {
1313

1414
fn name() -> &'static str;
1515

16-
async fn run(ctx: &mut ActivityCtx, input: &Self::Input) -> Result<Self::Output>;
16+
async fn run(ctx: &mut ActivityCtx, input: &Self::Input) -> GlobalResult<Self::Output>;
1717
}
1818

1919
pub trait ActivityInput: Serialize + DeserializeOwned + Debug + Hash + Send {

lib/chirp-workflow/core/src/ctx/activity.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ impl ActivityCtx {
2727
name.to_string(),
2828
std::time::Duration::from_secs(60),
2929
conn.clone(),
30-
// TODO: req_id
31-
Uuid::new_v4(),
3230
workflow_id,
31+
// TODO: ray_id
32+
Uuid::new_v4(),
3333
rivet_util::timestamp::now(),
3434
// TODO: req_ts
3535
rivet_util::timestamp::now(),

lib/chirp-workflow/core/src/ctx/workflow.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use std::{collections::HashMap, sync::Arc};
22

3-
use anyhow::*;
43
use serde::Serialize;
54
use tokio::time::Duration;
65
use uuid::Uuid;
@@ -138,7 +137,7 @@ impl WorkflowCtx {
138137
}
139138
}
140139

141-
async fn run_workflow_inner(&mut self) -> Result<()> {
140+
async fn run_workflow_inner(&mut self) -> WorkflowResult<()> {
142141
tracing::info!(id=%self.workflow_id, "running workflow");
143142

144143
// Lookup workflow

lib/chirp-workflow/core/src/error.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use anyhow::*;
1+
use global_error::GlobalError;
22
use uuid::Uuid;
33

44
pub type WorkflowResult<T> = Result<T, WorkflowError>;
@@ -10,13 +10,13 @@ pub type WorkflowResult<T> = Result<T, WorkflowError>;
1010
#[derive(thiserror::Error, Debug)]
1111
pub enum WorkflowError {
1212
#[error("workflow failure: {0:?}")]
13-
WorkflowFailure(Error),
13+
WorkflowFailure(GlobalError),
1414

1515
#[error("activity failure: {0:?}")]
16-
ActivityFailure(Error),
16+
ActivityFailure(GlobalError),
1717

1818
#[error("operation failure: {0:?}")]
19-
OperationFailure(Error),
19+
OperationFailure(GlobalError),
2020

2121
#[error("workflow missing from registry: {0}")]
2222
WorkflowMissingFromRegistry(String),

lib/chirp-workflow/core/src/operation.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use anyhow::*;
21
use async_trait::async_trait;
2+
use global_error::GlobalResult;
33

44
use crate::OperationCtx;
55

@@ -10,7 +10,7 @@ pub trait Operation {
1010

1111
fn name() -> &'static str;
1212

13-
async fn run(ctx: &mut OperationCtx, input: &Self::Input) -> Result<Self::Output>;
13+
async fn run(ctx: &mut OperationCtx, input: &Self::Input) -> GlobalResult<Self::Output>;
1414
}
1515

1616
pub trait OperationInput: Send {

lib/chirp-workflow/core/src/prelude.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ pub use chirp_workflow_macros::*;
2828

2929
// External libraries
3030
#[doc(hidden)]
31-
pub use anyhow::{self, Result};
32-
#[doc(hidden)]
3331
pub use async_trait;
3432
#[doc(hidden)]
3533
pub use futures_util;

lib/chirp-workflow/core/src/registry.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};
22

33
use futures_util::FutureExt;
4+
use global_error::GlobalError;
45

56
use crate::{Workflow, WorkflowCtx, WorkflowError, WorkflowResult};
67

@@ -41,11 +42,20 @@ impl Registry {
4142
// Run workflow
4243
let output = match W::run(ctx, &input).await {
4344
Ok(x) => x,
45+
// Differentiate between WorkflowError and user error
4446
Err(err) => {
45-
// Differentiate between WorkflowError and user error
46-
match err.downcast::<WorkflowError>() {
47-
Ok(err) => return Err(err),
48-
Err(err) => return Err(WorkflowError::WorkflowFailure(err)),
47+
match err {
48+
GlobalError::Raw(inner_err) => {
49+
match inner_err.downcast::<WorkflowError>() {
50+
Ok(inner_err) => return Err(*inner_err),
51+
Err(err) => {
52+
return Err(WorkflowError::WorkflowFailure(
53+
GlobalError::Raw(err),
54+
))
55+
}
56+
}
57+
}
58+
_ => return Err(WorkflowError::WorkflowFailure(err)),
4959
}
5060
}
5161
};

lib/chirp-workflow/core/src/util.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{
33
time::{SystemTime, UNIX_EPOCH},
44
};
55

6-
use anyhow::*;
6+
use global_error::{macros::*, GlobalResult};
77
use rand::Rng;
88
use tokio::time::{self, Duration};
99

@@ -16,7 +16,7 @@ const FAULT_RATE: usize = 80;
1616

1717
pub async fn sleep_until_ts(ts: i64) {
1818
let target_time = UNIX_EPOCH + Duration::from_millis(ts as u64);
19-
if let std::result::Result::Ok(sleep_duration) = target_time.duration_since(SystemTime::now()) {
19+
if let Ok(sleep_duration) = target_time.duration_since(SystemTime::now()) {
2020
time::sleep(sleep_duration).await;
2121
}
2222
}
@@ -100,12 +100,13 @@ pub fn combine_events(
100100
.map(|(k, v)| (k, v.into_iter().map(|(_, v)| v).collect()))
101101
.collect();
102102

103-
WorkflowResult::Ok(event_history)
103+
Ok(event_history)
104104
}
105105

106-
pub fn inject_fault() -> Result<()> {
106+
pub fn inject_fault() -> GlobalResult<()> {
107107
if rand::thread_rng().gen_range(0..100) < FAULT_RATE {
108108
bail!("This is a random panic!");
109109
}
110+
110111
Ok(())
111112
}

lib/chirp-workflow/core/src/workflow.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use anyhow::*;
21
use async_trait::async_trait;
2+
use global_error::GlobalResult;
33
use serde::{de::DeserializeOwned, Serialize};
44
use std::fmt::Debug;
55

@@ -13,7 +13,7 @@ pub trait Workflow {
1313
fn name() -> &'static str;
1414

1515
// TODO: Is there any reason for input to be a reference?
16-
async fn run(ctx: &mut WorkflowCtx, input: &Self::Input) -> Result<Self::Output>;
16+
async fn run(ctx: &mut WorkflowCtx, input: &Self::Input) -> GlobalResult<Self::Output>;
1717
}
1818

1919
pub trait WorkflowInput: Serialize + DeserializeOwned + Debug + Send {

0 commit comments

Comments
 (0)