Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions backend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ pub enum Error {
Anyhow(#[from] anyhow::Error),
}

impl Error {
/// https://docs.rs/anyhow/1/anyhow/struct.Error.html#display-representations
pub fn alt(&self) -> String {
format!("{:#}", self)
}
}

pub fn to_anyhow<T: 'static + std::error::Error + Send + Sync>(e: T) -> anyhow::Error {
From::from(e)
}
Expand Down
252 changes: 238 additions & 14 deletions backend/src/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/

use std::collections::HashMap;
use std::time::Duration;

use reqwest::Client;
use sql_builder::prelude::*;
Expand All @@ -25,6 +26,7 @@ use crate::{
db::{UserDB, DB},
error::{self, to_anyhow, Error, JsonResult, Result},
jobs::RawCode,
more_serde::{default_true, is_default},
scripts::Schema,
users::Authed,
utils::{http_get_from_hub, list_elems_from_hub, Pagination, StripPath},
Expand Down Expand Up @@ -69,17 +71,86 @@ pub struct NewFlow {
pub schema: Option<Schema>,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
#[derive(Deserialize, Serialize, Debug, Clone, Default)]
pub struct FlowValue {
pub modules: Vec<FlowModule>,
#[serde(default)]
#[serde(skip_serializing_if = "is_default")]
pub retry: Retry,
#[serde(default)]
pub failure_module: Option<FlowModule>,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct StopAfterIf {
pub expr: String,
pub skip_if_stopped: bool,
}

#[derive(Deserialize, Serialize, Debug, Clone, Default, PartialEq)]
#[serde(default)]
pub struct Retry {
constant: ConstantDelay,
exponential: ExponentialDelay,
}

impl Retry {
/// Takes the number of previous retries and returns the interval until the next retry if any.
///
/// May return [`Duration::ZERO`] to retry immediately.
pub fn interval(&self, previous_attempts: u16) -> Option<Duration> {
let Self { constant, exponential } = self;

if previous_attempts < constant.attempts {
Some(Duration::from_secs(constant.seconds as u64))
} else if previous_attempts - constant.attempts < exponential.attempts {
let exp = previous_attempts.saturating_add(1) as u32;
let secs = exponential.multiplier * exponential.seconds.saturating_pow(exp);
Some(Duration::from_secs(secs as u64))
} else {
None
}
}

pub fn has_attempts(&self) -> bool {
self.constant.attempts != 0 || self.exponential.attempts != 0
}

pub fn max_attempts(&self) -> u16 {
self.constant
.attempts
.saturating_add(self.exponential.attempts)
}

pub fn max_interval(&self) -> Option<Duration> {
self.max_attempts()
.checked_sub(1)
.and_then(|p| self.interval(p))
}
}

#[derive(Deserialize, Serialize, Debug, Clone, Default, PartialEq)]
#[serde(default)]
pub struct ConstantDelay {
pub attempts: u16,
pub seconds: u16,
}

/// multiplier * seconds ^ failures
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
#[serde(default)]
pub struct ExponentialDelay {
pub attempts: u16,
pub multiplier: u16,
pub seconds: u16,
}

impl Default for ExponentialDelay {
fn default() -> Self {
Self { attempts: 0, multiplier: 1, seconds: 0 }
}
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct FlowModule {
#[serde(default)]
Expand Down Expand Up @@ -121,10 +192,6 @@ pub enum FlowModuleValue {
RawScript(RawCode),
}

fn default_true() -> bool {
true
}

#[derive(Deserialize)]
pub struct ListFlowQuery {
pub path_start: Option<String>,
Expand Down Expand Up @@ -413,17 +480,18 @@ mod tests {
// Note this useful idiom: importing names from outer (for mod tests) scope.
use super::*;

const SECOND: Duration = Duration::from_secs(1);

#[test]
fn test_serialize() -> anyhow::Result<()> {
let mut hm = HashMap::new();
hm.insert(
"test".to_owned(),
InputTransform::Static { value: serde_json::json!("test2") },
);
fn flowmodule_serde() {
let fv = FlowValue {
modules: vec![
FlowModule {
input_transforms: hm,
input_transforms: [(
"test".to_string(),
InputTransform::Static { value: serde_json::json!("test2") },
)]
.into(),
value: FlowModuleValue::Script { path: "test".to_string() },
stop_after_if: None,
summary: None,
Expand Down Expand Up @@ -468,9 +536,83 @@ mod tests {
}),
summary: None,
}),
retry: Default::default(),
};
println!("{}", serde_json::json!(fv).to_string());
Ok(())
let expect = serde_json::json!({
"modules": [
{
"input_transforms": {
"test": {
"type": "static",
"value": "test2"
}
},
"value": {
"type": "script",
"path": "test"
},
"stop_after_if": null,
"summary": null
},
{
"input_transforms": {},
"value": {
"type": "rawscript",
"content": "test",
"path": null,
"language": "deno"
},
"stop_after_if": {
"expr": "foo = 'bar'",
"skip_if_stopped": false
},
"summary": null
},
{
"input_transforms": {
"iterand": {
"type": "static",
"value": [
1,
2,
3
]
}
},
"value": {
"type": "forloopflow",
"iterator": {
"type": "static",
"value": [
1,
2,
3
]
},
"skip_failures": true,
"modules": []
},
"stop_after_if": {
"expr": "previous.isEmpty()",
"skip_if_stopped": false,
},
"summary": null
}
],
"failure_module": {
"input_transforms": {},
"value": {
"type": "flow",
"path": "test"
},
"stop_after_if": {
"expr": "previous.isEmpty()",
"skip_if_stopped": false
},
"summary": null
}
});
assert_eq!(dbg!(serde_json::json!(fv)), dbg!(expect));
}

#[test]
Expand All @@ -497,4 +639,86 @@ mod tests {
InputTransform::Javascript { expr: "flow_input.iter.value".to_string() }
);
}

#[test]
fn retry_serde() {
assert_eq!(Retry::default(), serde_json::from_str(r#"{}"#).unwrap());

assert_eq!(
Retry::default(),
serde_json::from_str(
r#"
{
"constant": {
"seconds": 0
},
"exponential": {
"multiplier": 1,
"seconds": 0
}
}
"#
)
.unwrap()
);

assert_eq!(
Retry {
constant: Default::default(),
exponential: ExponentialDelay { attempts: 0, multiplier: 1, seconds: 123 }
},
serde_json::from_str(
r#"
{
"constant": {},
"exponential": { "seconds": 123 }
}
"#
)
.unwrap()
);
}

#[test]
fn retry_exponential() {
let retry = Retry {
constant: ConstantDelay::default(),
exponential: ExponentialDelay { attempts: 3, multiplier: 4, seconds: 3 },
};
assert_eq!(
vec![
Some(12 * SECOND),
Some(36 * SECOND),
Some(108 * SECOND),
None
],
(0..4)
.map(|previous_attempts| retry.interval(previous_attempts))
.collect::<Vec<_>>()
);

assert_eq!(Some(108 * SECOND), retry.max_interval());
}

#[test]
fn retry_both() {
let retry = Retry {
constant: ConstantDelay { attempts: 2, seconds: 4 },
exponential: ExponentialDelay { attempts: 2, multiplier: 1, seconds: 3 },
};
assert_eq!(
vec![
Some(4 * SECOND),
Some(4 * SECOND),
Some(27 * SECOND),
Some(81 * SECOND),
None,
],
(0..5)
.map(|previous_attempts| retry.interval(previous_attempts))
.collect::<Vec<_>>()
);

assert_eq!(Some(81 * SECOND), retry.max_interval());
}
}
14 changes: 13 additions & 1 deletion backend/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
users::{owner_to_token_owner, Authed},
utils::{require_admin, Pagination, StripPath, now_from_db},
worker,
worker_flow::init_flow_status,
worker_flow::{init_flow_status, FlowStatus},
};
use axum::{
extract::{Extension, Path, Query},
Expand Down Expand Up @@ -99,6 +99,18 @@ impl QueuedJob {
.map(String::as_str)
.unwrap_or("NO_FLOW_PATH")
}

pub fn parse_raw_flow(&self) -> Option<FlowValue> {
self.raw_flow
.as_ref()
.and_then(|v| serde_json::from_value::<FlowValue>(v.clone()).ok())
}

pub fn parse_flow_status(&self) -> Option<FlowStatus> {
self.flow_status
.as_ref()
.and_then(|v| serde_json::from_value::<FlowStatus>(v.clone()).ok())
}
}

#[derive(Debug, sqlx::FromRow, Serialize)]
Expand Down
1 change: 1 addition & 0 deletions backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ mod granular_acls;
mod groups;
mod jobs;
mod js_eval;
mod more_serde;
mod oauth2;
mod parser;
mod parser_py;
Expand Down
9 changes: 9 additions & 0 deletions backend/src/more_serde.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//! helpers for serde + serde derive attributes

pub fn default_true() -> bool {
true
}

pub fn is_default<T: Default + std::cmp::PartialEq>(t: &T) -> bool {
&T::default() == t
}
Loading