Skip to content

Commit

Permalink
feat(backend): use result_by_id in branchone (#857)
Browse files Browse the repository at this point in the history
* progress

* fix result by id
  • Loading branch information
rubenfiszel committed Nov 4, 2022
1 parent 41b7783 commit 0170188
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 24 deletions.
36 changes: 36 additions & 0 deletions backend/tests/worker.rs
Expand Up @@ -1843,6 +1843,42 @@ async fn test_branchone_simple(db: Pool<Postgres>) {
assert_eq!(result, serde_json::json!([1, 2]));
}

#[sqlx::test(fixtures("base"))]
async fn test_branchone_with_cond(db: Pool<Postgres>) {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await;
let port = server.addr.port();

let flow: FlowValue = serde_json::from_value(json!({
"modules": [
{
"id": "a",
"value": {
"type": "rawscript",
"language": "deno",
"content": "export function main(){ return [1] }",
}
},
{
"value": {
"branches": [{"expr": "previous_result[0] == 1 && result_by_id(\"a\")[0] == 1", "modules": [module_add_item_to_list(3)]}],
"default": [module_add_item_to_list(2)],
"type": "branchone",
}
},
],
}))
.unwrap();

let flow = JobPayload::RawFlow { value: flow, path: None };
let result = run_job_in_new_worker_until_complete(&db, flow, port)
.await
.result
.unwrap();

assert_eq!(result, serde_json::json!([1, 3]));
}

#[sqlx::test(fixtures("base"))]
async fn test_branchall_sequential(db: Pool<Postgres>) {
initialize_tracing().await;
Expand Down
3 changes: 1 addition & 2 deletions backend/windmill-api/openapi.yaml
Expand Up @@ -3709,8 +3709,7 @@ components:
type: string
args:
$ref: "#/components/schemas/ScriptArgs"
result:
type: object
result: {}
logs:
type: string
deleted:
Expand Down
17 changes: 9 additions & 8 deletions backend/windmill-worker/src/js_eval.rs
Expand Up @@ -49,7 +49,7 @@ pub async fn eval_timeout(
])
}

if !steps.is_empty() {
if !steps.is_empty() || by_id.is_some() {
ops.push(op_get_result::decl())
}

Expand Down Expand Up @@ -238,7 +238,7 @@ async function resource(path) {{
})
.join(""),
);
// tracing::debug!("{}", code);
tracing::debug!("{}", code);
let global = context.execute_script("<anon>", &code)?;
let global = context.resolve_value(global).await?;

Expand Down Expand Up @@ -272,17 +272,18 @@ async fn op_variable(args: Vec<String>) -> Result<String, anyhow::Error> {
}

