Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
48fc374
feat(plugin-sdk): add catch_unwind panic guards to all FFI trampolines
streamkit-devin Apr 19, 2026
a2a6cef
fix(plugin-sdk): address PR review — panic messages, drop guard, cstr…
streamkit-devin Apr 20, 2026
39b2769
fix(plugin-sdk): address second round review feedback
streamkit-devin Apr 20, 2026
1d9490c
fix(plugin-sdk): log cleanup() panic instead of silently swallowing
streamkit-devin Apr 20, 2026
3ed8844
refactor(plugin-sdk): consolidate guards, add shim tests, polish
streamkit-devin Apr 20, 2026
fe3cbd1
refactor(plugin-sdk): parameterize guard_ptr label, soften guard comm…
streamkit-devin Apr 20, 2026
d93accd
feat(plugin-native): RAII call guard, call timeout, pointer provenanc…
streamkit-devin Apr 20, 2026
13e2a81
fix(plugin-native): address review — AtomicPtr, packet loss, timeout …
streamkit-devin Apr 20, 2026
ab97acc
fix(plugin-native): close TOCTOU race and address should-fix findings
streamkit-devin Apr 21, 2026
0bb7459
fix(plugin-native): address should-fix review findings (round 3)
streamkit-devin Apr 21, 2026
a8326fb
fix(plugin-native): fix rollback-stranding-destroy, harden on_upstrea…
streamkit-devin Apr 21, 2026
f124c9b
fix(plugin-native): SeqCst everywhere, rate-limit timeout warn, cleanup
streamkit-devin Apr 21, 2026
8da9209
perf(plugin-native): replace spawn_blocking with dedicated worker thread
streamkit-devin Apr 21, 2026
a06de01
fix(plugin-native): detach worker thread instead of joining on drop
streamkit-devin Apr 21, 2026
cac944a
fix(plugin-native): address PR review feedback on worker thread
streamkit-devin Apr 21, 2026
b7b4754
fix(plugin-native): address second review round for worker thread PR
streamkit-devin Apr 22, 2026
849645c
fix(plugin-native): address review round 3 findings
streamkit-devin Apr 22, 2026
ad88a1e
fix(plugin-native): address review round 4 findings
streamkit-devin Apr 22, 2026
d6078e7
fix(plugin-native): address review round 5 validation findings
streamkit-devin Apr 22, 2026
5c92838
fix(plugin-native): bound send-side blocking, early tick check, cleanup
streamkit-devin Apr 22, 2026
d5c377d
Merge branch 'main' into devin/1776623122-plugin-sdk-hardening
streamer45 Apr 22, 2026
e255f79
docs(plugin-native): clarify set_call_timeout controls reply-side tim…
streamkit-devin Apr 22, 2026
a408460
feat(plugin-sdk): bump to v9 — zero-copy binary packets + logger over…
streamkit-devin Apr 22, 2026
1d2ade1
fix(plugin-sdk): address v9 review findings
streamkit-devin Apr 22, 2026
06f957c
fix(plugin-sdk): address v9 review round 2 — buffer leak + FFI safety
streamkit-devin Apr 22, 2026
02a6f31
fix(plugin-sdk): address v9 review round 3 — callsite safety + Stacke…
streamkit-devin Apr 22, 2026
c52d147
ci: re-trigger CI (Skit/Test non-GPU failure, GPU variant passed)
streamkit-devin Apr 22, 2026
d77b167
docs(plugin-sdk): v9 ABI/wire/logger release notes + ergonomic guidance
streamkit-devin Apr 22, 2026
73fb375
fix(plugin-sdk): guard buffer_handle/free_fn reads behind CPacket::le…
streamkit-devin Apr 22, 2026
136cf9a
style: cargo fmt
streamkit-devin Apr 22, 2026
40c6b64
refactor(plugin-sdk): modernize native_plugin_entry! — PluginMetadata…
streamkit-devin Apr 22, 2026
943701e
style: cargo fmt metadata_storage.rs
streamkit-devin Apr 22, 2026
3786430
fix(plugin-sdk): correct allow comment on callback_available
streamkit-devin Apr 23, 2026
bd68bc9
feat(plugin-native): add host observability — spans, metrics, plugin …
streamkit-devin Apr 23, 2026
2c65466
style(plugin-native): apply rustfmt formatting
streamkit-devin Apr 23, 2026
ccc68bb
fix(plugin-native): address review feedback on host observability
streamkit-devin Apr 23, 2026
cb5e9ce
fix(plugin-native): address second round of review feedback
streamkit-devin Apr 23, 2026
dce8493
fix(plugin-native): address third round of review feedback
streamkit-devin Apr 23, 2026
4a2ec61
fix(plugin-native): address fourth round of review feedback
streamkit-devin Apr 23, 2026
d0a321d
docs(plugin-native): fix stale await_reply docstring
streamkit-devin Apr 23, 2026
9fd18a3
style(plugin-native): align event field names, clarify raw kind
streamkit-devin Apr 23, 2026
8dc43e9
style(plugin-native): apply rustfmt formatting
streamkit-devin Apr 23, 2026
deb6800
feat(plugin-sdk): add ResourceCache and redesign ResourceSupport trait
streamkit-devin Apr 23, 2026
40bbe50
fix(plugin-sdk): address ResourceCache review feedback
streamkit-devin Apr 23, 2026
829f772
refactor(plugin-sdk): add CacheError, init_races, drop ResourceSuppor…
streamkit-devin Apr 23, 2026
e5005e6
refactor(plugin-sdk): add is_poisoned, init_races test, structured er…
streamkit-devin Apr 23, 2026
c8af171
refactor(plugin-sdk): harden cache API docs, drop ParakeetNode manual…
streamkit-devin Apr 23, 2026
09da696
fix(plugin-sdk): complete v9 native callback wiring
streamkit-devin Apr 23, 2026
d4a5b18
fix(plugin-sdk): address native review follow-ups
streamkit-devin Apr 23, 2026
52019c4
fix(plugin-sdk): validate native review findings
streamkit-devin Apr 24, 2026
4a2d18c
style(plugin-sdk): format parakeet cache call
streamkit-devin Apr 24, 2026
6732939
fix(plugin-sdk): address ffi cleanup review findings
streamkit-devin Apr 24, 2026
e7e4332
Merge branch 'main' into devin/1776623122-plugin-sdk-hardening
streamer45 Apr 24, 2026
00abb34
fix(plugin-native): hot-path filter, reply backstop, raw-pointer alig…
streamkit-devin Apr 24, 2026
da41415
fix(plugin-sdk): guard_unit for free_binary_buffer_handle, doc cleanups
streamkit-devin Apr 24, 2026
f69b044
fix(plugin-sdk): safety docs, lifetime invariants, panic-destroy test
streamkit-devin Apr 24, 2026
d3735ea
docs(plugin-native): fix set_call_timeout doc — None uses backstop, n…
streamkit-devin Apr 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 34 additions & 1 deletion apps/skit/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-FileCopyrightText: © 2025 StreamKit Contributors
//
// SPDX-License-Identifier: MPL-2.0

