Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions codex-rs/codex-api/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,6 @@ pub struct ResponseCreateWsRequest {
pub client_metadata: Option<HashMap<String, String>>,
}

#[derive(Debug, Serialize)]
pub struct ResponseProcessedWsRequest {
pub response_id: String,
}

pub fn response_create_client_metadata(
client_metadata: Option<HashMap<String, String>>,
trace: Option<&W3cTraceContext>,
Expand Down Expand Up @@ -272,8 +267,6 @@ pub fn response_create_client_metadata(
pub enum ResponsesWsRequest {
#[serde(rename = "response.create")]
ResponseCreate(ResponseCreateWsRequest),
#[serde(rename = "response.processed")]
ResponseProcessed(ResponseProcessedWsRequest),
}

pub fn create_text_param_for_request(
Expand Down
35 changes: 0 additions & 35 deletions codex-rs/codex-api/src/endpoint/responses_websocket.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::auth::SharedAuthProvider;
use crate::common::ResponseEvent;
use crate::common::ResponseProcessedWsRequest;
use crate::common::ResponseStream;
use crate::common::ResponsesWsRequest;
use crate::error::ApiError;
Expand Down Expand Up @@ -206,40 +205,6 @@ impl ResponsesWebsocketConnection {
self.stream.lock().await.is_none()
}

#[instrument(
name = "responses_websocket.send_response_processed",
level = "info",
skip_all,
fields(transport = "responses_websocket", api.path = "responses")
)]
#[expect(
clippy::await_holding_invalid_type,
reason = "the guard serializes exclusive use of the websocket while sending a request frame"
)]
pub async fn send_response_processed(&self, response_id: String) -> Result<(), ApiError> {
let request =
ResponsesWsRequest::ResponseProcessed(ResponseProcessedWsRequest { response_id });
let request_body = serde_json::to_value(&request).map_err(|err| {
ApiError::Stream(format!("failed to encode websocket request: {err}"))
})?;

let mut guard = self.stream.lock().await;
let Some(ws_stream) = guard.as_mut() else {
return Err(ApiError::Stream(
"websocket connection is closed".to_string(),
));
};

send_websocket_request(
ws_stream,
request_body,
self.idle_timeout,
self.telemetry.as_ref(),
/*connection_reused*/ true,
)
.await
}

#[instrument(
name = "responses_websocket.stream_request",
level = "info",
Expand Down
1 change: 0 additions & 1 deletion codex-rs/codex-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ pub use crate::common::RawMemoryMetadata;
pub use crate::common::Reasoning;
pub use crate::common::ResponseCreateWsRequest;
pub use crate::common::ResponseEvent;
pub use crate::common::ResponseProcessedWsRequest;
pub use crate::common::ResponseStream;
pub use crate::common::ResponsesApiRequest;
pub use crate::common::ResponsesWsRequest;
Expand Down
8 changes: 1 addition & 7 deletions codex-rs/core/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -556,9 +556,6 @@
"request_rule": {
"type": "boolean"
},
"responses_websocket_response_processed": {
"type": "boolean"
},
"responses_websockets": {
"type": "boolean"
},
Expand Down Expand Up @@ -4679,9 +4676,6 @@
"request_rule": {
"type": "boolean"
},
"responses_websocket_response_processed": {
"type": "boolean"
},
"responses_websockets": {
"type": "boolean"
},
Expand Down Expand Up @@ -5205,4 +5199,4 @@
},
"title": "ConfigToml",
"type": "object"
}
}
17 changes: 1 addition & 16 deletions codex-rs/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ use tokio::sync::oneshot::error::TryRecvError;
use tokio_tungstenite::tungstenite::Error;
use tokio_tungstenite::tungstenite::Message;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::instrument;
use tracing::trace;
use tracing::warn;
Expand Down Expand Up @@ -967,18 +966,6 @@ impl ModelClientSession {
.set_connection_reused(/*connection_reused*/ false);
}

pub(crate) async fn send_response_processed(&self, response_id: &str) {
let Some(connection) = self.websocket_session.connection.as_ref() else {
return;
};
if let Err(err) = connection
.send_response_processed(response_id.to_string())
.await
{
debug!("failed to send response.processed websocket request: {err}");
}
}

