diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index faa9e9a05049..4e9ad24fc00d 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -20,6 +20,7 @@ use axum::{ Extension, Form, RequestExt, Router, }; use base64::Engine; +use bytes::Bytes; use hmac::Mac; use hyper::{header::CONTENT_TYPE, http, HeaderMap, Request, StatusCode}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; @@ -1216,11 +1217,13 @@ struct PreviewFlow { args: Option>, } -pub struct JsonOrForm(T); +pub struct JsonOrForm( + Option>, + Option, +); #[axum::async_trait] -impl FromRequest - for JsonOrForm>> +impl FromRequest for JsonOrForm where S: Send + Sync, { @@ -1232,25 +1235,51 @@ where ) -> Result { let content_type_header = req.headers().get(CONTENT_TYPE); let content_type = content_type_header.and_then(|value| value.to_str().ok()); - if let Some(content_type) = content_type { if content_type.starts_with("application/json") { - let Json(payload): Json> = - req.extract().await.map_err(IntoResponse::into_response)?; - return match payload { - Some(serde_json::Value::Object(map)) => Ok(Self(Some(map))), - None => Ok(Self(None)), - Some(x) => { - let mut map = serde_json::Map::new(); - map.insert("body".to_string(), x); - Ok(Self(Some(map))) - } - }; + if req + .uri() + .query() + .map(|x| x.contains("raw=true")) + .unwrap_or(false) + { + let bytes = Bytes::from_request(req, _state) + .await + .map_err(IntoResponse::into_response)?; + let str = String::from_utf8(bytes.to_vec()).map_err(|e| { + Error::BadRequest(format!("invalid utf8: {}", e)).into_response() + })?; + let payload = + serde_json::from_str::>(&str).map_err(|e| { + Error::BadRequest(format!("invalid json: {}", e)).into_response() + })?; + return match payload { + Some(serde_json::Value::Object(map)) => Ok(Self(Some(map), Some(str))), + None => Ok(Self(None, Some(str))), + Some(x) => { + let mut map = serde_json::Map::new(); + map.insert("body".to_string(), x); + Ok(Self(Some(map), Some(str))) + } + }; + } else { + let Json(payload): Json> = + req.extract().await.map_err(IntoResponse::into_response)?; + return match payload { + Some(serde_json::Value::Object(map)) => Ok(Self(Some(map), None)), + None => Ok(Self(None, None)), + Some(x) => { + let mut map = serde_json::Map::new(); + map.insert("body".to_string(), x); + Ok(Self(Some(map), None)) + } + }; + } } if content_type.starts_with("application/x-www-form-urlencoded") { let Form(payload) = req.extract().await.map_err(IntoResponse::into_response)?; - return Ok(Self(Some(payload))); + return Ok(Self(Some(payload), None)); } } @@ -1303,6 +1332,19 @@ fn decode_payload(t: String) -> anyhow::Result { .context("invalid base64")?; serde_json::from_slice(vec.as_slice()).context("invalid json") } + +fn add_raw_string( + raw_string: Option, + mut args: serde_json::Map, +) -> serde_json::Map { + if let Some(raw_string) = raw_string { + args.insert( + "raw_string".to_string(), + serde_json::Value::String(raw_string), + ); + } + return args; +} pub async fn run_flow_by_path( authed: Authed, Extension(user_db): Extension, @@ -1310,12 +1352,13 @@ pub async fn run_flow_by_path( Path((w_id, flow_path)): Path<(String, StripPath)>, Query(run_query): Query, headers: HeaderMap, - JsonOrForm(args): JsonOrForm>>, + JsonOrForm(args, raw_string): JsonOrForm, ) -> error::Result<(StatusCode, String)> { let flow_path = flow_path.to_path(); let mut tx: QueueTransaction<'_, _> = (rsmq, user_db.begin(&authed).await?).into(); let scheduled_for = run_query.get_scheduled_for(tx.transaction_mut()).await?; let args = run_query.add_include_headers(headers, args.unwrap_or_default()); + let args = add_raw_string(raw_string, args); let (uuid, tx) = push( tx, &w_id, @@ -1347,7 +1390,7 @@ pub async fn run_job_by_path( Path((w_id, script_path)): Path<(String, StripPath)>, Query(run_query): Query, headers: HeaderMap, - JsonOrForm(args): JsonOrForm>>, + JsonOrForm(args, raw_string): JsonOrForm, ) -> error::Result<(StatusCode, String)> { let script_path = script_path.to_path(); let mut tx: QueueTransaction<'_, _> = (rsmq, user_db.begin(&authed).await?).into(); @@ -1355,6 +1398,7 @@ pub async fn run_job_by_path( script_path_to_payload(script_path, tx.transaction_mut(), &w_id).await?; let scheduled_for = run_query.get_scheduled_for(tx.transaction_mut()).await?; let args = run_query.add_include_headers(headers, args.unwrap_or_default()); + let args = add_raw_string(raw_string, args); let (uuid, tx) = push( tx, @@ -1594,7 +1638,7 @@ pub async fn run_wait_result_job_by_path( Path((w_id, script_path)): Path<(String, StripPath)>, Query(run_query): Query, headers: HeaderMap, - JsonOrForm(args): JsonOrForm>>, + JsonOrForm(args, raw_string): JsonOrForm, ) -> error::JsonResult { check_queue_too_long(db, QUEUE_LIMIT_WAIT_RESULT.or(run_query.queue_limit)).await?; let script_path = script_path.to_path(); @@ -1603,6 +1647,7 @@ pub async fn run_wait_result_job_by_path( script_path_to_payload(script_path, tx.transaction_mut(), &w_id).await?; let args = run_query.add_include_headers(headers, args.unwrap_or_default()); + let args = add_raw_string(raw_string, args); let (uuid, tx) = push( tx, @@ -1644,7 +1689,7 @@ pub async fn run_wait_result_job_by_hash( Path((w_id, script_hash)): Path<(String, ScriptHash)>, Query(run_query): Query, headers: HeaderMap, - JsonOrForm(args): JsonOrForm>>, + JsonOrForm(args, raw_string): JsonOrForm, ) -> error::JsonResult { check_queue_too_long(db, run_query.queue_limit).await?; @@ -1653,6 +1698,7 @@ pub async fn run_wait_result_job_by_hash( let (path, tag) = get_path_and_tag_for_hash(tx.transaction_mut(), &w_id, hash).await?; let args = run_query.add_include_headers(headers, args.unwrap_or_default()); + let args = add_raw_string(raw_string, args); let (uuid, tx) = push( tx, @@ -1694,7 +1740,7 @@ pub async fn run_wait_result_flow_by_path( Path((w_id, flow_path)): Path<(String, StripPath)>, Query(run_query): Query, headers: HeaderMap, - JsonOrForm(args): JsonOrForm>>, + JsonOrForm(args, raw_string): JsonOrForm, ) -> error::JsonResult { check_queue_too_long(db, run_query.queue_limit).await?; @@ -1702,6 +1748,7 @@ pub async fn run_wait_result_flow_by_path( let mut tx: QueueTransaction<'_, _> = (rsmq, user_db.clone().begin(&authed).await?).into(); let scheduled_for = run_query.get_scheduled_for(tx.transaction_mut()).await?; let args = run_query.add_include_headers(headers, args.unwrap_or_default()); + let args = add_raw_string(raw_string, args); let (uuid, tx) = push( tx, @@ -1823,13 +1870,14 @@ pub async fn run_job_by_hash( Path((w_id, script_hash)): Path<(String, ScriptHash)>, Query(run_query): Query, headers: HeaderMap, - JsonOrForm(args): JsonOrForm>>, + JsonOrForm(args, raw_string): JsonOrForm, ) -> error::Result<(StatusCode, String)> { let hash = script_hash.0; let mut tx: QueueTransaction<'_, _> = (rsmq, user_db.begin(&authed).await?).into(); let (path, tag) = get_path_and_tag_for_hash(tx.transaction_mut(), &w_id, hash).await?; let scheduled_for = run_query.get_scheduled_for(tx.transaction_mut()).await?; let args = run_query.add_include_headers(headers, args.unwrap_or_default()); + let args = add_raw_string(raw_string, args); let (uuid, tx) = push( tx,