use figment::{
providers::{Env, Format, Toml},
Figment,
Expand All @@ -17,6 +16,15 @@ const fn default_engine_batch_size() -> usize {
32
}

/// Deserialize `Option<u64>` with a minimum clamp of 1 for timeout values.
fn deserialize_clamp_timeout<'de, D>(deserializer: D) -> Result<Option<u64>, D::Error>
where
D: serde::Deserializer<'de>,
{
let val: Option<u64> = Option::deserialize(deserializer)?;
Ok(val.map(|v| v.max(1)))
}

/// Preset tuning profiles for the engine.
#[derive(Deserialize, Serialize, Debug, Clone, Copy, JsonSchema)]
#[serde(rename_all = "kebab-case")]
Expand Down Expand Up @@ -187,6 +195,10 @@ const fn default_max_body_size() -> usize {
100 * 1024 * 1024
}

const fn default_native_call_timeout_value() -> u64 {
300
}

fn default_cors_allowed_origins() -> Vec<String> {
vec![
// Portless localhost (e.g., reverse proxy on 80/443)
Expand Down Expand Up @@ -354,6 +366,17 @@ impl Default for ServerConfig {
#[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)]
pub struct PluginConfig {
pub directory: String,
/// Native plugin FFI call timeout in seconds (default: 300, minimum: 1).
///
/// Set to `null` to use only the default backstop timeout on the reply
/// side; the send-side backpressure guard remains bounded regardless.
///
/// Values below 1 are clamped to 1 to prevent instant timeouts.
#[serde(
default = "PluginConfig::default_native_call_timeout_secs",
deserialize_with = "deserialize_clamp_timeout"
)]
pub native_call_timeout_secs: Option<u64>,
#[serde(flatten, default)]
pub http_management: PluginHttpConfig,
#[serde(flatten, default)]
Expand Down Expand Up @@ -450,6 +473,7 @@ impl Default for PluginConfig {
fn default() -> Self {
Self {
directory: ".plugins".to_string(),
native_call_timeout_secs: Some(default_native_call_timeout_value()),
http_management: PluginHttpConfig::default(),
marketplace: PluginMarketplaceConfig::default(),
trusted_pubkeys: Vec::new(),
Expand All @@ -460,6 +484,15 @@ impl Default for PluginConfig {
}
}

impl PluginConfig {
// Serde default hooks must return the exact field type; the wrapped value
// distinguishes missing config from explicit null.
#[allow(clippy::unnecessary_wraps)]
const fn default_native_call_timeout_secs() -> Option<u64> {
Some(default_native_call_timeout_value())
}
}

const fn default_require_registry_origin() -> bool {
false
}
Expand Down
4 changes: 4 additions & 0 deletions apps/skit/src/marketplace_installer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2108,6 +2108,7 @@ mod tests {
plugin_dir.to_path_buf(),
wasm_dir,
native_dir,
Some(std::time::Duration::from_secs(300)),
)?;
Ok(Arc::new(tokio::sync::Mutex::new(manager)))
}
Expand Down Expand Up @@ -2201,6 +2202,7 @@ mod tests {

let config = PluginConfig {
directory: plugin_dir.to_string_lossy().to_string(),
native_call_timeout_secs: Some(300),
http_management: crate::config::PluginHttpConfig { allow_http_management: false },
marketplace: crate::config::PluginMarketplaceConfig {
marketplace_enabled: true,
Expand Down Expand Up @@ -2302,6 +2304,7 @@ mod tests {

let config = PluginConfig {
directory: plugin_dir.to_string_lossy().to_string(),
native_call_timeout_secs: Some(300),
http_management: crate::config::PluginHttpConfig { allow_http_management: false },
marketplace: crate::config::PluginMarketplaceConfig {
marketplace_enabled: true,
Expand Down Expand Up @@ -2369,6 +2372,7 @@ mod tests {

let config = PluginConfig {
directory: plugin_dir.to_string_lossy().to_string(),
native_call_timeout_secs: Some(300),
http_management: crate::config::PluginHttpConfig { allow_http_management: false },
marketplace: crate::config::PluginMarketplaceConfig {
marketplace_enabled: true,
Expand Down
6 changes: 5 additions & 1 deletion apps/skit/src/plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ pub struct UnifiedPluginManager {
plugin_base_dir: PathBuf,
wasm_directory: PathBuf,
native_directory: PathBuf,
native_call_timeout: Option<std::time::Duration>,
engine: Arc<Engine>,
#[allow(dead_code)] // Will be used when plugins are migrated to new resource system
resource_manager: Arc<streamkit_core::ResourceManager>,
Expand All @@ -165,6 +166,7 @@ impl UnifiedPluginManager {
plugin_base_dir: PathBuf,
wasm_directory: PathBuf,
native_directory: PathBuf,
native_call_timeout: Option<std::time::Duration>,
) -> Result<Self> {
if !wasm_directory.exists() {
std::fs::create_dir_all(&wasm_directory).with_context(|| {
Expand All @@ -188,6 +190,7 @@ impl UnifiedPluginManager {
plugin_base_dir,
wasm_directory,
native_directory,
native_call_timeout,
engine,
resource_manager,
plugins_loaded_gauge: meter
Expand Down Expand Up @@ -776,12 +779,13 @@ impl UnifiedPluginManager {
return Err(anyhow!("Native plugin file {} does not exist", path.to_string_lossy()));
}

let plugin = LoadedNativePlugin::load(path)
let mut plugin = LoadedNativePlugin::load(path)
.map_err(|e| {
tracing::error!(error = %e, path = ?path, "Detailed native plugin load error");
e
})
.with_context(|| format!("failed to load native plugin {}", path.to_string_lossy()))?;
plugin.set_call_timeout(self.native_call_timeout);

let metadata = plugin.metadata();
let original_kind = metadata.kind.clone();
Expand Down
1 change: 1 addition & 0 deletions apps/skit/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4061,6 +4061,7 @@ pub fn create_app(
plugin_base_dir,
wasm_plugin_dir,
native_plugin_dir,
config.plugins.native_call_timeout_secs.map(std::time::Duration::from_secs),
)
.expect("Failed to initialize unified plugin manager");
let plugin_manager = Arc::new(tokio::sync::Mutex::new(plugin_manager));
Expand Down
1 change: 1 addition & 0 deletions crates/plugin-native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ streamkit-plugin-sdk-native = { workspace = true }

libloading = "0.9"
anyhow = "1.0"
opentelemetry = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true, features = ["sync", "macros", "rt", "time"] }
async-trait = { workspace = true }
Expand Down
68 changes: 60 additions & 8 deletions crates/plugin-native/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@
//! This crate provides the host-side runtime for loading and executing native plugins
//! that use the C ABI interface.

pub mod metrics;
pub mod wrapper;

use anyhow::{anyhow, Context, Result};
use libloading::{Library, Symbol};
use std::path::Path;
use std::sync::Arc;
use streamkit_core::{NodeRegistry, PinCardinality};
use streamkit_plugin_sdk_native::types::{CNativePluginAPI, NATIVE_PLUGIN_API_VERSION};
use streamkit_plugin_sdk_native::types::PLUGIN_SET_LOG_ENABLED_SYMBOL;
use streamkit_plugin_sdk_native::types::{
CNativePluginAPI, CSetLogEnabledCallback, NATIVE_PLUGIN_API_VERSION,
};
use streamkit_plugin_sdk_native::{conversions, types::PLUGIN_API_SYMBOL};
use tracing::{info, warn};

Expand All @@ -35,6 +39,8 @@ pub struct LoadedNativePlugin {
library: Arc<Library>,
api: &'static CNativePluginAPI,
metadata: PluginMetadata,
call_timeout: Option<std::time::Duration>,
set_log_enabled_callback: Option<CSetLogEnabledCallback>,
}

/// Metadata extracted from a plugin
Expand Down Expand Up @@ -106,12 +112,16 @@ impl LoadedNativePlugin {
// the lifetime of the loaded library, which we keep alive via Arc<Library>.
let api = unsafe { &*api_ptr };

// Check API version compatibility — accept v6 (pre-BinaryWithMeta),
// v7 (added BinaryWithMeta), and v8 (added EncodedAudio metadata).
// All are wire-compatible; v6 plugins never emit BinaryWithMeta
// packets, v7 plugins don't declare EncodedAudio pin types (they
// fall back to Binary), but runtime packet transport is unaffected
// since EncodedAudio is a metadata-only discriminant.
// Check API version compatibility — accept v6 through v9.
// v6: pre-BinaryWithMeta.
// v7: added BinaryWithMeta.
// v8: added EncodedAudio metadata.
// v9: zero-copy binary packets (buffer_handle), logger overhaul
// (set_log_enabled_callback, target = plugin kind).
// v6–v8 are wire-compatible; v9 adds a Binary→BinaryWithMeta wire
// upgrade (see v9 notes in types.rs). Version-gated features use
// runtime api_version checks (e.g. buffer_handle for v9, downgrade
// for v6).
if api.version < MIN_SUPPORTED_API_VERSION || api.version > NATIVE_PLUGIN_API_VERSION {
let plugin_version = api.version;
return Err(anyhow!(
Expand All @@ -123,6 +133,25 @@ impl LoadedNativePlugin {
// Extract metadata
let mut metadata = Self::extract_metadata(api)?;

let set_log_enabled_callback = if api.version >= 9 {
// SAFETY: Optional extension symbol. When present, the SDK macro
// exports it with the exact `CSetLogEnabledCallback` signature.
match unsafe { library.get::<CSetLogEnabledCallback>(PLUGIN_SET_LOG_ENABLED_SYMBOL) } {
Ok(symbol) => Some(*symbol),
Err(e) => {
warn!(
kind = %metadata.kind,
api_version = api.version,
error = %e,
"v9 plugin did not export log-enabled callback symbol"
);
None
},
}
} else {
None
};

// Detect source plugin capability from the v3 API fields.
// If the plugin provides `get_source_config`, we probe it with a temporary
// instance to read tick parameters. If instance creation fails we fall back
Expand Down Expand Up @@ -160,7 +189,13 @@ impl LoadedNativePlugin {

info!(kind = %metadata.kind, "Successfully loaded native plugin");

Ok(Self { library: Arc::new(library), api, metadata })
Ok(Self {
library: Arc::new(library),
api,
metadata,
call_timeout: Some(wrapper::DEFAULT_CALL_TIMEOUT),
set_log_enabled_callback,
})
}

/// Extract metadata from the plugin
Expand Down Expand Up @@ -300,6 +335,21 @@ impl LoadedNativePlugin {
&self.library
}

/// Override the reply-side timeout for FFI calls (process_packet, flush,
/// tick). This controls how long the async side waits for the worker
/// thread's oneshot reply.
///
/// Pass `None` to fall back to the default backstop timeout
/// ([`DEFAULT_CALL_TIMEOUT`](crate::wrapper::DEFAULT_CALL_TIMEOUT),
/// 300 s) instead of a caller-chosen duration. The reply side is
/// **never** truly unbounded — the backstop always applies.
///
/// The channel-send timeout (backpressure guard) also uses
/// `DEFAULT_CALL_TIMEOUT` when this is `None`.
pub const fn set_call_timeout(&mut self, timeout: Option<std::time::Duration>) {
self.call_timeout = timeout;
}
Comment thread
staging-devin-ai-integration[bot] marked this conversation as resolved.

/// Create a new node instance from this plugin
///
/// # Errors
Expand All @@ -316,6 +366,8 @@ impl LoadedNativePlugin {
self.api,
self.metadata.clone(),
params,
self.call_timeout,
self.set_log_enabled_callback,
)?;

Ok(Box::new(wrapper))
Expand Down
Loading
Loading