#[allow(clippy::too_many_arguments)]
/// Builds shared Responses API transport options and request-body options.
///
Expand Down Expand Up @@ -1672,9 +1659,7 @@ fn parse_turn_metadata_header(turn_metadata_header: Option<&str>) -> Option<Head
/// Meant to be called just before sending the request over the socket, to capture realistic
/// transport timing.
fn stamp_ws_stream_request_start_ms(request: &mut ResponsesWsRequest) {
let ResponsesWsRequest::ResponseCreate(payload) = request else {
return;
};
let ResponsesWsRequest::ResponseCreate(payload) = request;
payload
.client_metadata
.get_or_insert_with(HashMap::new)
Expand Down
25 changes: 5 additions & 20 deletions codex-rs/core/src/compact_remote_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use codex_analytics::CompactionImplementation;
use codex_analytics::CompactionPhase;
use codex_analytics::CompactionReason;
use codex_analytics::CompactionTrigger;
use codex_features::Feature;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::items::ContextCompactionItem;
Expand Down Expand Up @@ -281,7 +280,6 @@ async fn run_remote_compact_task_inner_impl(
);
let RemoteCompactionV2Output {
compaction_output,
response_id,
token_usage,
} = compaction_output_result?;
if let Some(token_usage) = token_usage {
Expand Down Expand Up @@ -314,18 +312,11 @@ async fn run_remote_compact_task_inner_impl(

sess.emit_turn_item_completed(turn_context, compaction_item)
.await;
if turn_context
.features
.enabled(Feature::ResponsesWebsocketResponseProcessed)
{
client_session.send_response_processed(&response_id).await;
}
Ok(())
}

struct RemoteCompactionV2Output {
compaction_output: ResponseItem,
response_id: String,
token_usage: Option<TokenUsage>,
}

Expand Down Expand Up @@ -409,7 +400,7 @@ async fn collect_compaction_output(
let mut output_item_count = 0usize;
let mut compaction_count = 0usize;
let mut compaction_output = None;
let mut completed_response_id = None;
let mut saw_completed = false;
let mut completed_token_usage = None;
while let Some(event) = stream.next().await {
match event? {
Expand All @@ -422,25 +413,21 @@ async fn collect_compaction_output(
}
}
}
ResponseEvent::Completed {
response_id,
token_usage,
..
} => {
completed_response_id = Some(response_id);
ResponseEvent::Completed { token_usage, .. } => {
saw_completed = true;
completed_token_usage = token_usage;
break;
}
_ => {}
}
}

let Some(response_id) = completed_response_id else {
if !saw_completed {
return Err(CodexErr::Stream(
"remote compaction v2 stream closed before response.completed".to_string(),
None,
));
};
}

if compaction_count != 1 {
return Err(CodexErr::Fatal(format!(
Expand All @@ -453,7 +440,6 @@ async fn collect_compaction_output(
};
Ok(RemoteCompactionV2Output {
compaction_output,
response_id,
token_usage: completed_token_usage,
})
}
Expand Down Expand Up @@ -804,7 +790,6 @@ mod tests {
.expect("compaction should be collected");

assert_eq!(output.compaction_output, compaction);
assert_eq!(output.response_id, "resp-compact");
assert_eq!(
output.token_usage,
Some(TokenUsage {
Expand Down
13 changes: 1 addition & 12 deletions codex-rs/core/src/session/turn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1827,7 +1827,6 @@ async fn try_run_sampling_request(
!sess.services.extensions.turn_item_contributors().is_empty();
let mut active_item_is_streaming_to_client = false;
let receiving_span = trace_span!("receiving_stream");
let mut completed_response_id: Option<String> = None;
let outcome: CodexResult<SamplingRequestResult> = loop {
let handle_responses = trace_span!(
parent: &receiving_span,
Expand Down Expand Up @@ -2071,9 +2070,9 @@ async fn try_run_sampling_request(
sess.services.models_manager.refresh_if_new_etag(etag).await;
}
ResponseEvent::Completed {
response_id,
token_usage,
end_turn,
..
} => {
flush_assistant_text_segments_all(
&sess,
Expand All @@ -2089,7 +2088,6 @@ async fn try_run_sampling_request(
if let Some(false) = end_turn {
needs_follow_up = true;
}
completed_response_id = Some(response_id);
break Ok(SamplingRequestResult {
needs_follow_up,
last_agent_message,
Expand Down Expand Up @@ -2213,15 +2211,6 @@ async fn try_run_sampling_request(
)
.await;

if sess
.features
.enabled(Feature::ResponsesWebsocketResponseProcessed)
&& outcome.is_ok()
&& let Some(response_id) = completed_response_id.as_deref()
{
client_session.send_response_processed(response_id).await;
}

drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?;

if should_emit_token_count {
Expand Down
Loading
Loading