Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions crates/cli/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ mod rb;
mod replicate;
mod rm;
mod share;
mod sql;
mod stat;
mod tag;
mod tree;
Expand Down Expand Up @@ -209,6 +210,9 @@ pub enum Commands {
/// Deprecated: use `rc object share`
Share(share::ShareArgs),

/// Run S3 Select SQL on an object
Sql(sql::SqlArgs),

// Phase 5: Optional commands (capability-dependent)
/// Deprecated: use `rc bucket version`
#[command(subcommand)]
Expand Down Expand Up @@ -239,8 +243,6 @@ pub enum Commands {
// Retention(retention::RetentionArgs),
// /// Watch for object events
// Watch(watch::WatchArgs),
// /// Run S3 Select queries
// Sql(sql::SqlArgs),
}

/// Execute the CLI command and return an exit code
Expand Down Expand Up @@ -325,6 +327,9 @@ pub async fn execute(cli: Cli) -> ExitCode {
Commands::Share(args) => {
share::execute(args, output_options.resolve(OutputBehavior::HumanDefault)).await
}
Commands::Sql(args) => {
sql::execute(args, output_options.resolve(OutputBehavior::HumanDefault)).await
}
Commands::Version(cmd) => {
version::execute(
version::VersionArgs { command: cmd },
Expand Down
182 changes: 182 additions & 0 deletions crates/cli/src/commands/sql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
//! sql command — S3 Select queries on objects.

use clap::{Args, ValueEnum};
use rc_core::{
AliasManager, ObjectStore, SelectCompression, SelectInputFormat, SelectOptions,
SelectOutputFormat, parse_object_path,
};
use rc_s3::S3Client;

use crate::exit_code::ExitCode;
use crate::output::{Formatter, OutputConfig};

/// Run an S3 Select SQL query on an object and stream results to stdout.
#[derive(Args, Debug)]
pub struct SqlArgs {
/// Object path (alias/bucket/key)
pub path: String,

/// SQL expression (S3 Select)
#[arg(long)]
pub query: String,

/// Input object format
#[arg(long, value_enum, default_value_t = InputFormatArg::Csv)]
pub input_format: InputFormatArg,

/// Select result format
#[arg(long, value_enum, default_value_t = OutputFormatArg::Csv)]
pub output_format: OutputFormatArg,

/// Compression of the stored object (input decompression)
#[arg(long, value_enum, default_value_t = CompressionArg::None)]
pub compression: CompressionArg,
}

#[derive(Clone, Copy, Debug, ValueEnum)]
pub enum InputFormatArg {
Csv,
/// Newline-delimited JSON (S3 Select `Type=LINES`).
Json,
Parquet,
}

#[derive(Clone, Copy, Debug, ValueEnum)]
pub enum OutputFormatArg {
Csv,
Json,
}

#[derive(Clone, Copy, Debug, ValueEnum)]
pub enum CompressionArg {
None,
Gzip,
Bzip2,
}

impl From<InputFormatArg> for SelectInputFormat {
fn from(value: InputFormatArg) -> Self {
match value {
InputFormatArg::Csv => SelectInputFormat::Csv,
InputFormatArg::Json => SelectInputFormat::Json,
InputFormatArg::Parquet => SelectInputFormat::Parquet,
}
}
}

impl From<OutputFormatArg> for SelectOutputFormat {
fn from(value: OutputFormatArg) -> Self {
match value {
OutputFormatArg::Csv => SelectOutputFormat::Csv,
OutputFormatArg::Json => SelectOutputFormat::Json,
}
}
}

impl From<CompressionArg> for SelectCompression {
fn from(value: CompressionArg) -> Self {
match value {
CompressionArg::None => SelectCompression::None,
CompressionArg::Gzip => SelectCompression::Gzip,
CompressionArg::Bzip2 => SelectCompression::Bzip2,
}
}
}

/// Execute the `sql` command.
pub async fn execute(args: SqlArgs, output_config: OutputConfig) -> ExitCode {
let formatter = Formatter::new(output_config);

if args.query.trim().is_empty() {
formatter.error("Query must not be empty (--query)");
return ExitCode::UsageError;
}

let remote = match parse_object_path(&args.path) {
Ok(p) => p,
Err(e) => {
formatter.error(&e.to_string());
return ExitCode::UsageError;
}
};

let alias_manager = match AliasManager::new() {
Ok(am) => am,
Err(e) => {
formatter.error(&format!("Failed to load aliases: {e}"));
return ExitCode::GeneralError;
}
};

let alias = match alias_manager.get(&remote.alias) {
Ok(a) => a,
Err(_) => {
formatter.error(&format!("Alias '{}' not found", remote.alias));
return ExitCode::NotFound;
}
};

let client = match S3Client::new(alias).await {
Ok(c) => c,
Err(e) => {
formatter.error(&format!("Failed to create S3 client: {e}"));
return ExitCode::NetworkError;
}
};

let options = SelectOptions {
expression: args.query,
input_format: args.input_format.into(),
output_format: args.output_format.into(),
compression: args.compression.into(),
};

let mut stdout = tokio::io::stdout();

match client
.select_object_content(&remote, &options, &mut stdout)
.await
{
Ok(()) => ExitCode::Success,
Err(e) => {
formatter.error(&e.to_string());
exit_code_from_error(&e)
}
}
}

fn exit_code_from_error(error: &rc_core::Error) -> ExitCode {
ExitCode::from_i32(error.exit_code()).unwrap_or(ExitCode::GeneralError)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::output::OutputConfig;

#[tokio::test]
async fn sql_empty_query_is_usage_error() {
let args = SqlArgs {
path: "a/b/c".to_string(),
query: " ".to_string(),
input_format: InputFormatArg::Csv,
output_format: OutputFormatArg::Csv,
compression: CompressionArg::None,
};
let code = execute(args, OutputConfig::default()).await;
assert_eq!(code, ExitCode::UsageError);
}

#[tokio::test]
async fn sql_invalid_object_path_is_usage_error() {
let args = SqlArgs {
path: "a/b".to_string(),
query: "SELECT 1".to_string(),
input_format: InputFormatArg::Csv,
output_format: OutputFormatArg::Csv,
compression: CompressionArg::None,
};
let code = execute(args, OutputConfig::default()).await;
assert_eq!(code, ExitCode::UsageError);
}
}
11 changes: 11 additions & 0 deletions crates/cli/tests/help_contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ fn top_level_command_help_contract() {
"mirror",
"tree",
"share",
"sql",
"version",
"tag",
"cors",
Expand Down Expand Up @@ -323,6 +324,16 @@ fn top_level_command_help_contract() {
usage: "Usage: rc share [OPTIONS] <PATH>",
expected_tokens: &["--expire", "--upload", "--content-type"],
},
HelpCase {
args: &["sql"],
usage: "Usage: rc sql [OPTIONS] --query <QUERY> <PATH>",
expected_tokens: &[
"--query",
"--input-format",
"--output-format",
"--compression",
],
},
HelpCase {
args: &["version"],
usage: "Usage: rc version [OPTIONS] <COMMAND>",
Expand Down
4 changes: 3 additions & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod lifecycle;
pub mod path;
pub mod replication;
pub mod retry;
pub mod select;
pub mod traits;

pub use alias::{Alias, AliasManager};
Expand All @@ -28,12 +29,13 @@ pub use lifecycle::{
LifecycleConfiguration, LifecycleExpiration, LifecycleRule, LifecycleRuleStatus,
LifecycleTransition, NoncurrentVersionExpiration, NoncurrentVersionTransition,
};
pub use path::{ParsedPath, RemotePath, parse_path};
pub use path::{ParsedPath, RemotePath, parse_object_path, parse_path};
pub use replication::{
BucketTarget, BucketTargetCredentials, ReplicationConfiguration, ReplicationDestination,
ReplicationRule, ReplicationRuleStatus,
};
pub use retry::{RetryBuilder, is_retryable_error, retry_with_backoff};
pub use select::{SelectCompression, SelectInputFormat, SelectOptions, SelectOutputFormat};
pub use traits::{
BucketNotification, Capabilities, ListOptions, ListResult, NotificationTarget, ObjectInfo,
ObjectStore, ObjectVersion,
Expand Down
26 changes: 26 additions & 0 deletions crates/core/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,24 @@ pub fn parse_path(path: &str) -> Result<ParsedPath> {
}
}

/// Parse a remote path that must refer to a single object (`alias/bucket/key` with non-empty key).
pub fn parse_object_path(path: &str) -> Result<RemotePath> {
match parse_path(path)? {
ParsedPath::Remote(r) => {
if r.key.is_empty() || r.is_dir {
Err(Error::InvalidPath(
"Object path required: alias/bucket/key (key must not be empty)".into(),
))
} else {
Ok(r)
}
}
ParsedPath::Local(_) => Err(Error::InvalidPath(
"Expected remote path in the form alias/bucket/key".into(),
)),
}
}

/// Check if a string is a valid alias name
fn is_valid_alias_name(name: &str) -> bool {
!name.is_empty()
Expand Down Expand Up @@ -256,6 +274,14 @@ mod tests {
assert!(remote.is_dir);
}

#[test]
fn test_parse_object_path_requires_non_empty_key() {
assert!(parse_object_path("myalias/bucket").is_err());
assert!(parse_object_path("myalias/bucket/dir/").is_err());
let r = parse_object_path("myalias/bucket/key.txt").unwrap();
assert_eq!(r.key, "key.txt");
}

#[test]
fn test_parse_local_absolute_path() {
let path = parse_path("/home/user/file.txt").unwrap();
Expand Down
48 changes: 48 additions & 0 deletions crates/core/src/select.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//! S3 Select domain types (no AWS SDK types).

/// Object payload format for S3 Select input.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum SelectInputFormat {
#[default]
Csv,
Json,
Parquet,
}

/// Result row format for S3 Select output.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum SelectOutputFormat {
#[default]
Csv,
Json,
}

/// Compression applied to the **stored object** (input decompression).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum SelectCompression {
#[default]
None,
Gzip,
Bzip2,
}

/// Options for running an S3 Select query on one object.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SelectOptions {
/// SQL expression (S3 Select / `s3object`).
pub expression: String,
pub input_format: SelectInputFormat,
pub output_format: SelectOutputFormat,
pub compression: SelectCompression,
}

impl Default for SelectOptions {
fn default() -> Self {
Self {
expression: String::new(),
input_format: SelectInputFormat::Csv,
output_format: SelectOutputFormat::Csv,
compression: SelectCompression::None,
}
}
}
15 changes: 14 additions & 1 deletion crates/core/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ use std::collections::HashMap;
use async_trait::async_trait;
use jiff::Timestamp;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncWrite;

use crate::cors::CorsRule;
use crate::error::Result;
use crate::lifecycle::LifecycleRule;
use crate::path::RemotePath;
use crate::replication::ReplicationConfiguration;
use crate::select::SelectOptions;

/// Metadata for an object version
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -176,7 +178,10 @@ pub struct Capabilities {
/// Supports anonymous bucket access policies
pub anonymous: bool,

/// Supports S3 Select
/// S3 Select (`SelectObjectContent`).
///
/// This remains `false` in generic capability hints because support is determined by issuing
/// a real request against the target object.
pub select: bool,

/// Supports event notifications
Expand Down Expand Up @@ -387,6 +392,14 @@ pub trait ObjectStore: Send + Sync {

/// Delete bucket CORS configuration.
async fn delete_bucket_cors(&self, bucket: &str) -> Result<()>;

/// Run S3 Select on an object and stream result payloads to `writer`.
async fn select_object_content(
&self,
path: &RemotePath,
options: &SelectOptions,
writer: &mut (dyn AsyncWrite + Send + Unpin),
) -> Result<()>;
// async fn get_versioning(&self, bucket: &str) -> Result<bool>;
// async fn set_versioning(&self, bucket: &str, enabled: bool) -> Result<()>;
// async fn get_tags(&self, path: &RemotePath) -> Result<HashMap<String, String>>;
Expand Down
Loading