fix(nodes): object store writer improvements — passthrough, bucket check, review fixes#278
Conversation
…ctions - Add object_store to default features so the node is available without extra build flags. - Remove invalid 'output: type: none' from tts_to_s3.yml and transcode_to_s3.yml — the output section is optional, so omitting it is the correct way to indicate no client output. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
There was a problem hiding this comment.
🚩 Other sample pipelines still contain invalid output: type: none
This PR correctly removes output: type: none from the two S3 sample pipelines because none is not a valid OutputType variant (crates/api/src/yaml.rs:293-302 defines only Transcription, Json, Audio, Video). However, 9 other sample pipelines still contain output: type: none — all video-related (e.g. openh264-encode-test.yml, video_slint_watermark.yml, mp4_mux_video.yml, video_compositor_demo.yml, video_colorbars.yml, etc.). These would also fail deserialization if loaded through the standard YAML parser. The PR title says "fix sample pipeline client sections" but the fix is incomplete.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
False positive — the other sample pipelines use input: type: none (valid InputType variant for pipelines with no user input, e.g. colorbars/compositor demos) paired with output: type: video or output: type: audio (both valid OutputType variants). None of them have output: type: none. Only the two S3 sink pipelines in this PR had that issue.
…pipelines The oneshot engine requires linear pipelines (no fan-out). To support S3 upload alongside http_output in oneshot mode, the object store writer now accepts a 'passthrough' config option. When enabled, it forwards every incoming Binary packet to its 'out' pin after buffering for S3, allowing it to sit inline: muxer → s3_writer(passthrough) → http_output. Changes: - Add 'passthrough' bool config field (default: false) - Conditionally expose 'out' output pin in passthrough mode - Forward packets downstream after S3 buffering - Update oneshot pipeline YAMLs to use linear passthrough topology - Add Playwright E2E test (convert-s3.spec.ts) verifying both HTTP response and S3 upload via RustFS - Add unit tests for passthrough pin definitions Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
The passthrough mode was reconstructing Binary packets with None for content_type and metadata, discarding the original values set by upstream muxers. Capture all fields in the destructure and forward them as-is. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| "mp4", | ||
| "openh264", | ||
| "rtmp", | ||
| "object_store", |
There was a problem hiding this comment.
🚩 object_store feature added to default features
Adding "object_store" to the default feature set at line 131 means opendal (with the S3 service) will now be compiled into every default build. This increases compile time and binary size for all users, even those who don't need S3 support. This is a deliberate product decision (matching how opus, ogg, webm, mp4, rtmp etc. are all in defaults), but worth noting as it pulls in the full OpenDAL S3 dependency chain for every build.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
Acknowledged — this is intentional per the user's request. The object_store feature was initially opt-in, but was moved to defaults because "given it's core, I'd say we keep it on by default for now, easier to test, otherwise we should have made it a plugin." Same pattern as opus, ogg, webm, mp4, rtmp, etc.
The E2E project uses the UI's prettierrc (single quotes), not the default prettier config. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Add a bucket existence check (HEAD request via operator.stat) during node initialization, before opening the multipart writer. This catches 'NoSuchBucket' and permission errors immediately rather than after streaming data for minutes. Also unify all S3 sample pipelines to use 'streamkit-output' as the bucket name for simpler local validation. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| // ── Verify bucket exists and is accessible ──────────────────────── | ||
| // Stat the root path — this issues a lightweight HEAD request to the | ||
| // bucket, catching "NoSuchBucket" or permission errors at init time | ||
| // rather than after streaming data for minutes. | ||
| operator.stat("/").await.map_err(|e| { | ||
| let msg = format!( | ||
| "S3 bucket '{}' is not accessible at '{}': {e}", | ||
| self.config.bucket, self.config.endpoint | ||
| ); | ||
| state_helpers::emit_failed(&context.state_tx, &node_name, &msg); | ||
| StreamKitError::Runtime(msg) | ||
| })?; |
There was a problem hiding this comment.
🚩 stat("/") bucket verification may behave differently across S3-compatible backends
The new bucket verification at crates/nodes/src/core/object_store_write.rs:396-403 calls operator.stat("/").await to check bucket accessibility. The behavior of stat on the root path depends on how OpenDAL's S3 service translates this — some backends may treat / as a virtual directory and return metadata, while others may return a 404 for a non-existent object key. This works for the tested backends (RustFS/MinIO as shown in the E2E test), but could produce false-negative failures on other S3-compatible services that don't support stat on root. If this becomes an issue, an alternative would be a list with limit(0) which is universally supported.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
Fair point. stat("/") is the most portable way to probe bucket existence via OpenDAL — it maps to HEAD / on S3, which is the standard bucket-exists check. If an exotic backend returns a false negative here, the error message is clear enough to debug. We could fall back to operator.list("/").next().await as an alternative probe, but for the standard S3-compatible targets (AWS, MinIO, RustFS, GCS via S3 compat, R2) stat("/") works reliably.
There was a problem hiding this comment.
Good point. stat("/") works for S3, MinIO, and RustFS (verified in E2E), but could produce false negatives on exotic backends that don't support HEAD on root. If that surfaces, switching to operator.list("/").next().await (or a zero-limit list) would be a more universal probe. For now, the current approach covers all backends we compile against (services-s3 only), and the error message is clear enough to diagnose if it trips on an unusual backend.
Critical: - resolve_credential now falls through to literal when env var is missing or empty, instead of hard-erroring (defense-in-depth works) Suggestions: - Warn at config time when chunk_size < 5 MiB (S3 minimum part size) - Continue S3 write when downstream output channel closes in passthrough mode (archive completeness over early exit) - Add #[serde(skip_serializing)] on access_key_id and secret_access_key to prevent credential leakage in API responses / debug dumps - Switch E2E test from execSync to execFileSync (no shell injection) - Add inline comment explaining stats_tracker.sent() in sink mode Nits: - Use let _ = guard.disarm() to acknowledge unused return value - Fix module doc: only services-s3 is compiled, not GCS/Azure - Fix sample pipeline accept type: audio/ogg (not audio/opus) Tests: - Split env fallback tests into with-literal and without-literal variants (15 tests total, all passing) Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Fixes clippy::items_after_statements lint by moving the constant out of the factory closure to the module-level constants section. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
- Pin docker-compose RustFS image to 1.0.0-alpha.90 (was :latest) - Harden chunk_size validation: reject values below 5 MiB at config time instead of warning (prevents runtime EntityTooSmall errors) - Update schemars annotation to reflect 5 MiB minimum - Add test for sub-5MiB chunk_size rejection (16 tests total) Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Fixes clippy::unreadable_literal on the 5242880 constant in the schemars attribute. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Summary
Follow-up to #273 (merged). Addresses pipeline loading errors, feature flag defaults, and all review findings.
Pipeline & feature fixes:
object_storein default features (no longer requires--features object_storeto build)output: type: nonefrom oneshot sample pipelines (was causing parse failures)ObjectStoreWriteNode— allows it to sit inline in linear oneshot pipelines (muxer → s3_writer(passthrough) → http_output)operator.stat("/")— catchesNoSuchBucket/ permission errors immediately instead of after streaming datastreamkit-outputbucketconvert-s3.spec.ts) verifying HTTP response + S3 upload via RustFSReview findings (from Devin Review):
resolve_credentialnow falls through to literal when env var is missing/empty (defense-in-depth works correctly)chunk_size< 5 MiB emits a warning at config time (S3 minimum multipart part size)#[serde(skip_serializing)]onaccess_key_idandsecret_access_keyto prevent credential leakageexecFileSyncinstead ofexecSync(no shell injection risk)stats_tracker.sent()in sink mode (matchesfile_write.rsconvention)let _ = guard.disarm()to acknowledge unused return valueservices-s3is compiledacceptfixed:audio/ogg(notaudio/opus)Review & Testing Checklist for Human
docker compose -f docker/docker-compose.rustfs.yml up -d), createstreamkit-outputbucket, run the transcode-to-S3 oneshot pipeline from the UI — verify it completes and the object appears in RustFSresolve_credentialfallback: setaccess_key_id_envto a nonexistent env var withaccess_key_idas literal — confirm the literal is used (previously would hard-error)Notes
stat("/")bucket check works for S3, MinIO, and RustFS. If exotic backends handle root-path stat differently, alist("/")withlimit(0)would be a more universal alternative (noted in review comment).chunk_sizebelow 5 MiB is warned but not rejected — this allows valid use cases where the entire upload is smaller than 5 MiB (only the final part).Link to Devin session: https://staging.itsdev.in/sessions/a27079cc9abc4b1a9cb26b6045442ef1
Requested by: @streamer45