Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/nodes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ default = [
"mp4",
"openh264",
"rtmp",
"object_store",
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 object_store feature added to default features

Adding "object_store" to the default feature set at line 131 means opendal (with the S3 service) will now be compiled into every default build. This increases compile time and binary size for all users, even those who don't need S3 support. This is a deliberate product decision (matching how opus, ogg, webm, mp4, rtmp etc. are all in defaults), but worth noting as it pulls in the full OpenDAL S3 dependency chain for every build.

Staging: Open in Devin

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged — this is intentional per the user's request. The object_store feature was initially opt-in, but was moved to defaults because "given it's core, I'd say we keep it on by default for now, easier to test, otherwise we should have made it a plugin." Same pattern as opus, ogg, webm, mp4, rtmp, etc.

]

# Individual features for each node.
Expand Down
3 changes: 2 additions & 1 deletion crates/nodes/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ pub fn register_core_nodes(registry: &mut NodeRegistry, constraints: &GlobalNode
false,
"Streams binary data to S3-compatible object storage (AWS S3, GCS, Azure, MinIO, RustFS, etc.). \
Uses multipart upload for bounded memory usage. \
Credentials can be provided via config or environment variables.",
Credentials can be provided via config or environment variables. \
Set passthrough: true to forward packets downstream (required for oneshot pipelines).",
);
}
}
175 changes: 157 additions & 18 deletions crates/nodes/src/core/object_store_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,20 @@

//! Object store write node — streams binary data to S3-compatible object storage.
//!
//! Uses [Apache OpenDAL](https://opendal.apache.org/) to support S3, GCS,
//! Azure Blob, MinIO, RustFS, and other compatible backends.
//! Uses [Apache OpenDAL](https://opendal.apache.org/) to support S3, MinIO,
//! RustFS, and other S3-compatible backends. (Only the `services-s3` feature
//! is compiled; GCS / Azure would require additional OpenDAL features.)
//!
//! Incoming [`Packet::Binary`] packets are buffered up to `chunk_size` and
//! written via OpenDAL's multipart [`Writer`](opendal::Writer), keeping memory
//! bounded regardless of the total upload size.
//!
//! ## Passthrough mode
//!
//! When `passthrough` is enabled (default: `false`), the node also forwards
//! every incoming packet to its `"out"` pin, allowing it to sit inline in a
//! linear pipeline (e.g. `muxer → s3_writer → http_output`). This is
//! required for oneshot pipelines which do not support fan-out.

use async_trait::async_trait;
use schemars::JsonSchema;
Expand All @@ -27,6 +35,11 @@ use streamkit_core::{
/// Default buffer/chunk size: 5 MiB (the S3 minimum multipart part size).
const DEFAULT_CHUNK_SIZE: usize = 5 * 1024 * 1024;

/// S3 minimum multipart part size (5 MiB). Intermediate parts smaller than
/// this will be rejected with `EntityTooSmall`; only the final part may be
/// smaller.
const S3_MIN_PART_SIZE: usize = 5 * 1024 * 1024;

const fn default_chunk_size() -> usize {
DEFAULT_CHUNK_SIZE
}
Expand Down Expand Up @@ -67,7 +80,7 @@ pub struct ObjectStoreWriteConfig {
/// Access key ID.
///
/// If omitted, the node falls back to `access_key_id_env`.
#[serde(default)]
#[serde(default, skip_serializing)]
pub access_key_id: Option<String>,

/// Environment variable name containing the access key ID.
Expand All @@ -79,7 +92,7 @@ pub struct ObjectStoreWriteConfig {
/// Secret access key.
///
/// If omitted, the node falls back to `secret_key_env`.
#[serde(default)]
#[serde(default, skip_serializing)]
pub secret_access_key: Option<String>,

/// Environment variable name containing the secret access key.
Expand All @@ -93,13 +106,22 @@ pub struct ObjectStoreWriteConfig {
/// This controls the multipart upload part size. S3 requires a minimum
/// part size of 5 MiB (except the last part).
#[serde(default = "default_chunk_size")]
#[schemars(range(min = 1))]
#[schemars(range(min = 5_242_880))]
pub chunk_size: usize,

/// Optional MIME content type for the uploaded object
/// (e.g. `audio/ogg`, `video/mp4`).
#[serde(default)]
pub content_type: Option<String>,

/// When `true`, the node forwards every incoming packet to its `"out"`
/// pin in addition to writing it to object storage. This allows the
/// node to sit inline in a linear pipeline (required for oneshot mode
/// which does not support fan-out).
///
/// Default: `false` (pure sink — no output pin).
#[serde(default)]
pub passthrough: bool,
}

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -128,14 +150,12 @@ fn resolve_credential(
return Ok(val);
},
Ok(_) => {
return Err(StreamKitError::Configuration(format!(
"Environment variable '{env}' for {label} is empty"
)));
// Env var exists but is empty — fall through to literal.
tracing::debug!("Env var '{env}' for {label} is empty, trying literal fallback");
},
Err(_) => {
return Err(StreamKitError::Configuration(format!(
"Environment variable '{env}' for {label} is not set"
)));
// Env var not set — fall through to literal.
tracing::debug!("Env var '{env}' for {label} is not set, trying literal fallback");
},
}
}
Expand Down Expand Up @@ -240,6 +260,7 @@ impl ObjectStoreWriteNode {
secret_key_env: None,
chunk_size: default_chunk_size(),
content_type: None,
passthrough: false,
}
} else {
config_helpers::parse_config_required(params)?
Expand Down Expand Up @@ -268,6 +289,14 @@ impl ObjectStoreWriteNode {
));
}

