Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/terraphim_orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ reqwest = { workspace = true, optional = true }
[dev-dependencies]
tokio-test = "0.4"
tempfile = { workspace = true }
serial_test = "3"


[[bin]]
Expand Down
3 changes: 3 additions & 0 deletions crates/terraphim_orchestrator/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ pub struct RoutingConfig {
/// Run provider probes on startup (default: true).
#[serde(default = "default_true_routing")]
pub probe_on_startup: bool,
/// Use RoutingDecisionEngine instead of inline model selection (default: false).
#[serde(default)]
pub use_routing_engine: bool,
}

fn default_probe_ttl() -> u64 {
Expand Down
3 changes: 2 additions & 1 deletion crates/terraphim_orchestrator/src/control_plane/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod output_parser;
pub mod policy;
pub mod routing;
pub mod telemetry;
pub mod telemetry_persist;

pub use routing::{DispatchContext, RouteCandidate, RoutingDecision, RoutingDecisionEngine};
pub use telemetry::{CompletionEvent, TelemetryStore};
pub use telemetry::{CompletionEvent, TelemetryStore, TelemetrySummary};
132 changes: 82 additions & 50 deletions crates/terraphim_orchestrator/src/control_plane/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
//! recorded in the decision rationale. Budget pressure biases route selection
//! toward cheaper models when an agent's spend approaches its limit.

use crate::cost_tracker::{BudgetVerdict, CostTracker};
use crate::control_plane::telemetry::TelemetryStore;
use crate::cost_tracker::BudgetVerdict;
use crate::{kg_router::KgRouter, provider_probe::ProviderHealthMap};
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -93,6 +94,7 @@ pub struct RoutingDecision {
pub dominant_signal: RouteSource,
pub budget_pressure: BudgetPressure,
pub budget_influenced: bool,
pub telemetry_influenced: bool,
}

fn make_agent_provider(agent_name: &str, cli_tool: &str) -> Provider {
Expand Down Expand Up @@ -125,22 +127,22 @@ struct CollectedCandidates {
pub struct RoutingDecisionEngine {
kg_router: Option<Arc<KgRouter>>,
provider_health: Arc<ProviderHealthMap>,
cost_tracker: CostTracker,
router: terraphim_router::Router,
telemetry_store: Option<Arc<TelemetryStore>>,
}

impl RoutingDecisionEngine {
pub fn new(
kg_router: Option<Arc<KgRouter>>,
provider_health: Arc<ProviderHealthMap>,
cost_tracker: CostTracker,
router: terraphim_router::Router,
telemetry_store: Option<Arc<TelemetryStore>>,
) -> Self {
Self {
kg_router,
provider_health,
cost_tracker,
router,
telemetry_store,
}
}

Expand All @@ -155,8 +157,8 @@ impl RoutingDecisionEngine {
matches!(cli_name, "claude" | "claude-code" | "opencode")
}

fn budget_pressure(&self, agent_name: &str) -> BudgetPressure {
BudgetPressure::from_verdict(&self.cost_tracker.check(agent_name))
fn budget_pressure(verdict: &BudgetVerdict) -> BudgetPressure {
BudgetPressure::from_verdict(verdict)
}

fn collect_kg_candidates(&self, ctx: &DispatchContext) -> Vec<RouteCandidate> {
Expand Down Expand Up @@ -260,9 +262,13 @@ impl RoutingDecisionEngine {
base * (1.0 - penalty)
}

pub fn decide_route(&self, ctx: &DispatchContext) -> RoutingDecision {
pub fn decide_route(
&self,
ctx: &DispatchContext,
budget_verdict: &BudgetVerdict,
) -> RoutingDecision {
let cli_name = Self::cli_name(ctx);
let pressure = self.budget_pressure(&ctx.agent_name);
let pressure = Self::budget_pressure(budget_verdict);

if !Self::supports_model_flag(cli_name) {
let candidate = RouteCandidate {
Expand All @@ -280,6 +286,7 @@ impl RoutingDecisionEngine {
dominant_signal: RouteSource::CliDefault,
budget_pressure: pressure,
budget_influenced: false,
telemetry_influenced: false,
};
}

Expand Down Expand Up @@ -335,18 +342,53 @@ impl RoutingDecisionEngine {
dominant_signal: RouteSource::CliDefault,
budget_pressure: pressure,
budget_influenced: false,
telemetry_influenced: false,
};
}

let no_pressure_scores: Vec<f64> = all_candidates
.iter()
.map(|c| Self::score_candidate(c, BudgetPressure::NoPressure))
.collect();
let pressured_scores: Vec<f64> = all_candidates
let mut pressured_scores: Vec<f64> = all_candidates
.iter()
.map(|c| Self::score_candidate(c, pressure))
.collect();

// Apply telemetry-based scoring adjustments
let mut telemetry_influenced = false;
if let Some(ref store) = self.telemetry_store {
let performances: Vec<crate::control_plane::telemetry::ModelPerformanceSnapshot> =
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
let mut perfs = Vec::new();
for candidate in &all_candidates {
perfs.push(store.model_performance(&candidate.model).await);
}
perfs
})
});

for (i, perf) in performances.iter().enumerate() {
if perf.is_subscription_limited() {
pressured_scores[i] *= 0.1;
telemetry_influenced = true;
} else if perf.successful_completions > 0 {
let success_bonus = (perf.success_rate - 0.5).max(0.0_f64) * 0.2_f64;
let latency_bonus = if perf.avg_latency_ms > 0.0 && perf.avg_latency_ms < 5000.0
{
(1.0_f64 - perf.avg_latency_ms / 10000.0_f64).max(0.0_f64) * 0.1_f64
} else {
0.0_f64
};
let bonus = success_bonus + latency_bonus;
if bonus > 0.01 {
pressured_scores[i] *= 1.0 + bonus;
telemetry_influenced = true;
}
}
}
}

let mut indexed: Vec<usize> = (0..all_candidates.len()).collect();
indexed.sort_by(|&a, &b| {
pressured_scores[b]
Expand Down Expand Up @@ -395,6 +437,9 @@ impl RoutingDecisionEngine {
if budget_influenced {
rationale.push_str(". Budget pressure biased selection toward cheaper model");
}
if telemetry_influenced {
rationale.push_str(". Telemetry data influenced selection");
}

let primary_available = !matches!(winner.source, RouteSource::CliDefault);

Expand All @@ -406,13 +451,15 @@ impl RoutingDecisionEngine {
dominant_signal,
budget_pressure: pressure,
budget_influenced,
telemetry_influenced,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::cost_tracker::CostTracker;

fn create_test_context_with_cli(
agent_name: &str,
Expand Down Expand Up @@ -450,50 +497,35 @@ mod tests {
Arc::new(crate::provider_probe::ProviderHealthMap::new(
std::time::Duration::from_secs(300),
)),
CostTracker::new(),
terraphim_router::Router::new(),
)
}

fn test_engine_with_agent_budget(
agent_name: &str,
budget_cents: Option<u64>,
) -> RoutingDecisionEngine {
let mut ct = CostTracker::new();
ct.register(agent_name, budget_cents);
RoutingDecisionEngine::new(
None,
Arc::new(crate::provider_probe::ProviderHealthMap::new(
std::time::Duration::from_secs(300),
)),
ct,
terraphim_router::Router::new(),
)
}

fn test_engine_with_spent(
agent_name: &str,
budget_cents: Option<u64>,
spend_usd: f64,
) -> RoutingDecisionEngine {
) -> (RoutingDecisionEngine, CostTracker) {
let mut ct = CostTracker::new();
ct.register(agent_name, budget_cents);
ct.record_cost(agent_name, spend_usd);
RoutingDecisionEngine::new(
let engine = RoutingDecisionEngine::new(
None,
Arc::new(crate::provider_probe::ProviderHealthMap::new(
std::time::Duration::from_secs(300),
)),
ct,
terraphim_router::Router::new(),
)
None,
);
(engine, ct)
}

#[test]
fn test_cli_default_for_unsupported_tool() {
let engine = test_engine();
let ctx = create_test_context_with_cli("test-agent", "Implement a feature", "codex");
let decision = engine.decide_route(&ctx);
let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped);

assert_eq!(decision.candidate.source, RouteSource::CliDefault);
assert!(decision.candidate.model.is_empty());
Expand All @@ -511,7 +543,7 @@ mod tests {
"Implement a feature",
"claude-3-opus",
);
let decision = engine.decide_route(&ctx);
let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped);

assert_eq!(decision.candidate.source, RouteSource::StaticConfig);
assert_eq!(decision.candidate.model, "claude-3-opus");
Expand All @@ -530,7 +562,7 @@ mod tests {
layer: crate::config::AgentLayer::Core,
session_id: None,
};
let decision = engine.decide_route(&ctx);
let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped);

assert_eq!(decision.candidate.source, RouteSource::CliDefault);
assert_eq!(decision.dominant_signal, RouteSource::CliDefault);
Expand All @@ -544,7 +576,7 @@ mod tests {
"Implement a feature",
"kimi-for-coding/k2p5",
);
let decision = engine.decide_route(&ctx);
let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped);

assert_eq!(decision.candidate.source, RouteSource::StaticConfig);
assert_eq!(decision.candidate.model, "kimi-for-coding/k2p5");
Expand All @@ -554,7 +586,7 @@ mod tests {
fn test_cli_default_when_no_signals_match() {
let engine = test_engine();
let ctx = create_test_context_with_cli("test-agent", "do something", "opencode");
let decision = engine.decide_route(&ctx);
let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped);

assert_eq!(decision.candidate.source, RouteSource::CliDefault);
assert!(decision.rationale.contains("No routing signal matched"));
Expand All @@ -565,7 +597,7 @@ mod tests {
fn test_rationale_records_dominant_signal() {
let engine = test_engine();
let ctx = create_test_context_with_static_model("agent", "task", "model-x");
let decision = engine.decide_route(&ctx);
let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped);

assert!(decision.rationale.contains("static config"));
assert!(decision.rationale.contains("Selected model-x"));
Expand All @@ -582,9 +614,9 @@ mod tests {
layer: crate::config::AgentLayer::Core,
session_id: None,
};
let decision = engine.decide_route(&ctx);
let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped);

assert!(decision.all_candidates.len() >= 1);
assert!(!decision.all_candidates.is_empty());
assert!(decision
.all_candidates
.iter()
Expand All @@ -609,12 +641,12 @@ mod tests {
Arc::new(crate::provider_probe::ProviderHealthMap::new(
std::time::Duration::from_secs(300),
)),
CostTracker::new(),
terraphim_router::Router::new(),
None,
);

let ctx = create_test_context_with_cli("agent", "implement feature", "opencode");
let decision = engine.decide_route(&ctx);
let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped);

assert!(
decision.candidate.source == RouteSource::KnowledgeGraph
Expand Down Expand Up @@ -644,12 +676,12 @@ mod tests {
Arc::new(crate::provider_probe::ProviderHealthMap::new(
std::time::Duration::from_secs(300),
)),
CostTracker::new(),
terraphim_router::Router::new(),
None,
);

let ctx = create_test_context_with_cli("agent", "security audit the codebase", "opencode");
let decision = engine.decide_route(&ctx);
let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped);

assert_eq!(decision.candidate.source, RouteSource::KnowledgeGraph);
assert!(decision.candidate.model.contains("opus"));
Expand All @@ -660,7 +692,7 @@ mod tests {
fn test_keyword_only_no_kg_match() {
let engine = test_engine();
let ctx = create_test_context_with_cli("agent", "implement a feature", "opencode");
let decision = engine.decide_route(&ctx);
let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped);

assert!(
decision.candidate.source == RouteSource::KeywordRouting
Expand Down Expand Up @@ -711,26 +743,26 @@ mod tests {
fn test_budget_pressure_no_pressure_for_uncapped() {
let engine = test_engine();
let ctx = create_test_context_with_static_model("test-agent", "task", "model-x");
let decision = engine.decide_route(&ctx);
let decision = engine.decide_route(&ctx, &BudgetVerdict::Uncapped);

assert_eq!(decision.budget_pressure, BudgetPressure::NoPressure);
assert!(!decision.budget_influenced);
}

#[test]
fn test_budget_pressure_near_exhaustion_detected() {
let engine = test_engine_with_spent("test-agent", Some(10000), 85.0);
let (engine, ct) = test_engine_with_spent("test-agent", Some(10000), 85.0);
let ctx = create_test_context_with_static_model("test-agent", "task", "model-x");
let decision = engine.decide_route(&ctx);
let decision = engine.decide_route(&ctx, &ct.check("test-agent"));

assert_eq!(decision.budget_pressure, BudgetPressure::NearExhaustion);
}

#[test]
fn test_budget_pressure_exhausted_detected() {
let engine = test_engine_with_spent("test-agent", Some(10000), 100.0);
let (engine, ct) = test_engine_with_spent("test-agent", Some(10000), 100.0);
let ctx = create_test_context_with_static_model("test-agent", "task", "model-x");
let decision = engine.decide_route(&ctx);
let decision = engine.decide_route(&ctx, &ct.check("test-agent"));

assert_eq!(decision.budget_pressure, BudgetPressure::Exhausted);
}
Expand All @@ -755,9 +787,9 @@ mod tests {

#[test]
fn test_budget_influences_rationale_when_pressure() {
let engine = test_engine_with_spent("test-agent", Some(10000), 85.0);
let (engine, ct) = test_engine_with_spent("test-agent", Some(10000), 85.0);
let ctx = create_test_context_with_static_model("test-agent", "task", "model-x");
let decision = engine.decide_route(&ctx);
let decision = engine.decide_route(&ctx, &ct.check("test-agent"));

assert_eq!(decision.budget_pressure, BudgetPressure::NearExhaustion);
}
Expand Down
Loading
Loading