#[op]
async fn op_get_result(
args: Vec<String>,
) -> Result<windmill_api_client::types::CompletedJob, anyhow::Error> {
async fn op_get_result(args: Vec<String>) -> Result<serde_json::Value, anyhow::Error> {
let workspace = &args[0];
let id = &args[1];
let token = &args[2];
let base_url = &args[3];
let client = windmill_api_client::create_client(base_url, token.clone());
let result = client.get_completed_job(workspace, &id.parse()?).await?;
// TODO: verify this works. Previously this returned Option<serde_jons::Value>, now it's statically typed.
Ok(result.into_inner())
let result = client
.get_completed_job(workspace, &id.parse()?)
.await?
.result
.clone();
Ok(serde_json::json!(result))
}

#[op]
Expand Down
41 changes: 27 additions & 14 deletions backend/windmill-worker/src/worker_flow.rs
Expand Up @@ -301,7 +301,8 @@ pub async fn update_flow_status_after_job_completion(

let stop_early = success
&& if let Some(expr) = r.stop_early_expr.clone() {
compute_bool_from_expr(expr, &r.args, result.clone(), base_internal_url).await?
compute_bool_from_expr(expr, &r.args, result.clone(), base_internal_url, None, None)
.await?
} else {
false
};
Expand Down Expand Up @@ -564,6 +565,8 @@ async fn compute_bool_from_expr(
flow_args: &Option<serde_json::Value>,
result: serde_json::Value,
base_internal_url: &str,
by_id: Option<IdContext>,
creds: Option<EvalCreds>,
) -> error::Result<bool> {
let flow_input = flow_args.clone().unwrap_or_else(|| json!({}));
match eval_timeout(
Expand All @@ -574,9 +577,9 @@ async fn compute_bool_from_expr(
("previous_result".to_string(), result),
]
.into(),
None,
creds,
vec![],
None,
by_id,
base_internal_url.to_string(),
)
.await?
Expand Down Expand Up @@ -1076,13 +1079,12 @@ async fn push_next_flow_job(
}

let mut transform_context: Option<TransformContext> = None;

let mut args = match &module.value {
FlowModuleValue::Script { input_transforms, .. }
| FlowModuleValue::RawScript { input_transforms, .. } => {
let tx = db.begin().await?;
let (tx, ctx) = get_transform_context(tx, &flow_job, &status, &flow.modules).await?;
let ctx = get_transform_context(db, &flow_job, &status, &flow.modules).await?;
transform_context = Some(ctx);
tx.commit().await?;
let (token, steps, by_id) = transform_context.as_ref().unwrap();
transform_input(
&flow_job.args,
Expand Down Expand Up @@ -1126,10 +1128,12 @@ async fn push_next_flow_job(
};

let tx = db.begin().await?;

let (tx, next_flow_transform) = compute_next_flow_transform(
flow_job,
&flow,
transform_context,
db,
tx,
&module,
&status,
Expand Down Expand Up @@ -1461,6 +1465,7 @@ async fn compute_next_flow_transform<'c>(
flow_job: &QueuedJob,
flow: &FlowValue,
transform_context: Option<TransformContext>,
db: &DB,
mut tx: sqlx::Transaction<'c, sqlx::Postgres>,
module: &FlowModule,
status: &FlowStatus,
Expand Down Expand Up @@ -1506,10 +1511,7 @@ async fn compute_next_flow_transform<'c>(
let (token, steps, by_id) = if let Some(x) = transform_context {
x
} else {
let (tx_new, res) =
get_transform_context(tx, &flow_job, &status, &flow.modules).await?;
tx = tx_new;
res
get_transform_context(db, &flow_job, &status, &flow.modules).await?
};
let flow_input = flow_job.args.clone().unwrap_or_else(|| json!({}));
/* Iterator is an InputTransform, evaluate it into an array. */
Expand Down Expand Up @@ -1627,12 +1629,19 @@ async fn compute_next_flow_transform<'c>(
let branch = match status_module {
FlowStatusModule::WaitingForPriorSteps { .. } => {
let mut branch_chosen = BranchChosen::Default;
let (token, _steps, idcontext) =
get_transform_context(db, &flow_job, &status, &flow.modules).await?;
for (i, b) in branches.iter().enumerate() {
let pred = compute_bool_from_expr(
b.expr.to_string(),
&flow_job.args,
last_result.clone(),
base_internal_url,
Some(idcontext.clone()),
Some(EvalCreds {
workspace: flow_job.workspace_id.clone(),
token: token.to_string(),
}),
)
.await?;

Expand Down Expand Up @@ -1775,12 +1784,13 @@ async fn compute_next_flow_transform<'c>(
}
}

async fn get_transform_context<'c>(
tx: sqlx::Transaction<'c, sqlx::Postgres>,
async fn get_transform_context(
db: &DB,
flow_job: &QueuedJob,
status: &FlowStatus,
modules: &Vec<FlowModule>,
) -> error::Result<(sqlx::Transaction<'c, sqlx::Postgres>, TransformContext)> {
) -> error::Result<TransformContext> {
let tx = db.begin().await?;
let (tx, new_token) = crate::create_token_for_owner(
tx,
&flow_job.workspace_id,
Expand All @@ -1790,6 +1800,9 @@ async fn get_transform_context<'c>(
&flow_job.created_by,
)
.await?;
//we need to commit asap otherwise the token won't be valid for auth to check outside of this transaction
//which will happen with client http calls
tx.commit().await?;
let new_steps: Vec<Uuid> = status
.modules
.iter()
Expand All @@ -1801,7 +1814,7 @@ async fn get_transform_context<'c>(
.zip(new_steps.clone())
.collect();

Ok((tx, (new_token, new_steps, IdContext(flow_job.id, id_map))))
Ok((new_token, new_steps, IdContext(flow_job.id, id_map)))
}

async fn evaluate_with<F>(
Expand Down

0 comments on commit 0170188

Please sign in to comment.