diff --git a/crates/cli/src/commands/mod.rs b/crates/cli/src/commands/mod.rs index 5d0ba6d..d5b9901 100644 --- a/crates/cli/src/commands/mod.rs +++ b/crates/cli/src/commands/mod.rs @@ -35,6 +35,7 @@ mod rb; mod replicate; mod rm; mod share; +mod sql; mod stat; mod tag; mod tree; @@ -209,6 +210,9 @@ pub enum Commands { /// Deprecated: use `rc object share` Share(share::ShareArgs), + /// Run S3 Select SQL on an object + Sql(sql::SqlArgs), + // Phase 5: Optional commands (capability-dependent) /// Deprecated: use `rc bucket version` #[command(subcommand)] @@ -239,8 +243,6 @@ pub enum Commands { // Retention(retention::RetentionArgs), // /// Watch for object events // Watch(watch::WatchArgs), - // /// Run S3 Select queries - // Sql(sql::SqlArgs), } /// Execute the CLI command and return an exit code @@ -325,6 +327,9 @@ pub async fn execute(cli: Cli) -> ExitCode { Commands::Share(args) => { share::execute(args, output_options.resolve(OutputBehavior::HumanDefault)).await } + Commands::Sql(args) => { + sql::execute(args, output_options.resolve(OutputBehavior::HumanDefault)).await + } Commands::Version(cmd) => { version::execute( version::VersionArgs { command: cmd }, diff --git a/crates/cli/src/commands/sql.rs b/crates/cli/src/commands/sql.rs new file mode 100644 index 0000000..111bae1 --- /dev/null +++ b/crates/cli/src/commands/sql.rs @@ -0,0 +1,182 @@ +//! sql command — S3 Select queries on objects. + +use clap::{Args, ValueEnum}; +use rc_core::{ + AliasManager, ObjectStore, SelectCompression, SelectInputFormat, SelectOptions, + SelectOutputFormat, parse_object_path, +}; +use rc_s3::S3Client; + +use crate::exit_code::ExitCode; +use crate::output::{Formatter, OutputConfig}; + +/// Run an S3 Select SQL query on an object and stream results to stdout. +#[derive(Args, Debug)] +pub struct SqlArgs { + /// Object path (alias/bucket/key) + pub path: String, + + /// SQL expression (S3 Select) + #[arg(long)] + pub query: String, + + /// Input object format + #[arg(long, value_enum, default_value_t = InputFormatArg::Csv)] + pub input_format: InputFormatArg, + + /// Select result format + #[arg(long, value_enum, default_value_t = OutputFormatArg::Csv)] + pub output_format: OutputFormatArg, + + /// Compression of the stored object (input decompression) + #[arg(long, value_enum, default_value_t = CompressionArg::None)] + pub compression: CompressionArg, +} + +#[derive(Clone, Copy, Debug, ValueEnum)] +pub enum InputFormatArg { + Csv, + /// Newline-delimited JSON (S3 Select `Type=LINES`). + Json, + Parquet, +} + +#[derive(Clone, Copy, Debug, ValueEnum)] +pub enum OutputFormatArg { + Csv, + Json, +} + +#[derive(Clone, Copy, Debug, ValueEnum)] +pub enum CompressionArg { + None, + Gzip, + Bzip2, +} + +impl From for SelectInputFormat { + fn from(value: InputFormatArg) -> Self { + match value { + InputFormatArg::Csv => SelectInputFormat::Csv, + InputFormatArg::Json => SelectInputFormat::Json, + InputFormatArg::Parquet => SelectInputFormat::Parquet, + } + } +} + +impl From for SelectOutputFormat { + fn from(value: OutputFormatArg) -> Self { + match value { + OutputFormatArg::Csv => SelectOutputFormat::Csv, + OutputFormatArg::Json => SelectOutputFormat::Json, + } + } +} + +impl From for SelectCompression { + fn from(value: CompressionArg) -> Self { + match value { + CompressionArg::None => SelectCompression::None, + CompressionArg::Gzip => SelectCompression::Gzip, + CompressionArg::Bzip2 => SelectCompression::Bzip2, + } + } +} + +/// Execute the `sql` command. +pub async fn execute(args: SqlArgs, output_config: OutputConfig) -> ExitCode { + let formatter = Formatter::new(output_config); + + if args.query.trim().is_empty() { + formatter.error("Query must not be empty (--query)"); + return ExitCode::UsageError; + } + + let remote = match parse_object_path(&args.path) { + Ok(p) => p, + Err(e) => { + formatter.error(&e.to_string()); + return ExitCode::UsageError; + } + }; + + let alias_manager = match AliasManager::new() { + Ok(am) => am, + Err(e) => { + formatter.error(&format!("Failed to load aliases: {e}")); + return ExitCode::GeneralError; + } + }; + + let alias = match alias_manager.get(&remote.alias) { + Ok(a) => a, + Err(_) => { + formatter.error(&format!("Alias '{}' not found", remote.alias)); + return ExitCode::NotFound; + } + }; + + let client = match S3Client::new(alias).await { + Ok(c) => c, + Err(e) => { + formatter.error(&format!("Failed to create S3 client: {e}")); + return ExitCode::NetworkError; + } + }; + + let options = SelectOptions { + expression: args.query, + input_format: args.input_format.into(), + output_format: args.output_format.into(), + compression: args.compression.into(), + }; + + let mut stdout = tokio::io::stdout(); + + match client + .select_object_content(&remote, &options, &mut stdout) + .await + { + Ok(()) => ExitCode::Success, + Err(e) => { + formatter.error(&e.to_string()); + exit_code_from_error(&e) + } + } +} + +fn exit_code_from_error(error: &rc_core::Error) -> ExitCode { + ExitCode::from_i32(error.exit_code()).unwrap_or(ExitCode::GeneralError) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::output::OutputConfig; + + #[tokio::test] + async fn sql_empty_query_is_usage_error() { + let args = SqlArgs { + path: "a/b/c".to_string(), + query: " ".to_string(), + input_format: InputFormatArg::Csv, + output_format: OutputFormatArg::Csv, + compression: CompressionArg::None, + }; + let code = execute(args, OutputConfig::default()).await; + assert_eq!(code, ExitCode::UsageError); + } + + #[tokio::test] + async fn sql_invalid_object_path_is_usage_error() { + let args = SqlArgs { + path: "a/b".to_string(), + query: "SELECT 1".to_string(), + input_format: InputFormatArg::Csv, + output_format: OutputFormatArg::Csv, + compression: CompressionArg::None, + }; + let code = execute(args, OutputConfig::default()).await; + assert_eq!(code, ExitCode::UsageError); + } +} diff --git a/crates/cli/tests/help_contract.rs b/crates/cli/tests/help_contract.rs index 45da61d..e32002f 100644 --- a/crates/cli/tests/help_contract.rs +++ b/crates/cli/tests/help_contract.rs @@ -135,6 +135,7 @@ fn top_level_command_help_contract() { "mirror", "tree", "share", + "sql", "version", "tag", "cors", @@ -323,6 +324,16 @@ fn top_level_command_help_contract() { usage: "Usage: rc share [OPTIONS] ", expected_tokens: &["--expire", "--upload", "--content-type"], }, + HelpCase { + args: &["sql"], + usage: "Usage: rc sql [OPTIONS] --query ", + expected_tokens: &[ + "--query", + "--input-format", + "--output-format", + "--compression", + ], + }, HelpCase { args: &["version"], usage: "Usage: rc version [OPTIONS] ", diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 7aeebe2..28d249f 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -18,6 +18,7 @@ pub mod lifecycle; pub mod path; pub mod replication; pub mod retry; +pub mod select; pub mod traits; pub use alias::{Alias, AliasManager}; @@ -28,12 +29,13 @@ pub use lifecycle::{ LifecycleConfiguration, LifecycleExpiration, LifecycleRule, LifecycleRuleStatus, LifecycleTransition, NoncurrentVersionExpiration, NoncurrentVersionTransition, }; -pub use path::{ParsedPath, RemotePath, parse_path}; +pub use path::{ParsedPath, RemotePath, parse_object_path, parse_path}; pub use replication::{ BucketTarget, BucketTargetCredentials, ReplicationConfiguration, ReplicationDestination, ReplicationRule, ReplicationRuleStatus, }; pub use retry::{RetryBuilder, is_retryable_error, retry_with_backoff}; +pub use select::{SelectCompression, SelectInputFormat, SelectOptions, SelectOutputFormat}; pub use traits::{ BucketNotification, Capabilities, ListOptions, ListResult, NotificationTarget, ObjectInfo, ObjectStore, ObjectVersion, diff --git a/crates/core/src/path.rs b/crates/core/src/path.rs index b80766b..825d4b7 100644 --- a/crates/core/src/path.rs +++ b/crates/core/src/path.rs @@ -214,6 +214,24 @@ pub fn parse_path(path: &str) -> Result { } } +/// Parse a remote path that must refer to a single object (`alias/bucket/key` with non-empty key). +pub fn parse_object_path(path: &str) -> Result { + match parse_path(path)? { + ParsedPath::Remote(r) => { + if r.key.is_empty() || r.is_dir { + Err(Error::InvalidPath( + "Object path required: alias/bucket/key (key must not be empty)".into(), + )) + } else { + Ok(r) + } + } + ParsedPath::Local(_) => Err(Error::InvalidPath( + "Expected remote path in the form alias/bucket/key".into(), + )), + } +} + /// Check if a string is a valid alias name fn is_valid_alias_name(name: &str) -> bool { !name.is_empty() @@ -256,6 +274,14 @@ mod tests { assert!(remote.is_dir); } + #[test] + fn test_parse_object_path_requires_non_empty_key() { + assert!(parse_object_path("myalias/bucket").is_err()); + assert!(parse_object_path("myalias/bucket/dir/").is_err()); + let r = parse_object_path("myalias/bucket/key.txt").unwrap(); + assert_eq!(r.key, "key.txt"); + } + #[test] fn test_parse_local_absolute_path() { let path = parse_path("/home/user/file.txt").unwrap(); diff --git a/crates/core/src/select.rs b/crates/core/src/select.rs new file mode 100644 index 0000000..adaede3 --- /dev/null +++ b/crates/core/src/select.rs @@ -0,0 +1,48 @@ +//! S3 Select domain types (no AWS SDK types). + +/// Object payload format for S3 Select input. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum SelectInputFormat { + #[default] + Csv, + Json, + Parquet, +} + +/// Result row format for S3 Select output. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum SelectOutputFormat { + #[default] + Csv, + Json, +} + +/// Compression applied to the **stored object** (input decompression). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum SelectCompression { + #[default] + None, + Gzip, + Bzip2, +} + +/// Options for running an S3 Select query on one object. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SelectOptions { + /// SQL expression (S3 Select / `s3object`). + pub expression: String, + pub input_format: SelectInputFormat, + pub output_format: SelectOutputFormat, + pub compression: SelectCompression, +} + +impl Default for SelectOptions { + fn default() -> Self { + Self { + expression: String::new(), + input_format: SelectInputFormat::Csv, + output_format: SelectOutputFormat::Csv, + compression: SelectCompression::None, + } + } +} diff --git a/crates/core/src/traits.rs b/crates/core/src/traits.rs index 33f9761..52d9670 100644 --- a/crates/core/src/traits.rs +++ b/crates/core/src/traits.rs @@ -8,12 +8,14 @@ use std::collections::HashMap; use async_trait::async_trait; use jiff::Timestamp; use serde::{Deserialize, Serialize}; +use tokio::io::AsyncWrite; use crate::cors::CorsRule; use crate::error::Result; use crate::lifecycle::LifecycleRule; use crate::path::RemotePath; use crate::replication::ReplicationConfiguration; +use crate::select::SelectOptions; /// Metadata for an object version #[derive(Debug, Clone, Serialize, Deserialize)] @@ -176,7 +178,10 @@ pub struct Capabilities { /// Supports anonymous bucket access policies pub anonymous: bool, - /// Supports S3 Select + /// S3 Select (`SelectObjectContent`). + /// + /// This remains `false` in generic capability hints because support is determined by issuing + /// a real request against the target object. pub select: bool, /// Supports event notifications @@ -387,6 +392,14 @@ pub trait ObjectStore: Send + Sync { /// Delete bucket CORS configuration. async fn delete_bucket_cors(&self, bucket: &str) -> Result<()>; + + /// Run S3 Select on an object and stream result payloads to `writer`. + async fn select_object_content( + &self, + path: &RemotePath, + options: &SelectOptions, + writer: &mut (dyn AsyncWrite + Send + Unpin), + ) -> Result<()>; // async fn get_versioning(&self, bucket: &str) -> Result; // async fn set_versioning(&self, bucket: &str, enabled: bool) -> Result<()>; // async fn get_tags(&self, path: &RemotePath) -> Result>; diff --git a/crates/s3/src/capability.rs b/crates/s3/src/capability.rs deleted file mode 100644 index 6163137..0000000 --- a/crates/s3/src/capability.rs +++ /dev/null @@ -1,121 +0,0 @@ -//! Capability detection for S3 backends -//! -//! Different S3-compatible backends support different features. -//! This module provides capability detection to gracefully handle -//! unsupported features. - -use rc_core::{Capabilities, Error, Result}; - -/// Detect capabilities of an S3 backend -/// -/// This function probes the backend to determine which features are supported. -/// For optional commands, we use this to determine whether to proceed or -/// return EXIT_UNSUPPORTED_FEATURE. -pub async fn detect_capabilities( - client: &aws_sdk_s3::Client, - bucket: &str, -) -> Result { - // Note: Object Lock and S3 Select detection would require additional probes - // that might have side effects. For now, we default to false and let users - // use --force if they know their backend supports these features. - let caps = Capabilities { - versioning: check_versioning(client, bucket).await, - tagging: check_tagging(client, bucket).await, - ..Default::default() - }; - - Ok(caps) -} - -/// Check if bucket versioning is supported -async fn check_versioning(client: &aws_sdk_s3::Client, bucket: &str) -> bool { - // Try to get versioning configuration - // If we get a successful response (even if versioning is not enabled), - // the backend supports versioning - client - .get_bucket_versioning() - .bucket(bucket) - .send() - .await - .is_ok() -} - -/// Check if object tagging is supported -async fn check_tagging(client: &aws_sdk_s3::Client, bucket: &str) -> bool { - // Try to get bucket tagging - // Even if no tags are set, a supported backend will return a valid response - // or a specific "no tags" error, not an unsupported operation error - match client.get_bucket_tagging().bucket(bucket).send().await { - Ok(_) => true, - Err(e) => { - let err = e.into_service_error(); - // NoSuchTagSet means tagging is supported, just no tags set - // AccessDenied might mean we don't have permission but feature exists - !err.to_string().contains("NotImplemented") - } - } -} - -/// Check if a specific operation is supported, returning appropriate error -pub fn require_capability(caps: &Capabilities, feature: &str) -> Result<()> { - let supported = match feature { - "versioning" => caps.versioning, - "object_lock" | "retention" => caps.object_lock, - "tagging" => caps.tagging, - "select" | "sql" => caps.select, - "notifications" | "watch" => caps.notifications, - "lifecycle" => caps.lifecycle, - "replication" => caps.replication, - "cors" => caps.cors, - _ => false, - }; - - if supported { - Ok(()) - } else { - Err(Error::UnsupportedFeature(format!( - "The backend does not support '{feature}'. Use --force to attempt anyway." - ))) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_require_capability_versioning() { - let caps = Capabilities { - versioning: true, - ..Default::default() - }; - assert!(require_capability(&caps, "versioning").is_ok()); - - let caps = Capabilities { - versioning: false, - ..Default::default() - }; - assert!(require_capability(&caps, "versioning").is_err()); - } - - #[test] - fn test_require_capability_cors() { - let caps = Capabilities { - cors: true, - ..Default::default() - }; - assert!(require_capability(&caps, "cors").is_ok()); - - let caps = Capabilities { - cors: false, - ..Default::default() - }; - assert!(require_capability(&caps, "cors").is_err()); - } - - #[test] - fn test_require_capability_unknown() { - let caps = Capabilities::default(); - assert!(require_capability(&caps, "unknown_feature").is_err()); - } -} diff --git a/crates/s3/src/client.rs b/crates/s3/src/client.rs index e0759b5..e7ff81c 100644 --- a/crates/s3/src/client.rs +++ b/crates/s3/src/client.rs @@ -23,7 +23,7 @@ use quick_xml::de::from_str as from_xml_str; use rc_core::{ Alias, BucketNotification, Capabilities, CorsRule, Error, LifecycleRule, ListOptions, ListResult, NotificationTarget, ObjectInfo, ObjectStore, ObjectVersion, RemotePath, - ReplicationConfiguration, Result, + ReplicationConfiguration, Result, SelectOptions, }; use reqwest::Method; use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue}; @@ -31,6 +31,7 @@ use serde::Deserialize; use sha2::{Digest, Sha256}; use std::collections::HashMap; use tokio::io::AsyncReadExt; +use tokio::io::AsyncWrite; /// Keep single-part uploads small to avoid backend incompatibilities with /// streaming aws-chunked payloads. @@ -648,7 +649,6 @@ impl HttpClient for ReqwestConnector { pub struct S3Client { inner: aws_sdk_s3::Client, xml_http_client: reqwest::Client, - #[allow(dead_code)] alias: Alias, } @@ -1725,8 +1725,8 @@ impl ObjectStore for S3Client { } async fn capabilities(&self) -> Result { - // Hardcoded optimistic defaults for common S3-compatible backends. - // In a future phase we can replace these with active detection. + // Best-effort hints for common S3-compatible backends. `select` is not inferred here + // because `rc sql` determines support from the real request result. Ok(Capabilities { versioning: true, object_lock: false, @@ -2701,6 +2701,15 @@ impl ObjectStore for S3Client { })?; Ok(()) } + + async fn select_object_content( + &self, + path: &RemotePath, + options: &SelectOptions, + writer: &mut (dyn AsyncWrite + Send + Unpin), + ) -> Result<()> { + crate::select::select_object_content(&self.inner, path, options, writer).await + } } #[cfg(test)] diff --git a/crates/s3/src/lib.rs b/crates/s3/src/lib.rs index 95554fc..67ab319 100644 --- a/crates/s3/src/lib.rs +++ b/crates/s3/src/lib.rs @@ -5,9 +5,9 @@ //! depends on the AWS SDK. pub mod admin; -pub mod capability; pub mod client; pub mod multipart; +mod select; pub use admin::AdminClient; pub use client::{DeleteRequestOptions, S3Client}; diff --git a/crates/s3/src/select.rs b/crates/s3/src/select.rs new file mode 100644 index 0000000..d9f79c8 --- /dev/null +++ b/crates/s3/src/select.rs @@ -0,0 +1,270 @@ +//! S3 Select (`SelectObjectContent`) — AWS SDK mapping and streaming. + +use aws_sdk_s3::types::{ + CompressionType, CsvInput, CsvOutput, ExpressionType, FileHeaderInfo, InputSerialization, + JsonInput, JsonOutput, JsonType, OutputSerialization, ParquetInput, QuoteFields, + SelectObjectContentEventStream, +}; +use aws_smithy_runtime_api::client::orchestrator::HttpResponse; +use aws_smithy_runtime_api::client::result::SdkError; +use aws_smithy_types::error::metadata::ProvideErrorMetadata; +use aws_smithy_types::event_stream::RawMessage; +use rc_core::{ + Error, RemotePath, Result, SelectCompression, SelectInputFormat, SelectOptions, + SelectOutputFormat, +}; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +/// Run S3 Select and write record payloads to `writer` incrementally. +pub async fn select_object_content( + client: &aws_sdk_s3::Client, + path: &RemotePath, + options: &SelectOptions, + writer: &mut (dyn AsyncWrite + Send + Unpin), +) -> Result<()> { + let input = build_input_serialization(options)?; + let output = build_output_serialization(options); + + // aws-sdk-s3 `SelectObjectContent` does not expose object `VersionId`; the current object is used. + let resp = client + .select_object_content() + .bucket(&path.bucket) + .key(&path.key) + .expression(&options.expression) + .expression_type(ExpressionType::Sql) + .input_serialization(input) + .output_serialization(output) + .send() + .await + .map_err(map_select_initial_error)?; + + let mut events = resp.payload; + while let Some(ev) = events.recv().await.map_err(map_select_stream_error)? { + match ev { + SelectObjectContentEventStream::Records(rec) => { + if let Some(blob) = rec.payload { + writer.write_all(blob.as_ref()).await.map_err(Error::Io)?; + } + } + SelectObjectContentEventStream::End(_) => break, + _ => {} + } + } + writer.flush().await.map_err(Error::Io)?; + Ok(()) +} + +fn compression_type(c: SelectCompression) -> CompressionType { + match c { + SelectCompression::None => CompressionType::None, + SelectCompression::Gzip => CompressionType::Gzip, + SelectCompression::Bzip2 => CompressionType::Bzip2, + } +} + +fn build_input_serialization(options: &SelectOptions) -> Result { + if matches!(options.input_format, SelectInputFormat::Parquet) + && !matches!(options.compression, SelectCompression::None) + { + return Err(Error::General( + "Parquet input does not support whole-object GZIP or BZIP2 compression.".to_string(), + )); + } + + let compression = compression_type(options.compression); + let mut b = InputSerialization::builder().compression_type(compression); + match options.input_format { + SelectInputFormat::Csv => { + let csv = CsvInput::builder() + .file_header_info(FileHeaderInfo::None) + .build(); + b = b.csv(csv); + } + SelectInputFormat::Json => { + // JSONL: one JSON object per line (S3 Select `Type=LINES`). + let json = JsonInput::builder().r#type(JsonType::Lines).build(); + b = b.json(json); + } + SelectInputFormat::Parquet => { + let pq = ParquetInput::builder().build(); + b = b.parquet(pq); + } + } + Ok(b.build()) +} + +fn build_output_serialization(options: &SelectOptions) -> OutputSerialization { + let mut b = OutputSerialization::builder(); + match options.output_format { + SelectOutputFormat::Csv => { + let csv = CsvOutput::builder() + .quote_fields(QuoteFields::Asneeded) + .build(); + b = b.csv(csv); + } + SelectOutputFormat::Json => { + let json = JsonOutput::builder().build(); + b = b.json(json); + } + } + b.build() +} + +fn resolve_http_service_error_code<'a, E: ProvideErrorMetadata + ?Sized>( + op_err: &'a E, + raw: &'a HttpResponse, +) -> Option<&'a str> { + op_err + .code() + .or_else(|| op_err.meta().code()) + .or_else(|| header_amz_error_code(raw)) +} + +fn header_amz_error_code(raw: &HttpResponse) -> Option<&str> { + raw.headers().get("x-amz-error-code") +} + +fn resolve_event_stream_error_code<'a, E: ProvideErrorMetadata + ?Sized>( + op_err: &'a E, + _raw: &'a RawMessage, +) -> Option<&'a str> { + op_err.code().or_else(|| op_err.meta().code()) +} + +fn map_select_initial_error( + err: SdkError< + aws_sdk_s3::operation::select_object_content::SelectObjectContentError, + HttpResponse, + >, +) -> Error { + use aws_sdk_s3::error::SdkError; + match &err { + SdkError::ServiceError(se) => { + let code = resolve_http_service_error_code(se.err(), se.raw()); + classify_aws_code(code, &err.to_string()) + } + SdkError::TimeoutError(_) => Error::Network("Request timeout".to_string()), + SdkError::DispatchFailure(e) => Error::Network(format!("Network dispatch error: {e:?}")), + SdkError::ResponseError(e) => Error::Network(format!("Response error: {e:?}")), + SdkError::ConstructionFailure(e) => Error::General(format!("Request construction: {e:?}")), + _ => Error::Network(err.to_string()), + } +} + +fn map_select_stream_error( + err: SdkError, +) -> Error { + use aws_sdk_s3::error::SdkError; + match &err { + SdkError::ServiceError(se) => { + let code = resolve_event_stream_error_code(se.err(), se.raw()); + classify_aws_code(code, &err.to_string()) + } + SdkError::TimeoutError(_) => Error::Network("Request timeout".to_string()), + SdkError::DispatchFailure(e) => Error::Network(format!("Network dispatch error: {e:?}")), + SdkError::ResponseError(e) => Error::Network(format!("Response error: {e:?}")), + SdkError::ConstructionFailure(e) => Error::General(format!("Stream construction: {e:?}")), + _ => Error::Network(err.to_string()), + } +} + +fn classify_aws_code(code: Option<&str>, text: &str) -> Error { + let c = code.filter(|s| !s.is_empty()); + match c { + Some("NoSuchKey") => Error::NotFound("Object not found".to_string()), + Some("NoSuchBucket") => Error::NotFound("Bucket not found".to_string()), + Some("AccessDenied") => Error::Auth("Access denied".to_string()), + Some("NotImplemented") => { + Error::UnsupportedFeature("The backend does not support S3 Select.".to_string()) + } + Some("InvalidArgument") => Error::General(format!("Invalid S3 Select request: {text}")), + Some(_) if text.contains("NotImplemented") => { + Error::UnsupportedFeature("The backend does not support S3 Select.".to_string()) + } + Some(_) => Error::General(text.to_string()), + None => classify_aws_code_missing_metadata(text), + } +} + +/// When the SDK did not surface `x-amz-error-code` / metadata, use minimal substring checks. +fn classify_aws_code_missing_metadata(text: &str) -> Error { + if text.contains("NotImplemented") { + return Error::UnsupportedFeature("The backend does not support S3 Select.".to_string()); + } + if text.contains("NoSuchKey") { + return Error::NotFound("Object not found".to_string()); + } + if text.contains("NoSuchBucket") { + return Error::NotFound("Bucket not found".to_string()); + } + Error::General(text.to_string()) +} + +#[cfg(test)] +mod tests { + use super::{build_input_serialization, classify_aws_code}; + use rc_core::Error; + use rc_core::{SelectCompression, SelectInputFormat, SelectOptions, SelectOutputFormat}; + + #[test] + fn classify_maps_no_such_key() { + let e = classify_aws_code(Some("NoSuchKey"), ""); + assert!(matches!(e, Error::NotFound(_))); + } + + #[test] + fn classify_maps_not_implemented() { + let e = classify_aws_code(Some("NotImplemented"), ""); + assert!(matches!(e, Error::UnsupportedFeature(_))); + } + + #[test] + fn classify_fallback_network() { + let e = classify_aws_code(Some("SlowDown"), "rate limited"); + assert!(matches!(e, Error::General(_))); + } + + #[test] + fn classify_missing_code_maps_no_such_bucket_substring() { + let e = classify_aws_code(None, "Service error: ... NoSuchBucket ..."); + assert!(matches!(e, Error::NotFound(msg) if msg.contains("Bucket"))); + } + + #[test] + fn classify_maps_invalid_argument() { + let e = classify_aws_code(Some("InvalidArgument"), "bad expr"); + assert!(matches!(e, Error::General(_))); + } + + #[test] + fn classify_missing_code_unknown_maps_general() { + let e = classify_aws_code(None, "Service error: query parsing failed"); + assert!(matches!(e, Error::General(_))); + } + + #[test] + fn parquet_rejects_whole_object_compression() { + let options = SelectOptions { + expression: "SELECT * FROM S3Object".to_string(), + input_format: SelectInputFormat::Parquet, + output_format: SelectOutputFormat::Csv, + compression: SelectCompression::Gzip, + }; + + let error = build_input_serialization(&options) + .expect_err("parquet should reject whole-object compression"); + assert!(matches!(error, Error::General(_))); + } + + #[test] + fn parquet_allows_no_compression() { + let options = SelectOptions { + expression: "SELECT * FROM S3Object".to_string(), + input_format: SelectInputFormat::Parquet, + output_format: SelectOutputFormat::Csv, + compression: SelectCompression::None, + }; + + build_input_serialization(&options).expect("parquet without whole-object compression"); + } +}