From 93afdccdd752210f2bb9d36386bbdf64f3f0588d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beltr=C3=A1n=20Rodr=C3=ADguez?= Date: Mon, 18 May 2026 10:01:09 +0000 Subject: [PATCH 1/7] =?UTF-8?q?=F0=9F=90=9E=20Windmill=20fails=20to=20dele?= =?UTF-8?q?te=20election=20event=20files=20on=20production=20AWS=20S3=20(#?= =?UTF-8?q?2522)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backport of fc2dab10d4 from release/10.0. --- packages/sequent-core/src/services/s3.rs | 461 +++++++++++++++++-- packages/sequent-core/src/util/aws.rs | 26 +- packages/windmill/src/services/celery_app.rs | 6 +- 3 files changed, 458 insertions(+), 35 deletions(-) diff --git a/packages/sequent-core/src/services/s3.rs b/packages/sequent-core/src/services/s3.rs index bb1375de63a..9c6c192fb3a 100644 --- a/packages/sequent-core/src/services/s3.rs +++ b/packages/sequent-core/src/services/s3.rs @@ -4,7 +4,8 @@ // SPDX-License-Identifier: AGPL-3.0-only use crate::util::aws::{ - get_fetch_expiration_secs, get_s3_aws_config, get_upload_expiration_secs, + get_fetch_expiration_secs, get_from_env_aws_config, get_s3_aws_config, + get_upload_expiration_secs, AWS_S3_PRIVATE_URI_ENV, AWS_S3_PUBLIC_URI_ENV, }; use crate::util::temp_path::{ generate_temp_file, get_public_assets_path_env_var, @@ -27,8 +28,206 @@ use tokio::io::{self, AsyncReadExt}; use tracing::{info, instrument}; const MAX_CHUNK_SIZE: u64 = 16 * 1024 * 1024; +const AWS_HOSTED_S3_HOST_DELIMITER: &str = ".s3."; +const AWS_HOSTED_S3_DOMAIN_SUFFIX: &str = "amazonaws.com"; +const AWS_S3_SERVICE_HOST_PREFIX: &str = "s3"; + +#[derive(Debug, PartialEq, Eq)] +struct ResolvedS3ListTargetParts { + service_endpoint: Option, + bucket: String, + prefix_root: Option, +} + +/// Carries the resolved S3 client, real bucket name, and optional logical +/// prefix root for list-style operations that must work on both MinIO and AWS. +struct ResolvedS3ListTarget { + client: s3::Client, + bucket: String, + prefix_root: Option, +} + +impl ResolvedS3ListTarget { + /// Adds the resolved logical prefix root so callers can request the same + /// effective key space regardless of the underlying endpoint shape. + fn qualify_prefix(&self, prefix: &str) -> String { + match &self.prefix_root { + Some(prefix_root) => join_s3_path(prefix_root, prefix), + None => prefix.to_string(), + } + } +} + +/// Joins S3 path fragments while normalizing slashes so generated prefixes stay +/// stable across callers. +fn join_s3_path(prefix: &str, suffix: &str) -> String { + let prefix = prefix.trim_matches('/'); + let suffix = suffix.trim_matches('/'); + + match (prefix.is_empty(), suffix.is_empty()) { + (true, true) => String::new(), + (true, false) => suffix.to_string(), + (false, true) => prefix.to_string(), + (false, false) => format!("{prefix}/{suffix}"), + } +} + +/// Detects bucket-hosted AWS endpoints and extracts the real service endpoint +/// plus bucket name so list operations can address AWS correctly. +fn parse_aws_bucket_endpoint( + endpoint_uri: &str, + aws_region: Option<&str>, +) -> Result> { + // Parse once so we can reason about the hostname shape without doing any + // string slicing against the raw env var value. + let url = reqwest::Url::parse(endpoint_uri) + .with_context(|| format!("Invalid S3 endpoint URL `{endpoint_uri}`"))?; + let host = match url.host_str() { + Some(host) => host, + None => return Ok(None), + }; + + // AWS bucket-hosted endpoints look like `.s3.amazonaws.com` or + // `.s3..amazonaws.com`. MinIO and other custom endpoints do + // not match this shape, so they must be left untouched. + let (bucket_name, service_host) = match host + .split_once(AWS_HOSTED_S3_HOST_DELIMITER) + { + Some((bucket_name, suffix)) if !bucket_name.is_empty() => { + // Only rewrite real AWS S3 hosts. This avoids accidentally + // treating custom domains as bucket-hosted AWS endpoints. + if !suffix.ends_with(AWS_HOSTED_S3_DOMAIN_SUFFIX) { + return Ok(None); + } + + let service_host = if suffix == AWS_HOSTED_S3_DOMAIN_SUFFIX { + // The global host form does not encode the bucket region. + // Prefer the resolved SDK region so SigV4 targets the + // correct regional S3 endpoint outside us-east-1. + match aws_region { + Some(region) if !region.is_empty() => format!( + "{AWS_S3_SERVICE_HOST_PREFIX}.{region}.{AWS_HOSTED_S3_DOMAIN_SUFFIX}" + ), + _ => format!( + "{AWS_S3_SERVICE_HOST_PREFIX}.{AWS_HOSTED_S3_DOMAIN_SUFFIX}" + ), + } + } else { + // Regional bucket-hosted endpoints already tell us which + // service host to talk to, so we preserve that region. + format!("{AWS_S3_SERVICE_HOST_PREFIX}.{suffix}") + }; + + (bucket_name, service_host) + } + _ => return Ok(None), + }; + + // Rebuild the endpoint URL without the bucket in the hostname. The caller + // will use the returned bucket name plus this service endpoint for list and + // delete operations that require bucket + prefix semantics on AWS. + let mut service_endpoint = format!("{}://{}", url.scheme(), service_host); + if let Some(port) = url.port() { + service_endpoint.push_str(&format!(":{port}")); + } + + Ok(Some((service_endpoint, bucket_name.to_string()))) +} + +/// Resolves the bucket and prefix semantics for a list-style S3 call without +/// constructing a client so both runtime code and tests share the same rules. +fn resolve_s3_list_target_parts( + endpoint_uri: &str, + logical_bucket: &str, + aws_region: Option<&str>, +) -> Result { + if let Some((service_endpoint, bucket_name)) = + parse_aws_bucket_endpoint(endpoint_uri, aws_region)? + { + return Ok(ResolvedS3ListTargetParts { + service_endpoint: Some(service_endpoint), + bucket: bucket_name, + prefix_root: Some(logical_bucket.trim_matches('/').to_string()), + }); + } + + Ok(ResolvedS3ListTargetParts { + service_endpoint: None, + bucket: logical_bucket.to_string(), + prefix_root: None, + }) +} + +/// Builds an S3 config for an already-resolved endpoint while keeping the same +/// credential-loading rules used by the shared AWS helpers. +fn build_s3_config_for_endpoint( + sdk_config: &aws_config::SdkConfig, + endpoint_uri: &str, +) -> s3::Config { + let access_key_result = env::var("AWS_S3_ACCESS_KEY"); + let access_secret_result = env::var("AWS_S3_ACCESS_SECRET"); + let mut builder = aws_sdk_s3::config::Builder::from(sdk_config) + .endpoint_url(endpoint_uri) + .force_path_style(true); + + if let (Ok(access_key), Ok(access_secret)) = + (access_key_result, access_secret_result) + { + if !access_key.is_empty() && !access_secret.is_empty() { + let credentials_provider = aws_sdk_s3::config::Credentials::new( + access_key, + access_secret, + None, + None, + "loaded-from-custom-env", + ); + builder = builder.credentials_provider(credentials_provider); + } + } + + builder.build() +} + +#[instrument(err)] +/// Resolves the client, bucket, and optional logical prefix root for a +/// server-side list operation. +/// +/// When `use_server_endpoint` is `false`, the helper uses the client endpoint +/// instead of the server endpoint. +async fn get_s3_list_target( + logical_bucket: &str, + use_server_endpoint: bool, +) -> Result { + let env_var_name = if use_server_endpoint { + AWS_S3_PRIVATE_URI_ENV + } else { + AWS_S3_PUBLIC_URI_ENV + }; + let endpoint_uri = env::var(env_var_name) + .with_context(|| format!("{env_var_name} must be set"))?; + let sdk_config = get_from_env_aws_config().await?; + let aws_region = sdk_config.region().map(|region| region.as_ref()); + let target_parts = resolve_s3_list_target_parts( + &endpoint_uri, + logical_bucket, + aws_region, + )?; + let resolved_endpoint = target_parts + .service_endpoint + .as_deref() + .unwrap_or(&endpoint_uri); + let config = build_s3_config_for_endpoint(&sdk_config, resolved_endpoint); + + Ok(ResolvedS3ListTarget { + client: get_s3_client(config).await?, + bucket: target_parts.bucket, + prefix_root: target_parts.prefix_root, + }) +} #[instrument(err, skip_all)] +/// Returns the logical private bucket or root prefix so callers can separate +/// storage scope from endpoint selection. pub fn get_private_bucket() -> Result { let s3_bucket = env::var("AWS_S3_BUCKET") .map_err(|err| anyhow!("AWS_S3_BUCKET must be set: {err}"))?; @@ -36,6 +235,8 @@ pub fn get_private_bucket() -> Result { } #[instrument(err, skip_all)] +/// Returns the logical public bucket or root prefix used for public assets and +/// plugin storage. pub fn get_public_bucket() -> Result { let s3_bucket = env::var("AWS_S3_PUBLIC_BUCKET") .map_err(|err| anyhow!("AWS_S3_PUBLIC_BUCKET must be set: {err}"))?; @@ -43,6 +244,8 @@ pub fn get_public_bucket() -> Result { } #[instrument(skip(client, config))] +/// Creates a bucket when running against environments that manage buckets +/// directly instead of pre-provisioning them. async fn create_bucket_if_not_exists( client: &s3::Client, config: &s3::Config, @@ -83,12 +286,16 @@ async fn create_bucket_if_not_exists( Ok(()) } +/// Wraps S3 client construction so callers rely on one place for config to +/// client conversion. pub async fn get_s3_client(config: s3::Config) -> Result { let client = s3::Client::from_conf(config); Ok(client) } #[instrument] +/// Builds the private document key layout so uploads and downloads use a +/// stable tenant and event-specific hierarchy. pub fn get_document_key( tenant_id: &str, election_event_id: Option<&str>, @@ -106,6 +313,8 @@ pub fn get_document_key( } #[instrument(skip_all)] +/// Builds the public document key layout so public assets share the same naming +/// convention as private documents. pub fn get_public_document_key( tenant_id: &str, document_id: &str, @@ -115,11 +324,13 @@ pub fn get_public_document_key( } #[instrument(err)] +/// Creates a presigned download URL for a document so clients can fetch files +/// without proxying the bytes through the backend. pub async fn get_document_url( key: String, s3_bucket: String, ) -> Result { - let config = get_s3_aws_config(/* private = */ false).await?; + let config = get_s3_aws_config(/* use_server_endpoint = */ false).await?; let client = get_s3_client(config).await?; let presigning_config = PresigningConfig::expires_in(Duration::from_secs( @@ -137,6 +348,8 @@ pub async fn get_document_url( } #[instrument(err, ret)] +/// Creates a presigned upload URL and selects the endpoint that the caller can +/// actually reach. pub async fn get_upload_url( key: String, is_public: bool, @@ -146,9 +359,10 @@ pub async fn get_upload_url( true => get_public_bucket()?, false => get_private_bucket()?, }; - // We always use the public aws config since we are generating a client-side - // upload url. is_public is only used to define the upload bucket - let config = get_s3_aws_config(/* private = */ is_local).await?; + // Select the AWS endpoint that the caller can reach: when `is_local` is true + // we use the server-only endpoint; `is_public` only determines the upload bucket. + let config = + get_s3_aws_config(/* use_server_endpoint = */ is_local).await?; let client = get_s3_client(config.clone()).await?; let presigning_config = PresigningConfig::expires_in(Duration::from_secs( @@ -165,13 +379,15 @@ pub async fn get_upload_url( } #[instrument(err, skip_all)] +/// Downloads one object into a temporary file so downstream code can work with +/// a filesystem path instead of holding the full payload in memory. pub async fn get_object_into_temp_file( s3_bucket: &str, key: &str, prefix: &str, suffix: &str, ) -> anyhow::Result { - let config = get_s3_aws_config(/* private = */ true) + let config = get_s3_aws_config(/* use_server_endpoint = */ true) .await .with_context(|| "Error obtaining aws config")?; let client = get_s3_client(config.clone()).await?; @@ -206,6 +422,8 @@ pub async fn get_object_into_temp_file( } #[instrument(err, skip_all)] +/// Uploads a file path to S3 and switches to multipart uploads only when the +/// payload is large enough to need chunking. pub async fn upload_file_to_s3( key: String, is_public: bool, @@ -253,6 +471,8 @@ pub async fn upload_file_to_s3( } #[instrument(err, skip_all)] +/// Streams a large file through S3 multipart upload so oversized reports and +/// exports do not need to be buffered at once. pub async fn upload_multipart_data_to_s3( path: &Path, key: String, @@ -358,6 +578,8 @@ pub async fn upload_multipart_data_to_s3( } #[instrument(err, skip_all)] +/// Uploads a single in-memory body to S3 for smaller files where multipart +/// upload would add unnecessary overhead. pub async fn upload_data_to_s3( data: ByteStream, key: String, @@ -398,22 +620,28 @@ pub async fn upload_data_to_s3( Ok(()) } +/// Returns the server-side MinIO URL used by backend services when they need a +/// direct path to the public bucket. pub fn get_minio_url() -> Result { - let minio_private_uri = env::var("AWS_S3_PRIVATE_URI") - .map_err(|err| anyhow!("AWS_S3_PRIVATE_URI must be set"))?; + let minio_private_uri = env::var(AWS_S3_PRIVATE_URI_ENV) + .map_err(|_err| anyhow!("AWS_S3_PRIVATE_URI must be set"))?; let bucket = get_public_bucket()?; Ok(format!("{}/{}", minio_private_uri, bucket)) } +/// Returns the client-facing MinIO URL used when generated links must be +/// reachable from outside the backend network. pub fn get_minio_public_url() -> Result { - let minio_public_uri = env::var("AWS_S3_PUBLIC_URI") - .map_err(|err| anyhow!("AWS_S3_PUBLIC_URI must be set"))?; + let minio_public_uri = env::var(AWS_S3_PUBLIC_URI_ENV) + .map_err(|_err| anyhow!("AWS_S3_PUBLIC_URI must be set"))?; let bucket = get_public_bucket()?; Ok(format!("{}/{}", minio_public_uri, bucket)) } +/// Builds the URL for a public asset stored in S3 or MinIO so templates can +/// reference it directly. pub fn get_public_asset_file_path(filename: &str) -> Result { let minio_endpoint_base = get_minio_url().with_context(|| "Error fetching get_minio_url")?; @@ -426,6 +654,8 @@ pub fn get_public_asset_file_path(filename: &str) -> Result { } #[instrument(err)] +/// Downloads a file via HTTP into a string for flows that consume public text +/// assets rather than raw S3 SDK responses. pub async fn download_s3_file_to_string(file_url: &str) -> Result { let client = reqwest::Client::new(); @@ -445,19 +675,20 @@ pub async fn download_s3_file_to_string(file_url: &str) -> Result { } #[instrument(err, ret)] +/// Deletes every object under a prefix and resolves AWS bucket-hosted endpoints +/// into the real bucket plus key prefix before listing. pub async fn delete_files_from_s3( s3_bucket: String, prefix: String, is_public: bool, ) -> Result<()> { - let config = get_s3_aws_config(!is_public) - .await - .with_context(|| "Error getting s3 aws config")?; - info!("Config acquired"); - let client = get_s3_client(config.clone()) + let resolved_target = get_s3_list_target(&s3_bucket, !is_public) .await - .with_context(|| "Error getting s3 client")?; - info!("S3 client acquired"); + .with_context(|| "Error getting s3 list target")?; + info!("S3 list target acquired"); + let list_prefix = resolved_target.qualify_prefix(&prefix); + let client = resolved_target.client; + let bucket_name = resolved_target.bucket; // First, collect all keys to delete let mut all_keys: Vec = Vec::new(); @@ -467,8 +698,8 @@ pub async fn delete_files_from_s3( info!("Listing objects"); let list_output = match client .list_objects_v2() - .bucket(s3_bucket.clone()) - .prefix(prefix.clone()) + .bucket(bucket_name.clone()) + .prefix(list_prefix.clone()) .max_keys(1000) .set_continuation_token(token.clone()) .send() @@ -508,15 +739,15 @@ pub async fn delete_files_from_s3( info!( "Collected {} objects to delete from S3 bucket '{}' with prefix '{}'", all_keys.len(), - s3_bucket, - prefix + bucket_name, + list_prefix ); // Now delete each key individually, tolerating NoSuchKey errors for key in &all_keys { match client .delete_object() - .bucket(s3_bucket.clone()) + .bucket(bucket_name.clone()) .key(key.clone()) .send() .await @@ -552,6 +783,7 @@ pub async fn delete_files_from_s3( } #[instrument(err)] +/// Downloads one object into memory when callers need its bytes immediately. pub async fn get_file_from_s3( s3_bucket: String, path: String, @@ -581,23 +813,25 @@ pub async fn get_file_from_s3( } #[instrument(err)] +/// Lists a prefix and streams each matching file into a temporary path so export +/// code can package files without buffering them all in memory. pub async fn get_files_from_s3( s3_bucket: String, prefix: String, ) -> Result> { - let config = get_s3_aws_config(true) + let resolved_target = get_s3_list_target(&s3_bucket, true) .await - .with_context(|| "Error getting s3 aws config")?; - let client = get_s3_client(config.clone()) - .await - .with_context(|| "Error getting s3 client")?; + .with_context(|| "Error getting s3 list target")?; + let list_prefix = resolved_target.qualify_prefix(&prefix); + let client = resolved_target.client; + let bucket_name = resolved_target.bucket; let mut file_paths = Vec::new(); let result = client .list_objects_v2() - .bucket(&s3_bucket) - .prefix(&prefix) + .bucket(&bucket_name) + .prefix(&list_prefix) .send() .await?; @@ -621,7 +855,7 @@ pub async fn get_files_from_s3( // Get object from S3 let s3_object = client .get_object() - .bucket(&s3_bucket) + .bucket(&bucket_name) .key(key) .send() .await?; @@ -653,3 +887,170 @@ pub async fn get_files_from_s3( Ok(file_paths) } + +#[cfg(test)] +mod tests { + use super::{ + join_s3_path, parse_aws_bucket_endpoint, resolve_s3_list_target_parts, + ResolvedS3ListTargetParts, + }; + + #[test] + fn join_s3_path_handles_empty_segments() { + assert_eq!(join_s3_path("public", "plugins/"), "public/plugins"); + assert_eq!(join_s3_path("public/", "/plugins/"), "public/plugins"); + assert_eq!(join_s3_path("", "plugins/"), "plugins"); + assert_eq!(join_s3_path("public", ""), "public"); + } + + #[test] + fn parse_region_aware_aws_bucket_endpoint() { + let parsed = parse_aws_bucket_endpoint( + "https://sequent-dev-bucket-eu-west-1-123.s3.eu-west-1.amazonaws.com", + Some("eu-west-1"), + ) + .unwrap(); + + assert_eq!( + parsed, + Some(( + "https://s3.eu-west-1.amazonaws.com".to_string(), + "sequent-dev-bucket-eu-west-1-123".to_string(), + )) + ); + } + + #[test] + fn parse_global_aws_bucket_endpoint() { + let parsed = parse_aws_bucket_endpoint( + "https://sequent-dev-bucket-eu-west-1-123.s3.amazonaws.com", + Some("eu-west-1"), + ) + .unwrap(); + + assert_eq!( + parsed, + Some(( + "https://s3.eu-west-1.amazonaws.com".to_string(), + "sequent-dev-bucket-eu-west-1-123".to_string(), + )) + ); + } + + #[test] + fn parse_global_aws_bucket_endpoint_without_region_keeps_global_host() { + let parsed = parse_aws_bucket_endpoint( + "https://sequent-dev-bucket-eu-west-1-123.s3.amazonaws.com", + None, + ) + .unwrap(); + + assert_eq!( + parsed, + Some(( + "https://s3.amazonaws.com".to_string(), + "sequent-dev-bucket-eu-west-1-123".to_string(), + )) + ); + } + + #[test] + fn ignores_non_aws_endpoints() { + let parsed = + parse_aws_bucket_endpoint("http://minio:9000", Some("eu-west-1")) + .unwrap(); + + assert_eq!(parsed, None); + } + + #[test] + fn ignores_localhost_non_aws_endpoints() { + let parsed = parse_aws_bucket_endpoint( + "http://127.0.0.1:9000", + Some("eu-west-1"), + ) + .unwrap(); + + assert_eq!(parsed, None); + } + + #[test] + fn resolves_local_private_bucket_without_rewriting() { + let resolved = resolve_s3_list_target_parts( + "http://minio:9000", + "election-event-documents", + Some("us-east-1"), + ) + .unwrap(); + + assert_eq!( + resolved, + ResolvedS3ListTargetParts { + service_endpoint: None, + bucket: "election-event-documents".to_string(), + prefix_root: None, + } + ); + } + + #[test] + fn resolves_local_public_bucket_without_rewriting() { + let resolved = resolve_s3_list_target_parts( + "http://127.0.0.1:9000", + "public", + Some("us-east-1"), + ) + .unwrap(); + + assert_eq!( + resolved, + ResolvedS3ListTargetParts { + service_endpoint: None, + bucket: "public".to_string(), + prefix_root: None, + } + ); + } + + #[test] + fn resolves_production_public_bucket_to_real_bucket_and_prefix() { + let resolved = resolve_s3_list_target_parts( + "https://sequent-dev-bucket-eu-west-1-133529410358.s3.amazonaws.com", + "public", + Some("eu-west-1"), + ) + .unwrap(); + + assert_eq!( + resolved, + ResolvedS3ListTargetParts { + service_endpoint: Some( + "https://s3.eu-west-1.amazonaws.com".to_string(), + ), + bucket: "sequent-dev-bucket-eu-west-1-133529410358".to_string(), + prefix_root: Some("public".to_string()), + } + ); + } + + #[test] + fn resolves_production_private_bucket_to_real_bucket_and_prefix() { + let resolved = resolve_s3_list_target_parts( + "https://sequent-dev-bucket-eu-west-1-133529410358.s3.amazonaws.com", + "election-event-documents", + Some("eu-west-1"), + ) + .unwrap(); + + assert_eq!( + resolved, + ResolvedS3ListTargetParts { + service_endpoint: Some( + "https://s3.eu-west-1.amazonaws.com".to_string(), + ), + bucket: "sequent-dev-bucket-eu-west-1-133529410358".to_string(), + prefix_root: Some("election-event-documents".to_string(),), + } + ); + } +} diff --git a/packages/sequent-core/src/util/aws.rs b/packages/sequent-core/src/util/aws.rs index 7f28e2e467d..b79ff91ffca 100644 --- a/packages/sequent-core/src/util/aws.rs +++ b/packages/sequent-core/src/util/aws.rs @@ -5,6 +5,11 @@ use anyhow::{anyhow, Result}; use aws_config::{meta::region::RegionProviderChain, Region, SdkConfig}; use tracing::{info, instrument}; +pub const AWS_S3_PRIVATE_URI_ENV: &str = "AWS_S3_PRIVATE_URI"; +pub const AWS_S3_PUBLIC_URI_ENV: &str = "AWS_S3_PUBLIC_URI"; + +/// Resolves the AWS region from the environment and keeps the default chain +/// as a fallback so local and deployed runtimes share the same lookup flow. pub fn get_region() -> Result { let region = RegionProviderChain::first_try(Region::new( std::env::var("AWS_REGION") @@ -16,6 +21,8 @@ pub fn get_region() -> Result { } #[instrument(err, skip_all)] +/// Loads the shared AWS SDK configuration from the process environment so S3, +/// SES, SNS, and STS all use the same credentials and region resolution. pub async fn get_from_env_aws_config() -> Result { let region = Region::new( std::env::var("AWS_REGION") @@ -25,12 +32,18 @@ pub async fn get_from_env_aws_config() -> Result { } #[instrument(err)] -pub async fn get_s3_aws_config(is_private: bool) -> Result { +/// Builds an S3 client configuration for the selected endpoint. +/// +/// When `use_server_endpoint` is `false`, the client-facing endpoint is used +/// instead of the server-side endpoint. +pub async fn get_s3_aws_config( + use_server_endpoint: bool, +) -> Result { let sdk_config = get_from_env_aws_config().await?; - let env_var_name = if is_private { - "AWS_S3_PRIVATE_URI" + let env_var_name = if use_server_endpoint { + AWS_S3_PRIVATE_URI_ENV } else { - "AWS_S3_PUBLIC_URI" + AWS_S3_PUBLIC_URI_ENV }; let access_key_result = std::env::var("AWS_S3_ACCESS_KEY"); let access_secret_result = std::env::var("AWS_S3_ACCESS_SECRET"); @@ -69,6 +82,8 @@ pub async fn get_s3_aws_config(is_private: bool) -> Result { .build()) } +/// Returns the maximum upload size so callers can reject oversized payloads +/// before opening a long-running upload flow. pub fn get_max_upload_size() -> Result { Ok(std::env::var("AWS_S3_MAX_UPLOAD_BYTES") .map_err(|err| { @@ -77,6 +92,7 @@ pub fn get_max_upload_size() -> Result { .parse()?) } +/// Returns the upload URL lifetime so presigned uploads expire predictably. pub fn get_upload_expiration_secs() -> Result { Ok(std::env::var("AWS_S3_UPLOAD_EXPIRATION_SECS") .map_err(|err| { @@ -85,6 +101,8 @@ pub fn get_upload_expiration_secs() -> Result { .parse()?) } +/// Returns the download URL lifetime so generated fetch URLs match the +/// deployment's cache and access expectations. pub fn get_fetch_expiration_secs() -> Result { Ok(std::env::var("AWS_S3_FETCH_EXPIRATION_SECS") .map_err(|err| { diff --git a/packages/windmill/src/services/celery_app.rs b/packages/windmill/src/services/celery_app.rs index 5a4d60ac932..d7794068c14 100644 --- a/packages/windmill/src/services/celery_app.rs +++ b/packages/windmill/src/services/celery_app.rs @@ -161,7 +161,11 @@ lazy_static! { /// CELERY_APP holds the high-level Celery application. Note: The Celery app is /// built separately from the Broker because it handles task routing/scheduling. static ref CELERY_APP: AsyncOnce> = - AsyncOnce::new(async { generate_celery_app().await.unwrap() }); + AsyncOnce::new(async { generate_celery_app().await.unwrap_or_else(|err| { + tracing::error!("{:#}", err); + panic!("{:#}", err); + }) + }); } /// Returns the global Celery app. From ae6e371d1b3d6c06581fd137acf3973b457d94e4 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 18 May 2026 15:14:50 +0000 Subject: [PATCH 2/7] refactor: reuse shared s3 endpoint config builder Agent-Logs-Url: https://github.com/sequentech/step/sessions/75612038-8c39-4295-a386-86175cac5f1c Co-authored-by: BelSequent <178146277+BelSequent@users.noreply.github.com> --- packages/sequent-core/src/services/s3.rs | 38 ++------------- packages/sequent-core/src/util/aws.rs | 62 +++++++++++++----------- 2 files changed, 39 insertions(+), 61 deletions(-) diff --git a/packages/sequent-core/src/services/s3.rs b/packages/sequent-core/src/services/s3.rs index 9c6c192fb3a..22607b313ab 100644 --- a/packages/sequent-core/src/services/s3.rs +++ b/packages/sequent-core/src/services/s3.rs @@ -4,8 +4,9 @@ // SPDX-License-Identifier: AGPL-3.0-only use crate::util::aws::{ - get_fetch_expiration_secs, get_from_env_aws_config, get_s3_aws_config, - get_upload_expiration_secs, AWS_S3_PRIVATE_URI_ENV, AWS_S3_PUBLIC_URI_ENV, + build_s3_aws_config_for_endpoint, get_fetch_expiration_secs, + get_from_env_aws_config, get_s3_aws_config, get_upload_expiration_secs, + AWS_S3_PRIVATE_URI_ENV, AWS_S3_PUBLIC_URI_ENV, }; use crate::util::temp_path::{ generate_temp_file, get_public_assets_path_env_var, @@ -158,36 +159,6 @@ fn resolve_s3_list_target_parts( }) } -/// Builds an S3 config for an already-resolved endpoint while keeping the same -/// credential-loading rules used by the shared AWS helpers. -fn build_s3_config_for_endpoint( - sdk_config: &aws_config::SdkConfig, - endpoint_uri: &str, -) -> s3::Config { - let access_key_result = env::var("AWS_S3_ACCESS_KEY"); - let access_secret_result = env::var("AWS_S3_ACCESS_SECRET"); - let mut builder = aws_sdk_s3::config::Builder::from(sdk_config) - .endpoint_url(endpoint_uri) - .force_path_style(true); - - if let (Ok(access_key), Ok(access_secret)) = - (access_key_result, access_secret_result) - { - if !access_key.is_empty() && !access_secret.is_empty() { - let credentials_provider = aws_sdk_s3::config::Credentials::new( - access_key, - access_secret, - None, - None, - "loaded-from-custom-env", - ); - builder = builder.credentials_provider(credentials_provider); - } - } - - builder.build() -} - #[instrument(err)] /// Resolves the client, bucket, and optional logical prefix root for a /// server-side list operation. @@ -216,7 +187,8 @@ async fn get_s3_list_target( .service_endpoint .as_deref() .unwrap_or(&endpoint_uri); - let config = build_s3_config_for_endpoint(&sdk_config, resolved_endpoint); + let config = + build_s3_aws_config_for_endpoint(&sdk_config, resolved_endpoint); Ok(ResolvedS3ListTarget { client: get_s3_client(config).await?, diff --git a/packages/sequent-core/src/util/aws.rs b/packages/sequent-core/src/util/aws.rs index b79ff91ffca..6078916bb02 100644 --- a/packages/sequent-core/src/util/aws.rs +++ b/packages/sequent-core/src/util/aws.rs @@ -31,30 +31,25 @@ pub async fn get_from_env_aws_config() -> Result { Ok(aws_config::from_env().region(region).load().await) } -#[instrument(err)] -/// Builds an S3 client configuration for the selected endpoint. -/// -/// When `use_server_endpoint` is `false`, the client-facing endpoint is used -/// instead of the server-side endpoint. -pub async fn get_s3_aws_config( - use_server_endpoint: bool, -) -> Result { - let sdk_config = get_from_env_aws_config().await?; - let env_var_name = if use_server_endpoint { - AWS_S3_PRIVATE_URI_ENV - } else { - AWS_S3_PUBLIC_URI_ENV - }; +/// Builds an S3 client configuration for an explicit endpoint URL while +/// preserving this module's credential-loading rules. +pub(crate) fn build_s3_aws_config_for_endpoint( + sdk_config: &SdkConfig, + endpoint_uri: &str, +) -> aws_sdk_s3::Config { let access_key_result = std::env::var("AWS_S3_ACCESS_KEY"); let access_secret_result = std::env::var("AWS_S3_ACCESS_SECRET"); - let endpoint_uri = std::env::var(env_var_name)?; - info!("env_var_name={env_var_name}, endpoint_uri = {endpoint_uri:?}"); + let mut builder = aws_sdk_s3::config::Builder::from(sdk_config) + .endpoint_url(endpoint_uri) + .force_path_style(true); // apply bucketname as path param instead of pre-domain + let mut using_custom_credentials = false; if let (Ok(access_key), Ok(access_secret)) = (access_key_result, access_secret_result) { - if (!access_key.is_empty() && !access_secret.is_empty()) { + if !access_key.is_empty() && !access_secret.is_empty() { info!("using provided aws access key and secret credentials"); + using_custom_credentials = true; let credentials_provider = aws_sdk_s3::config::Credentials::new( access_key, @@ -63,23 +58,34 @@ pub async fn get_s3_aws_config( None, "loaded-from-custom-env", ); - - return Ok(aws_sdk_s3::config::Builder::from(&sdk_config) - .endpoint_url(endpoint_uri) - .credentials_provider(credentials_provider) - .force_path_style(true) // apply bucketname as path param instead of pre-domain - .build()); + builder = builder.credentials_provider(credentials_provider); } // Very important: fall-through to auto detecting credentials // from the execution environment if the environment variables // were present, but empty. } - info!("using default aws sdk config credentials"); - Ok(aws_sdk_s3::config::Builder::from(&sdk_config) - .endpoint_url(endpoint_uri) - .force_path_style(true) // apply bucketname as path param instead of pre-domain - .build()) + if !using_custom_credentials { + info!("using default aws sdk config credentials"); + } + + builder.build() +} + +#[instrument(err)] +pub async fn get_s3_aws_config( + use_server_endpoint: bool, +) -> Result { + let sdk_config = get_from_env_aws_config().await?; + let env_var_name = if use_server_endpoint { + AWS_S3_PRIVATE_URI_ENV + } else { + AWS_S3_PUBLIC_URI_ENV + }; + let endpoint_uri = std::env::var(env_var_name)?; + info!("env_var_name={env_var_name}, endpoint_uri = {endpoint_uri:?}"); + + Ok(build_s3_aws_config_for_endpoint(&sdk_config, &endpoint_uri)) } /// Returns the maximum upload size so callers can reject oversized payloads From 576c5e0f7715d6be4f42a7a5d16ea32ff0e336ab Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 18 May 2026 15:15:32 +0000 Subject: [PATCH 3/7] docs: clarify shared s3 config helper usage Agent-Logs-Url: https://github.com/sequentech/step/sessions/75612038-8c39-4295-a386-86175cac5f1c Co-authored-by: BelSequent <178146277+BelSequent@users.noreply.github.com> --- packages/sequent-core/src/util/aws.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/packages/sequent-core/src/util/aws.rs b/packages/sequent-core/src/util/aws.rs index 6078916bb02..8dcf5ef2bdf 100644 --- a/packages/sequent-core/src/util/aws.rs +++ b/packages/sequent-core/src/util/aws.rs @@ -33,6 +33,16 @@ pub async fn get_from_env_aws_config() -> Result { /// Builds an S3 client configuration for an explicit endpoint URL while /// preserving this module's credential-loading rules. +/// +/// Use this helper when the caller already resolved the final endpoint URI. +/// Use [`get_s3_aws_config`] when endpoint selection should be derived from +/// `AWS_S3_PRIVATE_URI`/`AWS_S3_PUBLIC_URI`. +/// +/// - `sdk_config`: shared AWS SDK config loaded from environment/default chain. +/// - `endpoint_uri`: absolute S3-compatible endpoint URL to target. +/// +/// Returns the final S3 client config with endpoint, path-style behavior, and +/// optional explicit credentials from `AWS_S3_ACCESS_KEY`/`AWS_S3_ACCESS_SECRET`. pub(crate) fn build_s3_aws_config_for_endpoint( sdk_config: &SdkConfig, endpoint_uri: &str, From 7e4ef37dc8c2c42717c0d716f8b9fef358adcd65 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 18 May 2026 15:16:02 +0000 Subject: [PATCH 4/7] docs: restore get_s3_aws_config function docs Agent-Logs-Url: https://github.com/sequentech/step/sessions/75612038-8c39-4295-a386-86175cac5f1c Co-authored-by: BelSequent <178146277+BelSequent@users.noreply.github.com> --- packages/sequent-core/src/util/aws.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/sequent-core/src/util/aws.rs b/packages/sequent-core/src/util/aws.rs index 8dcf5ef2bdf..9b69cb0b2e8 100644 --- a/packages/sequent-core/src/util/aws.rs +++ b/packages/sequent-core/src/util/aws.rs @@ -83,6 +83,10 @@ pub(crate) fn build_s3_aws_config_for_endpoint( } #[instrument(err)] +/// Builds an S3 client configuration for the selected endpoint. +/// +/// When `use_server_endpoint` is `false`, the client-facing endpoint is used +/// instead of the server-side endpoint. pub async fn get_s3_aws_config( use_server_endpoint: bool, ) -> Result { From fdf146241c74ad60e8e1dcf6ac70933683da002e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 18 May 2026 15:20:48 +0000 Subject: [PATCH 5/7] fix: use AWS SDK error metadata for robust S3 error handling Agent-Logs-Url: https://github.com/sequentech/step/sessions/5c790503-b81c-4eb3-b202-44f39542db4d Co-authored-by: BelSequent <178146277+BelSequent@users.noreply.github.com> --- packages/sequent-core/src/services/s3.rs | 67 +++++++++++++++--------- 1 file changed, 42 insertions(+), 25 deletions(-) diff --git a/packages/sequent-core/src/services/s3.rs b/packages/sequent-core/src/services/s3.rs index 22607b313ab..8afcdb1284e 100644 --- a/packages/sequent-core/src/services/s3.rs +++ b/packages/sequent-core/src/services/s3.rs @@ -18,6 +18,7 @@ use aws_sdk_s3::types::{ CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier, }; use aws_smithy_types::byte_stream::{ByteStream, Length}; +use aws_smithy_types::error::metadata::ProvideErrorMetadata; use core::time::Duration; use s3::presigning::PresigningConfig; use std::fs::File; @@ -677,19 +678,33 @@ pub async fn delete_files_from_s3( .send() .await { - Ok(list) => { - list - // Successfully deleted - } + Ok(list) => list, Err(err) => { - // Check if it's a NoSuchKey error - let err_str = format!("{:?}", err); - if err_str.contains("NoSuchKey") { - info!("Key already absent in S3; continuing. {:?}", err); - return Ok(()); - } else { - // For other errors, fail the operation - return Err(anyhow!("{:?}", err)); + // Handle specific S3 error codes using AWS SDK error metadata + match err.code() { + Some("NoSuchBucket") => { + info!( + bucket = %bucket_name, + "Bucket does not exist; nothing to delete" + ); + return Ok(()); + } + Some("AccessDenied") => { + return Err(anyhow!( + "Access denied when listing objects in bucket '{}' with prefix '{}': {}", + bucket_name, + list_prefix, + err.message().unwrap_or("no error message") + )); + } + _ => { + return Err(anyhow!( + "Failed to list objects in bucket '{}' with prefix '{}': {}", + bucket_name, + list_prefix, + err + )); + } } } }; @@ -728,19 +743,21 @@ pub async fn delete_files_from_s3( // Successfully deleted } Err(err) => { - // Check if it's a NoSuchKey error - let err_str = format!("{:?}", err); - if err_str.contains("NoSuchKey") { - tracing::warn!( - key = %key, - "Key already absent in S3; continuing" - ); - } else { - // For other errors, fail the operation - return Err(anyhow::Error::from(err).context(format!( - "Failed to delete S3 object: {}", - key - ))); + // Handle specific S3 error codes using AWS SDK error metadata + match err.code() { + Some("NoSuchKey") => { + tracing::warn!( + key = %key, + "Key already absent in S3; continuing" + ); + } + _ => { + // For other errors, fail the operation + return Err(anyhow::Error::from(err).context(format!( + "Failed to delete S3 object: {}", + key + ))); + } } } } From 7566573af0693067175e9b28fa5f3dcbf241dce8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 18 May 2026 15:24:09 +0000 Subject: [PATCH 6/7] refine: improve error message clarity Agent-Logs-Url: https://github.com/sequentech/step/sessions/5c790503-b81c-4eb3-b202-44f39542db4d Co-authored-by: BelSequent <178146277+BelSequent@users.noreply.github.com> --- packages/sequent-core/src/services/s3.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sequent-core/src/services/s3.rs b/packages/sequent-core/src/services/s3.rs index 8afcdb1284e..b669dc08639 100644 --- a/packages/sequent-core/src/services/s3.rs +++ b/packages/sequent-core/src/services/s3.rs @@ -694,7 +694,7 @@ pub async fn delete_files_from_s3( "Access denied when listing objects in bucket '{}' with prefix '{}': {}", bucket_name, list_prefix, - err.message().unwrap_or("no error message") + err.message().unwrap_or("no additional details available") )); } _ => { From 79f997f5b50fbdbe412e778f4f974d63daa0ce1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beltr=C3=A1n=20Rodr=C3=ADguez?= Date: Tue, 19 May 2026 09:07:12 +0000 Subject: [PATCH 7/7] Review error handling and ai slop --- .../sequent-core/src/services/connection.rs | 3 +- packages/sequent-core/src/services/s3.rs | 103 +++++++++++------- .../src/services/uuid_validation.rs | 3 +- packages/sequent-core/src/util/aws.rs | 15 ++- packages/windmill/src/services/celery_app.rs | 2 +- 5 files changed, 74 insertions(+), 52 deletions(-) diff --git a/packages/sequent-core/src/services/connection.rs b/packages/sequent-core/src/services/connection.rs index 7c5f2cfdf48..560c4c779a3 100644 --- a/packages/sequent-core/src/services/connection.rs +++ b/packages/sequent-core/src/services/connection.rs @@ -202,8 +202,7 @@ struct TokenResponseExtended { /// Last access token can be reused if it´s not expired, this is to avoid /// Keycloak having to hold one token per Api request which could lead quickly -/// to many thousands of tokens. -/// +/// to many thousands of tokens.
/// Keycloak can hold multiple tokens for the same client, so we do not care /// about using the previous token if one thread read it and while didn´t send /// it yet other thread wrote it. As long as it is not expired, we can reuse it. diff --git a/packages/sequent-core/src/services/s3.rs b/packages/sequent-core/src/services/s3.rs index b669dc08639..664a92d7b1a 100644 --- a/packages/sequent-core/src/services/s3.rs +++ b/packages/sequent-core/src/services/s3.rs @@ -27,12 +27,17 @@ use std::path::{Path, PathBuf}; use std::{env, error::Error}; use tempfile::{NamedTempFile, TempPath}; use tokio::io::{self, AsyncReadExt}; -use tracing::{info, instrument}; +use tracing::{info, instrument, warn}; const MAX_CHUNK_SIZE: u64 = 16 * 1024 * 1024; const AWS_HOSTED_S3_HOST_DELIMITER: &str = ".s3."; const AWS_HOSTED_S3_DOMAIN_SUFFIX: &str = "amazonaws.com"; const AWS_S3_SERVICE_HOST_PREFIX: &str = "s3"; +const S3_LIST_MAX_KEYS: i32 = 1000; +const S3_ERR_NO_SUCH_BUCKET: &str = "NoSuchBucket"; +const S3_ERR_NO_SUCH_KEY: &str = "NoSuchKey"; +const S3_ERR_ACCESS_DENIED: &str = "AccessDenied"; +const S3_ERR_NO_DETAILS: &str = "no additional details available"; #[derive(Debug, PartialEq, Eq)] struct ResolvedS3ListTargetParts { @@ -51,7 +56,11 @@ struct ResolvedS3ListTarget { impl ResolvedS3ListTarget { /// Adds the resolved logical prefix root so callers can request the same - /// effective key space regardless of the underlying endpoint shape. + /// effective key space regardless of the underlying endpoint shape.
+ /// I.e. AWS prefix_root is "public/" or "election-event-documents/" (both + /// within the same bucket) while MinIO prefix_root is None and the + /// bucket name encodes the scope instead. + #[instrument(skip_all)] fn qualify_prefix(&self, prefix: &str) -> String { match &self.prefix_root { Some(prefix_root) => join_s3_path(prefix_root, prefix), @@ -62,6 +71,7 @@ impl ResolvedS3ListTarget { /// Joins S3 path fragments while normalizing slashes so generated prefixes stay /// stable across callers. +#[instrument(skip_all)] fn join_s3_path(prefix: &str, suffix: &str) -> String { let prefix = prefix.trim_matches('/'); let suffix = suffix.trim_matches('/'); @@ -76,6 +86,7 @@ fn join_s3_path(prefix: &str, suffix: &str) -> String { /// Detects bucket-hosted AWS endpoints and extracts the real service endpoint /// plus bucket name so list operations can address AWS correctly. +#[instrument(err, skip_all)] fn parse_aws_bucket_endpoint( endpoint_uri: &str, aws_region: Option<&str>, @@ -137,7 +148,12 @@ fn parse_aws_bucket_endpoint( } /// Resolves the bucket and prefix semantics for a list-style S3 call without -/// constructing a client so both runtime code and tests share the same rules. +/// constructing a client so both runtime code and tests share the same +/// rules.
When the endpoint is minIO (development/codespaces) the bucket +/// name is the logical bucket, then sevice_endpoint is set to None (the raw env +/// var must be set by the caller) and prefix_root is empty (is already the +/// bucket name). +#[instrument(err, skip_all)] fn resolve_s3_list_target_parts( endpoint_uri: &str, logical_bucket: &str, @@ -160,12 +176,11 @@ fn resolve_s3_list_target_parts( }) } -#[instrument(err)] /// Resolves the client, bucket, and optional logical prefix root for a -/// server-side list operation. -/// +/// server-side list operation.
/// When `use_server_endpoint` is `false`, the helper uses the client endpoint /// instead of the server endpoint. +#[instrument(err)] async fn get_s3_list_target( logical_bucket: &str, use_server_endpoint: bool, @@ -198,27 +213,27 @@ async fn get_s3_list_target( }) } -#[instrument(err, skip_all)] /// Returns the logical private bucket or root prefix so callers can separate /// storage scope from endpoint selection. +#[instrument(err, skip_all)] pub fn get_private_bucket() -> Result { let s3_bucket = env::var("AWS_S3_BUCKET") .map_err(|err| anyhow!("AWS_S3_BUCKET must be set: {err}"))?; Ok(s3_bucket) } -#[instrument(err, skip_all)] /// Returns the logical public bucket or root prefix used for public assets and /// plugin storage. +#[instrument(err, skip_all)] pub fn get_public_bucket() -> Result { let s3_bucket = env::var("AWS_S3_PUBLIC_BUCKET") .map_err(|err| anyhow!("AWS_S3_PUBLIC_BUCKET must be set: {err}"))?; Ok(s3_bucket) } -#[instrument(skip(client, config))] /// Creates a bucket when running against environments that manage buckets /// directly instead of pre-provisioning them. +#[instrument(skip(client, config))] async fn create_bucket_if_not_exists( client: &s3::Client, config: &s3::Config, @@ -266,9 +281,9 @@ pub async fn get_s3_client(config: s3::Config) -> Result { Ok(client) } -#[instrument] /// Builds the private document key layout so uploads and downloads use a /// stable tenant and event-specific hierarchy. +#[instrument] pub fn get_document_key( tenant_id: &str, election_event_id: Option<&str>, @@ -285,9 +300,9 @@ pub fn get_document_key( } } -#[instrument(skip_all)] /// Builds the public document key layout so public assets share the same naming /// convention as private documents. +#[instrument(skip_all)] pub fn get_public_document_key( tenant_id: &str, document_id: &str, @@ -296,9 +311,9 @@ pub fn get_public_document_key( format!("tenant-{}/document-{}/{}", tenant_id, document_id, name) } -#[instrument(err)] /// Creates a presigned download URL for a document so clients can fetch files /// without proxying the bytes through the backend. +#[instrument(err)] pub async fn get_document_url( key: String, s3_bucket: String, @@ -320,9 +335,9 @@ pub async fn get_document_url( Ok(presigned_request.uri().to_string()) } -#[instrument(err, ret)] /// Creates a presigned upload URL and selects the endpoint that the caller can /// actually reach. +#[instrument(err, ret)] pub async fn get_upload_url( key: String, is_public: bool, @@ -332,8 +347,9 @@ pub async fn get_upload_url( true => get_public_bucket()?, false => get_private_bucket()?, }; - // Select the AWS endpoint that the caller can reach: when `is_local` is true - // we use the server-only endpoint; `is_public` only determines the upload bucket. + // Select the AWS endpoint that the caller can reach: when `is_local` is + // true we use the server-only endpoint; `is_public` only determines the + // upload bucket. let config = get_s3_aws_config(/* use_server_endpoint = */ is_local).await?; let client = get_s3_client(config.clone()).await?; @@ -351,9 +367,9 @@ pub async fn get_upload_url( Ok(presigned_request.uri().to_string()) } -#[instrument(err, skip_all)] /// Downloads one object into a temporary file so downstream code can work with /// a filesystem path instead of holding the full payload in memory. +#[instrument(err, skip_all)] pub async fn get_object_into_temp_file( s3_bucket: &str, key: &str, @@ -394,9 +410,9 @@ pub async fn get_object_into_temp_file( Ok(temp_file) } -#[instrument(err, skip_all)] /// Uploads a file path to S3 and switches to multipart uploads only when the /// payload is large enough to need chunking. +#[instrument(err, skip_all)] pub async fn upload_file_to_s3( key: String, is_public: bool, @@ -443,9 +459,9 @@ pub async fn upload_file_to_s3( } } -#[instrument(err, skip_all)] /// Streams a large file through S3 multipart upload so oversized reports and /// exports do not need to be buffered at once. +#[instrument(err, skip_all)] pub async fn upload_multipart_data_to_s3( path: &Path, key: String, @@ -550,9 +566,9 @@ pub async fn upload_multipart_data_to_s3( Ok(()) } -#[instrument(err, skip_all)] /// Uploads a single in-memory body to S3 for smaller files where multipart /// upload would add unnecessary overhead. +#[instrument(err, skip_all)] pub async fn upload_data_to_s3( data: ByteStream, key: String, @@ -626,9 +642,9 @@ pub fn get_public_asset_file_path(filename: &str) -> Result { )) } -#[instrument(err)] /// Downloads a file via HTTP into a string for flows that consume public text /// assets rather than raw S3 SDK responses. +#[instrument(err)] pub async fn download_s3_file_to_string(file_url: &str) -> Result { let client = reqwest::Client::new(); @@ -647,9 +663,9 @@ pub async fn download_s3_file_to_string(file_url: &str) -> Result { Ok(String::from_utf8(bytes.to_vec())?) } -#[instrument(err, ret)] /// Deletes every object under a prefix and resolves AWS bucket-hosted endpoints /// into the real bucket plus key prefix before listing. +#[instrument(err, ret)] pub async fn delete_files_from_s3( s3_bucket: String, prefix: String, @@ -673,28 +689,32 @@ pub async fn delete_files_from_s3( .list_objects_v2() .bucket(bucket_name.clone()) .prefix(list_prefix.clone()) - .max_keys(1000) + .max_keys(S3_LIST_MAX_KEYS) .set_continuation_token(token.clone()) .send() .await { Ok(list) => list, Err(err) => { - // Handle specific S3 error codes using AWS SDK error metadata - match err.code() { - Some("NoSuchBucket") => { - info!( - bucket = %bucket_name, - "Bucket does not exist; nothing to delete" - ); + let code = err.code(); + if let Some(c) = code { + warn!(code = c, "S3 list_objects_v2 returned error code"); + } + // AWS can return NoSuchKey from list_objects_v2 in + // addition to the expected NoSuchBucket. + match code { + Some(c) + if c == S3_ERR_NO_SUCH_BUCKET + || c == S3_ERR_NO_SUCH_KEY => + { return Ok(()); } - Some("AccessDenied") => { + Some(c) if c == S3_ERR_ACCESS_DENIED => { return Err(anyhow!( "Access denied when listing objects in bucket '{}' with prefix '{}': {}", bucket_name, list_prefix, - err.message().unwrap_or("no additional details available") + err.message().unwrap_or(S3_ERR_NO_DETAILS) )); } _ => { @@ -743,16 +763,15 @@ pub async fn delete_files_from_s3( // Successfully deleted } Err(err) => { - // Handle specific S3 error codes using AWS SDK error metadata - match err.code() { - Some("NoSuchKey") => { - tracing::warn!( - key = %key, - "Key already absent in S3; continuing" - ); + let code = err.code(); + if let Some(c) = code { + warn!(code = c, "S3 delete_object returned error code"); + } + match code { + Some(c) if c == S3_ERR_NO_SUCH_KEY => { + warn!(key = %key, "Key already absent in S3; continuing"); } _ => { - // For other errors, fail the operation return Err(anyhow::Error::from(err).context(format!( "Failed to delete S3 object: {}", key @@ -771,8 +790,8 @@ pub async fn delete_files_from_s3( Ok(()) } -#[instrument(err)] /// Downloads one object into memory when callers need its bytes immediately. +#[instrument(err)] pub async fn get_file_from_s3( s3_bucket: String, path: String, @@ -801,9 +820,9 @@ pub async fn get_file_from_s3( Ok(result) } +/// Lists a prefix and streams each matching file into a temporary path so +/// export code can package files without buffering them all in memory. #[instrument(err)] -/// Lists a prefix and streams each matching file into a temporary path so export -/// code can package files without buffering them all in memory. pub async fn get_files_from_s3( s3_bucket: String, prefix: String, diff --git a/packages/sequent-core/src/services/uuid_validation.rs b/packages/sequent-core/src/services/uuid_validation.rs index 57275d86091..442e5a24be8 100644 --- a/packages/sequent-core/src/services/uuid_validation.rs +++ b/packages/sequent-core/src/services/uuid_validation.rs @@ -5,8 +5,7 @@ use anyhow::anyhow; use uuid::Uuid; -/// Parses and validates that the given string is a valid v4 UUID. -/// +/// Parses and validates that the given string is a valid v4 UUID.
/// Returns the parsed `Uuid` on success, or an error if the string is not /// a valid UUID or is not version 4 (random). pub fn parse_uuid_v4(value: &str) -> anyhow::Result { diff --git a/packages/sequent-core/src/util/aws.rs b/packages/sequent-core/src/util/aws.rs index 9b69cb0b2e8..9f3f0b47892 100644 --- a/packages/sequent-core/src/util/aws.rs +++ b/packages/sequent-core/src/util/aws.rs @@ -10,6 +10,7 @@ pub const AWS_S3_PUBLIC_URI_ENV: &str = "AWS_S3_PUBLIC_URI"; /// Resolves the AWS region from the environment and keeps the default chain /// as a fallback so local and deployed runtimes share the same lookup flow. +#[instrument(err, skip_all)] pub fn get_region() -> Result { let region = RegionProviderChain::first_try(Region::new( std::env::var("AWS_REGION") @@ -20,9 +21,9 @@ pub fn get_region() -> Result { Ok(region) } -#[instrument(err, skip_all)] /// Loads the shared AWS SDK configuration from the process environment so S3, /// SES, SNS, and STS all use the same credentials and region resolution. +#[instrument(err, skip_all)] pub async fn get_from_env_aws_config() -> Result { let region = Region::new( std::env::var("AWS_REGION") @@ -42,7 +43,9 @@ pub async fn get_from_env_aws_config() -> Result { /// - `endpoint_uri`: absolute S3-compatible endpoint URL to target. /// /// Returns the final S3 client config with endpoint, path-style behavior, and -/// optional explicit credentials from `AWS_S3_ACCESS_KEY`/`AWS_S3_ACCESS_SECRET`. +/// optional explicit credentials from +/// `AWS_S3_ACCESS_KEY`/`AWS_S3_ACCESS_SECRET`. +#[instrument(skip_all)] pub(crate) fn build_s3_aws_config_for_endpoint( sdk_config: &SdkConfig, endpoint_uri: &str, @@ -82,11 +85,10 @@ pub(crate) fn build_s3_aws_config_for_endpoint( builder.build() } -#[instrument(err)] -/// Builds an S3 client configuration for the selected endpoint. -/// +/// Builds an S3 client configuration for the selected endpoint.
/// When `use_server_endpoint` is `false`, the client-facing endpoint is used /// instead of the server-side endpoint. +#[instrument(err, skip_all)] pub async fn get_s3_aws_config( use_server_endpoint: bool, ) -> Result { @@ -104,6 +106,7 @@ pub async fn get_s3_aws_config( /// Returns the maximum upload size so callers can reject oversized payloads /// before opening a long-running upload flow. +#[instrument(err, skip_all)] pub fn get_max_upload_size() -> Result { Ok(std::env::var("AWS_S3_MAX_UPLOAD_BYTES") .map_err(|err| { @@ -113,6 +116,7 @@ pub fn get_max_upload_size() -> Result { } /// Returns the upload URL lifetime so presigned uploads expire predictably. +#[instrument(err, skip_all)] pub fn get_upload_expiration_secs() -> Result { Ok(std::env::var("AWS_S3_UPLOAD_EXPIRATION_SECS") .map_err(|err| { @@ -123,6 +127,7 @@ pub fn get_upload_expiration_secs() -> Result { /// Returns the download URL lifetime so generated fetch URLs match the /// deployment's cache and access expectations. +#[instrument(err, skip_all)] pub fn get_fetch_expiration_secs() -> Result { Ok(std::env::var("AWS_S3_FETCH_EXPIRATION_SECS") .map_err(|err| { diff --git a/packages/windmill/src/services/celery_app.rs b/packages/windmill/src/services/celery_app.rs index d7794068c14..58b26b28128 100644 --- a/packages/windmill/src/services/celery_app.rs +++ b/packages/windmill/src/services/celery_app.rs @@ -162,7 +162,7 @@ lazy_static! { /// built separately from the Broker because it handles task routing/scheduling. static ref CELERY_APP: AsyncOnce> = AsyncOnce::new(async { generate_celery_app().await.unwrap_or_else(|err| { - tracing::error!("{:#}", err); + tracing::error!("Error to generate celery app: {:#}", err); panic!("{:#}", err); }) });