diff --git a/packages/sequent-core/src/services/connection.rs b/packages/sequent-core/src/services/connection.rs index 7c5f2cfdf48..560c4c779a3 100644 --- a/packages/sequent-core/src/services/connection.rs +++ b/packages/sequent-core/src/services/connection.rs @@ -202,8 +202,7 @@ struct TokenResponseExtended { /// Last access token can be reused if it´s not expired, this is to avoid /// Keycloak having to hold one token per Api request which could lead quickly -/// to many thousands of tokens. -/// +/// to many thousands of tokens.
/// Keycloak can hold multiple tokens for the same client, so we do not care /// about using the previous token if one thread read it and while didn´t send /// it yet other thread wrote it. As long as it is not expired, we can reuse it. diff --git a/packages/sequent-core/src/services/s3.rs b/packages/sequent-core/src/services/s3.rs index bb1375de63a..664a92d7b1a 100644 --- a/packages/sequent-core/src/services/s3.rs +++ b/packages/sequent-core/src/services/s3.rs @@ -4,7 +4,9 @@ // SPDX-License-Identifier: AGPL-3.0-only use crate::util::aws::{ - get_fetch_expiration_secs, get_s3_aws_config, get_upload_expiration_secs, + build_s3_aws_config_for_endpoint, get_fetch_expiration_secs, + get_from_env_aws_config, get_s3_aws_config, get_upload_expiration_secs, + AWS_S3_PRIVATE_URI_ENV, AWS_S3_PUBLIC_URI_ENV, }; use crate::util::temp_path::{ generate_temp_file, get_public_assets_path_env_var, @@ -16,6 +18,7 @@ use aws_sdk_s3::types::{ CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier, }; use aws_smithy_types::byte_stream::{ByteStream, Length}; +use aws_smithy_types::error::metadata::ProvideErrorMetadata; use core::time::Duration; use s3::presigning::PresigningConfig; use std::fs::File; @@ -24,10 +27,194 @@ use std::path::{Path, PathBuf}; use std::{env, error::Error}; use tempfile::{NamedTempFile, TempPath}; use tokio::io::{self, AsyncReadExt}; -use tracing::{info, instrument}; +use tracing::{info, instrument, warn}; const MAX_CHUNK_SIZE: u64 = 16 * 1024 * 1024; +const AWS_HOSTED_S3_HOST_DELIMITER: &str = ".s3."; +const AWS_HOSTED_S3_DOMAIN_SUFFIX: &str = "amazonaws.com"; +const AWS_S3_SERVICE_HOST_PREFIX: &str = "s3"; +const S3_LIST_MAX_KEYS: i32 = 1000; +const S3_ERR_NO_SUCH_BUCKET: &str = "NoSuchBucket"; +const S3_ERR_NO_SUCH_KEY: &str = "NoSuchKey"; +const S3_ERR_ACCESS_DENIED: &str = "AccessDenied"; +const S3_ERR_NO_DETAILS: &str = "no additional details available"; + +#[derive(Debug, PartialEq, Eq)] +struct ResolvedS3ListTargetParts { + service_endpoint: Option, + bucket: String, + prefix_root: Option, +} + +/// Carries the resolved S3 client, real bucket name, and optional logical +/// prefix root for list-style operations that must work on both MinIO and AWS. +struct ResolvedS3ListTarget { + client: s3::Client, + bucket: String, + prefix_root: Option, +} + +impl ResolvedS3ListTarget { + /// Adds the resolved logical prefix root so callers can request the same + /// effective key space regardless of the underlying endpoint shape.
+ /// I.e. AWS prefix_root is "public/" or "election-event-documents/" (both + /// within the same bucket) while MinIO prefix_root is None and the + /// bucket name encodes the scope instead. + #[instrument(skip_all)] + fn qualify_prefix(&self, prefix: &str) -> String { + match &self.prefix_root { + Some(prefix_root) => join_s3_path(prefix_root, prefix), + None => prefix.to_string(), + } + } +} +/// Joins S3 path fragments while normalizing slashes so generated prefixes stay +/// stable across callers. +#[instrument(skip_all)] +fn join_s3_path(prefix: &str, suffix: &str) -> String { + let prefix = prefix.trim_matches('/'); + let suffix = suffix.trim_matches('/'); + + match (prefix.is_empty(), suffix.is_empty()) { + (true, true) => String::new(), + (true, false) => suffix.to_string(), + (false, true) => prefix.to_string(), + (false, false) => format!("{prefix}/{suffix}"), + } +} + +/// Detects bucket-hosted AWS endpoints and extracts the real service endpoint +/// plus bucket name so list operations can address AWS correctly. +#[instrument(err, skip_all)] +fn parse_aws_bucket_endpoint( + endpoint_uri: &str, + aws_region: Option<&str>, +) -> Result> { + // Parse once so we can reason about the hostname shape without doing any + // string slicing against the raw env var value. + let url = reqwest::Url::parse(endpoint_uri) + .with_context(|| format!("Invalid S3 endpoint URL `{endpoint_uri}`"))?; + let host = match url.host_str() { + Some(host) => host, + None => return Ok(None), + }; + + // AWS bucket-hosted endpoints look like `.s3.amazonaws.com` or + // `.s3..amazonaws.com`. MinIO and other custom endpoints do + // not match this shape, so they must be left untouched. + let (bucket_name, service_host) = match host + .split_once(AWS_HOSTED_S3_HOST_DELIMITER) + { + Some((bucket_name, suffix)) if !bucket_name.is_empty() => { + // Only rewrite real AWS S3 hosts. This avoids accidentally + // treating custom domains as bucket-hosted AWS endpoints. + if !suffix.ends_with(AWS_HOSTED_S3_DOMAIN_SUFFIX) { + return Ok(None); + } + + let service_host = if suffix == AWS_HOSTED_S3_DOMAIN_SUFFIX { + // The global host form does not encode the bucket region. + // Prefer the resolved SDK region so SigV4 targets the + // correct regional S3 endpoint outside us-east-1. + match aws_region { + Some(region) if !region.is_empty() => format!( + "{AWS_S3_SERVICE_HOST_PREFIX}.{region}.{AWS_HOSTED_S3_DOMAIN_SUFFIX}" + ), + _ => format!( + "{AWS_S3_SERVICE_HOST_PREFIX}.{AWS_HOSTED_S3_DOMAIN_SUFFIX}" + ), + } + } else { + // Regional bucket-hosted endpoints already tell us which + // service host to talk to, so we preserve that region. + format!("{AWS_S3_SERVICE_HOST_PREFIX}.{suffix}") + }; + + (bucket_name, service_host) + } + _ => return Ok(None), + }; + + // Rebuild the endpoint URL without the bucket in the hostname. The caller + // will use the returned bucket name plus this service endpoint for list and + // delete operations that require bucket + prefix semantics on AWS. + let mut service_endpoint = format!("{}://{}", url.scheme(), service_host); + if let Some(port) = url.port() { + service_endpoint.push_str(&format!(":{port}")); + } + + Ok(Some((service_endpoint, bucket_name.to_string()))) +} + +/// Resolves the bucket and prefix semantics for a list-style S3 call without +/// constructing a client so both runtime code and tests share the same +/// rules.
When the endpoint is minIO (development/codespaces) the bucket +/// name is the logical bucket, then sevice_endpoint is set to None (the raw env +/// var must be set by the caller) and prefix_root is empty (is already the +/// bucket name). +#[instrument(err, skip_all)] +fn resolve_s3_list_target_parts( + endpoint_uri: &str, + logical_bucket: &str, + aws_region: Option<&str>, +) -> Result { + if let Some((service_endpoint, bucket_name)) = + parse_aws_bucket_endpoint(endpoint_uri, aws_region)? + { + return Ok(ResolvedS3ListTargetParts { + service_endpoint: Some(service_endpoint), + bucket: bucket_name, + prefix_root: Some(logical_bucket.trim_matches('/').to_string()), + }); + } + + Ok(ResolvedS3ListTargetParts { + service_endpoint: None, + bucket: logical_bucket.to_string(), + prefix_root: None, + }) +} + +/// Resolves the client, bucket, and optional logical prefix root for a +/// server-side list operation.
+/// When `use_server_endpoint` is `false`, the helper uses the client endpoint +/// instead of the server endpoint. +#[instrument(err)] +async fn get_s3_list_target( + logical_bucket: &str, + use_server_endpoint: bool, +) -> Result { + let env_var_name = if use_server_endpoint { + AWS_S3_PRIVATE_URI_ENV + } else { + AWS_S3_PUBLIC_URI_ENV + }; + let endpoint_uri = env::var(env_var_name) + .with_context(|| format!("{env_var_name} must be set"))?; + let sdk_config = get_from_env_aws_config().await?; + let aws_region = sdk_config.region().map(|region| region.as_ref()); + let target_parts = resolve_s3_list_target_parts( + &endpoint_uri, + logical_bucket, + aws_region, + )?; + let resolved_endpoint = target_parts + .service_endpoint + .as_deref() + .unwrap_or(&endpoint_uri); + let config = + build_s3_aws_config_for_endpoint(&sdk_config, resolved_endpoint); + + Ok(ResolvedS3ListTarget { + client: get_s3_client(config).await?, + bucket: target_parts.bucket, + prefix_root: target_parts.prefix_root, + }) +} + +/// Returns the logical private bucket or root prefix so callers can separate +/// storage scope from endpoint selection. #[instrument(err, skip_all)] pub fn get_private_bucket() -> Result { let s3_bucket = env::var("AWS_S3_BUCKET") @@ -35,6 +222,8 @@ pub fn get_private_bucket() -> Result { Ok(s3_bucket) } +/// Returns the logical public bucket or root prefix used for public assets and +/// plugin storage. #[instrument(err, skip_all)] pub fn get_public_bucket() -> Result { let s3_bucket = env::var("AWS_S3_PUBLIC_BUCKET") @@ -42,6 +231,8 @@ pub fn get_public_bucket() -> Result { Ok(s3_bucket) } +/// Creates a bucket when running against environments that manage buckets +/// directly instead of pre-provisioning them. #[instrument(skip(client, config))] async fn create_bucket_if_not_exists( client: &s3::Client, @@ -83,11 +274,15 @@ async fn create_bucket_if_not_exists( Ok(()) } +/// Wraps S3 client construction so callers rely on one place for config to +/// client conversion. pub async fn get_s3_client(config: s3::Config) -> Result { let client = s3::Client::from_conf(config); Ok(client) } +/// Builds the private document key layout so uploads and downloads use a +/// stable tenant and event-specific hierarchy. #[instrument] pub fn get_document_key( tenant_id: &str, @@ -105,6 +300,8 @@ pub fn get_document_key( } } +/// Builds the public document key layout so public assets share the same naming +/// convention as private documents. #[instrument(skip_all)] pub fn get_public_document_key( tenant_id: &str, @@ -114,12 +311,14 @@ pub fn get_public_document_key( format!("tenant-{}/document-{}/{}", tenant_id, document_id, name) } +/// Creates a presigned download URL for a document so clients can fetch files +/// without proxying the bytes through the backend. #[instrument(err)] pub async fn get_document_url( key: String, s3_bucket: String, ) -> Result { - let config = get_s3_aws_config(/* private = */ false).await?; + let config = get_s3_aws_config(/* use_server_endpoint = */ false).await?; let client = get_s3_client(config).await?; let presigning_config = PresigningConfig::expires_in(Duration::from_secs( @@ -136,6 +335,8 @@ pub async fn get_document_url( Ok(presigned_request.uri().to_string()) } +/// Creates a presigned upload URL and selects the endpoint that the caller can +/// actually reach. #[instrument(err, ret)] pub async fn get_upload_url( key: String, @@ -146,9 +347,11 @@ pub async fn get_upload_url( true => get_public_bucket()?, false => get_private_bucket()?, }; - // We always use the public aws config since we are generating a client-side - // upload url. is_public is only used to define the upload bucket - let config = get_s3_aws_config(/* private = */ is_local).await?; + // Select the AWS endpoint that the caller can reach: when `is_local` is + // true we use the server-only endpoint; `is_public` only determines the + // upload bucket. + let config = + get_s3_aws_config(/* use_server_endpoint = */ is_local).await?; let client = get_s3_client(config.clone()).await?; let presigning_config = PresigningConfig::expires_in(Duration::from_secs( @@ -164,6 +367,8 @@ pub async fn get_upload_url( Ok(presigned_request.uri().to_string()) } +/// Downloads one object into a temporary file so downstream code can work with +/// a filesystem path instead of holding the full payload in memory. #[instrument(err, skip_all)] pub async fn get_object_into_temp_file( s3_bucket: &str, @@ -171,7 +376,7 @@ pub async fn get_object_into_temp_file( prefix: &str, suffix: &str, ) -> anyhow::Result { - let config = get_s3_aws_config(/* private = */ true) + let config = get_s3_aws_config(/* use_server_endpoint = */ true) .await .with_context(|| "Error obtaining aws config")?; let client = get_s3_client(config.clone()).await?; @@ -205,6 +410,8 @@ pub async fn get_object_into_temp_file( Ok(temp_file) } +/// Uploads a file path to S3 and switches to multipart uploads only when the +/// payload is large enough to need chunking. #[instrument(err, skip_all)] pub async fn upload_file_to_s3( key: String, @@ -252,6 +459,8 @@ pub async fn upload_file_to_s3( } } +/// Streams a large file through S3 multipart upload so oversized reports and +/// exports do not need to be buffered at once. #[instrument(err, skip_all)] pub async fn upload_multipart_data_to_s3( path: &Path, @@ -357,6 +566,8 @@ pub async fn upload_multipart_data_to_s3( Ok(()) } +/// Uploads a single in-memory body to S3 for smaller files where multipart +/// upload would add unnecessary overhead. #[instrument(err, skip_all)] pub async fn upload_data_to_s3( data: ByteStream, @@ -398,22 +609,28 @@ pub async fn upload_data_to_s3( Ok(()) } +/// Returns the server-side MinIO URL used by backend services when they need a +/// direct path to the public bucket. pub fn get_minio_url() -> Result { - let minio_private_uri = env::var("AWS_S3_PRIVATE_URI") - .map_err(|err| anyhow!("AWS_S3_PRIVATE_URI must be set"))?; + let minio_private_uri = env::var(AWS_S3_PRIVATE_URI_ENV) + .map_err(|_err| anyhow!("AWS_S3_PRIVATE_URI must be set"))?; let bucket = get_public_bucket()?; Ok(format!("{}/{}", minio_private_uri, bucket)) } +/// Returns the client-facing MinIO URL used when generated links must be +/// reachable from outside the backend network. pub fn get_minio_public_url() -> Result { - let minio_public_uri = env::var("AWS_S3_PUBLIC_URI") - .map_err(|err| anyhow!("AWS_S3_PUBLIC_URI must be set"))?; + let minio_public_uri = env::var(AWS_S3_PUBLIC_URI_ENV) + .map_err(|_err| anyhow!("AWS_S3_PUBLIC_URI must be set"))?; let bucket = get_public_bucket()?; Ok(format!("{}/{}", minio_public_uri, bucket)) } +/// Builds the URL for a public asset stored in S3 or MinIO so templates can +/// reference it directly. pub fn get_public_asset_file_path(filename: &str) -> Result { let minio_endpoint_base = get_minio_url().with_context(|| "Error fetching get_minio_url")?; @@ -425,6 +642,8 @@ pub fn get_public_asset_file_path(filename: &str) -> Result { )) } +/// Downloads a file via HTTP into a string for flows that consume public text +/// assets rather than raw S3 SDK responses. #[instrument(err)] pub async fn download_s3_file_to_string(file_url: &str) -> Result { let client = reqwest::Client::new(); @@ -444,20 +663,21 @@ pub async fn download_s3_file_to_string(file_url: &str) -> Result { Ok(String::from_utf8(bytes.to_vec())?) } +/// Deletes every object under a prefix and resolves AWS bucket-hosted endpoints +/// into the real bucket plus key prefix before listing. #[instrument(err, ret)] pub async fn delete_files_from_s3( s3_bucket: String, prefix: String, is_public: bool, ) -> Result<()> { - let config = get_s3_aws_config(!is_public) - .await - .with_context(|| "Error getting s3 aws config")?; - info!("Config acquired"); - let client = get_s3_client(config.clone()) + let resolved_target = get_s3_list_target(&s3_bucket, !is_public) .await - .with_context(|| "Error getting s3 client")?; - info!("S3 client acquired"); + .with_context(|| "Error getting s3 list target")?; + info!("S3 list target acquired"); + let list_prefix = resolved_target.qualify_prefix(&prefix); + let client = resolved_target.client; + let bucket_name = resolved_target.bucket; // First, collect all keys to delete let mut all_keys: Vec = Vec::new(); @@ -467,26 +687,44 @@ pub async fn delete_files_from_s3( info!("Listing objects"); let list_output = match client .list_objects_v2() - .bucket(s3_bucket.clone()) - .prefix(prefix.clone()) - .max_keys(1000) + .bucket(bucket_name.clone()) + .prefix(list_prefix.clone()) + .max_keys(S3_LIST_MAX_KEYS) .set_continuation_token(token.clone()) .send() .await { - Ok(list) => { - list - // Successfully deleted - } + Ok(list) => list, Err(err) => { - // Check if it's a NoSuchKey error - let err_str = format!("{:?}", err); - if err_str.contains("NoSuchKey") { - info!("Key already absent in S3; continuing. {:?}", err); - return Ok(()); - } else { - // For other errors, fail the operation - return Err(anyhow!("{:?}", err)); + let code = err.code(); + if let Some(c) = code { + warn!(code = c, "S3 list_objects_v2 returned error code"); + } + // AWS can return NoSuchKey from list_objects_v2 in + // addition to the expected NoSuchBucket. + match code { + Some(c) + if c == S3_ERR_NO_SUCH_BUCKET + || c == S3_ERR_NO_SUCH_KEY => + { + return Ok(()); + } + Some(c) if c == S3_ERR_ACCESS_DENIED => { + return Err(anyhow!( + "Access denied when listing objects in bucket '{}' with prefix '{}': {}", + bucket_name, + list_prefix, + err.message().unwrap_or(S3_ERR_NO_DETAILS) + )); + } + _ => { + return Err(anyhow!( + "Failed to list objects in bucket '{}' with prefix '{}': {}", + bucket_name, + list_prefix, + err + )); + } } } }; @@ -508,15 +746,15 @@ pub async fn delete_files_from_s3( info!( "Collected {} objects to delete from S3 bucket '{}' with prefix '{}'", all_keys.len(), - s3_bucket, - prefix + bucket_name, + list_prefix ); // Now delete each key individually, tolerating NoSuchKey errors for key in &all_keys { match client .delete_object() - .bucket(s3_bucket.clone()) + .bucket(bucket_name.clone()) .key(key.clone()) .send() .await @@ -525,19 +763,20 @@ pub async fn delete_files_from_s3( // Successfully deleted } Err(err) => { - // Check if it's a NoSuchKey error - let err_str = format!("{:?}", err); - if err_str.contains("NoSuchKey") { - tracing::warn!( - key = %key, - "Key already absent in S3; continuing" - ); - } else { - // For other errors, fail the operation - return Err(anyhow::Error::from(err).context(format!( - "Failed to delete S3 object: {}", - key - ))); + let code = err.code(); + if let Some(c) = code { + warn!(code = c, "S3 delete_object returned error code"); + } + match code { + Some(c) if c == S3_ERR_NO_SUCH_KEY => { + warn!(key = %key, "Key already absent in S3; continuing"); + } + _ => { + return Err(anyhow::Error::from(err).context(format!( + "Failed to delete S3 object: {}", + key + ))); + } } } } @@ -551,6 +790,7 @@ pub async fn delete_files_from_s3( Ok(()) } +/// Downloads one object into memory when callers need its bytes immediately. #[instrument(err)] pub async fn get_file_from_s3( s3_bucket: String, @@ -580,24 +820,26 @@ pub async fn get_file_from_s3( Ok(result) } +/// Lists a prefix and streams each matching file into a temporary path so +/// export code can package files without buffering them all in memory. #[instrument(err)] pub async fn get_files_from_s3( s3_bucket: String, prefix: String, ) -> Result> { - let config = get_s3_aws_config(true) + let resolved_target = get_s3_list_target(&s3_bucket, true) .await - .with_context(|| "Error getting s3 aws config")?; - let client = get_s3_client(config.clone()) - .await - .with_context(|| "Error getting s3 client")?; + .with_context(|| "Error getting s3 list target")?; + let list_prefix = resolved_target.qualify_prefix(&prefix); + let client = resolved_target.client; + let bucket_name = resolved_target.bucket; let mut file_paths = Vec::new(); let result = client .list_objects_v2() - .bucket(&s3_bucket) - .prefix(&prefix) + .bucket(&bucket_name) + .prefix(&list_prefix) .send() .await?; @@ -621,7 +863,7 @@ pub async fn get_files_from_s3( // Get object from S3 let s3_object = client .get_object() - .bucket(&s3_bucket) + .bucket(&bucket_name) .key(key) .send() .await?; @@ -653,3 +895,170 @@ pub async fn get_files_from_s3( Ok(file_paths) } + +#[cfg(test)] +mod tests { + use super::{ + join_s3_path, parse_aws_bucket_endpoint, resolve_s3_list_target_parts, + ResolvedS3ListTargetParts, + }; + + #[test] + fn join_s3_path_handles_empty_segments() { + assert_eq!(join_s3_path("public", "plugins/"), "public/plugins"); + assert_eq!(join_s3_path("public/", "/plugins/"), "public/plugins"); + assert_eq!(join_s3_path("", "plugins/"), "plugins"); + assert_eq!(join_s3_path("public", ""), "public"); + } + + #[test] + fn parse_region_aware_aws_bucket_endpoint() { + let parsed = parse_aws_bucket_endpoint( + "https://sequent-dev-bucket-eu-west-1-123.s3.eu-west-1.amazonaws.com", + Some("eu-west-1"), + ) + .unwrap(); + + assert_eq!( + parsed, + Some(( + "https://s3.eu-west-1.amazonaws.com".to_string(), + "sequent-dev-bucket-eu-west-1-123".to_string(), + )) + ); + } + + #[test] + fn parse_global_aws_bucket_endpoint() { + let parsed = parse_aws_bucket_endpoint( + "https://sequent-dev-bucket-eu-west-1-123.s3.amazonaws.com", + Some("eu-west-1"), + ) + .unwrap(); + + assert_eq!( + parsed, + Some(( + "https://s3.eu-west-1.amazonaws.com".to_string(), + "sequent-dev-bucket-eu-west-1-123".to_string(), + )) + ); + } + + #[test] + fn parse_global_aws_bucket_endpoint_without_region_keeps_global_host() { + let parsed = parse_aws_bucket_endpoint( + "https://sequent-dev-bucket-eu-west-1-123.s3.amazonaws.com", + None, + ) + .unwrap(); + + assert_eq!( + parsed, + Some(( + "https://s3.amazonaws.com".to_string(), + "sequent-dev-bucket-eu-west-1-123".to_string(), + )) + ); + } + + #[test] + fn ignores_non_aws_endpoints() { + let parsed = + parse_aws_bucket_endpoint("http://minio:9000", Some("eu-west-1")) + .unwrap(); + + assert_eq!(parsed, None); + } + + #[test] + fn ignores_localhost_non_aws_endpoints() { + let parsed = parse_aws_bucket_endpoint( + "http://127.0.0.1:9000", + Some("eu-west-1"), + ) + .unwrap(); + + assert_eq!(parsed, None); + } + + #[test] + fn resolves_local_private_bucket_without_rewriting() { + let resolved = resolve_s3_list_target_parts( + "http://minio:9000", + "election-event-documents", + Some("us-east-1"), + ) + .unwrap(); + + assert_eq!( + resolved, + ResolvedS3ListTargetParts { + service_endpoint: None, + bucket: "election-event-documents".to_string(), + prefix_root: None, + } + ); + } + + #[test] + fn resolves_local_public_bucket_without_rewriting() { + let resolved = resolve_s3_list_target_parts( + "http://127.0.0.1:9000", + "public", + Some("us-east-1"), + ) + .unwrap(); + + assert_eq!( + resolved, + ResolvedS3ListTargetParts { + service_endpoint: None, + bucket: "public".to_string(), + prefix_root: None, + } + ); + } + + #[test] + fn resolves_production_public_bucket_to_real_bucket_and_prefix() { + let resolved = resolve_s3_list_target_parts( + "https://sequent-dev-bucket-eu-west-1-133529410358.s3.amazonaws.com", + "public", + Some("eu-west-1"), + ) + .unwrap(); + + assert_eq!( + resolved, + ResolvedS3ListTargetParts { + service_endpoint: Some( + "https://s3.eu-west-1.amazonaws.com".to_string(), + ), + bucket: "sequent-dev-bucket-eu-west-1-133529410358".to_string(), + prefix_root: Some("public".to_string()), + } + ); + } + + #[test] + fn resolves_production_private_bucket_to_real_bucket_and_prefix() { + let resolved = resolve_s3_list_target_parts( + "https://sequent-dev-bucket-eu-west-1-133529410358.s3.amazonaws.com", + "election-event-documents", + Some("eu-west-1"), + ) + .unwrap(); + + assert_eq!( + resolved, + ResolvedS3ListTargetParts { + service_endpoint: Some( + "https://s3.eu-west-1.amazonaws.com".to_string(), + ), + bucket: "sequent-dev-bucket-eu-west-1-133529410358".to_string(), + prefix_root: Some("election-event-documents".to_string(),), + } + ); + } +} diff --git a/packages/sequent-core/src/services/uuid_validation.rs b/packages/sequent-core/src/services/uuid_validation.rs index 57275d86091..442e5a24be8 100644 --- a/packages/sequent-core/src/services/uuid_validation.rs +++ b/packages/sequent-core/src/services/uuid_validation.rs @@ -5,8 +5,7 @@ use anyhow::anyhow; use uuid::Uuid; -/// Parses and validates that the given string is a valid v4 UUID. -/// +/// Parses and validates that the given string is a valid v4 UUID.
/// Returns the parsed `Uuid` on success, or an error if the string is not /// a valid UUID or is not version 4 (random). pub fn parse_uuid_v4(value: &str) -> anyhow::Result { diff --git a/packages/sequent-core/src/util/aws.rs b/packages/sequent-core/src/util/aws.rs index 7f28e2e467d..9f3f0b47892 100644 --- a/packages/sequent-core/src/util/aws.rs +++ b/packages/sequent-core/src/util/aws.rs @@ -5,6 +5,12 @@ use anyhow::{anyhow, Result}; use aws_config::{meta::region::RegionProviderChain, Region, SdkConfig}; use tracing::{info, instrument}; +pub const AWS_S3_PRIVATE_URI_ENV: &str = "AWS_S3_PRIVATE_URI"; +pub const AWS_S3_PUBLIC_URI_ENV: &str = "AWS_S3_PUBLIC_URI"; + +/// Resolves the AWS region from the environment and keeps the default chain +/// as a fallback so local and deployed runtimes share the same lookup flow. +#[instrument(err, skip_all)] pub fn get_region() -> Result { let region = RegionProviderChain::first_try(Region::new( std::env::var("AWS_REGION") @@ -15,6 +21,8 @@ pub fn get_region() -> Result { Ok(region) } +/// Loads the shared AWS SDK configuration from the process environment so S3, +/// SES, SNS, and STS all use the same credentials and region resolution. #[instrument(err, skip_all)] pub async fn get_from_env_aws_config() -> Result { let region = Region::new( @@ -24,24 +32,37 @@ pub async fn get_from_env_aws_config() -> Result { Ok(aws_config::from_env().region(region).load().await) } -#[instrument(err)] -pub async fn get_s3_aws_config(is_private: bool) -> Result { - let sdk_config = get_from_env_aws_config().await?; - let env_var_name = if is_private { - "AWS_S3_PRIVATE_URI" - } else { - "AWS_S3_PUBLIC_URI" - }; +/// Builds an S3 client configuration for an explicit endpoint URL while +/// preserving this module's credential-loading rules. +/// +/// Use this helper when the caller already resolved the final endpoint URI. +/// Use [`get_s3_aws_config`] when endpoint selection should be derived from +/// `AWS_S3_PRIVATE_URI`/`AWS_S3_PUBLIC_URI`. +/// +/// - `sdk_config`: shared AWS SDK config loaded from environment/default chain. +/// - `endpoint_uri`: absolute S3-compatible endpoint URL to target. +/// +/// Returns the final S3 client config with endpoint, path-style behavior, and +/// optional explicit credentials from +/// `AWS_S3_ACCESS_KEY`/`AWS_S3_ACCESS_SECRET`. +#[instrument(skip_all)] +pub(crate) fn build_s3_aws_config_for_endpoint( + sdk_config: &SdkConfig, + endpoint_uri: &str, +) -> aws_sdk_s3::Config { let access_key_result = std::env::var("AWS_S3_ACCESS_KEY"); let access_secret_result = std::env::var("AWS_S3_ACCESS_SECRET"); - let endpoint_uri = std::env::var(env_var_name)?; - info!("env_var_name={env_var_name}, endpoint_uri = {endpoint_uri:?}"); + let mut builder = aws_sdk_s3::config::Builder::from(sdk_config) + .endpoint_url(endpoint_uri) + .force_path_style(true); // apply bucketname as path param instead of pre-domain + let mut using_custom_credentials = false; if let (Ok(access_key), Ok(access_secret)) = (access_key_result, access_secret_result) { - if (!access_key.is_empty() && !access_secret.is_empty()) { + if !access_key.is_empty() && !access_secret.is_empty() { info!("using provided aws access key and secret credentials"); + using_custom_credentials = true; let credentials_provider = aws_sdk_s3::config::Credentials::new( access_key, @@ -50,25 +71,42 @@ pub async fn get_s3_aws_config(is_private: bool) -> Result { None, "loaded-from-custom-env", ); - - return Ok(aws_sdk_s3::config::Builder::from(&sdk_config) - .endpoint_url(endpoint_uri) - .credentials_provider(credentials_provider) - .force_path_style(true) // apply bucketname as path param instead of pre-domain - .build()); + builder = builder.credentials_provider(credentials_provider); } // Very important: fall-through to auto detecting credentials // from the execution environment if the environment variables // were present, but empty. } - info!("using default aws sdk config credentials"); - Ok(aws_sdk_s3::config::Builder::from(&sdk_config) - .endpoint_url(endpoint_uri) - .force_path_style(true) // apply bucketname as path param instead of pre-domain - .build()) + if !using_custom_credentials { + info!("using default aws sdk config credentials"); + } + + builder.build() } +/// Builds an S3 client configuration for the selected endpoint.
+/// When `use_server_endpoint` is `false`, the client-facing endpoint is used +/// instead of the server-side endpoint. +#[instrument(err, skip_all)] +pub async fn get_s3_aws_config( + use_server_endpoint: bool, +) -> Result { + let sdk_config = get_from_env_aws_config().await?; + let env_var_name = if use_server_endpoint { + AWS_S3_PRIVATE_URI_ENV + } else { + AWS_S3_PUBLIC_URI_ENV + }; + let endpoint_uri = std::env::var(env_var_name)?; + info!("env_var_name={env_var_name}, endpoint_uri = {endpoint_uri:?}"); + + Ok(build_s3_aws_config_for_endpoint(&sdk_config, &endpoint_uri)) +} + +/// Returns the maximum upload size so callers can reject oversized payloads +/// before opening a long-running upload flow. +#[instrument(err, skip_all)] pub fn get_max_upload_size() -> Result { Ok(std::env::var("AWS_S3_MAX_UPLOAD_BYTES") .map_err(|err| { @@ -77,6 +115,8 @@ pub fn get_max_upload_size() -> Result { .parse()?) } +/// Returns the upload URL lifetime so presigned uploads expire predictably. +#[instrument(err, skip_all)] pub fn get_upload_expiration_secs() -> Result { Ok(std::env::var("AWS_S3_UPLOAD_EXPIRATION_SECS") .map_err(|err| { @@ -85,6 +125,9 @@ pub fn get_upload_expiration_secs() -> Result { .parse()?) } +/// Returns the download URL lifetime so generated fetch URLs match the +/// deployment's cache and access expectations. +#[instrument(err, skip_all)] pub fn get_fetch_expiration_secs() -> Result { Ok(std::env::var("AWS_S3_FETCH_EXPIRATION_SECS") .map_err(|err| { diff --git a/packages/windmill/src/services/celery_app.rs b/packages/windmill/src/services/celery_app.rs index 5a4d60ac932..58b26b28128 100644 --- a/packages/windmill/src/services/celery_app.rs +++ b/packages/windmill/src/services/celery_app.rs @@ -161,7 +161,11 @@ lazy_static! { /// CELERY_APP holds the high-level Celery application. Note: The Celery app is /// built separately from the Broker because it handles task routing/scheduling. static ref CELERY_APP: AsyncOnce> = - AsyncOnce::new(async { generate_celery_app().await.unwrap() }); + AsyncOnce::new(async { generate_celery_app().await.unwrap_or_else(|err| { + tracing::error!("Error to generate celery app: {:#}", err); + panic!("{:#}", err); + }) + }); } /// Returns the global Celery app.