From 1e2e3b1c660725ae4d52541fe3a6ff659f19069e Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Thu, 4 Jun 2026 12:11:52 -0700 Subject: [PATCH] Remove response.processed websocket request --- codex-rs/codex-api/src/common.rs | 7 - .../src/endpoint/responses_websocket.rs | 35 ---- codex-rs/codex-api/src/lib.rs | 1 - codex-rs/core/config.schema.json | 8 +- codex-rs/core/src/client.rs | 17 +- codex-rs/core/src/compact_remote_v2.rs | 25 +-- codex-rs/core/src/session/turn.rs | 13 +- .../core/tests/suite/client_websockets.rs | 162 ------------------ codex-rs/features/src/lib.rs | 8 - codex-rs/features/src/tests.rs | 16 -- 10 files changed, 8 insertions(+), 284 deletions(-) diff --git a/codex-rs/codex-api/src/common.rs b/codex-rs/codex-api/src/common.rs index 91b251c41f6..50ac2685b44 100644 --- a/codex-rs/codex-api/src/common.rs +++ b/codex-rs/codex-api/src/common.rs @@ -239,11 +239,6 @@ pub struct ResponseCreateWsRequest { pub client_metadata: Option>, } -#[derive(Debug, Serialize)] -pub struct ResponseProcessedWsRequest { - pub response_id: String, -} - pub fn response_create_client_metadata( client_metadata: Option>, trace: Option<&W3cTraceContext>, @@ -272,8 +267,6 @@ pub fn response_create_client_metadata( pub enum ResponsesWsRequest { #[serde(rename = "response.create")] ResponseCreate(ResponseCreateWsRequest), - #[serde(rename = "response.processed")] - ResponseProcessed(ResponseProcessedWsRequest), } pub fn create_text_param_for_request( diff --git a/codex-rs/codex-api/src/endpoint/responses_websocket.rs b/codex-rs/codex-api/src/endpoint/responses_websocket.rs index f0a00198172..a7366a98717 100644 --- a/codex-rs/codex-api/src/endpoint/responses_websocket.rs +++ b/codex-rs/codex-api/src/endpoint/responses_websocket.rs @@ -1,6 +1,5 @@ use crate::auth::SharedAuthProvider; use crate::common::ResponseEvent; -use crate::common::ResponseProcessedWsRequest; use crate::common::ResponseStream; use crate::common::ResponsesWsRequest; use crate::error::ApiError; @@ -206,40 +205,6 @@ impl ResponsesWebsocketConnection { self.stream.lock().await.is_none() } - #[instrument( - name = "responses_websocket.send_response_processed", - level = "info", - skip_all, - fields(transport = "responses_websocket", api.path = "responses") - )] - #[expect( - clippy::await_holding_invalid_type, - reason = "the guard serializes exclusive use of the websocket while sending a request frame" - )] - pub async fn send_response_processed(&self, response_id: String) -> Result<(), ApiError> { - let request = - ResponsesWsRequest::ResponseProcessed(ResponseProcessedWsRequest { response_id }); - let request_body = serde_json::to_value(&request).map_err(|err| { - ApiError::Stream(format!("failed to encode websocket request: {err}")) - })?; - - let mut guard = self.stream.lock().await; - let Some(ws_stream) = guard.as_mut() else { - return Err(ApiError::Stream( - "websocket connection is closed".to_string(), - )); - }; - - send_websocket_request( - ws_stream, - request_body, - self.idle_timeout, - self.telemetry.as_ref(), - /*connection_reused*/ true, - ) - .await - } - #[instrument( name = "responses_websocket.stream_request", level = "info", diff --git a/codex-rs/codex-api/src/lib.rs b/codex-rs/codex-api/src/lib.rs index 08176d8dec6..8efebf076c7 100644 --- a/codex-rs/codex-api/src/lib.rs +++ b/codex-rs/codex-api/src/lib.rs @@ -32,7 +32,6 @@ pub use crate::common::RawMemoryMetadata; pub use crate::common::Reasoning; pub use crate::common::ResponseCreateWsRequest; pub use crate::common::ResponseEvent; -pub use crate::common::ResponseProcessedWsRequest; pub use crate::common::ResponseStream; pub use crate::common::ResponsesApiRequest; pub use crate::common::ResponsesWsRequest; diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index b281b44bb78..a1b3d2357a6 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -556,9 +556,6 @@ "request_rule": { "type": "boolean" }, - "responses_websocket_response_processed": { - "type": "boolean" - }, "responses_websockets": { "type": "boolean" }, @@ -4679,9 +4676,6 @@ "request_rule": { "type": "boolean" }, - "responses_websocket_response_processed": { - "type": "boolean" - }, "responses_websockets": { "type": "boolean" }, @@ -5205,4 +5199,4 @@ }, "title": "ConfigToml", "type": "object" -} \ No newline at end of file +} diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 124dfb18c08..2c6be6222a7 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -100,7 +100,6 @@ use tokio::sync::oneshot::error::TryRecvError; use tokio_tungstenite::tungstenite::Error; use tokio_tungstenite::tungstenite::Message; use tokio_util::sync::CancellationToken; -use tracing::debug; use tracing::instrument; use tracing::trace; use tracing::warn; @@ -967,18 +966,6 @@ impl ModelClientSession { .set_connection_reused(/*connection_reused*/ false); } - pub(crate) async fn send_response_processed(&self, response_id: &str) { - let Some(connection) = self.websocket_session.connection.as_ref() else { - return; - }; - if let Err(err) = connection - .send_response_processed(response_id.to_string()) - .await - { - debug!("failed to send response.processed websocket request: {err}"); - } - } - #[allow(clippy::too_many_arguments)] /// Builds shared Responses API transport options and request-body options. /// @@ -1672,9 +1659,7 @@ fn parse_turn_metadata_header(turn_metadata_header: Option<&str>) -> Option, } @@ -409,7 +400,7 @@ async fn collect_compaction_output( let mut output_item_count = 0usize; let mut compaction_count = 0usize; let mut compaction_output = None; - let mut completed_response_id = None; + let mut saw_completed = false; let mut completed_token_usage = None; while let Some(event) = stream.next().await { match event? { @@ -422,12 +413,8 @@ async fn collect_compaction_output( } } } - ResponseEvent::Completed { - response_id, - token_usage, - .. - } => { - completed_response_id = Some(response_id); + ResponseEvent::Completed { token_usage, .. } => { + saw_completed = true; completed_token_usage = token_usage; break; } @@ -435,12 +422,12 @@ async fn collect_compaction_output( } } - let Some(response_id) = completed_response_id else { + if !saw_completed { return Err(CodexErr::Stream( "remote compaction v2 stream closed before response.completed".to_string(), None, )); - }; + } if compaction_count != 1 { return Err(CodexErr::Fatal(format!( @@ -453,7 +440,6 @@ async fn collect_compaction_output( }; Ok(RemoteCompactionV2Output { compaction_output, - response_id, token_usage: completed_token_usage, }) } @@ -804,7 +790,6 @@ mod tests { .expect("compaction should be collected"); assert_eq!(output.compaction_output, compaction); - assert_eq!(output.response_id, "resp-compact"); assert_eq!( output.token_usage, Some(TokenUsage { diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 2765de76fb2..03b1c4b1dbd 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -1827,7 +1827,6 @@ async fn try_run_sampling_request( !sess.services.extensions.turn_item_contributors().is_empty(); let mut active_item_is_streaming_to_client = false; let receiving_span = trace_span!("receiving_stream"); - let mut completed_response_id: Option = None; let outcome: CodexResult = loop { let handle_responses = trace_span!( parent: &receiving_span, @@ -2071,9 +2070,9 @@ async fn try_run_sampling_request( sess.services.models_manager.refresh_if_new_etag(etag).await; } ResponseEvent::Completed { - response_id, token_usage, end_turn, + .. } => { flush_assistant_text_segments_all( &sess, @@ -2089,7 +2088,6 @@ async fn try_run_sampling_request( if let Some(false) = end_turn { needs_follow_up = true; } - completed_response_id = Some(response_id); break Ok(SamplingRequestResult { needs_follow_up, last_agent_message, @@ -2213,15 +2211,6 @@ async fn try_run_sampling_request( ) .await; - if sess - .features - .enabled(Feature::ResponsesWebsocketResponseProcessed) - && outcome.is_ok() - && let Some(response_id) = completed_response_id.as_deref() - { - client_session.send_response_processed(response_id).await; - } - drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?; if should_emit_token_count { diff --git a/codex-rs/core/tests/suite/client_websockets.rs b/codex-rs/core/tests/suite/client_websockets.rs index 41db88f1add..9dee201d643 100755 --- a/codex-rs/core/tests/suite/client_websockets.rs +++ b/codex-rs/core/tests/suite/client_websockets.rs @@ -185,168 +185,6 @@ async fn responses_websocket_streams_without_feature_flag_when_provider_supports server.shutdown().await; } -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn responses_websocket_sends_response_processed_when_feature_enabled() { - skip_if_no_network!(); - - let server = start_websocket_server(vec![vec![ - vec![ - ev_response_created("resp-prewarm"), - ev_completed("resp-prewarm"), - ], - vec![ - ev_response_created("resp-1"), - ev_assistant_message("msg-1", "hi"), - ev_completed("resp-1"), - ], - vec![], - ]]) - .await; - - let mut builder = test_codex().with_config(|config| { - config - .features - .enable(Feature::ResponsesWebsocketResponseProcessed) - .expect("test config should allow feature update"); - }); - let test = builder - .build_with_websocket_server(&server) - .await - .expect("build websocket codex"); - - test.submit_turn("hello") - .await - .expect("submission should send response.processed after processing"); - - let processed = server - .wait_for_request(/*connection_index*/ 0, /*request_index*/ 2) - .await; - assert_eq!( - processed.body_json(), - json!({ - "type": "response.processed", - "response_id": "resp-1", - }) - ); - - let connection = server.single_connection(); - assert_eq!(connection.len(), 3); - assert_eq!( - connection[1].body_json()["type"].as_str(), - Some("response.create") - ); - - server.shutdown().await; -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn responses_websocket_sends_response_processed_after_remote_compaction_v2() { - skip_if_no_network!(); - - let server = start_websocket_server(vec![vec![ - vec![ - ev_response_created("resp-prewarm"), - ev_completed("resp-prewarm"), - ], - vec![ - ev_response_created("resp-1"), - ev_assistant_message("msg-1", "hi"), - ev_completed("resp-1"), - ], - vec![], - vec![ - json!({ - "type": "response.output_item.done", - "item": { - "type": "compaction", - "encrypted_content": "ENCRYPTED_CONTEXT_COMPACTION_SUMMARY", - } - }), - ev_completed("resp-compact"), - ], - vec![], - ]]) - .await; - - let mut builder = test_codex().with_config(|config| { - config - .features - .enable(Feature::RemoteCompactionV2) - .expect("test config should allow feature update"); - config - .features - .enable(Feature::ResponsesWebsocketResponseProcessed) - .expect("test config should allow feature update"); - }); - let test = builder - .build_with_websocket_server(&server) - .await - .expect("build websocket codex"); - - test.submit_turn("hello") - .await - .expect("submission should send response.processed after processing"); - - test.codex - .submit(Op::Compact) - .await - .expect("compact submission should succeed"); - wait_for_event(&test.codex, |msg| matches!(msg, EventMsg::TurnComplete(_))).await; - - let compact_processed = server - .wait_for_request(/*connection_index*/ 0, /*request_index*/ 4) - .await; - assert_eq!( - compact_processed.body_json(), - json!({ - "type": "response.processed", - "response_id": "resp-compact", - }) - ); - - let connection = server.single_connection(); - assert_eq!(connection.len(), 5); - assert_eq!( - connection[3].body_json()["type"].as_str(), - Some("response.create") - ); - - server.shutdown().await; -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn responses_websocket_omits_response_processed_without_feature() { - skip_if_no_network!(); - - let server = start_websocket_server(vec![vec![ - vec![ - ev_response_created("resp-prewarm"), - ev_completed("resp-prewarm"), - ], - vec![ - ev_response_created("resp-1"), - ev_assistant_message("msg-1", "hi"), - ev_completed("resp-1"), - ], - vec![], - ]]) - .await; - let mut builder = test_codex(); - let test = builder - .build_with_websocket_server(&server) - .await - .expect("build websocket codex"); - - test.submit_turn("hello") - .await - .expect("submission should complete without response.processed"); - - let connection = server.single_connection(); - assert_eq!(connection.len(), 2); - - server.shutdown().await; -} - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn responses_websocket_reuses_connection_with_per_turn_trace_payloads() { skip_if_no_network!(); diff --git a/codex-rs/features/src/lib.rs b/codex-rs/features/src/lib.rs index ce1ee6940eb..772c8e2a0a6 100644 --- a/codex-rs/features/src/lib.rs +++ b/codex-rs/features/src/lib.rs @@ -206,8 +206,6 @@ pub enum Feature { RealtimeConversation, /// Prevent idle system sleep while a turn is actively running. PreventIdleSleep, - /// Send `response.processed` over Responses API websockets after a turn response is recorded. - ResponsesWebsocketResponseProcessed, /// Enable remote compaction v2 over the normal Responses API. RemoteCompactionV2, /// Enable workspace dependency support. @@ -1219,12 +1217,6 @@ pub const FEATURES: &[FeatureSpec] = &[ stage: Stage::Removed, default_enabled: false, }, - FeatureSpec { - id: Feature::ResponsesWebsocketResponseProcessed, - key: "responses_websocket_response_processed", - stage: Stage::UnderDevelopment, - default_enabled: false, - }, FeatureSpec { id: Feature::RemoteCompactionV2, key: "remote_compaction_v2", diff --git a/codex-rs/features/src/tests.rs b/codex-rs/features/src/tests.rs index 5d7087e90bd..aa5e0174bac 100644 --- a/codex-rs/features/src/tests.rs +++ b/codex-rs/features/src/tests.rs @@ -146,22 +146,6 @@ fn remote_compaction_v2_is_under_development() { ); } -#[test] -fn responses_websocket_response_processed_is_under_development() { - assert_eq!( - Feature::ResponsesWebsocketResponseProcessed.stage(), - Stage::UnderDevelopment - ); - assert_eq!( - Feature::ResponsesWebsocketResponseProcessed.default_enabled(), - false - ); - assert_eq!( - feature_for_key("responses_websocket_response_processed"), - Some(Feature::ResponsesWebsocketResponseProcessed) - ); -} - #[test] fn terminal_resize_reflow_is_experimental_and_enabled_by_default() { assert_eq!(