if config.chunk_size < S3_MIN_PART_SIZE && params.is_some() {
return Err(StreamKitError::Configuration(format!(
"chunk_size ({}) is below the S3 minimum multipart part size ({} bytes / 5 MiB). \
Intermediate parts smaller than 5 MiB will be rejected by S3 with EntityTooSmall.",
config.chunk_size, S3_MIN_PART_SIZE,
)));
}

Ok(Box::new(Self { config }))
})
}
Expand All @@ -288,8 +317,16 @@ impl ProcessorNode for ObjectStoreWriteNode {
}

fn output_pins(&self) -> Vec<OutputPin> {
// Sink — no outputs.
vec![]
if self.config.passthrough {
vec![OutputPin {
name: "out".to_string(),
produces_type: PacketType::Passthrough,
cardinality: PinCardinality::Broadcast,
}]
} else {
// Pure sink — no outputs.
vec![]
}
}

async fn run(self: Box<Self>, mut context: NodeContext) -> Result<(), StreamKitError> {
Expand Down Expand Up @@ -348,7 +385,22 @@ impl ProcessorNode for ObjectStoreWriteNode {
.finish()
};

tracing::info!(%node_name, "S3 operator created, opening writer");
tracing::info!(%node_name, "S3 operator created, verifying bucket access");

// ── Verify bucket exists and is accessible ────────────────────────
// Stat the root path — this issues a lightweight HEAD request to the
// bucket, catching "NoSuchBucket" or permission errors at init time
// rather than after streaming data for minutes.
operator.stat("/").await.map_err(|e| {
let msg = format!(
"S3 bucket '{}' is not accessible at '{}': {e}",
self.config.bucket, self.config.endpoint
);
state_helpers::emit_failed(&context.state_tx, &node_name, &msg);
StreamKitError::Runtime(msg)
})?;
Comment on lines +390 to +401
Copy link
Copy Markdown
Contributor Author

@staging-devin-ai-integration staging-devin-ai-integration bot Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 stat("/") bucket verification may behave differently across S3-compatible backends

The new bucket verification at crates/nodes/src/core/object_store_write.rs:396-403 calls operator.stat("/").await to check bucket accessibility. The behavior of stat on the root path depends on how OpenDAL's S3 service translates this — some backends may treat / as a virtual directory and return metadata, while others may return a 404 for a non-existent object key. This works for the tested backends (RustFS/MinIO as shown in the E2E test), but could produce false-negative failures on other S3-compatible services that don't support stat on root. If this becomes an issue, an alternative would be a list with limit(0) which is universally supported.

Staging: Open in Devin

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. stat("/") is the most portable way to probe bucket existence via OpenDAL — it maps to HEAD / on S3, which is the standard bucket-exists check. If an exotic backend returns a false negative here, the error message is clear enough to debug. We could fall back to operator.list("/").next().await as an alternative probe, but for the standard S3-compatible targets (AWS, MinIO, RustFS, GCS via S3 compat, R2) stat("/") works reliably.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. stat("/") works for S3, MinIO, and RustFS (verified in E2E), but could produce false negatives on exotic backends that don't support HEAD on root. If that surfaces, switching to operator.list("/").next().await (or a zero-limit list) would be a more universal probe. For now, the current approach covers all backends we compile against (services-s3 only), and the error message is clear enough to diagnose if it trips on an unusual backend.


tracing::info!(%node_name, "Bucket verified, opening writer");

// ── Open writer (multipart upload) ───────────────────────────────
let writer_future = operator.writer_with(&self.config.key).chunk(self.config.chunk_size);
Expand Down Expand Up @@ -384,9 +436,12 @@ impl ProcessorNode for ObjectStoreWriteNode {
let mut total_bytes: u64 = 0;
let mut buffer = Vec::with_capacity(self.config.chunk_size);
let mut chunks_written: u64 = 0;
// Tracks whether the downstream output channel has closed (passthrough
// mode only). When true we stop forwarding but keep writing to S3.
let mut output_closed = false;

while let Some(packet) = context.recv_with_cancellation(&mut input_rx).await {
if let Packet::Binary { data, .. } = packet {
if let Packet::Binary { data, content_type, metadata } = packet {
stats_tracker.received();
packet_count += 1;
total_bytes += data.len() as u64;
Expand Down Expand Up @@ -414,6 +469,23 @@ impl ProcessorNode for ObjectStoreWriteNode {
);
}

// Forward the packet downstream when in passthrough mode.
if self.config.passthrough && !output_closed {
let forwarded = Packet::Binary { data, content_type, metadata };
if context.output_sender.send("out", forwarded).await.is_err() {
// Downstream closed but we keep writing to S3 so the
// archive is complete. Skip further send attempts.
tracing::info!(
%node_name,
"Output channel closed; continuing S3 write without forwarding"
);
output_closed = true;
}
}

// Consistent with file_write.rs: report a "sent" stat for
// every packet consumed, even in pure-sink mode where there
// is no downstream receiver.
stats_tracker.sent();
stats_tracker.maybe_send();
} else {
Expand Down Expand Up @@ -458,7 +530,7 @@ impl ProcessorNode for ObjectStoreWriteNode {
}

// Upload committed successfully — disarm the abort guard.
guard.disarm();
let _ = guard.disarm();

stats_tracker.force_send();
tracing::info!(
Expand Down Expand Up @@ -488,9 +560,9 @@ mod tests {
use streamkit_core::NodeStatsUpdate;
use tokio::sync::mpsc;

/// Verify pin definitions for the object store write node.
/// Verify pin definitions for the object store write node (sink mode).
#[test]
fn test_pin_definitions() {
fn test_pin_definitions_sink() {
let node = ObjectStoreWriteNode {
config: ObjectStoreWriteConfig {
endpoint: String::new(),
Expand All @@ -503,6 +575,7 @@ mod tests {
secret_key_env: None,
chunk_size: default_chunk_size(),
content_type: None,
passthrough: false,
},
};

Expand All @@ -515,6 +588,35 @@ mod tests {
assert!(outputs.is_empty(), "Sink node should have no output pins");
}

/// Verify pin definitions for passthrough mode.
#[test]
fn test_pin_definitions_passthrough() {
let node = ObjectStoreWriteNode {
config: ObjectStoreWriteConfig {
endpoint: String::new(),
bucket: String::new(),
key: String::new(),
region: default_region(),
access_key_id: None,
access_key_id_env: None,
secret_access_key: None,
secret_key_env: None,
chunk_size: default_chunk_size(),
content_type: None,
passthrough: true,
},
};

let inputs = node.input_pins();
assert_eq!(inputs.len(), 1);
assert_eq!(inputs[0].name, "in");

let outputs = node.output_pins();
assert_eq!(outputs.len(), 1, "Passthrough mode should have one output pin");
assert_eq!(outputs[0].name, "out");
assert_eq!(outputs[0].produces_type, PacketType::Passthrough);
}

/// Verify factory rejects zero chunk_size.
#[test]
fn test_factory_rejects_zero_chunk_size() {
Expand All @@ -534,6 +636,25 @@ mod tests {
assert!(err.contains("chunk_size"), "Error should mention chunk_size: {err}");
}

/// Verify factory rejects chunk_size below S3 minimum (5 MiB).
#[test]
fn test_factory_rejects_sub_5mib_chunk_size() {
let factory = ObjectStoreWriteNode::factory();
let params = serde_json::json!({
"endpoint": "http://localhost:9000",
"bucket": "test",
"key": "test.bin",
"chunk_size": 1024,
});
let result = factory(Some(&params));
assert!(result.is_err());
let err = match result {
Err(e) => e.to_string(),
Ok(_) => panic!("Expected error for sub-5MiB chunk_size"),
};
assert!(err.contains("5 MiB"), "Error should mention 5 MiB minimum: {err}");
}

/// Stub lookup that never finds any variable.
fn no_env(_name: &str) -> Result<String, std::env::VarError> {
Err(std::env::VarError::NotPresent)
Expand Down Expand Up @@ -568,13 +689,30 @@ mod tests {

#[test]
fn test_resolve_credential_env_empty() {
// Env var is empty — should fall through to literal.
let lookup = |_: &str| Ok(String::new());
let result = resolve_credential(Some("ANY_VAR"), Some("fallback"), "test", lookup);
assert_eq!(result.unwrap(), "fallback");
}

#[test]
fn test_resolve_credential_env_empty_no_literal() {
// Env var is empty and no literal — should error.
let lookup = |_: &str| Ok(String::new());
let result = resolve_credential(Some("ANY_VAR"), None, "test", lookup);
assert!(result.is_err());
}

#[test]
fn test_resolve_credential_env_not_set() {
// Env var not set — should fall through to literal.
let result = resolve_credential(Some("MISSING"), Some("fallback"), "test", no_env);
assert_eq!(result.unwrap(), "fallback");
}

#[test]
fn test_resolve_credential_env_not_set_no_literal() {
// Env var not set and no literal — should error.
let result = resolve_credential(Some("MISSING"), None, "test", no_env);
assert!(result.is_err());
}
Expand Down Expand Up @@ -675,6 +813,7 @@ mod tests {
secret_key_env: None,
chunk_size: default_chunk_size(),
content_type: None,
passthrough: false,
};
let node = Box::new(ObjectStoreWriteNode { config });

Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.rustfs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

services:
rustfs:
image: rustfs/rustfs:latest
image: rustfs/rustfs:1.0.0-alpha.90
container_name: rustfs-server
ports:
- "9000:9000" # S3 API
Expand Down
Loading
Loading