Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
<h1 align="center">OpenAI Codex CLI</h1>

<p align="center"><code>npm i -g @openai/codex</code><br />or <code>brew install codex</code></p>

Expand Down Expand Up @@ -102,4 +101,3 @@ Codex CLI supports a rich set of configuration options, with preferences stored
## License

This repository is licensed under the [Apache-2.0 License](LICENSE).

20 changes: 20 additions & 0 deletions codex-rs/exec/src/exec_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ use ts_rs::TS;
pub enum ConversationEvent {
#[serde(rename = "session.created")]
SessionCreated(SessionCreatedEvent),
#[serde(rename = "turn.started")]
TurnStarted(TurnStartedEvent),
#[serde(rename = "turn.completed")]
TurnCompleted(TurnCompletedEvent),
#[serde(rename = "item.started")]
ItemStarted(ItemStartedEvent),
#[serde(rename = "item.updated")]
Expand All @@ -23,6 +27,22 @@ pub struct SessionCreatedEvent {
pub session_id: String,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS, Default)]
pub struct TurnStartedEvent {}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
pub struct TurnCompletedEvent {
pub usage: Usage,
}

/// Minimal usage summary for a turn.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS, Default)]
pub struct Usage {
pub input_tokens: u64,
pub cached_input_tokens: u64,
pub output_tokens: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
pub struct ItemStartedEvent {
pub item: ConversationItem,
Expand Down
42 changes: 38 additions & 4 deletions codex-rs/exec/src/experimental_event_processor_with_json_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ use crate::exec_events::ReasoningItem;
use crate::exec_events::SessionCreatedEvent;
use crate::exec_events::TodoItem;
use crate::exec_events::TodoListItem;
use crate::exec_events::TurnCompletedEvent;
use crate::exec_events::TurnStartedEvent;
use crate::exec_events::Usage;
use codex_core::config::Config;
use codex_core::plan_tool::StepStatus;
use codex_core::plan_tool::UpdatePlanArgs;
Expand All @@ -37,6 +40,7 @@ use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::PatchApplyEndEvent;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::TaskCompleteEvent;
use codex_core::protocol::TaskStartedEvent;
use tracing::error;
use tracing::warn;

Expand All @@ -48,6 +52,7 @@ pub struct ExperimentalEventProcessorWithJsonOutput {
running_patch_applies: HashMap<String, PatchApplyBeginEvent>,
// Tracks the todo list for the current turn (at most one per turn).
running_todo_list: Option<RunningTodoList>,
last_total_token_usage: Option<codex_core::protocol::TokenUsage>,
}

#[derive(Debug, Clone)]
Expand All @@ -70,6 +75,7 @@ impl ExperimentalEventProcessorWithJsonOutput {
running_commands: HashMap::new(),
running_patch_applies: HashMap::new(),
running_todo_list: None,
last_total_token_usage: None,
}
}

Expand All @@ -82,14 +88,21 @@ impl ExperimentalEventProcessorWithJsonOutput {
EventMsg::ExecCommandEnd(ev) => self.handle_exec_command_end(ev),
EventMsg::PatchApplyBegin(ev) => self.handle_patch_apply_begin(ev),
EventMsg::PatchApplyEnd(ev) => self.handle_patch_apply_end(ev),
EventMsg::TokenCount(ev) => {
if let Some(info) = &ev.info {
self.last_total_token_usage = Some(info.total_token_usage.clone());
}
Vec::new()
}
EventMsg::TaskStarted(ev) => self.handle_task_started(ev),
EventMsg::TaskComplete(_) => self.handle_task_complete(),
EventMsg::Error(ev) => vec![ConversationEvent::Error(ConversationErrorEvent {
message: ev.message.clone(),
})],
EventMsg::StreamError(ev) => vec![ConversationEvent::Error(ConversationErrorEvent {
message: ev.message.clone(),
})],
EventMsg::PlanUpdate(ev) => self.handle_plan_update(ev),
EventMsg::TaskComplete(_) => self.handle_task_complete(),
_ => Vec::new(),
}
}
Expand Down Expand Up @@ -283,19 +296,40 @@ impl ExperimentalEventProcessorWithJsonOutput {
vec![ConversationEvent::ItemStarted(ItemStartedEvent { item })]
}

fn handle_task_started(&self, _: &TaskStartedEvent) -> Vec<ConversationEvent> {
vec![ConversationEvent::TurnStarted(TurnStartedEvent {})]
}

fn handle_task_complete(&mut self) -> Vec<ConversationEvent> {
let usage = if let Some(u) = &self.last_total_token_usage {
Usage {
input_tokens: u.input_tokens,
cached_input_tokens: u.cached_input_tokens,
output_tokens: u.output_tokens,
}
} else {
Usage::default()
};

let mut items = Vec::new();

if let Some(running) = self.running_todo_list.take() {
let item = ConversationItem {
id: running.item_id,
details: ConversationItemDetails::TodoList(TodoListItem {
items: running.items,
}),
};
return vec![ConversationEvent::ItemCompleted(ItemCompletedEvent {
items.push(ConversationEvent::ItemCompleted(ItemCompletedEvent {
item,
})];
}));
}
Vec::new()

items.push(ConversationEvent::TurnCompleted(TurnCompletedEvent {
usage,
}));

items
}
}

Expand Down
9 changes: 9 additions & 0 deletions codex-rs/exec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,13 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
info!("Sent prompt with event ID: {initial_prompt_task_id}");

