diff --git a/Cargo.lock b/Cargo.lock index f5d422ebb..e34680f1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9106,6 +9106,7 @@ dependencies = [ "toml 0.8.23", "tracing", "tracing-subscriber", + "ulid", "uuid", ] diff --git a/crates/terraphim_orchestrator/Cargo.toml b/crates/terraphim_orchestrator/Cargo.toml index 915f138f8..adf4eafc7 100644 --- a/crates/terraphim_orchestrator/Cargo.toml +++ b/crates/terraphim_orchestrator/Cargo.toml @@ -63,6 +63,7 @@ terraphim_persistence = { path = "../terraphim_persistence", version = "1.4.10", # Quickwit integration (optional) reqwest = { workspace = true, optional = true } +ulid = "1.2.1" [dev-dependencies] tokio-test = "0.4" diff --git a/crates/terraphim_orchestrator/orchestrator.example.toml b/crates/terraphim_orchestrator/orchestrator.example.toml index a1de6d822..f5b52e60d 100644 --- a/crates/terraphim_orchestrator/orchestrator.example.toml +++ b/crates/terraphim_orchestrator/orchestrator.example.toml @@ -22,6 +22,21 @@ max_duration_secs = 1800 repo_path = "/opt/ai-dark-factory/workspace" create_prs = false +# ============================================================================= +# ROUTING (telemetry-aware model selection) +# ============================================================================= +# Uncomment to enable KG-driven and telemetry-aware model routing. +# Requires taxonomy markdown files in the taxonomy_path directory. +# When use_routing_engine is true, the control-plane routing engine +# combines KG routing, keyword routing, provider health, budget pressure, +# and live telemetry signals for model selection. +# +# [routing] +# taxonomy_path = "/opt/ai-dark-factory/kg/routing" +# probe_ttl_secs = 300 +# probe_on_startup = true +# use_routing_engine = false + # ============================================================================= # SAFETY LAYER (always running) # ============================================================================= diff --git a/crates/terraphim_orchestrator/src/config.rs b/crates/terraphim_orchestrator/src/config.rs index 5c7778906..fbb9aa242 100644 --- a/crates/terraphim_orchestrator/src/config.rs +++ b/crates/terraphim_orchestrator/src/config.rs @@ -111,7 +111,16 @@ pub struct RoutingConfig { /// Run provider probes on startup (default: true). #[serde(default = "default_true_routing")] pub probe_on_startup: bool, - /// Use RoutingDecisionEngine instead of inline model selection (default: false). + /// Use RoutingDecisionEngine instead of inline model selection. + /// + /// When enabled, `spawn_agent()` delegates model selection to the + /// control-plane routing engine which combines KG routing, keyword + /// routing, provider health, budget pressure, and live telemetry + /// signals (throughput, latency, subscription limits). + /// + /// Telemetry data is persisted across restarts and restored on startup. + /// + /// Default: `false` (uses inline model selection logic). #[serde(default)] pub use_routing_engine: bool, } diff --git a/crates/terraphim_orchestrator/src/control_plane/routing.rs b/crates/terraphim_orchestrator/src/control_plane/routing.rs index ee9ce4bf8..3fbc34dd2 100644 --- a/crates/terraphim_orchestrator/src/control_plane/routing.rs +++ b/crates/terraphim_orchestrator/src/control_plane/routing.rs @@ -7,7 +7,7 @@ use crate::control_plane::telemetry::TelemetryStore; use crate::cost_tracker::BudgetVerdict; -use crate::{kg_router::KgRouter, provider_probe::ProviderHealthMap}; +use crate::kg_router::KgRouter; use std::path::PathBuf; use std::sync::Arc; use terraphim_types::capability::{CostLevel, Latency, Provider, ProviderType}; @@ -126,7 +126,8 @@ struct CollectedCandidates { pub struct RoutingDecisionEngine { kg_router: Option>, - provider_health: Arc, + /// Snapshot of unhealthy provider names at construction time. + unhealthy_providers: Vec, router: terraphim_router::Router, telemetry_store: Option>, } @@ -134,13 +135,13 @@ pub struct RoutingDecisionEngine { impl RoutingDecisionEngine { pub fn new( kg_router: Option>, - provider_health: Arc, + unhealthy_providers: Vec, router: terraphim_router::Router, telemetry_store: Option>, ) -> Self { Self { kg_router, - provider_health, + unhealthy_providers, router, telemetry_store, } @@ -172,7 +173,7 @@ impl RoutingDecisionEngine { None => return Vec::new(), }; - let unhealthy = self.provider_health.unhealthy_providers(); + let unhealthy = &self.unhealthy_providers; let mut candidates = Vec::new(); for route in &decision.fallback_routes { @@ -262,7 +263,7 @@ impl RoutingDecisionEngine { base * (1.0 - penalty) } - pub fn decide_route( + pub async fn decide_route( &self, ctx: &DispatchContext, budget_verdict: &BudgetVerdict, @@ -357,16 +358,10 @@ impl RoutingDecisionEngine { // Apply telemetry-based scoring adjustments let mut telemetry_influenced = false; if let Some(ref store) = self.telemetry_store { - let performances: Vec = - tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on(async { - let mut perfs = Vec::new(); - for candidate in &all_candidates { - perfs.push(store.model_performance(&candidate.model).await); - } - perfs - }) - }); + let mut performances = Vec::with_capacity(all_candidates.len()); + for candidate in &all_candidates { + performances.push(store.model_performance(&candidate.model).await); + } for (i, perf) in performances.iter().enumerate() { if perf.is_subscription_limited() { @@ -492,14 +487,7 @@ mod tests { } fn test_engine() -> RoutingDecisionEngine { - RoutingDecisionEngine::new( - None, - Arc::new(crate::provider_probe::ProviderHealthMap::new( - std::time::Duration::from_secs(300), - )), - terraphim_router::Router::new(), - None, - ) + RoutingDecisionEngine::new(None, Vec::new(), terraphim_router::Router::new(), None) } fn test_engine_with_spent( @@ -510,22 +498,16 @@ mod tests { let mut ct = CostTracker::new(); ct.register(agent_name, budget_cents); ct.record_cost(agent_name, spend_usd); - let engine = RoutingDecisionEngine::new( - None, - Arc::new(crate::provider_probe::ProviderHealthMap::new( - std::time::Duration::from_secs(300), - )), - terraphim_router::Router::new(), - None, - ); + let engine = + RoutingDecisionEngine::new(None, Vec::new(), terraphim_router::Router::new(), None); (engine, ct) } - #[test] - fn test_cli_default_for_unsupported_tool() { + #[tokio::test] + async fn test_cli_default_for_unsupported_tool() { let engine = test_engine(); let ctx = create_test_context_with_cli("test-agent", "Implement a feature", "codex"); - let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped); + let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped).await; assert_eq!(decision.candidate.source, RouteSource::CliDefault); assert!(decision.candidate.model.is_empty()); @@ -535,15 +517,15 @@ mod tests { assert!(!decision.budget_influenced); } - #[test] - fn test_static_model_selected_when_only_signal() { + #[tokio::test] + async fn test_static_model_selected_when_only_signal() { let engine = test_engine(); let ctx = create_test_context_with_static_model( "test-agent", "Implement a feature", "claude-3-opus", ); - let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped); + let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped).await; assert_eq!(decision.candidate.source, RouteSource::StaticConfig); assert_eq!(decision.candidate.model, "claude-3-opus"); @@ -551,8 +533,8 @@ mod tests { assert_eq!(decision.dominant_signal, RouteSource::StaticConfig); } - #[test] - fn test_unsupported_cli_ignores_static_model() { + #[tokio::test] + async fn test_unsupported_cli_ignores_static_model() { let engine = test_engine(); let ctx = DispatchContext { agent_name: "test-agent".to_string(), @@ -562,49 +544,49 @@ mod tests { layer: crate::config::AgentLayer::Core, session_id: None, }; - let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped); + let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped).await; assert_eq!(decision.candidate.source, RouteSource::CliDefault); assert_eq!(decision.dominant_signal, RouteSource::CliDefault); } - #[test] - fn test_opencode_gets_static_model() { + #[tokio::test] + async fn test_opencode_gets_static_model() { let engine = test_engine(); let ctx = create_test_context_with_static_model( "test-agent", "Implement a feature", "kimi-for-coding/k2p5", ); - let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped); + let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped).await; assert_eq!(decision.candidate.source, RouteSource::StaticConfig); assert_eq!(decision.candidate.model, "kimi-for-coding/k2p5"); } - #[test] - fn test_cli_default_when_no_signals_match() { + #[tokio::test] + async fn test_cli_default_when_no_signals_match() { let engine = test_engine(); let ctx = create_test_context_with_cli("test-agent", "do something", "opencode"); - let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped); + let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped).await; assert_eq!(decision.candidate.source, RouteSource::CliDefault); assert!(decision.rationale.contains("No routing signal matched")); assert_eq!(decision.dominant_signal, RouteSource::CliDefault); } - #[test] - fn test_rationale_records_dominant_signal() { + #[tokio::test] + async fn test_rationale_records_dominant_signal() { let engine = test_engine(); let ctx = create_test_context_with_static_model("agent", "task", "model-x"); - let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped); + let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped).await; assert!(decision.rationale.contains("static config")); assert!(decision.rationale.contains("Selected model-x")); } - #[test] - fn test_all_candidates_collected_from_multiple_sources() { + #[tokio::test] + async fn test_all_candidates_collected_from_multiple_sources() { let engine = test_engine(); let ctx = DispatchContext { agent_name: "test-agent".to_string(), @@ -614,7 +596,7 @@ mod tests { layer: crate::config::AgentLayer::Core, session_id: None, }; - let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped); + let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped).await; assert!(!decision.all_candidates.is_empty()); assert!(decision @@ -623,8 +605,8 @@ mod tests { .any(|c| c.source == RouteSource::StaticConfig)); } - #[test] - fn test_combined_kg_keyword_when_models_agree() { + #[tokio::test] + async fn test_combined_kg_keyword_when_models_agree() { use std::fs; use tempfile::tempdir; @@ -638,15 +620,13 @@ mod tests { let kg_router = Arc::new(crate::kg_router::KgRouter::load(dir.path()).unwrap()); let engine = RoutingDecisionEngine::new( Some(kg_router), - Arc::new(crate::provider_probe::ProviderHealthMap::new( - std::time::Duration::from_secs(300), - )), + Vec::new(), terraphim_router::Router::new(), None, ); let ctx = create_test_context_with_cli("agent", "implement feature", "opencode"); - let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped); + let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped).await; assert!( decision.candidate.source == RouteSource::KnowledgeGraph @@ -658,8 +638,8 @@ mod tests { assert!(decision.primary_available); } - #[test] - fn test_kg_only_no_keyword_match() { + #[tokio::test] + async fn test_kg_only_no_keyword_match() { use std::fs; use tempfile::tempdir; @@ -673,26 +653,24 @@ mod tests { let kg_router = Arc::new(crate::kg_router::KgRouter::load(dir.path()).unwrap()); let engine = RoutingDecisionEngine::new( Some(kg_router), - Arc::new(crate::provider_probe::ProviderHealthMap::new( - std::time::Duration::from_secs(300), - )), + Vec::new(), terraphim_router::Router::new(), None, ); let ctx = create_test_context_with_cli("agent", "security audit the codebase", "opencode"); - let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped); + let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped).await; assert_eq!(decision.candidate.source, RouteSource::KnowledgeGraph); assert!(decision.candidate.model.contains("opus")); assert_eq!(decision.dominant_signal, RouteSource::KnowledgeGraph); } - #[test] - fn test_keyword_only_no_kg_match() { + #[tokio::test] + async fn test_keyword_only_no_kg_match() { let engine = test_engine(); let ctx = create_test_context_with_cli("agent", "implement a feature", "opencode"); - let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped); + let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped).await; assert!( decision.candidate.source == RouteSource::KeywordRouting @@ -700,8 +678,8 @@ mod tests { ); } - #[test] - fn test_dispatch_context_session_id() { + #[tokio::test] + async fn test_dispatch_context_session_id() { let ctx = DispatchContext { agent_name: "test-agent".to_string(), task: "Do something".to_string(), @@ -713,8 +691,8 @@ mod tests { assert_eq!(ctx.session_id, Some("sess-123".to_string())); } - #[test] - fn test_route_source_display() { + #[tokio::test] + async fn test_route_source_display() { assert_eq!(RouteSource::KnowledgeGraph.to_string(), "KG"); assert_eq!(RouteSource::KeywordRouting.to_string(), "keyword"); assert_eq!(RouteSource::StaticConfig.to_string(), "static"); @@ -722,8 +700,8 @@ mod tests { assert_eq!(RouteSource::CliDefault.to_string(), "CLI default"); } - #[test] - fn test_make_agent_provider() { + #[tokio::test] + async fn test_make_agent_provider() { let provider = make_agent_provider("my-agent", "opencode"); assert!(provider.id.contains("my-agent")); if let ProviderType::Agent { @@ -739,36 +717,36 @@ mod tests { } } - #[test] - fn test_budget_pressure_no_pressure_for_uncapped() { + #[tokio::test] + async fn test_budget_pressure_no_pressure_for_uncapped() { let engine = test_engine(); let ctx = create_test_context_with_static_model("test-agent", "task", "model-x"); - let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped); + let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped).await; assert_eq!(decision.budget_pressure, BudgetPressure::NoPressure); assert!(!decision.budget_influenced); } - #[test] - fn test_budget_pressure_near_exhaustion_detected() { + #[tokio::test] + async fn test_budget_pressure_near_exhaustion_detected() { let (engine, ct) = test_engine_with_spent("test-agent", Some(10000), 85.0); let ctx = create_test_context_with_static_model("test-agent", "task", "model-x"); - let decision = engine.decide_route(&ctx, &ct.check("test-agent")); + let decision = engine.decide_route(&ctx, &ct.check("test-agent")).await; assert_eq!(decision.budget_pressure, BudgetPressure::NearExhaustion); } - #[test] - fn test_budget_pressure_exhausted_detected() { + #[tokio::test] + async fn test_budget_pressure_exhausted_detected() { let (engine, ct) = test_engine_with_spent("test-agent", Some(10000), 100.0); let ctx = create_test_context_with_static_model("test-agent", "task", "model-x"); - let decision = engine.decide_route(&ctx, &ct.check("test-agent")); + let decision = engine.decide_route(&ctx, &ct.check("test-agent")).await; assert_eq!(decision.budget_pressure, BudgetPressure::Exhausted); } - #[test] - fn test_budget_pressure_penalty_calculation() { + #[tokio::test] + async fn test_budget_pressure_penalty_calculation() { let no_pressure = BudgetPressure::NoPressure; assert_eq!(no_pressure.cost_penalty(&CostLevel::Cheap), 0.0); assert_eq!(no_pressure.cost_penalty(&CostLevel::Moderate), 0.0); @@ -785,17 +763,17 @@ mod tests { assert!((exhausted.cost_penalty(&CostLevel::Expensive) - 0.70).abs() < 0.001); } - #[test] - fn test_budget_influences_rationale_when_pressure() { + #[tokio::test] + async fn test_budget_influences_rationale_when_pressure() { let (engine, ct) = test_engine_with_spent("test-agent", Some(10000), 85.0); let ctx = create_test_context_with_static_model("test-agent", "task", "model-x"); - let decision = engine.decide_route(&ctx, &ct.check("test-agent")); + let decision = engine.decide_route(&ctx, &ct.check("test-agent")).await; assert_eq!(decision.budget_pressure, BudgetPressure::NearExhaustion); } - #[test] - fn test_budget_verdict_conversion() { + #[tokio::test] + async fn test_budget_verdict_conversion() { assert_eq!( BudgetPressure::from_verdict(&BudgetVerdict::Uncapped), BudgetPressure::NoPressure @@ -820,8 +798,8 @@ mod tests { ); } - #[test] - fn test_score_candidate_with_budget_pressure() { + #[tokio::test] + async fn test_score_candidate_with_budget_pressure() { let candidate = RouteCandidate { provider: Provider { id: "test".to_string(), @@ -852,4 +830,88 @@ mod tests { assert!(score_no_pressure > score_near); assert!(score_near > score_exhausted); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_telemetry_penalises_subscription_limited_model() { + use crate::control_plane::telemetry::{CompletionEvent, TelemetryStore, TokenBreakdown}; + + let store = TelemetryStore::new(3600); + store + .record(CompletionEvent { + model: "limited-model".to_string(), + session_id: "test".to_string(), + completed_at: chrono::Utc::now(), + latency_ms: 0, + success: false, + tokens: TokenBreakdown::default(), + cost_usd: 0.0, + error: Some("weekly session limit reached".to_string()), + }) + .await; + + let engine = RoutingDecisionEngine::new( + None, + Vec::new(), + terraphim_router::Router::new(), + Some(Arc::new(store)), + ); + + let ctx = create_test_context_with_static_model("agent", "task", "limited-model"); + let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped).await; + + assert!( + decision.telemetry_influenced, + "telemetry should influence when subscription limited" + ); + assert!( + decision.rationale.contains("Telemetry"), + "rationale should mention telemetry" + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_telemetry_boosts_high_success_model() { + use crate::control_plane::telemetry::{CompletionEvent, TelemetryStore, TokenBreakdown}; + + let store = TelemetryStore::new(3600); + // Record 10 successful completions with good latency + for _ in 0..10 { + store + .record(CompletionEvent { + model: "fast-model".to_string(), + session_id: "test".to_string(), + completed_at: chrono::Utc::now(), + latency_ms: 200, + success: true, + tokens: TokenBreakdown { + total: 500, + input: 400, + output: 100, + ..Default::default() + }, + cost_usd: 0.005, + error: None, + }) + .await; + } + + let engine = RoutingDecisionEngine::new( + None, + Vec::new(), + terraphim_router::Router::new(), + Some(Arc::new(store)), + ); + + let ctx = create_test_context_with_static_model("agent", "implement feature", "fast-model"); + let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped).await; + + assert!( + decision.telemetry_influenced, + "telemetry should influence with high success rate" + ); + assert!( + decision.rationale.contains("Telemetry"), + "rationale should mention telemetry" + ); + } } diff --git a/crates/terraphim_orchestrator/src/control_plane/telemetry.rs b/crates/terraphim_orchestrator/src/control_plane/telemetry.rs index 33a753639..02c6e28c3 100644 --- a/crates/terraphim_orchestrator/src/control_plane/telemetry.rs +++ b/crates/terraphim_orchestrator/src/control_plane/telemetry.rs @@ -155,6 +155,80 @@ pub struct TelemetrySummary { pub exported_at: DateTime, } +/// Compute a [`ModelPerformanceSnapshot`] from raw inner state without acquiring +/// any locks. Both `model_performance` and `all_model_performances` delegate to +/// this helper so the snapshot logic lives in exactly one place. +fn compute_snapshot( + inner: &TelemetryStoreInner, + model: &str, + now: DateTime, +) -> ModelPerformanceSnapshot { + let events = inner.events.get(model); + + let (successful, failed, avg_latency) = match events { + None => (0u64, 0u64, 0.0), + Some(evts) => { + let mut success_count = 0u64; + let mut fail_count = 0u64; + let mut latency_sum = 0.0f64; + let mut latency_count = 0u64; + + for e in evts { + if e.success { + success_count += 1; + latency_sum += e.latency_ms as f64; + latency_count += 1; + } else { + fail_count += 1; + } + } + + let avg = if latency_count > 0 { + latency_sum / latency_count as f64 + } else { + 0.0 + }; + + (success_count, fail_count, avg) + } + }; + + let total = successful + failed; + let success_rate = if total > 0 { + successful as f64 / total as f64 + } else { + 0.0 + }; + let throughput = if inner.window_secs > 0 { + successful as f64 / inner.window_secs as f64 + } else { + 0.0 + }; + + let last_event_at = events.and_then(|evts| evts.last().map(|e| e.completed_at)); + + let subscription_limit_reached = inner + .subscription_limits + .get(model) + .map(|expires| now < *expires) + .unwrap_or(false); + + let subscription_limit_expires_at = inner.subscription_limits.get(model).copied(); + + ModelPerformanceSnapshot { + model: model.to_string(), + successful_completions: successful, + failed_completions: failed, + window_secs: inner.window_secs, + throughput, + avg_latency_ms: avg_latency, + success_rate, + last_event_at, + subscription_limit_reached, + subscription_limit_expires_at, + } +} + /// In-memory telemetry store backed by terraphim_persistence. /// /// Stores rolling completion events and exposes performance/usage snapshots @@ -226,70 +300,7 @@ impl TelemetryStore { /// Get performance snapshot for a specific model. pub async fn model_performance(&self, model: &str) -> ModelPerformanceSnapshot { let inner = self.inner.read().await; - let events = inner.events.get(model); - - let (successful, failed, avg_latency) = match events { - None => (0u64, 0u64, 0.0), - Some(evts) => { - let mut success_count = 0u64; - let mut fail_count = 0u64; - let mut latency_sum = 0.0f64; - let mut latency_count = 0u64; - - for e in evts { - if e.success { - success_count += 1; - latency_sum += e.latency_ms as f64; - latency_count += 1; - } else { - fail_count += 1; - } - } - - let avg = if latency_count > 0 { - latency_sum / latency_count as f64 - } else { - 0.0 - }; - - (success_count, fail_count, avg) - } - }; - - let total = successful + failed; - let success_rate = if total > 0 { - successful as f64 / total as f64 - } else { - 0.0 - }; - let throughput = if inner.window_secs > 0 { - successful as f64 / inner.window_secs as f64 - } else { - 0.0 - }; - - let last_event_at = events.and_then(|evts| evts.last().map(|e| e.completed_at)); - - let subscription_limit_reached = inner - .subscription_limits - .get(model) - .map(|expires| Utc::now() < *expires) - .unwrap_or(false); - - let subscription_limit_expires_at = inner.subscription_limits.get(model).copied(); - - ModelPerformanceSnapshot { - model: model.to_string(), - successful_completions: successful, - failed_completions: failed, - window_secs: inner.window_secs, - throughput, - avg_latency_ms: avg_latency, - success_rate, - last_event_at, - subscription_limit_reached, - subscription_limit_expires_at, - } + compute_snapshot(&inner, model, Utc::now()) } /// Get usage snapshot for a specific session. @@ -332,13 +343,47 @@ impl TelemetryStore { } /// Get performance snapshots for all known models. + /// + /// Acquires the read lock exactly once and computes all snapshots in a + /// single pass, avoiding the N+1 lock pattern of the previous implementation. pub async fn all_model_performances(&self) -> Vec { - let models = self.known_models().await; - let mut snapshots = Vec::with_capacity(models.len()); - for model in &models { - snapshots.push(self.model_performance(model).await); + let inner = self.inner.read().await; + let now = Utc::now(); + inner + .events + .keys() + .map(|m| compute_snapshot(&inner, m, now)) + .collect() + } + + /// Record multiple completion events in a single write-lock acquisition. + /// + /// Prefer this over calling `record` in a loop when multiple events are + /// available at once, to reduce lock contention. + pub async fn record_batch(&self, events: Vec) { + if events.is_empty() { + return; + } + let mut inner = self.inner.write().await; + let now = Utc::now(); + let cutoff = now - chrono::Duration::seconds(inner.window_secs as i64); + for event in events { + if let Some(ref error) = event.error { + if is_subscription_limit_error(error) { + let expires = + now + chrono::Duration::seconds(inner.subscription_limit_ttl_secs as i64); + inner + .subscription_limits + .insert(event.model.clone(), expires); + } + } + let bucket = inner.events.entry(event.model.clone()).or_default(); + bucket.push(event); + } + // Prune all buckets once after inserting all events. + for bucket in inner.events.values_mut() { + bucket.retain(|e| e.completed_at > cutoff); } - snapshots } /// Export a serialisable summary of current telemetry state. diff --git a/crates/terraphim_orchestrator/src/lib.rs b/crates/terraphim_orchestrator/src/lib.rs index 4d159730c..7bc13c647 100644 --- a/crates/terraphim_orchestrator/src/lib.rs +++ b/crates/terraphim_orchestrator/src/lib.rs @@ -143,7 +143,7 @@ struct ManagedAgent { worktree_path: Option, /// KG-routed model selected at spawn time (None = CLI default). Used for logging. routed_model: Option, - /// Session ID for telemetry tracking (format: "{agent_name}-{uuid}"). + /// Session ID for telemetry tracking (format: "{agent_name}-{ulid}"). session_id: String, } @@ -522,6 +522,9 @@ impl AgentOrchestrator { } } + // Restore persisted telemetry from previous runs + self.restore_telemetry().await; + // Spawn Safety-layer agents immediately let immediate = self.scheduler.immediate_agents(); for agent_def in &immediate { @@ -620,7 +623,7 @@ impl AgentOrchestrator { } // Graceful shutdown of all agents - self.persist_telemetry().await; + self.persist_telemetry(); self.shutdown_all_agents().await; Ok(()) } @@ -970,13 +973,11 @@ impl AgentOrchestrator { .kg_router .as_ref() .map(|r| std::sync::Arc::new(r.clone())); - let provider_health_arc = std::sync::Arc::new(provider_probe::ProviderHealthMap::new( - std::time::Duration::from_secs(300), - )); + let unhealthy = self.provider_health.unhealthy_providers(); let telemetry_arc = std::sync::Arc::new(self.telemetry_store.clone()); let engine = control_plane::RoutingDecisionEngine::new( kg_arc, - provider_health_arc, + unhealthy, terraphim_router::Router::new(), Some(telemetry_arc), ); @@ -989,7 +990,7 @@ impl AgentOrchestrator { session_id: None, }; let budget_verdict = self.cost_tracker.check(&def.name); - let decision = engine.decide_route(&ctx, &budget_verdict); + let decision = engine.decide_route(&ctx, &budget_verdict).await; info!( agent = %def.name, rationale = %decision.rationale, @@ -1259,7 +1260,7 @@ impl AgentOrchestrator { spawned_by_mention: false, worktree_path, routed_model: model.clone(), - session_id: format!("{}-{}", def.name, uuid::Uuid::new_v4()), + session_id: format!("{}-{}", def.name, ulid::Ulid::new()), }, ); @@ -2571,7 +2572,7 @@ impl AgentOrchestrator { // 15. Periodic telemetry persistence (every 60 ticks = ~5 min at 5s interval) if self.tick_count % 60 == 0 { - self.persist_telemetry().await; + self.persist_telemetry(); } } @@ -2754,45 +2755,22 @@ impl AgentOrchestrator { crate::OutputEvent::Stdout { line, .. } => { stdout_lines.push(line.clone()); output_lines.push(line.clone()); - let parsed = match cli_tool.as_str() { - "opencode" => control_plane::output_parser::parse_opencode_line( - line, - &session_id, - &model, - None, - ), - "claude" => control_plane::output_parser::parse_claude_line( - line, - &session_id, - &model, - ), - _ => control_plane::output_parser::ParsedOutput::Ignored, - }; - if let control_plane::output_parser::ParsedOutput::Completion(ce) = - parsed - { + if let Some(ce) = Self::parse_stdout_for_telemetry( + &cli_tool, + line, + &session_id, + &model, + ) { exit_telemetry.push((name.clone(), ce)); } } crate::OutputEvent::Stderr { line, .. } => { stderr_lines.push(line.clone()); output_lines.push(format!("[stderr] {}", line)); - if let Some(limit_model) = - control_plane::output_parser::parse_stderr_for_limit_errors(line) + if let Some(ce) = + Self::parse_stderr_for_telemetry(line, &session_id, &model) { - exit_telemetry.push(( - name.clone(), - control_plane::telemetry::CompletionEvent { - model: limit_model, - session_id: session_id.clone(), - completed_at: chrono::Utc::now(), - latency_ms: 0, - success: false, - tokens: control_plane::telemetry::TokenBreakdown::default(), - cost_usd: 0.0, - error: Some(line.clone()), - }, - )); + exit_telemetry.push((name.clone(), ce)); } } _ => {} @@ -3149,48 +3127,28 @@ impl AgentOrchestrator { }) .unwrap_or_default(); - let parsed = match cli_tool.as_str() { - "opencode" => control_plane::output_parser::parse_opencode_line( - line, - &session_id, - &model, - None, - ), - "claude" => control_plane::output_parser::parse_claude_line( - line, - &session_id, - &model, - ), - _ => control_plane::output_parser::ParsedOutput::Ignored, - }; - - if let control_plane::output_parser::ParsedOutput::Completion(ce) = parsed { + if let Some(ce) = + Self::parse_stdout_for_telemetry(&cli_tool, line, &session_id, &model) + { completion_events.push((name.clone(), ce)); } } OutputEvent::Stderr { line, .. } => { - if let Some(limit_model) = - control_plane::output_parser::parse_stderr_for_limit_errors(line) - { - let session_id = self - .active_agents - .get(name) - .map(|m| m.session_id.clone()) - .unwrap_or_default(); - - completion_events.push(( - name.clone(), - control_plane::telemetry::CompletionEvent { - model: limit_model, - session_id, - completed_at: chrono::Utc::now(), - latency_ms: 0, - success: false, - tokens: control_plane::telemetry::TokenBreakdown::default(), - cost_usd: 0.0, - error: Some(line.clone()), - }, - )); + let (session_id, model) = self + .active_agents + .get(name) + .map(|m| { + ( + m.session_id.clone(), + m.routed_model + .clone() + .or_else(|| m.definition.model.clone()) + .unwrap_or_default(), + ) + }) + .unwrap_or_default(); + if let Some(ce) = Self::parse_stderr_for_telemetry(line, &session_id, &model) { + completion_events.push((name.clone(), ce)); } } _ => {} @@ -3231,30 +3189,106 @@ impl AgentOrchestrator { } /// Record parsed telemetry events into the telemetry store and cost tracker. + /// + /// Cost accounting is performed per-agent before the batch write so that + /// agent-level spend is still tracked individually. The telemetry store + /// write uses a single lock acquisition via `record_batch`. async fn record_telemetry( &self, events: Vec<(String, control_plane::telemetry::CompletionEvent)>, ) { - for (agent_name, event) in events { - let cost = event.cost_usd; - self.telemetry_store.record(event).await; - if cost > 0.0 { - self.cost_tracker.record_cost(&agent_name, cost); + // Record costs per-agent first (no lock involved). + for (agent_name, event) in &events { + if event.cost_usd > 0.0 { + self.cost_tracker.record_cost(agent_name, event.cost_usd); + } + } + // Write all events in one lock acquisition. + let completion_events: Vec = + events.into_iter().map(|(_, e)| e).collect(); + self.telemetry_store.record_batch(completion_events).await; + } + + /// Attempt to restore persisted telemetry summary from durable storage. + /// + /// Best-effort: if no summary exists or loading fails, logs and continues + /// with an empty telemetry store. Called once at the start of `run()`. + async fn restore_telemetry(&self) { + use terraphim_persistence::Persistable; + let mut summary = control_plane::TelemetrySummary::new("telemetry_summary".to_string()); + match summary.load().await { + Ok(loaded) => { + self.telemetry_store.import_summary(loaded).await; + info!("restored persisted telemetry summary"); + } + Err(_) => { + info!("no persisted telemetry summary found, starting fresh"); } } } /// Persist telemetry summary to durable storage via fire-and-forget spawn. - async fn persist_telemetry(&self) { - let summary = self.telemetry_store.export_summary().await; + /// + /// Clones the Arc-backed store and moves both export and save into the + /// spawned task so the reconcile loop is not blocked by the read lock. + fn persist_telemetry(&self) { + let store = self.telemetry_store.clone(); tokio::spawn(async move { use terraphim_persistence::Persistable; + let summary = store.export_summary().await; if let Err(e) = summary.save().await { tracing::warn!(error = %e, "failed to persist telemetry summary"); } }); } + /// Parse a stdout line from a CLI tool into a CompletionEvent, if the line + /// represents a completed agent session. + /// + /// Returns `None` for lines that do not carry completion telemetry (tool + /// calls, status updates, ignored formats, or unrecognised cli_tool). + fn parse_stdout_for_telemetry( + cli_tool: &str, + line: &str, + session_id: &str, + model: &str, + ) -> Option { + let parsed = match cli_tool { + "opencode" => { + control_plane::output_parser::parse_opencode_line(line, session_id, model, None) + } + "claude" => control_plane::output_parser::parse_claude_line(line, session_id, model), + _ => control_plane::output_parser::ParsedOutput::Ignored, + }; + match parsed { + control_plane::output_parser::ParsedOutput::Completion(ce) => Some(ce), + _ => None, + } + } + + /// Parse a stderr line into a CompletionEvent representing a subscription + /// limit error. + /// + /// Returns `None` when the line does not match any known limit-error + /// pattern. + fn parse_stderr_for_telemetry( + line: &str, + session_id: &str, + model: &str, + ) -> Option { + control_plane::output_parser::parse_stderr_for_limit_errors(line)?; + Some(control_plane::telemetry::CompletionEvent { + model: model.to_string(), + session_id: session_id.to_string(), + completed_at: chrono::Utc::now(), + latency_ms: 0, + success: false, + tokens: control_plane::telemetry::TokenBreakdown::default(), + cost_usd: 0.0, + error: Some(line.to_string()), + }) + } + /// Check flow schedules and trigger due flows. async fn check_flow_schedules(&mut self) { let now = chrono::Utc::now(); @@ -3490,6 +3524,12 @@ impl AgentOrchestrator { self.last_run_commits .insert(agent_name.to_string(), commit.to_string()); } + + /// Test helper: access the telemetry store for assertions. + #[doc(hidden)] + pub fn telemetry_store(&self) -> &control_plane::TelemetryStore { + &self.telemetry_store + } } /// Check whether any changed file matches any of the watch path prefixes. diff --git a/crates/terraphim_orchestrator/tests/orchestrator_tests.rs b/crates/terraphim_orchestrator/tests/orchestrator_tests.rs index 777bd827b..c032f7710 100644 --- a/crates/terraphim_orchestrator/tests/orchestrator_tests.rs +++ b/crates/terraphim_orchestrator/tests/orchestrator_tests.rs @@ -1,6 +1,7 @@ use std::path::PathBuf; use std::time::Duration; +use serial_test::serial; use terraphim_orchestrator::{ AgentDefinition, AgentLayer, AgentOrchestrator, CompoundReviewConfig, HandoffContext, NightwatchConfig, OrchestratorConfig, OrchestratorError, TrackerConfig, TrackerStates, @@ -488,6 +489,78 @@ async fn test_git_diff_non_matching_changes_skips() { assert!(!orch.is_agent_active("sentinel")); // no matching changes } +/// Integration test: telemetry summary round-trips through persistence. +#[tokio::test] +#[serial] +async fn test_telemetry_persistence_round_trip() { + use terraphim_orchestrator::control_plane::telemetry::TokenBreakdown; + use terraphim_orchestrator::control_plane::{ + CompletionEvent, TelemetryStore, TelemetrySummary, + }; + use terraphim_persistence::{DeviceStorage, Persistable}; + + DeviceStorage::init_memory_only().await.unwrap(); + + let store = TelemetryStore::new(3600); + store + .record(CompletionEvent { + model: "test-model".to_string(), + session_id: "test-session".to_string(), + completed_at: chrono::Utc::now(), + latency_ms: 150, + success: true, + tokens: TokenBreakdown { + total: 1000, + input: 800, + output: 200, + ..Default::default() + }, + cost_usd: 0.01, + error: None, + }) + .await; + + let summary = store.export_summary().await; + // Use save() which writes to all available profiles, avoiding profile-name + // sensitivity when the global DeviceStorage was already initialised by + // another test with dashmap+sqlite rather than memory-only. + summary.save().await.unwrap(); + + let mut loaded = TelemetrySummary::new("telemetry_summary".to_string()); + loaded = loaded.load().await.unwrap(); + + assert_eq!(loaded.model_performances.len(), 1); + assert_eq!(loaded.model_performances[0].model, "test-model"); + assert_eq!(loaded.model_performances[0].successful_completions, 1); + + // Import into fresh store and verify + let restored = TelemetryStore::new(3600); + restored.import_summary(loaded).await; + + let perf = restored.model_performance("test-model").await; + assert!(perf.successful_completions > 0); + assert!(perf.avg_latency_ms > 0.0); +} + +/// Integration test: orchestrator constructs successfully with routing config. +#[test] +fn test_orchestrator_with_routing_config() { + let mut config = test_config(); + config.routing = Some(terraphim_orchestrator::config::RoutingConfig { + taxonomy_path: std::path::PathBuf::from("/tmp/nonexistent-taxonomy"), + probe_ttl_secs: 300, + probe_results_dir: None, + probe_on_startup: false, + use_routing_engine: true, + }); + let orch = AgentOrchestrator::new(config); + assert!( + orch.is_ok(), + "orchestrator should construct with routing config: {:?}", + orch.err() + ); +} + /// Gitea-issue: no comments on issue -> spawn (Findings). /// This tests the path where fetch_comments returns empty vec. /// Without a real Gitea server, we test via the fail-open path.