From b10bb45607d182f659e56ce75df62624e868a5e1 Mon Sep 17 00:00:00 2001 From: jhw Date: Sat, 21 Mar 2026 19:39:56 +0800 Subject: [PATCH 1/3] feat(phase-2): add sql command with S3 Select and ObjectStore hooks Made-with: Cursor --- crates/cli/src/commands/mod.rs | 9 +- crates/cli/src/commands/sql.rs | 208 +++++++++++++++++++++ crates/cli/tests/help_contract.rs | 12 ++ crates/core/src/lib.rs | 4 +- crates/core/src/path.rs | 26 +++ crates/core/src/select.rs | 48 +++++ crates/core/src/traits.rs | 18 +- crates/s3/src/capability.rs | 121 ------------ crates/s3/src/client.rs | 21 ++- crates/s3/src/lib.rs | 2 +- crates/s3/src/select.rs | 299 ++++++++++++++++++++++++++++++ 11 files changed, 638 insertions(+), 130 deletions(-) create mode 100644 crates/cli/src/commands/sql.rs create mode 100644 crates/core/src/select.rs delete mode 100644 crates/s3/src/capability.rs create mode 100644 crates/s3/src/select.rs diff --git a/crates/cli/src/commands/mod.rs b/crates/cli/src/commands/mod.rs index 5d0ba6d..d5b9901 100644 --- a/crates/cli/src/commands/mod.rs +++ b/crates/cli/src/commands/mod.rs @@ -35,6 +35,7 @@ mod rb; mod replicate; mod rm; mod share; +mod sql; mod stat; mod tag; mod tree; @@ -209,6 +210,9 @@ pub enum Commands { /// Deprecated: use `rc object share` Share(share::ShareArgs), + /// Run S3 Select SQL on an object + Sql(sql::SqlArgs), + // Phase 5: Optional commands (capability-dependent) /// Deprecated: use `rc bucket version` #[command(subcommand)] @@ -239,8 +243,6 @@ pub enum Commands { // Retention(retention::RetentionArgs), // /// Watch for object events // Watch(watch::WatchArgs), - // /// Run S3 Select queries - // Sql(sql::SqlArgs), } /// Execute the CLI command and return an exit code @@ -325,6 +327,9 @@ pub async fn execute(cli: Cli) -> ExitCode { Commands::Share(args) => { share::execute(args, output_options.resolve(OutputBehavior::HumanDefault)).await } + Commands::Sql(args) => { + sql::execute(args, output_options.resolve(OutputBehavior::HumanDefault)).await + } Commands::Version(cmd) => { version::execute( version::VersionArgs { command: cmd }, diff --git a/crates/cli/src/commands/sql.rs b/crates/cli/src/commands/sql.rs new file mode 100644 index 0000000..f09dbfb --- /dev/null +++ b/crates/cli/src/commands/sql.rs @@ -0,0 +1,208 @@ +//! sql command — S3 Select queries on objects. +//! +//! Unless `--force` is set, this command issues a lightweight `SelectObjectContent` probe against +//! a non-existent key in the target bucket before running your query, so support is detected +//! reliably (extra request; may incur cost on some backends). + +use clap::{Args, ValueEnum}; +use rc_core::{ + AliasManager, ObjectStore, SelectCompression, SelectInputFormat, SelectOptions, + SelectOutputFormat, parse_object_path, +}; +use rc_s3::S3Client; + +use crate::exit_code::ExitCode; +use crate::output::{Formatter, OutputConfig}; + +/// Run an S3 Select SQL query on an object and stream results to stdout. +#[derive(Args, Debug)] +pub struct SqlArgs { + /// Object path (alias/bucket/key) + pub path: String, + + /// SQL expression (S3 Select) + #[arg(long)] + pub query: String, + + /// Input object format + #[arg(long, value_enum, default_value_t = InputFormatArg::Csv)] + pub input_format: InputFormatArg, + + /// Select result format + #[arg(long, value_enum, default_value_t = OutputFormatArg::Csv)] + pub output_format: OutputFormatArg, + + /// Compression of the stored object (input decompression) + #[arg(long, value_enum, default_value_t = CompressionArg::None)] + pub compression: CompressionArg, + + /// Skip the pre-flight Select probe (one fewer `SelectObjectContent` call); use only if you + /// trust the backend supports S3 Select or probe fails incorrectly + #[arg(long)] + pub force: bool, +} + +#[derive(Clone, Copy, Debug, ValueEnum)] +pub enum InputFormatArg { + Csv, + /// Newline-delimited JSON (S3 Select `Type=LINES`). + Json, + Parquet, +} + +#[derive(Clone, Copy, Debug, ValueEnum)] +pub enum OutputFormatArg { + Csv, + Json, +} + +#[derive(Clone, Copy, Debug, ValueEnum)] +pub enum CompressionArg { + None, + Gzip, + Bzip2, +} + +impl From for SelectInputFormat { + fn from(value: InputFormatArg) -> Self { + match value { + InputFormatArg::Csv => SelectInputFormat::Csv, + InputFormatArg::Json => SelectInputFormat::Json, + InputFormatArg::Parquet => SelectInputFormat::Parquet, + } + } +} + +impl From for SelectOutputFormat { + fn from(value: OutputFormatArg) -> Self { + match value { + OutputFormatArg::Csv => SelectOutputFormat::Csv, + OutputFormatArg::Json => SelectOutputFormat::Json, + } + } +} + +impl From for SelectCompression { + fn from(value: CompressionArg) -> Self { + match value { + CompressionArg::None => SelectCompression::None, + CompressionArg::Gzip => SelectCompression::Gzip, + CompressionArg::Bzip2 => SelectCompression::Bzip2, + } + } +} + +/// Execute the `sql` command. +pub async fn execute(args: SqlArgs, output_config: OutputConfig) -> ExitCode { + let formatter = Formatter::new(output_config); + + if args.query.trim().is_empty() { + formatter.error("Query must not be empty (--query)"); + return ExitCode::UsageError; + } + + let remote = match parse_object_path(&args.path) { + Ok(p) => p, + Err(e) => { + formatter.error(&e.to_string()); + return ExitCode::UsageError; + } + }; + + let alias_manager = match AliasManager::new() { + Ok(am) => am, + Err(e) => { + formatter.error(&format!("Failed to load aliases: {e}")); + return ExitCode::GeneralError; + } + }; + + let alias = match alias_manager.get(&remote.alias) { + Ok(a) => a, + Err(_) => { + formatter.error(&format!("Alias '{}' not found", remote.alias)); + return ExitCode::NotFound; + } + }; + + let client = match S3Client::new(alias).await { + Ok(c) => c, + Err(e) => { + formatter.error(&format!("Failed to create S3 client: {e}")); + return ExitCode::NetworkError; + } + }; + + if !args.force { + match client.probe_select_support(&remote.bucket).await { + Ok(true) => {} + Ok(false) => { + formatter + .error("Backend does not support S3 Select. Use --force to attempt anyway."); + return ExitCode::UnsupportedFeature; + } + Err(e) => { + formatter.error(&format!("Failed to probe S3 Select support: {e}")); + return exit_code_from_error(&e); + } + } + } + + let options = SelectOptions { + expression: args.query, + input_format: args.input_format.into(), + output_format: args.output_format.into(), + compression: args.compression.into(), + }; + + let mut stdout = tokio::io::stdout(); + + match client + .select_object_content(&remote, &options, &mut stdout) + .await + { + Ok(()) => ExitCode::Success, + Err(e) => { + formatter.error(&e.to_string()); + exit_code_from_error(&e) + } + } +} + +fn exit_code_from_error(error: &rc_core::Error) -> ExitCode { + ExitCode::from_i32(error.exit_code()).unwrap_or(ExitCode::GeneralError) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::output::OutputConfig; + + #[tokio::test] + async fn sql_empty_query_is_usage_error() { + let args = SqlArgs { + path: "a/b/c".to_string(), + query: " ".to_string(), + input_format: InputFormatArg::Csv, + output_format: OutputFormatArg::Csv, + compression: CompressionArg::None, + force: false, + }; + let code = execute(args, OutputConfig::default()).await; + assert_eq!(code, ExitCode::UsageError); + } + + #[tokio::test] + async fn sql_invalid_object_path_is_usage_error() { + let args = SqlArgs { + path: "a/b".to_string(), + query: "SELECT 1".to_string(), + input_format: InputFormatArg::Csv, + output_format: OutputFormatArg::Csv, + compression: CompressionArg::None, + force: false, + }; + let code = execute(args, OutputConfig::default()).await; + assert_eq!(code, ExitCode::UsageError); + } +} diff --git a/crates/cli/tests/help_contract.rs b/crates/cli/tests/help_contract.rs index 45da61d..6d6e94c 100644 --- a/crates/cli/tests/help_contract.rs +++ b/crates/cli/tests/help_contract.rs @@ -135,6 +135,7 @@ fn top_level_command_help_contract() { "mirror", "tree", "share", + "sql", "version", "tag", "cors", @@ -323,6 +324,17 @@ fn top_level_command_help_contract() { usage: "Usage: rc share [OPTIONS] ", expected_tokens: &["--expire", "--upload", "--content-type"], }, + HelpCase { + args: &["sql"], + usage: "Usage: rc sql [OPTIONS] --query ", + expected_tokens: &[ + "--query", + "--input-format", + "--output-format", + "--compression", + "--force", + ], + }, HelpCase { args: &["version"], usage: "Usage: rc version [OPTIONS] ", diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 7aeebe2..28d249f 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -18,6 +18,7 @@ pub mod lifecycle; pub mod path; pub mod replication; pub mod retry; +pub mod select; pub mod traits; pub use alias::{Alias, AliasManager}; @@ -28,12 +29,13 @@ pub use lifecycle::{ LifecycleConfiguration, LifecycleExpiration, LifecycleRule, LifecycleRuleStatus, LifecycleTransition, NoncurrentVersionExpiration, NoncurrentVersionTransition, }; -pub use path::{ParsedPath, RemotePath, parse_path}; +pub use path::{ParsedPath, RemotePath, parse_object_path, parse_path}; pub use replication::{ BucketTarget, BucketTargetCredentials, ReplicationConfiguration, ReplicationDestination, ReplicationRule, ReplicationRuleStatus, }; pub use retry::{RetryBuilder, is_retryable_error, retry_with_backoff}; +pub use select::{SelectCompression, SelectInputFormat, SelectOptions, SelectOutputFormat}; pub use traits::{ BucketNotification, Capabilities, ListOptions, ListResult, NotificationTarget, ObjectInfo, ObjectStore, ObjectVersion, diff --git a/crates/core/src/path.rs b/crates/core/src/path.rs index b80766b..825d4b7 100644 --- a/crates/core/src/path.rs +++ b/crates/core/src/path.rs @@ -214,6 +214,24 @@ pub fn parse_path(path: &str) -> Result { } } +/// Parse a remote path that must refer to a single object (`alias/bucket/key` with non-empty key). +pub fn parse_object_path(path: &str) -> Result { + match parse_path(path)? { + ParsedPath::Remote(r) => { + if r.key.is_empty() || r.is_dir { + Err(Error::InvalidPath( + "Object path required: alias/bucket/key (key must not be empty)".into(), + )) + } else { + Ok(r) + } + } + ParsedPath::Local(_) => Err(Error::InvalidPath( + "Expected remote path in the form alias/bucket/key".into(), + )), + } +} + /// Check if a string is a valid alias name fn is_valid_alias_name(name: &str) -> bool { !name.is_empty() @@ -256,6 +274,14 @@ mod tests { assert!(remote.is_dir); } + #[test] + fn test_parse_object_path_requires_non_empty_key() { + assert!(parse_object_path("myalias/bucket").is_err()); + assert!(parse_object_path("myalias/bucket/dir/").is_err()); + let r = parse_object_path("myalias/bucket/key.txt").unwrap(); + assert_eq!(r.key, "key.txt"); + } + #[test] fn test_parse_local_absolute_path() { let path = parse_path("/home/user/file.txt").unwrap(); diff --git a/crates/core/src/select.rs b/crates/core/src/select.rs new file mode 100644 index 0000000..adaede3 --- /dev/null +++ b/crates/core/src/select.rs @@ -0,0 +1,48 @@ +//! S3 Select domain types (no AWS SDK types). + +/// Object payload format for S3 Select input. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum SelectInputFormat { + #[default] + Csv, + Json, + Parquet, +} + +/// Result row format for S3 Select output. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum SelectOutputFormat { + #[default] + Csv, + Json, +} + +/// Compression applied to the **stored object** (input decompression). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum SelectCompression { + #[default] + None, + Gzip, + Bzip2, +} + +/// Options for running an S3 Select query on one object. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SelectOptions { + /// SQL expression (S3 Select / `s3object`). + pub expression: String, + pub input_format: SelectInputFormat, + pub output_format: SelectOutputFormat, + pub compression: SelectCompression, +} + +impl Default for SelectOptions { + fn default() -> Self { + Self { + expression: String::new(), + input_format: SelectInputFormat::Csv, + output_format: SelectOutputFormat::Csv, + compression: SelectCompression::None, + } + } +} diff --git a/crates/core/src/traits.rs b/crates/core/src/traits.rs index 33f9761..926d6f1 100644 --- a/crates/core/src/traits.rs +++ b/crates/core/src/traits.rs @@ -8,12 +8,14 @@ use std::collections::HashMap; use async_trait::async_trait; use jiff::Timestamp; use serde::{Deserialize, Serialize}; +use tokio::io::AsyncWrite; use crate::cors::CorsRule; use crate::error::Result; use crate::lifecycle::LifecycleRule; use crate::path::RemotePath; use crate::replication::ReplicationConfiguration; +use crate::select::SelectOptions; /// Metadata for an object version #[derive(Debug, Clone, Serialize, Deserialize)] @@ -176,7 +178,8 @@ pub struct Capabilities { /// Supports anonymous bucket access policies pub anonymous: bool, - /// Supports S3 Select + /// S3 Select (`SelectObjectContent`). **Not set by [`ObjectStore::capabilities`]** (it stays + /// `false` there); use [`ObjectStore::probe_select_support`] to detect support per bucket. pub select: bool, /// Supports event notifications @@ -387,6 +390,19 @@ pub trait ObjectStore: Send + Sync { /// Delete bucket CORS configuration. async fn delete_bucket_cors(&self, bucket: &str) -> Result<()>; + + /// Whether this store supports S3 Select for `bucket` (`SelectObjectContent`). + /// + /// Implementations that do not support Select should return `Ok(false)`. + async fn probe_select_support(&self, bucket: &str) -> Result; + + /// Run S3 Select on an object and stream result payloads to `writer`. + async fn select_object_content( + &self, + path: &RemotePath, + options: &SelectOptions, + writer: &mut (dyn AsyncWrite + Send + Unpin), + ) -> Result<()>; // async fn get_versioning(&self, bucket: &str) -> Result; // async fn set_versioning(&self, bucket: &str, enabled: bool) -> Result<()>; // async fn get_tags(&self, path: &RemotePath) -> Result>; diff --git a/crates/s3/src/capability.rs b/crates/s3/src/capability.rs deleted file mode 100644 index 6163137..0000000 --- a/crates/s3/src/capability.rs +++ /dev/null @@ -1,121 +0,0 @@ -//! Capability detection for S3 backends -//! -//! Different S3-compatible backends support different features. -//! This module provides capability detection to gracefully handle -//! unsupported features. - -use rc_core::{Capabilities, Error, Result}; - -/// Detect capabilities of an S3 backend -/// -/// This function probes the backend to determine which features are supported. -/// For optional commands, we use this to determine whether to proceed or -/// return EXIT_UNSUPPORTED_FEATURE. -pub async fn detect_capabilities( - client: &aws_sdk_s3::Client, - bucket: &str, -) -> Result { - // Note: Object Lock and S3 Select detection would require additional probes - // that might have side effects. For now, we default to false and let users - // use --force if they know their backend supports these features. - let caps = Capabilities { - versioning: check_versioning(client, bucket).await, - tagging: check_tagging(client, bucket).await, - ..Default::default() - }; - - Ok(caps) -} - -/// Check if bucket versioning is supported -async fn check_versioning(client: &aws_sdk_s3::Client, bucket: &str) -> bool { - // Try to get versioning configuration - // If we get a successful response (even if versioning is not enabled), - // the backend supports versioning - client - .get_bucket_versioning() - .bucket(bucket) - .send() - .await - .is_ok() -} - -/// Check if object tagging is supported -async fn check_tagging(client: &aws_sdk_s3::Client, bucket: &str) -> bool { - // Try to get bucket tagging - // Even if no tags are set, a supported backend will return a valid response - // or a specific "no tags" error, not an unsupported operation error - match client.get_bucket_tagging().bucket(bucket).send().await { - Ok(_) => true, - Err(e) => { - let err = e.into_service_error(); - // NoSuchTagSet means tagging is supported, just no tags set - // AccessDenied might mean we don't have permission but feature exists - !err.to_string().contains("NotImplemented") - } - } -} - -/// Check if a specific operation is supported, returning appropriate error -pub fn require_capability(caps: &Capabilities, feature: &str) -> Result<()> { - let supported = match feature { - "versioning" => caps.versioning, - "object_lock" | "retention" => caps.object_lock, - "tagging" => caps.tagging, - "select" | "sql" => caps.select, - "notifications" | "watch" => caps.notifications, - "lifecycle" => caps.lifecycle, - "replication" => caps.replication, - "cors" => caps.cors, - _ => false, - }; - - if supported { - Ok(()) - } else { - Err(Error::UnsupportedFeature(format!( - "The backend does not support '{feature}'. Use --force to attempt anyway." - ))) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_require_capability_versioning() { - let caps = Capabilities { - versioning: true, - ..Default::default() - }; - assert!(require_capability(&caps, "versioning").is_ok()); - - let caps = Capabilities { - versioning: false, - ..Default::default() - }; - assert!(require_capability(&caps, "versioning").is_err()); - } - - #[test] - fn test_require_capability_cors() { - let caps = Capabilities { - cors: true, - ..Default::default() - }; - assert!(require_capability(&caps, "cors").is_ok()); - - let caps = Capabilities { - cors: false, - ..Default::default() - }; - assert!(require_capability(&caps, "cors").is_err()); - } - - #[test] - fn test_require_capability_unknown() { - let caps = Capabilities::default(); - assert!(require_capability(&caps, "unknown_feature").is_err()); - } -} diff --git a/crates/s3/src/client.rs b/crates/s3/src/client.rs index e0759b5..7306249 100644 --- a/crates/s3/src/client.rs +++ b/crates/s3/src/client.rs @@ -23,7 +23,7 @@ use quick_xml::de::from_str as from_xml_str; use rc_core::{ Alias, BucketNotification, Capabilities, CorsRule, Error, LifecycleRule, ListOptions, ListResult, NotificationTarget, ObjectInfo, ObjectStore, ObjectVersion, RemotePath, - ReplicationConfiguration, Result, + ReplicationConfiguration, Result, SelectOptions, }; use reqwest::Method; use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue}; @@ -31,6 +31,7 @@ use serde::Deserialize; use sha2::{Digest, Sha256}; use std::collections::HashMap; use tokio::io::AsyncReadExt; +use tokio::io::AsyncWrite; /// Keep single-part uploads small to avoid backend incompatibilities with /// streaming aws-chunked payloads. @@ -648,7 +649,6 @@ impl HttpClient for ReqwestConnector { pub struct S3Client { inner: aws_sdk_s3::Client, xml_http_client: reqwest::Client, - #[allow(dead_code)] alias: Alias, } @@ -1725,8 +1725,8 @@ impl ObjectStore for S3Client { } async fn capabilities(&self) -> Result { - // Hardcoded optimistic defaults for common S3-compatible backends. - // In a future phase we can replace these with active detection. + // Best-effort hints for common S3-compatible backends. `select` is not inferred here; + // use [`ObjectStore::probe_select_support`] (e.g. from `rc sql`). Ok(Capabilities { versioning: true, object_lock: false, @@ -2701,6 +2701,19 @@ impl ObjectStore for S3Client { })?; Ok(()) } + + async fn probe_select_support(&self, bucket: &str) -> Result { + crate::select::probe_select_support(&self.inner, bucket).await + } + + async fn select_object_content( + &self, + path: &RemotePath, + options: &SelectOptions, + writer: &mut (dyn AsyncWrite + Send + Unpin), + ) -> Result<()> { + crate::select::select_object_content(&self.inner, path, options, writer).await + } } #[cfg(test)] diff --git a/crates/s3/src/lib.rs b/crates/s3/src/lib.rs index 95554fc..67ab319 100644 --- a/crates/s3/src/lib.rs +++ b/crates/s3/src/lib.rs @@ -5,9 +5,9 @@ //! depends on the AWS SDK. pub mod admin; -pub mod capability; pub mod client; pub mod multipart; +mod select; pub use admin::AdminClient; pub use client::{DeleteRequestOptions, S3Client}; diff --git a/crates/s3/src/select.rs b/crates/s3/src/select.rs new file mode 100644 index 0000000..af7e206 --- /dev/null +++ b/crates/s3/src/select.rs @@ -0,0 +1,299 @@ +//! S3 Select (`SelectObjectContent`) — AWS SDK mapping and streaming. + +use aws_sdk_s3::types::{ + CompressionType, CsvInput, CsvOutput, ExpressionType, FileHeaderInfo, InputSerialization, + JsonInput, JsonOutput, JsonType, OutputSerialization, ParquetInput, QuoteFields, + SelectObjectContentEventStream, +}; +use aws_smithy_runtime_api::client::orchestrator::HttpResponse; +use aws_smithy_runtime_api::client::result::SdkError; +use aws_smithy_types::error::metadata::ProvideErrorMetadata; +use aws_smithy_types::event_stream::RawMessage; +use rc_core::{ + Error, RemotePath, Result, SelectCompression, SelectInputFormat, SelectOptions, + SelectOutputFormat, +}; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +/// Run S3 Select and write record payloads to `writer` incrementally. +pub async fn select_object_content( + client: &aws_sdk_s3::Client, + path: &RemotePath, + options: &SelectOptions, + writer: &mut (dyn AsyncWrite + Send + Unpin), +) -> Result<()> { + let input = build_input_serialization(options); + let output = build_output_serialization(options); + + // aws-sdk-s3 `SelectObjectContent` does not expose object `VersionId`; the current object is used. + let resp = client + .select_object_content() + .bucket(&path.bucket) + .key(&path.key) + .expression(&options.expression) + .expression_type(ExpressionType::Sql) + .input_serialization(input) + .output_serialization(output) + .send() + .await + .map_err(map_select_initial_error)?; + + let mut events = resp.payload; + while let Some(ev) = events.recv().await.map_err(map_select_stream_error)? { + match ev { + SelectObjectContentEventStream::Records(rec) => { + if let Some(blob) = rec.payload { + writer.write_all(blob.as_ref()).await.map_err(Error::Io)?; + } + } + SelectObjectContentEventStream::End(_) => break, + _ => {} + } + } + writer.flush().await.map_err(Error::Io)?; + Ok(()) +} + +/// Probe whether the bucket supports `SelectObjectContent` (lightweight; uses a non-existent key). +pub async fn probe_select_support(client: &aws_sdk_s3::Client, bucket: &str) -> Result { + let probe_path = RemotePath::new("_", bucket, "__rc_select_probe__/object-does-not-exist"); + let opts = SelectOptions { + expression: "SELECT s._1 FROM S3Object s LIMIT 0".to_string(), + input_format: SelectInputFormat::Csv, + output_format: SelectOutputFormat::Csv, + compression: SelectCompression::None, + }; + let mut sink = tokio::io::sink(); + match select_object_content(client, &probe_path, &opts, &mut sink).await { + Ok(()) => Ok(true), + Err(e) => classify_probe_or_pass_through(e), + } +} + +fn classify_probe_or_pass_through(err: Error) -> Result { + match err { + Error::UnsupportedFeature(_) => Ok(false), + Error::NotFound(msg) => { + if msg.to_ascii_lowercase().contains("bucket") { + Err(Error::NotFound(msg)) + } else { + // NoSuchKey: Select was accepted; object is missing. + Ok(true) + } + } + Error::Network(ref msg) if probe_network_implies_unsupported(msg) => Ok(false), + other => Err(other), + } +} + +/// Narrow fallback when [`classify_aws_code`] could not classify (missing `x-amz-error-code`). +fn probe_network_implies_unsupported(msg: &str) -> bool { + msg.contains("(code: NotImplemented)") || msg.contains("code: NotImplemented") +} + +fn compression_type(c: SelectCompression) -> CompressionType { + match c { + SelectCompression::None => CompressionType::None, + SelectCompression::Gzip => CompressionType::Gzip, + SelectCompression::Bzip2 => CompressionType::Bzip2, + } +} + +fn build_input_serialization(options: &SelectOptions) -> InputSerialization { + let compression = compression_type(options.compression); + let mut b = InputSerialization::builder().compression_type(compression); + match options.input_format { + SelectInputFormat::Csv => { + let csv = CsvInput::builder() + .file_header_info(FileHeaderInfo::None) + .build(); + b = b.csv(csv); + } + SelectInputFormat::Json => { + // JSONL: one JSON object per line (S3 Select `Type=LINES`). + let json = JsonInput::builder().r#type(JsonType::Lines).build(); + b = b.json(json); + } + SelectInputFormat::Parquet => { + let pq = ParquetInput::builder().build(); + b = b.parquet(pq); + } + } + b.build() +} + +fn build_output_serialization(options: &SelectOptions) -> OutputSerialization { + let mut b = OutputSerialization::builder(); + match options.output_format { + SelectOutputFormat::Csv => { + let csv = CsvOutput::builder() + .quote_fields(QuoteFields::Asneeded) + .build(); + b = b.csv(csv); + } + SelectOutputFormat::Json => { + let json = JsonOutput::builder().build(); + b = b.json(json); + } + } + b.build() +} + +fn resolve_http_service_error_code<'a, E: ProvideErrorMetadata + ?Sized>( + op_err: &'a E, + raw: &'a HttpResponse, +) -> Option<&'a str> { + op_err + .code() + .or_else(|| op_err.meta().code()) + .or_else(|| header_amz_error_code(raw)) +} + +fn header_amz_error_code(raw: &HttpResponse) -> Option<&str> { + raw.headers().get("x-amz-error-code") +} + +fn resolve_event_stream_error_code<'a, E: ProvideErrorMetadata + ?Sized>( + op_err: &'a E, + _raw: &'a RawMessage, +) -> Option<&'a str> { + op_err.code().or_else(|| op_err.meta().code()) +} + +fn map_select_initial_error( + err: SdkError< + aws_sdk_s3::operation::select_object_content::SelectObjectContentError, + HttpResponse, + >, +) -> Error { + use aws_sdk_s3::error::SdkError; + match &err { + SdkError::ServiceError(se) => { + let code = resolve_http_service_error_code(se.err(), se.raw()); + classify_aws_code(code, &err.to_string()) + } + SdkError::TimeoutError(_) => Error::Network("Request timeout".to_string()), + SdkError::DispatchFailure(e) => Error::Network(format!("Network dispatch error: {e:?}")), + SdkError::ResponseError(e) => Error::Network(format!("Response error: {e:?}")), + SdkError::ConstructionFailure(e) => Error::General(format!("Request construction: {e:?}")), + _ => Error::Network(err.to_string()), + } +} + +fn map_select_stream_error( + err: SdkError, +) -> Error { + use aws_sdk_s3::error::SdkError; + match &err { + SdkError::ServiceError(se) => { + let code = resolve_event_stream_error_code(se.err(), se.raw()); + classify_aws_code(code, &err.to_string()) + } + SdkError::TimeoutError(_) => Error::Network("Request timeout".to_string()), + SdkError::DispatchFailure(e) => Error::Network(format!("Network dispatch error: {e:?}")), + SdkError::ResponseError(e) => Error::Network(format!("Response error: {e:?}")), + SdkError::ConstructionFailure(e) => Error::General(format!("Stream construction: {e:?}")), + _ => Error::Network(err.to_string()), + } +} + +fn classify_aws_code(code: Option<&str>, text: &str) -> Error { + let c = code.filter(|s| !s.is_empty()); + match c { + Some("NoSuchKey") => Error::NotFound("Object not found".to_string()), + Some("NoSuchBucket") => Error::NotFound("Bucket not found".to_string()), + Some("AccessDenied") => Error::Auth("Access denied".to_string()), + Some("NotImplemented") => Error::UnsupportedFeature( + "The backend does not support S3 Select. Use --force to attempt anyway.".to_string(), + ), + Some("InvalidArgument") => Error::General(format!("Invalid S3 Select request: {text}")), + Some(_) if text.contains("NotImplemented") => Error::UnsupportedFeature( + "The backend does not support S3 Select. Use --force to attempt anyway.".to_string(), + ), + Some(_) => Error::Network(text.to_string()), + None => classify_aws_code_missing_metadata(text), + } +} + +/// When the SDK did not surface `x-amz-error-code` / metadata, use minimal substring checks. +fn classify_aws_code_missing_metadata(text: &str) -> Error { + if text.contains("NotImplemented") { + return Error::UnsupportedFeature( + "The backend does not support S3 Select. Use --force to attempt anyway.".to_string(), + ); + } + if text.contains("NoSuchKey") { + return Error::NotFound("Object not found".to_string()); + } + if text.contains("NoSuchBucket") { + return Error::NotFound("Bucket not found".to_string()); + } + Error::Network(text.to_string()) +} + +#[cfg(test)] +mod tests { + use super::classify_aws_code; + use rc_core::Error; + + #[test] + fn classify_maps_no_such_key() { + let e = classify_aws_code(Some("NoSuchKey"), ""); + assert!(matches!(e, Error::NotFound(_))); + } + + #[test] + fn classify_maps_not_implemented() { + let e = classify_aws_code(Some("NotImplemented"), ""); + assert!(matches!(e, Error::UnsupportedFeature(_))); + } + + #[test] + fn classify_fallback_network() { + let e = classify_aws_code(Some("SlowDown"), "rate limited"); + assert!(matches!(e, Error::Network(_))); + } + + #[test] + fn classify_missing_code_maps_no_such_bucket_substring() { + let e = classify_aws_code(None, "Service error: ... NoSuchBucket ..."); + assert!(matches!(e, Error::NotFound(msg) if msg.contains("Bucket"))); + } + + #[test] + fn classify_maps_invalid_argument() { + let e = classify_aws_code(Some("InvalidArgument"), "bad expr"); + assert!(matches!(e, Error::General(_))); + } +} + +#[cfg(test)] +mod probe_tests { + use super::classify_probe_or_pass_through; + use rc_core::Error; + + #[test] + fn probe_pass_through_bucket_missing() { + let err = Error::NotFound("Bucket not found".to_string()); + let out = classify_probe_or_pass_through(err); + assert!(matches!(out, Err(Error::NotFound(_)))); + } + + #[test] + fn probe_object_missing_means_select_accepted() { + let err = Error::NotFound("Object not found".to_string()); + assert!(matches!(classify_probe_or_pass_through(err), Ok(true))); + } + + #[test] + fn probe_unsupported_feature() { + let err = Error::UnsupportedFeature("no".to_string()); + assert!(matches!(classify_probe_or_pass_through(err), Ok(false))); + } + + #[test] + fn probe_network_not_implemented_code() { + let err = Error::Network("Service error: ... (code: NotImplemented)".to_string()); + assert!(matches!(classify_probe_or_pass_through(err), Ok(false))); + } +} From 0d3914e9787a4807e41f4e17bb92548699b4e94b Mon Sep 17 00:00:00 2001 From: jhw Date: Mon, 20 Apr 2026 23:20:45 +0800 Subject: [PATCH 2/3] feat(phase-2): run sql select without preflight probe --- crates/cli/src/commands/sql.rs | 26 ---------- crates/cli/tests/help_contract.rs | 1 - crates/core/src/traits.rs | 11 ++-- crates/s3/src/client.rs | 8 +-- crates/s3/src/select.rs | 84 +++---------------------------- 5 files changed, 13 insertions(+), 117 deletions(-) diff --git a/crates/cli/src/commands/sql.rs b/crates/cli/src/commands/sql.rs index f09dbfb..111bae1 100644 --- a/crates/cli/src/commands/sql.rs +++ b/crates/cli/src/commands/sql.rs @@ -1,8 +1,4 @@ //! sql command — S3 Select queries on objects. -//! -//! Unless `--force` is set, this command issues a lightweight `SelectObjectContent` probe against -//! a non-existent key in the target bucket before running your query, so support is detected -//! reliably (extra request; may incur cost on some backends). use clap::{Args, ValueEnum}; use rc_core::{ @@ -35,11 +31,6 @@ pub struct SqlArgs { /// Compression of the stored object (input decompression) #[arg(long, value_enum, default_value_t = CompressionArg::None)] pub compression: CompressionArg, - - /// Skip the pre-flight Select probe (one fewer `SelectObjectContent` call); use only if you - /// trust the backend supports S3 Select or probe fails incorrectly - #[arg(long)] - pub force: bool, } #[derive(Clone, Copy, Debug, ValueEnum)] @@ -133,21 +124,6 @@ pub async fn execute(args: SqlArgs, output_config: OutputConfig) -> ExitCode { } }; - if !args.force { - match client.probe_select_support(&remote.bucket).await { - Ok(true) => {} - Ok(false) => { - formatter - .error("Backend does not support S3 Select. Use --force to attempt anyway."); - return ExitCode::UnsupportedFeature; - } - Err(e) => { - formatter.error(&format!("Failed to probe S3 Select support: {e}")); - return exit_code_from_error(&e); - } - } - } - let options = SelectOptions { expression: args.query, input_format: args.input_format.into(), @@ -186,7 +162,6 @@ mod tests { input_format: InputFormatArg::Csv, output_format: OutputFormatArg::Csv, compression: CompressionArg::None, - force: false, }; let code = execute(args, OutputConfig::default()).await; assert_eq!(code, ExitCode::UsageError); @@ -200,7 +175,6 @@ mod tests { input_format: InputFormatArg::Csv, output_format: OutputFormatArg::Csv, compression: CompressionArg::None, - force: false, }; let code = execute(args, OutputConfig::default()).await; assert_eq!(code, ExitCode::UsageError); diff --git a/crates/cli/tests/help_contract.rs b/crates/cli/tests/help_contract.rs index 6d6e94c..e32002f 100644 --- a/crates/cli/tests/help_contract.rs +++ b/crates/cli/tests/help_contract.rs @@ -332,7 +332,6 @@ fn top_level_command_help_contract() { "--input-format", "--output-format", "--compression", - "--force", ], }, HelpCase { diff --git a/crates/core/src/traits.rs b/crates/core/src/traits.rs index 926d6f1..52d9670 100644 --- a/crates/core/src/traits.rs +++ b/crates/core/src/traits.rs @@ -178,8 +178,10 @@ pub struct Capabilities { /// Supports anonymous bucket access policies pub anonymous: bool, - /// S3 Select (`SelectObjectContent`). **Not set by [`ObjectStore::capabilities`]** (it stays - /// `false` there); use [`ObjectStore::probe_select_support`] to detect support per bucket. + /// S3 Select (`SelectObjectContent`). + /// + /// This remains `false` in generic capability hints because support is determined by issuing + /// a real request against the target object. pub select: bool, /// Supports event notifications @@ -391,11 +393,6 @@ pub trait ObjectStore: Send + Sync { /// Delete bucket CORS configuration. async fn delete_bucket_cors(&self, bucket: &str) -> Result<()>; - /// Whether this store supports S3 Select for `bucket` (`SelectObjectContent`). - /// - /// Implementations that do not support Select should return `Ok(false)`. - async fn probe_select_support(&self, bucket: &str) -> Result; - /// Run S3 Select on an object and stream result payloads to `writer`. async fn select_object_content( &self, diff --git a/crates/s3/src/client.rs b/crates/s3/src/client.rs index 7306249..e7ff81c 100644 --- a/crates/s3/src/client.rs +++ b/crates/s3/src/client.rs @@ -1725,8 +1725,8 @@ impl ObjectStore for S3Client { } async fn capabilities(&self) -> Result { - // Best-effort hints for common S3-compatible backends. `select` is not inferred here; - // use [`ObjectStore::probe_select_support`] (e.g. from `rc sql`). + // Best-effort hints for common S3-compatible backends. `select` is not inferred here + // because `rc sql` determines support from the real request result. Ok(Capabilities { versioning: true, object_lock: false, @@ -2702,10 +2702,6 @@ impl ObjectStore for S3Client { Ok(()) } - async fn probe_select_support(&self, bucket: &str) -> Result { - crate::select::probe_select_support(&self.inner, bucket).await - } - async fn select_object_content( &self, path: &RemotePath, diff --git a/crates/s3/src/select.rs b/crates/s3/src/select.rs index af7e206..16bbdf2 100644 --- a/crates/s3/src/select.rs +++ b/crates/s3/src/select.rs @@ -54,43 +54,6 @@ pub async fn select_object_content( Ok(()) } -/// Probe whether the bucket supports `SelectObjectContent` (lightweight; uses a non-existent key). -pub async fn probe_select_support(client: &aws_sdk_s3::Client, bucket: &str) -> Result { - let probe_path = RemotePath::new("_", bucket, "__rc_select_probe__/object-does-not-exist"); - let opts = SelectOptions { - expression: "SELECT s._1 FROM S3Object s LIMIT 0".to_string(), - input_format: SelectInputFormat::Csv, - output_format: SelectOutputFormat::Csv, - compression: SelectCompression::None, - }; - let mut sink = tokio::io::sink(); - match select_object_content(client, &probe_path, &opts, &mut sink).await { - Ok(()) => Ok(true), - Err(e) => classify_probe_or_pass_through(e), - } -} - -fn classify_probe_or_pass_through(err: Error) -> Result { - match err { - Error::UnsupportedFeature(_) => Ok(false), - Error::NotFound(msg) => { - if msg.to_ascii_lowercase().contains("bucket") { - Err(Error::NotFound(msg)) - } else { - // NoSuchKey: Select was accepted; object is missing. - Ok(true) - } - } - Error::Network(ref msg) if probe_network_implies_unsupported(msg) => Ok(false), - other => Err(other), - } -} - -/// Narrow fallback when [`classify_aws_code`] could not classify (missing `x-amz-error-code`). -fn probe_network_implies_unsupported(msg: &str) -> bool { - msg.contains("(code: NotImplemented)") || msg.contains("code: NotImplemented") -} - fn compression_type(c: SelectCompression) -> CompressionType { match c { SelectCompression::None => CompressionType::None, @@ -203,13 +166,13 @@ fn classify_aws_code(code: Option<&str>, text: &str) -> Error { Some("NoSuchKey") => Error::NotFound("Object not found".to_string()), Some("NoSuchBucket") => Error::NotFound("Bucket not found".to_string()), Some("AccessDenied") => Error::Auth("Access denied".to_string()), - Some("NotImplemented") => Error::UnsupportedFeature( - "The backend does not support S3 Select. Use --force to attempt anyway.".to_string(), - ), + Some("NotImplemented") => { + Error::UnsupportedFeature("The backend does not support S3 Select.".to_string()) + } Some("InvalidArgument") => Error::General(format!("Invalid S3 Select request: {text}")), - Some(_) if text.contains("NotImplemented") => Error::UnsupportedFeature( - "The backend does not support S3 Select. Use --force to attempt anyway.".to_string(), - ), + Some(_) if text.contains("NotImplemented") => { + Error::UnsupportedFeature("The backend does not support S3 Select.".to_string()) + } Some(_) => Error::Network(text.to_string()), None => classify_aws_code_missing_metadata(text), } @@ -218,9 +181,7 @@ fn classify_aws_code(code: Option<&str>, text: &str) -> Error { /// When the SDK did not surface `x-amz-error-code` / metadata, use minimal substring checks. fn classify_aws_code_missing_metadata(text: &str) -> Error { if text.contains("NotImplemented") { - return Error::UnsupportedFeature( - "The backend does not support S3 Select. Use --force to attempt anyway.".to_string(), - ); + return Error::UnsupportedFeature("The backend does not support S3 Select.".to_string()); } if text.contains("NoSuchKey") { return Error::NotFound("Object not found".to_string()); @@ -266,34 +227,3 @@ mod tests { assert!(matches!(e, Error::General(_))); } } - -#[cfg(test)] -mod probe_tests { - use super::classify_probe_or_pass_through; - use rc_core::Error; - - #[test] - fn probe_pass_through_bucket_missing() { - let err = Error::NotFound("Bucket not found".to_string()); - let out = classify_probe_or_pass_through(err); - assert!(matches!(out, Err(Error::NotFound(_)))); - } - - #[test] - fn probe_object_missing_means_select_accepted() { - let err = Error::NotFound("Object not found".to_string()); - assert!(matches!(classify_probe_or_pass_through(err), Ok(true))); - } - - #[test] - fn probe_unsupported_feature() { - let err = Error::UnsupportedFeature("no".to_string()); - assert!(matches!(classify_probe_or_pass_through(err), Ok(false))); - } - - #[test] - fn probe_network_not_implemented_code() { - let err = Error::Network("Service error: ... (code: NotImplemented)".to_string()); - assert!(matches!(classify_probe_or_pass_through(err), Ok(false))); - } -} From ca40b38e60265ebf65dceffb21a1b10d56df35d3 Mon Sep 17 00:00:00 2001 From: jhw Date: Mon, 20 Apr 2026 23:30:51 +0800 Subject: [PATCH 3/3] fix(phase-2): tighten select validation and errors --- crates/s3/src/select.rs | 55 +++++++++++++++++++++++++++++++++++------ 1 file changed, 48 insertions(+), 7 deletions(-) diff --git a/crates/s3/src/select.rs b/crates/s3/src/select.rs index 16bbdf2..d9f79c8 100644 --- a/crates/s3/src/select.rs +++ b/crates/s3/src/select.rs @@ -22,7 +22,7 @@ pub async fn select_object_content( options: &SelectOptions, writer: &mut (dyn AsyncWrite + Send + Unpin), ) -> Result<()> { - let input = build_input_serialization(options); + let input = build_input_serialization(options)?; let output = build_output_serialization(options); // aws-sdk-s3 `SelectObjectContent` does not expose object `VersionId`; the current object is used. @@ -62,7 +62,15 @@ fn compression_type(c: SelectCompression) -> CompressionType { } } -fn build_input_serialization(options: &SelectOptions) -> InputSerialization { +fn build_input_serialization(options: &SelectOptions) -> Result { + if matches!(options.input_format, SelectInputFormat::Parquet) + && !matches!(options.compression, SelectCompression::None) + { + return Err(Error::General( + "Parquet input does not support whole-object GZIP or BZIP2 compression.".to_string(), + )); + } + let compression = compression_type(options.compression); let mut b = InputSerialization::builder().compression_type(compression); match options.input_format { @@ -82,7 +90,7 @@ fn build_input_serialization(options: &SelectOptions) -> InputSerialization { b = b.parquet(pq); } } - b.build() + Ok(b.build()) } fn build_output_serialization(options: &SelectOptions) -> OutputSerialization { @@ -173,7 +181,7 @@ fn classify_aws_code(code: Option<&str>, text: &str) -> Error { Some(_) if text.contains("NotImplemented") => { Error::UnsupportedFeature("The backend does not support S3 Select.".to_string()) } - Some(_) => Error::Network(text.to_string()), + Some(_) => Error::General(text.to_string()), None => classify_aws_code_missing_metadata(text), } } @@ -189,13 +197,14 @@ fn classify_aws_code_missing_metadata(text: &str) -> Error { if text.contains("NoSuchBucket") { return Error::NotFound("Bucket not found".to_string()); } - Error::Network(text.to_string()) + Error::General(text.to_string()) } #[cfg(test)] mod tests { - use super::classify_aws_code; + use super::{build_input_serialization, classify_aws_code}; use rc_core::Error; + use rc_core::{SelectCompression, SelectInputFormat, SelectOptions, SelectOutputFormat}; #[test] fn classify_maps_no_such_key() { @@ -212,7 +221,7 @@ mod tests { #[test] fn classify_fallback_network() { let e = classify_aws_code(Some("SlowDown"), "rate limited"); - assert!(matches!(e, Error::Network(_))); + assert!(matches!(e, Error::General(_))); } #[test] @@ -226,4 +235,36 @@ mod tests { let e = classify_aws_code(Some("InvalidArgument"), "bad expr"); assert!(matches!(e, Error::General(_))); } + + #[test] + fn classify_missing_code_unknown_maps_general() { + let e = classify_aws_code(None, "Service error: query parsing failed"); + assert!(matches!(e, Error::General(_))); + } + + #[test] + fn parquet_rejects_whole_object_compression() { + let options = SelectOptions { + expression: "SELECT * FROM S3Object".to_string(), + input_format: SelectInputFormat::Parquet, + output_format: SelectOutputFormat::Csv, + compression: SelectCompression::Gzip, + }; + + let error = build_input_serialization(&options) + .expect_err("parquet should reject whole-object compression"); + assert!(matches!(error, Error::General(_))); + } + + #[test] + fn parquet_allows_no_compression() { + let options = SelectOptions { + expression: "SELECT * FROM S3Object".to_string(), + input_format: SelectInputFormat::Parquet, + output_format: SelectOutputFormat::Csv, + compression: SelectCompression::None, + }; + + build_input_serialization(&options).expect("parquet without whole-object compression"); + } }