// Run the loop until the task is complete.
// Track whether a fatal error was reported by the server so we can
// exit with a non-zero status for automation-friendly signaling.
let mut error_seen = false;
while let Some(event) = rx.recv().await {
if matches!(event.msg, EventMsg::Error(_)) {
error_seen = true;
}
let shutdown: CodexStatus = event_processor.process_event(event);
match shutdown {
CodexStatus::Running => continue,
Expand All @@ -343,6 +349,9 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
}
}
}
if error_seen {
std::process::exit(1);
}
Comment on lines +352 to +354
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to return a special type of Error and handle std::process::exit() higher up?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but outside of this PR, there are many std::process::exits in that file that I'd like to clean up.


Ok(())
}
Expand Down
107 changes: 90 additions & 17 deletions codex-rs/exec/tests/event_processor_with_json_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ use codex_exec::exec_events::ReasoningItem;
use codex_exec::exec_events::SessionCreatedEvent;
use codex_exec::exec_events::TodoItem as ExecTodoItem;
use codex_exec::exec_events::TodoListItem as ExecTodoListItem;
use codex_exec::exec_events::TurnCompletedEvent;
use codex_exec::exec_events::TurnStartedEvent;
use codex_exec::exec_events::Usage;
use codex_exec::experimental_event_processor_with_json_output::ExperimentalEventProcessorWithJsonOutput;
use pretty_assertions::assert_eq;
use std::path::PathBuf;
Expand Down Expand Up @@ -65,6 +68,22 @@ fn session_configured_produces_session_created_event() {
);
}

#[test]
fn task_started_produces_turn_started_event() {
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
let out = ep.collect_conversation_events(&event(
"t1",
EventMsg::TaskStarted(codex_core::protocol::TaskStartedEvent {
model_context_window: Some(32_000),
}),
));

assert_eq!(
out,
vec![ConversationEvent::TurnStarted(TurnStartedEvent {})]
);
}

#[test]
fn plan_update_emits_todo_list_started_updated_and_completed() {
use codex_core::plan_tool::PlanItemArg;
Expand Down Expand Up @@ -161,23 +180,28 @@ fn plan_update_emits_todo_list_started_updated_and_completed() {
let out_complete = ep.collect_conversation_events(&complete);
assert_eq!(
out_complete,
vec![ConversationEvent::ItemCompleted(ItemCompletedEvent {
item: ConversationItem {
id: "item_0".to_string(),
details: ConversationItemDetails::TodoList(ExecTodoListItem {
items: vec![
ExecTodoItem {
text: "step one".to_string(),
completed: true
},
ExecTodoItem {
text: "step two".to_string(),
completed: false
},
],
}),
},
})]
vec![
ConversationEvent::ItemCompleted(ItemCompletedEvent {
item: ConversationItem {
id: "item_0".to_string(),
details: ConversationItemDetails::TodoList(ExecTodoListItem {
items: vec![
ExecTodoItem {
text: "step one".to_string(),
completed: true
},
ExecTodoItem {
text: "step two".to_string(),
completed: false
},
],
}),
},
}),
ConversationEvent::TurnCompleted(TurnCompletedEvent {
usage: Usage::default(),
}),
]
);
}

Expand Down Expand Up @@ -585,3 +609,52 @@ fn patch_apply_failure_produces_item_completed_patchapply_failed() {
other => panic!("unexpected event: {other:?}"),
}
}

#[test]
fn task_complete_produces_turn_completed_with_usage() {
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);

// First, feed a TokenCount event with known totals.
let usage = codex_core::protocol::TokenUsage {
input_tokens: 1200,
cached_input_tokens: 200,
output_tokens: 345,
reasoning_output_tokens: 0,
total_tokens: 0,
};
let info = codex_core::protocol::TokenUsageInfo {
total_token_usage: usage.clone(),
last_token_usage: usage,
model_context_window: None,
};
let token_count_event = event(
"e1",
EventMsg::TokenCount(codex_core::protocol::TokenCountEvent {
info: Some(info),
rate_limits: None,
}),
);
assert!(
ep.collect_conversation_events(&token_count_event)
.is_empty()
);

// Then TaskComplete should produce turn.completed with the captured usage.
let complete_event = event(
"e2",
EventMsg::TaskComplete(codex_core::protocol::TaskCompleteEvent {
last_agent_message: Some("done".to_string()),
}),
);
let out = ep.collect_conversation_events(&complete_event);
assert_eq!(
out,
vec![ConversationEvent::TurnCompleted(TurnCompletedEvent {
usage: Usage {
input_tokens: 1200,
cached_input_tokens: 200,
output_tokens: 345,
},
})]
);
}
1 change: 1 addition & 0 deletions codex-rs/exec/tests/suite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ mod apply_patch;
mod output_schema;
mod resume;
mod sandbox;
mod server_error_exit;
34 changes: 34 additions & 0 deletions codex-rs/exec/tests/suite/server_error_exit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#![cfg(not(target_os = "windows"))]
#![allow(clippy::expect_used, clippy::unwrap_used)]

use core_test_support::responses;
use core_test_support::test_codex_exec::test_codex_exec;
use wiremock::matchers::any;

/// Verify that when the server reports an error, `codex-exec` exits with a
/// non-zero status code so automation can detect failures.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exits_non_zero_when_server_reports_error() -> anyhow::Result<()> {
let test = test_codex_exec();

// Mock a simple Responses API SSE stream that immediately reports a
// `response.failed` event with an error message.
let server = responses::start_mock_server().await;
let body = responses::sse(vec![serde_json::json!({
"type": "response.failed",
"response": {
"id": "resp_err_1",
"error": {"code": "rate_limit_exceeded", "message": "synthetic server error"}
}
})]);
responses::mount_sse_once(&server, any(), body).await;

test.cmd_with_server(&server)
.arg("--skip-git-repo-check")
.arg("tell me something")
.arg("--experimental-json")
.assert()
.code(1);

Ok(())
}
Loading