Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion crates/terraphim_orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,27 @@ fn validate_agent_name(name: &str) -> Result<(), OrchestratorError> {
impl AgentOrchestrator {
/// Create a new orchestrator from configuration.
pub fn new(config: OrchestratorConfig) -> Result<Self, OrchestratorError> {
let spawner = AgentSpawner::new().with_working_dir(&config.working_dir);
// Set CARGO_TARGET_DIR so worktree agents share the main build cache,
// and RUSTC_WRAPPER=sccache for cross-worktree compilation caching.
let mut spawn_env = std::collections::HashMap::new();
let target_dir = config.working_dir.join("target");
spawn_env.insert(
"CARGO_TARGET_DIR".to_string(),
target_dir.to_string_lossy().to_string(),
);
if std::process::Command::new("sccache")
.arg("--version")
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.is_ok()
{
spawn_env.insert("RUSTC_WRAPPER".to_string(), "sccache".to_string());
info!("sccache detected, enabling shared compilation cache for worktrees");
}
let spawner = AgentSpawner::new()
.with_working_dir(&config.working_dir)
.with_env_vars(spawn_env);
let router = RoutingEngine::new();
let nightwatch = NightwatchMonitor::new(config.nightwatch.clone());
let scheduler = TimeScheduler::new(&config.agents, Some(&config.compound_review.schedule))?;
Expand Down
232 changes: 180 additions & 52 deletions crates/terraphim_orchestrator/src/provider_probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,12 @@ impl ProviderHealthMap {
}
}

// Update circuit breakers from probe results
// Update circuit breakers from probe results (keyed by provider:model)
for result in &results {
let key = format!("{}:{}", result.provider, result.model);
let breaker = self
.breakers
.entry(result.provider.clone())
.entry(key)
.or_insert_with(|| CircuitBreaker::new(self.cb_config.clone()));

match result.status {
Expand All @@ -140,106 +141,197 @@ impl ProviderHealthMap {
self.probed_at = Some(Instant::now());
}

/// Get health status for a provider.
/// Get health status for a specific provider+model combination.
///
/// Uses **probe results first**: if the latest probe for this provider
/// failed or timed out, it's unhealthy regardless of circuit breaker state.
/// Falls back to circuit breaker for providers not recently probed.
pub fn provider_health(&self, provider: &str) -> HealthStatus {
// Check latest probe results (most authoritative)
if let Some(status) = self.latest_probe_status(provider) {
return match status {
/// Uses **probe results first** at the model level, then falls back to
/// circuit breaker. This is per-model, not per-provider aggregate.
pub fn model_health(&self, provider: &str, model: &str) -> HealthStatus {
let key = format!("{provider}:{model}");

// Check latest probe result for this exact model
if let Some(result) = self
.results
.iter()
.find(|r| r.provider == provider && r.model == model)
{
return match result.status {
ProbeStatus::Success => HealthStatus::Healthy,
ProbeStatus::Error => HealthStatus::Unhealthy,
ProbeStatus::Timeout => HealthStatus::Unhealthy,
};
}

// Fall back to circuit breaker for unprobed providers
match self.breakers.get(provider) {
// Fall back to circuit breaker (keyed by provider:model)
match self.breakers.get(&key) {
Some(breaker) => match breaker.state() {
CircuitState::Closed => HealthStatus::Healthy,
CircuitState::HalfOpen => HealthStatus::Degraded,
CircuitState::Open => HealthStatus::Unhealthy,
},
None => HealthStatus::Healthy, // Unknown providers assumed healthy
None => HealthStatus::Healthy,
}
}

/// Get aggregate health status for a provider (any model healthy = provider healthy).
pub fn provider_health(&self, provider: &str) -> HealthStatus {
// Check probe results at model level
let provider_results: Vec<_> = self
.results
.iter()
.filter(|r| r.provider == provider)
.collect();

if !provider_results.is_empty() {
if provider_results
.iter()
.any(|r| r.status == ProbeStatus::Success)
{
return HealthStatus::Healthy;
}
return HealthStatus::Unhealthy;
}

// Fall back to circuit breakers for this provider
let provider_breakers: Vec<_> = self
.breakers
.iter()
.filter(|(k, _)| k.starts_with(&format!("{provider}:")))
.collect();

if provider_breakers.is_empty() {
return HealthStatus::Healthy; // Unknown
}

if provider_breakers.iter().any(|(_, b)| b.should_allow()) {
HealthStatus::Healthy
} else {
HealthStatus::Unhealthy
}
}

/// Check if a provider is healthy enough to dispatch to.
///
/// A provider is healthy if its latest probe succeeded OR it wasn't probed
/// and the circuit breaker allows requests.
pub fn is_healthy(&self, provider: &str) -> bool {
matches!(
self.provider_health(provider),
HealthStatus::Healthy | HealthStatus::Degraded
)
}

/// List all unhealthy provider names (from probe results + circuit breakers).
/// Check if a specific model is healthy.
pub fn is_model_healthy(&self, provider: &str, model: &str) -> bool {
matches!(
self.model_health(provider, model),
HealthStatus::Healthy | HealthStatus::Degraded
)
}

/// List all unhealthy provider names.
///
/// A provider is unhealthy only if ALL its models are unhealthy.
pub fn unhealthy_providers(&self) -> Vec<String> {
let mut unhealthy: Vec<String> = Vec::new();
let mut providers: HashMap<String, (usize, usize)> = HashMap::new(); // (total, healthy)

// From probe results: any provider with failed/timeout probe
for result in &self.results {
if result.status != ProbeStatus::Success && !unhealthy.contains(&result.provider) {
unhealthy.push(result.provider.clone());
let entry = providers.entry(result.provider.clone()).or_insert((0, 0));
entry.0 += 1;
if result.status == ProbeStatus::Success {
entry.1 += 1;
}
}

// From circuit breakers: any open circuit not already in list
for (name, breaker) in &self.breakers {
if !breaker.should_allow() && !unhealthy.contains(name) {
unhealthy.push(name.clone());
}
}

unhealthy
}

/// Get the latest probe status for a provider (best result across all models).
fn latest_probe_status(&self, provider: &str) -> Option<ProbeStatus> {
let provider_results: Vec<_> = self
.results
.iter()
.filter(|r| r.provider == provider)
let mut unhealthy: Vec<String> = providers
.into_iter()
.filter(|(_, (total, healthy))| *total > 0 && *healthy == 0)
.map(|(name, _)| name)
.collect();

if provider_results.is_empty() {
return None;
// Also check circuit breakers for providers not in probe results
let mut cb_providers: HashMap<String, (usize, usize)> = HashMap::new();
for (key, breaker) in &self.breakers {
if let Some(provider) = key.split(':').next() {
if !unhealthy.contains(&provider.to_string()) {
let entry = cb_providers.entry(provider.to_string()).or_insert((0, 0));
entry.0 += 1;
if breaker.should_allow() {
entry.1 += 1;
}
}
}
}

// If ANY model for this provider succeeded, provider is healthy
if provider_results
.iter()
.any(|r| r.status == ProbeStatus::Success)
{
Some(ProbeStatus::Success)
} else {
// All models failed -- use the "least bad" status
Some(provider_results[0].status)
for (name, (total, healthy)) in cb_providers {
if total > 0 && healthy == 0 && !unhealthy.contains(&name) {
unhealthy.push(name);
}
}

unhealthy
}

/// Record a success for a provider (e.g., from ExitClassifier).
/// Record a success for a provider+model (e.g., from ExitClassifier).
pub fn record_success(&mut self, provider: &str) {
if let Some(breaker) = self.breakers.get_mut(provider) {
// Update all circuit breakers for this provider
let prefix = format!("{provider}:");
for (key, breaker) in &mut self.breakers {
if key.starts_with(&prefix) {
breaker.record_success();
}
}
}

/// Record a success for a specific model.
pub fn record_model_success(&mut self, provider: &str, model: &str) {
let key = format!("{provider}:{model}");
if let Some(breaker) = self.breakers.get_mut(&key) {
breaker.record_success();
}
}

/// Record a failure for a provider (e.g., from ExitClassifier ModelError).
/// Record a failure for a provider (affects all models for that provider).
pub fn record_failure(&mut self, provider: &str) {
let prefix = format!("{provider}:");
let keys: Vec<String> = self
.breakers
.keys()
.filter(|k| k.starts_with(&prefix))
.cloned()
.collect();

for key in keys {
let breaker = self.breakers.get_mut(&key).unwrap();
breaker.record_failure();
}

// If no breakers exist for this provider, create one at provider level
if !self.breakers.keys().any(|k| k.starts_with(&prefix)) {
let key = format!("{provider}:*");
let breaker = self
.breakers
.entry(key)
.or_insert_with(|| CircuitBreaker::new(self.cb_config.clone()));
breaker.record_failure();
}

warn!(
provider = provider,
"provider failure recorded (all models)"
);
}

/// Record a failure for a specific model.
pub fn record_model_failure(&mut self, provider: &str, model: &str) {
let key = format!("{provider}:{model}");
let breaker = self
.breakers
.entry(provider.to_string())
.entry(key)
.or_insert_with(|| CircuitBreaker::new(self.cb_config.clone()));
breaker.record_failure();
warn!(
provider = provider,
model = model,
state = %breaker.state(),
"provider failure recorded"
"model failure recorded"
);
}

Expand Down Expand Up @@ -489,5 +581,41 @@ mod tests {
];
// One model succeeded -> provider is healthy
assert!(map.is_healthy("minimax"));
// But the timed-out model is individually unhealthy
assert!(!map.is_model_healthy("minimax", "opencode-go/minimax-m2.5"));
assert!(map.is_model_healthy("minimax", "minimax-coding-plan/MiniMax-M2.5"));
}

#[test]
fn per_model_failure_does_not_affect_other_models() {
let mut map = ProviderHealthMap::new(Duration::from_secs(300));
// Opus fails but sonnet works
map.results = vec![
ProbeResult {
provider: "anthropic".to_string(),
model: "claude-opus-4-6".to_string(),
cli_tool: "claude".to_string(),
status: ProbeStatus::Error,
latency_ms: Some(5000),
error: Some("rate limited".to_string()),
timestamp: String::new(),
},
ProbeResult {
provider: "anthropic".to_string(),
model: "claude-sonnet-4-6".to_string(),
cli_tool: "claude".to_string(),
status: ProbeStatus::Success,
latency_ms: Some(8000),
error: None,
timestamp: String::new(),
},
];
// Provider healthy (sonnet works)
assert!(map.is_healthy("anthropic"));
// But opus individually unhealthy
assert!(!map.is_model_healthy("anthropic", "claude-opus-4-6"));
assert!(map.is_model_healthy("anthropic", "claude-sonnet-4-6"));
// Provider NOT in unhealthy list (sonnet keeps it alive)
assert!(!map.unhealthy_providers().contains(&"anthropic".to_string()));
}
}
Loading