Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions codex-rs/ext/goal/src/accounting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ impl GoalAccountingState {
self.inner().current_turn_id.clone()
}

pub(crate) fn turn_is_current_active_goal(&self, turn_id: &str) -> bool {
let inner = self.inner();
if inner.current_turn_id.as_deref() != Some(turn_id) {
return false;
}
let Some(turn) = inner.turns.get(turn_id) else {
return false;
};
turn.account_tokens && turn.active_goal_id.is_some()
}

pub(crate) fn record_token_usage(
&self,
turn_id: impl Into<String>,
Expand Down
48 changes: 48 additions & 0 deletions codex-rs/ext/goal/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,54 @@ impl GoalRuntimeHandle {
Ok(())
}

pub async fn usage_limit_active_goal_for_turn(&self, turn_id: &str) -> Result<(), String> {
Comment thread
jif-oai marked this conversation as resolved.
if !self.is_enabled() {
return Ok(());
}

if !self
.inner
.accounting_state
.turn_is_current_active_goal(turn_id)
{
return Ok(());
}

let progress_event_id = format!("{turn_id}:usage-limit-progress");
self.account_active_goal_progress(
turn_id,
progress_event_id.as_str(),
codex_state::GoalAccountingMode::ActiveOnly,
BudgetLimitedGoalDisposition::ClearActive,
)
.await?;

let previous_status = self
.current_goal_status_for_metrics(/*expected_goal_id*/ None)
.await?;
let Some(goal) = self
.inner
.state_dbs
.thread_goals()
.usage_limit_active_thread_goal(self.thread_id())
Comment thread
jif-oai marked this conversation as resolved.
Comment thread
jif-oai marked this conversation as resolved.
.await
.map_err(|err| err.to_string())?
else {
return Ok(());
};
self.inner
.metrics
.record_terminal_if_status_changed(previous_status, &goal);
self.inner.accounting_state.clear_active_goal();
let goal = protocol_goal_from_state(goal);
self.inner.event_emitter.thread_goal_updated(
format!("{turn_id}:usage-limit"),
Some(turn_id.to_string()),
goal,
);
Ok(())
}

pub async fn restore_after_resume(&self) -> Result<(), String> {
if !self.is_enabled() {
return Ok(());
Expand Down
245 changes: 244 additions & 1 deletion codex-rs/ext/goal/tests/goal_extension_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,243 @@ async fn budget_limited_goal_keeps_accounting_after_later_tool_finish() -> anyho
Ok(())
}

#[tokio::test]
async fn usage_limit_active_goal_accounts_progress_and_clears_accounting() -> anyhow::Result<()> {
let runtime = test_runtime().await?;
let thread_id = test_thread_id()?;
seed_thread_metadata(runtime.as_ref(), thread_id).await?;
let harness = GoalExtensionHarness::new(runtime.clone(), thread_id).await?;
harness.start_turn("turn-1", &TokenUsage::default()).await;

let tools = harness.tools();
let create_tool = tool_by_name(&tools, "create_goal");
create_tool
.handle(tool_call(
"create_goal",
"call-create-goal",
json!({ "objective": "ship goal extension backend" }),
))
.await?;
harness.sink.clear();

harness
.record_token_usage(
"turn-1",
&token_usage(
/*input_tokens*/ 20, /*cached_input_tokens*/ 5, /*output_tokens*/ 8,
/*reasoning_output_tokens*/ 2, /*total_tokens*/ 30,
),
)
.await;
harness
.runtime_handle()
.usage_limit_active_goal_for_turn("turn-1")
.await
.map_err(anyhow::Error::msg)?;

let goal = runtime
.thread_goals()
.get_thread_goal(thread_id)
.await?
.ok_or_else(|| anyhow::anyhow!("goal should exist"))?;
assert_eq!(23, goal.tokens_used);
assert_eq!(codex_state::ThreadGoalStatus::UsageLimited, goal.status);
assert_eq!(
vec![
CapturedGoalEvent {
event_id: "turn-1:usage-limit-progress".to_string(),
turn_id: Some("turn-1".to_string()),
status: ThreadGoalStatus::Active,
tokens_used: 23,
},
CapturedGoalEvent {
event_id: "turn-1:usage-limit".to_string(),
turn_id: Some("turn-1".to_string()),
status: ThreadGoalStatus::UsageLimited,
tokens_used: 23,
},
],
harness.sink.goal_events()
);

harness
.record_token_usage(
"turn-1",
&token_usage(
/*input_tokens*/ 50, /*cached_input_tokens*/ 5,
/*output_tokens*/ 20, /*reasoning_output_tokens*/ 0,
/*total_tokens*/ 70,
),
)
.await;
harness
.notify_tool_finish("turn-1", "call-shell-after-usage-limit", "shell")
.await;
harness.stop_turn("turn-1").await;

let goal = runtime
.thread_goals()
.get_thread_goal(thread_id)
.await?
.ok_or_else(|| anyhow::anyhow!("goal should exist"))?;
assert_eq!(23, goal.tokens_used);
assert_eq!(codex_state::ThreadGoalStatus::UsageLimited, goal.status);
Ok(())
}

#[tokio::test]
async fn usage_limit_budget_limited_goal_accounts_remaining_progress() -> anyhow::Result<()> {
let runtime = test_runtime().await?;
let thread_id = test_thread_id()?;
seed_thread_metadata(runtime.as_ref(), thread_id).await?;
let harness = GoalExtensionHarness::new(runtime.clone(), thread_id).await?;
harness.start_turn("turn-1", &TokenUsage::default()).await;

let tools = harness.tools();
let create_tool = tool_by_name(&tools, "create_goal");
create_tool
.handle(tool_call(
"create_goal",
"call-create-goal",
json!({
"objective": "ship goal extension backend",
"token_budget": 25,
}),
))
.await?;

harness
.record_token_usage(
"turn-1",
&token_usage(
/*input_tokens*/ 20, /*cached_input_tokens*/ 5,
/*output_tokens*/ 10, /*reasoning_output_tokens*/ 0,
/*total_tokens*/ 30,
),
)
.await;
harness
.notify_tool_finish("turn-1", "call-shell", "shell")
.await;
harness.sink.clear();

harness
.record_token_usage(
"turn-1",
&token_usage(
/*input_tokens*/ 24, /*cached_input_tokens*/ 5,
/*output_tokens*/ 16, /*reasoning_output_tokens*/ 0,
/*total_tokens*/ 40,
),
)
.await;
harness
.runtime_handle()
.usage_limit_active_goal_for_turn("turn-1")
.await
.map_err(anyhow::Error::msg)?;

let goal = runtime
.thread_goals()
.get_thread_goal(thread_id)
.await?
.ok_or_else(|| anyhow::anyhow!("goal should exist"))?;
assert_eq!(35, goal.tokens_used);
assert_eq!(codex_state::ThreadGoalStatus::UsageLimited, goal.status);
assert_eq!(
vec![
CapturedGoalEvent {
event_id: "turn-1:usage-limit-progress".to_string(),
turn_id: Some("turn-1".to_string()),
status: ThreadGoalStatus::BudgetLimited,
tokens_used: 35,
},
CapturedGoalEvent {
event_id: "turn-1:usage-limit".to_string(),
turn_id: Some("turn-1".to_string()),
status: ThreadGoalStatus::UsageLimited,
tokens_used: 35,
},
],
harness.sink.goal_events()
);
Ok(())
}

#[tokio::test]
async fn usage_limit_plan_turn_does_not_stop_goal() -> anyhow::Result<()> {
let runtime = test_runtime().await?;
let thread_id = test_thread_id()?;
seed_thread_metadata(runtime.as_ref(), thread_id).await?;
let harness = GoalExtensionHarness::new(runtime.clone(), thread_id).await?;

let tools = harness.tools();
let create_tool = tool_by_name(&tools, "create_goal");
create_tool
.handle(tool_call(
"create_goal",
"call-create-goal",
json!({ "objective": "ship goal extension backend" }),
))
.await?;

harness
.start_turn_with_mode("turn-plan", ModeKind::Plan, &TokenUsage::default())
.await;
harness.sink.clear();
harness
.runtime_handle()
.usage_limit_active_goal_for_turn("turn-plan")
.await
.map_err(anyhow::Error::msg)?;

let goal = runtime
.thread_goals()
.get_thread_goal(thread_id)
.await?
.ok_or_else(|| anyhow::anyhow!("goal should exist"))?;
assert_eq!(codex_state::ThreadGoalStatus::Active, goal.status);
assert_eq!(Vec::<CapturedGoalEvent>::new(), harness.sink.goal_events());
Ok(())
}

#[tokio::test]
async fn usage_limit_stale_turn_does_not_stop_current_goal() -> anyhow::Result<()> {
let runtime = test_runtime().await?;
let thread_id = test_thread_id()?;
seed_thread_metadata(runtime.as_ref(), thread_id).await?;
let harness = GoalExtensionHarness::new(runtime.clone(), thread_id).await?;
harness.start_turn("turn-1", &TokenUsage::default()).await;

let tools = harness.tools();
let create_tool = tool_by_name(&tools, "create_goal");
create_tool
.handle(tool_call(
"create_goal",
"call-create-goal",
json!({ "objective": "ship goal extension backend" }),
))
.await?;
harness.stop_turn("turn-1").await;
harness.start_turn("turn-2", &TokenUsage::default()).await;
harness.sink.clear();

harness
.runtime_handle()
.usage_limit_active_goal_for_turn("turn-1")
.await
.map_err(anyhow::Error::msg)?;

let goal = runtime
.thread_goals()
.get_thread_goal(thread_id)
.await?
.ok_or_else(|| anyhow::anyhow!("goal should exist"))?;
assert_eq!(codex_state::ThreadGoalStatus::Active, goal.status);
assert_eq!(Vec::<CapturedGoalEvent>::new(), harness.sink.goal_events());
Ok(())
}

#[tokio::test]
async fn update_goal_can_block_and_accounts_final_progress() -> anyhow::Result<()> {
let runtime = test_runtime().await?;
Expand Down Expand Up @@ -719,8 +956,14 @@ impl GoalExtensionHarness {
}

async fn start_turn(&self, turn_id: &str, usage: &TokenUsage) {
self.start_turn_with_mode(turn_id, ModeKind::Default, usage)
.await;
}

async fn start_turn_with_mode(&self, turn_id: &str, mode: ModeKind, usage: &TokenUsage) {
let turn_store = ExtensionData::new(turn_id);
let collaboration_mode = default_collaboration_mode();
let mut collaboration_mode = default_collaboration_mode();
collaboration_mode.mode = mode;
for contributor in self.registry.turn_lifecycle_contributors() {
contributor
.on_turn_start(TurnStartInput {
Expand Down
Loading