Skip to content

Commit

Permalink
feat: add include_header to pass request headers to script
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenfiszel committed Dec 5, 2022
1 parent cfe8011 commit 31c317b
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 20 deletions.
2 changes: 1 addition & 1 deletion backend/tests/worker.rs
Expand Up @@ -821,7 +821,7 @@ impl RunJob {
tx,
"test-workspace",
payload,
Some(args),
args,
/* user */ "test-user",
/* permissioned_as */ "u/admin".to_string(),
/* scheduled_for_o */ None,
Expand Down
2 changes: 1 addition & 1 deletion backend/windmill-api/src/apps.rs
Expand Up @@ -533,7 +533,7 @@ async fn execute_component(
tx,
&w_id,
job_payload,
Some(args),
args,
&username,
permissioned_as,
None,
Expand Down
4 changes: 2 additions & 2 deletions backend/windmill-api/src/flows.rs
Expand Up @@ -192,7 +192,7 @@ async fn create_flow(
tx,
&w_id,
JobPayload::FlowDependencies { path: nf.path.clone() },
None,
serde_json::Map::new(),
&authed.username,
windmill_common::users::owner_to_token_owner(&authed.username, false),
None,
Expand Down Expand Up @@ -296,7 +296,7 @@ async fn update_flow(
tx,
&w_id,
JobPayload::FlowDependencies { path: nf.path.clone() },
None,
serde_json::Map::new(),
&authed.username,
windmill_common::users::owner_to_token_owner(&authed.username, false),
None,
Expand Down
57 changes: 48 additions & 9 deletions backend/windmill-api/src/jobs.rs
Expand Up @@ -16,7 +16,7 @@ use axum::{
Extension, Json, Router,
};
use hmac::Mac;
use hyper::StatusCode;
use hyper::{HeaderMap, StatusCode};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use sql_builder::{prelude::*, quote, SqlBuilder};
use sqlx::{query_scalar, types::Uuid, Postgres, Transaction};
Expand Down Expand Up @@ -240,16 +240,17 @@ pub struct CompletedJob {
pub is_skipped: bool,
}

#[derive(Deserialize, Clone, Copy)]
#[derive(Deserialize, Clone)]
pub struct RunJobQuery {
scheduled_for: Option<chrono::DateTime<chrono::Utc>>,
scheduled_in_secs: Option<i64>,
parent_job: Option<Uuid>,
include_header: Option<String>,
}

impl RunJobQuery {
async fn get_scheduled_for<'c>(
self,
&self,
db: &mut Transaction<'c, Postgres>,
) -> error::Result<Option<chrono::DateTime<chrono::Utc>>> {
if let Some(scheduled_for) = self.scheduled_for {
Expand All @@ -261,6 +262,27 @@ impl RunJobQuery {
Ok(None)
}
}

fn add_include_headers(
&self,
headers: HeaderMap,
mut args: serde_json::Map<String, serde_json::Value>,
) -> serde_json::Map<String, serde_json::Value> {
self.include_header
.as_ref()
.map(|s| s.split(",").map(|s| s.to_string()).collect::<Vec<_>>())
.unwrap_or_default()
.iter()
.for_each(|h| {
if let Some(v) = headers.get(h) {
args.insert(
h.to_string().to_lowercase().replace('-', "_"),
serde_json::Value::String(v.to_str().unwrap().to_string()),
);
}
});
args
}
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -921,10 +943,13 @@ pub async fn run_flow_by_path(
Path((w_id, flow_path)): Path<(String, StripPath)>,
axum::Json(args): axum::Json<Option<serde_json::Map<String, serde_json::Value>>>,
Query(run_query): Query<RunJobQuery>,
headers: HeaderMap,
) -> error::Result<(StatusCode, String)> {
let flow_path = flow_path.to_path();
let mut tx = user_db.begin(&authed).await?;
let scheduled_for = run_query.get_scheduled_for(&mut tx).await?;
let args = run_query.add_include_headers(headers, args.unwrap_or_default());

let (uuid, tx) = push(
tx,
&w_id,
Expand All @@ -949,11 +974,13 @@ pub async fn run_job_by_path(
Path((w_id, script_path)): Path<(String, StripPath)>,
axum::Json(args): axum::Json<Option<serde_json::Map<String, serde_json::Value>>>,
Query(run_query): Query<RunJobQuery>,
headers: HeaderMap,
) -> error::Result<(StatusCode, String)> {
let script_path = script_path.to_path();
let mut tx = user_db.begin(&authed).await?;
let job_payload = script_path_to_payload(script_path, &mut tx, &w_id).await?;
let scheduled_for = run_query.get_scheduled_for(&mut tx).await?;
let args = run_query.add_include_headers(headers, args.unwrap_or_default());

let (uuid, tx) = push(
tx,
Expand Down Expand Up @@ -1011,12 +1038,15 @@ pub async fn run_wait_result_job_by_path(
Path((w_id, script_path)): Path<(String, StripPath)>,
axum::Json(args): axum::Json<Option<serde_json::Map<String, serde_json::Value>>>,
Query(run_query): Query<RunJobQuery>,
headers: HeaderMap,
) -> error::JsonResult<serde_json::Value> {
let script_path = script_path.to_path();
let mut tx = user_db.clone().begin(&authed).await?;
let job_payload = script_path_to_payload(script_path, &mut tx, &w_id).await?;
let scheduled_for = run_query.get_scheduled_for(&mut tx).await?;

let args = run_query.add_include_headers(headers, args.unwrap_or_default());

let (uuid, tx) = push(
tx,
&w_id,
Expand All @@ -1042,11 +1072,13 @@ pub async fn run_wait_result_job_by_hash(
Path((w_id, script_hash)): Path<(String, ScriptHash)>,
axum::Json(args): axum::Json<Option<serde_json::Map<String, serde_json::Value>>>,
Query(run_query): Query<RunJobQuery>,
headers: HeaderMap,
) -> error::JsonResult<serde_json::Value> {
let hash = script_hash.0;
let mut tx = user_db.clone().begin(&authed).await?;
let path = get_path_for_hash(&mut tx, &w_id, hash).await?;
let scheduled_for = run_query.get_scheduled_for(&mut tx).await?;
let args = run_query.add_include_headers(headers, args.unwrap_or_default());

let (uuid, tx) = push(
tx,
Expand Down Expand Up @@ -1087,10 +1119,12 @@ async fn run_preview_job(
Extension(user_db): Extension<UserDB>,
Path(w_id): Path<String>,
Json(preview): Json<Preview>,
Query(sch_query): Query<RunJobQuery>,
Query(run_query): Query<RunJobQuery>,
headers: HeaderMap,
) -> error::Result<(StatusCode, String)> {
let mut tx = user_db.begin(&authed).await?;
let scheduled_for = sch_query.get_scheduled_for(&mut tx).await?;
let scheduled_for = run_query.get_scheduled_for(&mut tx).await?;
let args = run_query.add_include_headers(headers, preview.args.unwrap_or_default());

let (uuid, tx) = push(
tx,
Expand All @@ -1101,7 +1135,7 @@ async fn run_preview_job(
language: preview.language,
lock: None,
}),
preview.args,
args,
&authed.username,
owner_to_token_owner(&authed.username, false),
scheduled_for,
Expand All @@ -1120,15 +1154,18 @@ async fn run_preview_flow_job(
Extension(user_db): Extension<UserDB>,
Path(w_id): Path<String>,
Json(raw_flow): Json<PreviewFlow>,
Query(sch_query): Query<RunJobQuery>,
Query(run_query): Query<RunJobQuery>,
headers: HeaderMap,
) -> error::Result<(StatusCode, String)> {
let mut tx = user_db.begin(&authed).await?;
let scheduled_for = sch_query.get_scheduled_for(&mut tx).await?;
let scheduled_for = run_query.get_scheduled_for(&mut tx).await?;
let args = run_query.add_include_headers(headers, raw_flow.args.unwrap_or_default());

let (uuid, tx) = push(
tx,
&w_id,
JobPayload::RawFlow { value: raw_flow.value, path: raw_flow.path },
raw_flow.args,
args,
&authed.username,
owner_to_token_owner(&authed.username, false),
scheduled_for,
Expand All @@ -1148,11 +1185,13 @@ pub async fn run_job_by_hash(
Path((w_id, script_hash)): Path<(String, ScriptHash)>,
axum::Json(args): axum::Json<Option<serde_json::Map<String, serde_json::Value>>>,
Query(run_query): Query<RunJobQuery>,
headers: HeaderMap,
) -> error::Result<(StatusCode, String)> {
let hash = script_hash.0;
let mut tx = user_db.begin(&authed).await?;
let path = get_path_for_hash(&mut tx, &w_id, hash).await?;
let scheduled_for = run_query.get_scheduled_for(&mut tx).await?;
let args = run_query.add_include_headers(headers, args.unwrap_or_default());

let (uuid, tx) = push(
tx,
Expand Down
2 changes: 1 addition & 1 deletion backend/windmill-api/src/oauth2.rs
Expand Up @@ -783,7 +783,7 @@ async fn slack_command(
tx,
&settings.workspace_id,
payload,
Some(map),
map,
&form.user_name,
"g/slack".to_string(),
None,
Expand Down
2 changes: 1 addition & 1 deletion backend/windmill-api/src/scripts.rs
Expand Up @@ -337,7 +337,7 @@ async fn create_script(
tx,
&w_id,
windmill_queue::JobPayload::Dependencies { hash, dependencies, language: ns.language },
None,
serde_json::Map::new(),
&authed.username,
owner_to_token_owner(&authed.username, false),
None,
Expand Down
4 changes: 2 additions & 2 deletions backend/windmill-queue/src/jobs.rs
Expand Up @@ -243,7 +243,7 @@ pub async fn push<'c>(
mut tx: Transaction<'c, Postgres>,
workspace_id: &str,
job_payload: JobPayload,
args: Option<serde_json::Map<String, serde_json::Value>>,
args: serde_json::Map<String, serde_json::Value>,
user: &str,
permissioned_as: String,
scheduled_for_o: Option<chrono::DateTime<chrono::Utc>>,
Expand All @@ -253,7 +253,7 @@ pub async fn push<'c>(
mut same_worker: bool,
) -> Result<(Uuid, Transaction<'c, Postgres>), Error> {
let scheduled_for = scheduled_for_o.unwrap_or_else(chrono::Utc::now);
let args_json = args.map(serde_json::Value::Object);
let args_json = serde_json::Value::Object(args);
let job_id: Uuid = Ulid::new().into();

let premium_workspace =
Expand Down
4 changes: 2 additions & 2 deletions backend/windmill-queue/src/schedule.rs
Expand Up @@ -74,11 +74,11 @@ pub async fn push_scheduled_job<'c>(
return Ok(tx);
}

let mut args: Option<serde_json::Map<String, serde_json::Value>> = None;
let mut args: serde_json::Map<String, serde_json::Value> = serde_json::Map::new();

if let Some(args_v) = schedule.args {
if let serde_json::Value::Object(args_m) = args_v {
args = Some(args_m)
args = args_m
} else {
return Err(error::Error::ExecutionErr(
"args of scripts needs to be dict".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion backend/windmill-worker/src/worker_flow.rs
Expand Up @@ -1194,7 +1194,7 @@ async fn push_next_flow_job(
tx,
&flow_job.workspace_id,
payload,
Some(args),
args,
&flow_job.created_by,
flow_job.permissioned_as.to_owned(),
scheduled_for_o,
Expand Down

0 comments on commit 31c317b

Please sign in to comment.