diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 2eb80cfaa6..2ce0997075 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -107,6 +107,7 @@ use crate::rollout::RolloutRecorderParams; use crate::shell; use crate::state::ActiveTurn; use crate::state::SessionServices; +use crate::state::SessionState; use crate::state::TaskKind; use crate::tasks::CompactTask; use crate::tasks::RegularTask; @@ -169,7 +170,7 @@ impl Codex { let config = Arc::new(config); - let configure_session = ConfigureSession { + let session_configuration = SessionConfiguration { provider: config.model_provider.clone(), model: config.model.clone(), model_reasoning_effort: config.model_reasoning_effort, @@ -178,13 +179,13 @@ impl Codex { base_instructions: config.base_instructions.clone(), approval_policy: config.approval_policy, sandbox_policy: config.sandbox_policy.clone(), - notify: UserNotifier::new(config.notify.clone()), cwd: config.cwd.clone(), + original_config_do_not_use: Arc::clone(&config), }; // Generate a unique ID for the lifetime of this Codex session. - let (session, turn_context) = Session::new( - configure_session, + let session = Session::new( + session_configuration, config.clone(), auth_manager.clone(), tx_event.clone(), @@ -199,7 +200,7 @@ impl Codex { let conversation_id = session.conversation_id; // This task will run until Op::Shutdown is received. - tokio::spawn(submission_loop(session, turn_context, config, rx_sub)); + tokio::spawn(submission_loop(session, config, rx_sub)); let codex = Codex { next_id: AtomicU64::new(0), tx_sub, @@ -243,8 +244,6 @@ impl Codex { } } -use crate::state::SessionState; - /// Context for an initialized model agent /// /// A session has at most 1 running task at a time, and can be interrupted by user input. @@ -283,8 +282,8 @@ impl TurnContext { } } -/// Configure the model session. -struct ConfigureSession { +#[derive(Clone)] +pub(crate) struct SessionConfiguration { /// Provider identifier ("openai", "openrouter", ...). provider: ModelProviderInfo, @@ -305,8 +304,6 @@ struct ConfigureSession { /// How to sandbox commands executed in the system sandbox_policy: SandboxPolicy, - notify: UserNotifier, - /// Working directory that should be treated as the *root* of the /// session. All relative paths supplied by the model as well as the /// execution sandbox are resolved against this directory **instead** @@ -315,32 +312,118 @@ struct ConfigureSession { /// `ConfigureSession` operation so that the business-logic layer can /// operate deterministically. cwd: PathBuf, + + // TODO(pakrym): Remove config from here + original_config_do_not_use: Arc, +} + +impl SessionConfiguration { + pub(crate) fn apply(&self, updates: &SessionSettingsUpdate) -> Self { + let mut next_configuration = self.clone(); + if let Some(model) = updates.model.clone() { + next_configuration.model = model; + } + if let Some(effort) = updates.reasoning_effort { + next_configuration.model_reasoning_effort = effort; + } + if let Some(summary) = updates.reasoning_summary { + next_configuration.model_reasoning_summary = summary; + } + if let Some(approval_policy) = updates.approval_policy { + next_configuration.approval_policy = approval_policy; + } + if let Some(sandbox_policy) = updates.sandbox_policy.clone() { + next_configuration.sandbox_policy = sandbox_policy; + } + if let Some(cwd) = updates.cwd.clone() { + next_configuration.cwd = cwd; + } + next_configuration + } +} + +#[derive(Default, Clone)] +pub(crate) struct SessionSettingsUpdate { + pub(crate) cwd: Option, + pub(crate) approval_policy: Option, + pub(crate) sandbox_policy: Option, + pub(crate) model: Option, + pub(crate) reasoning_effort: Option>, + pub(crate) reasoning_summary: Option, + pub(crate) final_output_json_schema: Option>, } impl Session { + fn make_turn_context( + auth_manager: Option>, + otel_event_manager: &OtelEventManager, + provider: ModelProviderInfo, + session_configuration: &SessionConfiguration, + conversation_id: ConversationId, + ) -> TurnContext { + let config = session_configuration.original_config_do_not_use.clone(); + let model_family = find_family_for_model(&session_configuration.model) + .unwrap_or_else(|| config.model_family.clone()); + let mut per_turn_config = (*config).clone(); + per_turn_config.model = session_configuration.model.clone(); + per_turn_config.model_family = model_family.clone(); + per_turn_config.model_reasoning_effort = session_configuration.model_reasoning_effort; + per_turn_config.model_reasoning_summary = session_configuration.model_reasoning_summary; + if let Some(model_info) = get_model_info(&model_family) { + per_turn_config.model_context_window = Some(model_info.context_window); + } + + let otel_event_manager = otel_event_manager.clone().with_model( + session_configuration.model.as_str(), + session_configuration.model.as_str(), + ); + + let client = ModelClient::new( + Arc::new(per_turn_config), + auth_manager, + otel_event_manager, + provider, + session_configuration.model_reasoning_effort, + session_configuration.model_reasoning_summary, + conversation_id, + ); + + let tools_config = ToolsConfig::new(&ToolsConfigParams { + model_family: &model_family, + features: &config.features, + }); + + TurnContext { + client, + cwd: session_configuration.cwd.clone(), + base_instructions: session_configuration.base_instructions.clone(), + user_instructions: session_configuration.user_instructions.clone(), + approval_policy: session_configuration.approval_policy, + sandbox_policy: session_configuration.sandbox_policy.clone(), + shell_environment_policy: config.shell_environment_policy.clone(), + tools_config, + is_review_mode: false, + final_output_json_schema: None, + } + } + async fn new( - configure_session: ConfigureSession, + session_configuration: SessionConfiguration, config: Arc, auth_manager: Arc, tx_event: Sender, initial_history: InitialHistory, session_source: SessionSource, - ) -> anyhow::Result<(Arc, TurnContext)> { - let ConfigureSession { - provider, - model, - model_reasoning_effort, - model_reasoning_summary, - user_instructions, - base_instructions, - approval_policy, - sandbox_policy, - notify, - cwd, - } = configure_session; - debug!("Configuring session: model={model}; provider={provider:?}"); - if !cwd.is_absolute() { - return Err(anyhow::anyhow!("cwd is not absolute: {cwd:?}")); + ) -> anyhow::Result> { + debug!( + "Configuring session: model={}; provider={:?}", + session_configuration.model, session_configuration.provider + ); + if !session_configuration.cwd.is_absolute() { + return Err(anyhow::anyhow!( + "cwd is not absolute: {:?}", + session_configuration.cwd + )); } let (conversation_id, rollout_params) = match &initial_history { @@ -350,7 +433,7 @@ impl Session { conversation_id, RolloutRecorderParams::new( conversation_id, - user_instructions.clone(), + session_configuration.user_instructions.clone(), session_source, ), ) @@ -406,8 +489,6 @@ impl Session { anyhow::anyhow!("failed to initialize rollout recorder: {e:#}") })?; let rollout_path = rollout_recorder.rollout_path.clone(); - // Create the mutable state for the Session. - let state = SessionState::new(); // Handle MCP manager result and record any startup failures. let (mcp_connection_manager, failed_clients) = match mcp_res { @@ -475,45 +556,24 @@ impl Session { config.active_profile.clone(), ); - // Now that the conversation id is final (may have been updated by resume), - // construct the model client. - let client = ModelClient::new( - config.clone(), - Some(auth_manager.clone()), - otel_event_manager, - provider.clone(), - model_reasoning_effort, - model_reasoning_summary, - conversation_id, - ); - let turn_context = TurnContext { - client, - tools_config: ToolsConfig::new(&ToolsConfigParams { - model_family: &config.model_family, - features: &config.features, - }), - user_instructions, - base_instructions, - approval_policy, - sandbox_policy, - shell_environment_policy: config.shell_environment_policy.clone(), - cwd, - is_review_mode: false, - final_output_json_schema: None, - }; + // Create the mutable state for the Session. + let state = SessionState::new(session_configuration.clone()); + let services = SessionServices { mcp_connection_manager, session_manager: ExecSessionManager::default(), unified_exec_manager: UnifiedExecSessionManager::default(), - notifier: notify, + notifier: UserNotifier::new(config.notify.clone()), rollout: Mutex::new(Some(rollout_recorder)), user_shell: default_shell, show_raw_agent_reasoning: config.show_raw_agent_reasoning, executor: Executor::new(ExecutorConfig::new( - turn_context.sandbox_policy.clone(), - turn_context.cwd.clone(), + session_configuration.sandbox_policy.clone(), + session_configuration.cwd.clone(), config.codex_linux_sandbox_exe.clone(), )), + auth_manager: Arc::clone(&auth_manager), + otel_event_manager, }; let sess = Arc::new(Session { @@ -528,15 +588,14 @@ impl Session { // Dispatch the SessionConfiguredEvent first and then report any errors. // If resuming, include converted initial messages in the payload so UIs can render them immediately. let initial_messages = initial_history.get_event_msgs(); - sess.record_initial_history(&turn_context, initial_history) - .await; + sess.record_initial_history(initial_history).await; let events = std::iter::once(Event { id: INITIAL_SUBMIT_ID.to_owned(), msg: EventMsg::SessionConfigured(SessionConfiguredEvent { session_id: conversation_id, - model, - reasoning_effort: model_reasoning_effort, + model: session_configuration.model.clone(), + reasoning_effort: session_configuration.model_reasoning_effort, history_log_id, history_entry_count, initial_messages, @@ -548,7 +607,7 @@ impl Session { sess.send_event(event).await; } - Ok((sess, turn_context)) + Ok(sess) } pub(crate) fn get_tx_event(&self) -> Sender { @@ -562,15 +621,12 @@ impl Session { format!("auto-compact-{id}") } - async fn record_initial_history( - &self, - turn_context: &TurnContext, - conversation_history: InitialHistory, - ) { + async fn record_initial_history(&self, conversation_history: InitialHistory) { + let turn_context = self.new_turn(SessionSettingsUpdate::default()).await; match conversation_history { InitialHistory::New => { // Build and record initial items (user instructions + environment context) - let items = self.build_initial_context(turn_context); + let items = self.build_initial_context(&turn_context); self.record_conversation_items(&items).await; } InitialHistory::Resumed(_) | InitialHistory::Forked(_) => { @@ -579,7 +635,7 @@ impl Session { // Always add response items to conversation history let reconstructed_history = - self.reconstruct_history_from_rollout(turn_context, &rollout_items); + self.reconstruct_history_from_rollout(&turn_context, &rollout_items); if !reconstructed_history.is_empty() { self.record_into_history(&reconstructed_history).await; } @@ -592,6 +648,52 @@ impl Session { } } + pub(crate) async fn update_settings(&self, updates: SessionSettingsUpdate) { + let mut state = self.state.lock().await; + + state.session_configuration = state.session_configuration.apply(&updates); + } + + pub(crate) async fn new_turn(&self, updates: SessionSettingsUpdate) -> Arc { + let current_configuration = self.state.lock().await.session_configuration.clone(); + let session_configuration = current_configuration.apply(&updates); + + self.services.executor.update_environment( + session_configuration.sandbox_policy.clone(), + session_configuration.cwd.clone(), + ); + + let mut turn_context: TurnContext = Self::make_turn_context( + Some(Arc::clone(&self.services.auth_manager)), + &self.services.otel_event_manager, + session_configuration.provider.clone(), + &session_configuration, + self.conversation_id, + ); + if let Some(final_schema) = updates.final_output_json_schema { + turn_context.final_output_json_schema = final_schema; + } + Arc::new(turn_context) + } + + fn build_environment_update_item( + &self, + previous: Option<&Arc>, + next: &TurnContext, + ) -> Option { + let prev = previous?; + + let prev_context = EnvironmentContext::from(prev.as_ref()); + let next_context = EnvironmentContext::from(next); + if prev_context.equals_except_shell(&next_context) { + return None; + } + Some(ResponseItem::from(EnvironmentContext::diff( + prev.as_ref(), + next, + ))) + } + /// Persist the event to rollout and send it to clients. pub(crate) async fn send_event(&self, event: Event) { // Persist the event into rollout (recorder filters as needed) @@ -1188,14 +1290,8 @@ impl Session { } } -async fn submission_loop( - sess: Arc, - turn_context: TurnContext, - config: Arc, - rx_sub: Receiver, -) { - // Wrap once to avoid cloning TurnContext for each task. - let mut turn_context = Arc::new(turn_context); +async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiver) { + let mut previous_context: Option> = None; // To break out of this loop, send Op::Shutdown. while let Ok(sub) = rx_sub.recv().await { debug!(?sub, "Submission"); @@ -1211,174 +1307,58 @@ async fn submission_loop( effort, summary, } => { - // Recalculate the persistent turn context with provided overrides. - let prev = Arc::clone(&turn_context); - let provider = prev.client.get_provider(); - - // Effective model + family - let (effective_model, effective_family) = if let Some(ref m) = model { - let fam = - find_family_for_model(m).unwrap_or_else(|| config.model_family.clone()); - (m.clone(), fam) - } else { - (prev.client.get_model(), prev.client.get_model_family()) - }; - - // Effective reasoning settings - let effective_effort = effort.unwrap_or(prev.client.get_reasoning_effort()); - let effective_summary = summary.unwrap_or(prev.client.get_reasoning_summary()); - - let auth_manager = prev.client.get_auth_manager(); - - // Build updated config for the client - let mut updated_config = (*config).clone(); - updated_config.model = effective_model.clone(); - updated_config.model_family = effective_family.clone(); - if let Some(model_info) = get_model_info(&effective_family) { - updated_config.model_context_window = Some(model_info.context_window); - } - - let otel_event_manager = prev.client.get_otel_event_manager().with_model( - updated_config.model.as_str(), - updated_config.model_family.slug.as_str(), - ); - - let client = ModelClient::new( - Arc::new(updated_config), - auth_manager, - otel_event_manager, - provider, - effective_effort, - effective_summary, - sess.conversation_id, - ); - - let new_approval_policy = approval_policy.unwrap_or(prev.approval_policy); - let new_sandbox_policy = sandbox_policy - .clone() - .unwrap_or(prev.sandbox_policy.clone()); - let new_cwd = cwd.clone().unwrap_or_else(|| prev.cwd.clone()); - - let tools_config = ToolsConfig::new(&ToolsConfigParams { - model_family: &effective_family, - features: &config.features, - }); - - let new_turn_context = TurnContext { - client, - tools_config, - user_instructions: prev.user_instructions.clone(), - base_instructions: prev.base_instructions.clone(), - approval_policy: new_approval_policy, - sandbox_policy: new_sandbox_policy.clone(), - shell_environment_policy: prev.shell_environment_policy.clone(), - cwd: new_cwd.clone(), - is_review_mode: false, - final_output_json_schema: None, + let updates = SessionSettingsUpdate { + cwd, + approval_policy, + sandbox_policy, + model, + reasoning_effort: effort, + reasoning_summary: summary, + ..Default::default() }; + sess.update_settings(updates).await; + } - // Install the new persistent context for subsequent tasks/turns. - turn_context = Arc::new(new_turn_context); - - // Optionally persist changes to model / effort - if cwd.is_some() || approval_policy.is_some() || sandbox_policy.is_some() { - sess.record_conversation_items(&[ResponseItem::from(EnvironmentContext::new( + Op::UserInput { .. } | Op::UserTurn { .. } => { + let (items, updates) = match sub.op { + Op::UserTurn { cwd, approval_policy, sandbox_policy, - // Shell is not configurable from turn to turn - None, - ))]) - .await; - } - } - Op::UserInput { items } => { - turn_context - .client - .get_otel_event_manager() - .user_prompt(&items); - // attempt to inject input into current task - if let Err(items) = sess.inject_input(items).await { - // no current task, spawn a new one - sess.spawn_task(Arc::clone(&turn_context), sub.id, items, RegularTask) - .await; - } - } - Op::UserTurn { - items, - cwd, - approval_policy, - sandbox_policy, - model, - effort, - summary, - final_output_json_schema, - } => { - turn_context + model, + effort, + summary, + final_output_json_schema, + items, + } => ( + items, + SessionSettingsUpdate { + cwd: Some(cwd), + approval_policy: Some(approval_policy), + sandbox_policy: Some(sandbox_policy), + model: Some(model), + reasoning_effort: Some(effort), + reasoning_summary: Some(summary), + final_output_json_schema: Some(final_output_json_schema), + }, + ), + Op::UserInput { items } => (items, SessionSettingsUpdate::default()), + _ => unreachable!(), + }; + let current_context = sess.new_turn(updates).await; + current_context .client .get_otel_event_manager() .user_prompt(&items); // attempt to inject input into current task if let Err(items) = sess.inject_input(items).await { - // Derive a fresh TurnContext for this turn using the provided overrides. - let provider = turn_context.client.get_provider(); - let auth_manager = turn_context.client.get_auth_manager(); - - // Derive a model family for the requested model; fall back to the session's. - let model_family = find_family_for_model(&model) - .unwrap_or_else(|| config.model_family.clone()); - - // Create a per‑turn Config clone with the requested model/family. - let mut per_turn_config = (*config).clone(); - per_turn_config.model = model.clone(); - per_turn_config.model_family = model_family.clone(); - if let Some(model_info) = get_model_info(&model_family) { - per_turn_config.model_context_window = Some(model_info.context_window); - } - - let otel_event_manager = - turn_context.client.get_otel_event_manager().with_model( - per_turn_config.model.as_str(), - per_turn_config.model_family.slug.as_str(), - ); - - // Build a new client with per‑turn reasoning settings. - // Reuse the same provider and session id; auth defaults to env/API key. - let client = ModelClient::new( - Arc::new(per_turn_config), - auth_manager, - otel_event_manager, - provider, - effort, - summary, - sess.conversation_id, - ); - - let fresh_turn_context = TurnContext { - client, - tools_config: ToolsConfig::new(&ToolsConfigParams { - model_family: &model_family, - features: &config.features, - }), - user_instructions: turn_context.user_instructions.clone(), - base_instructions: turn_context.base_instructions.clone(), - approval_policy, - sandbox_policy, - shell_environment_policy: turn_context.shell_environment_policy.clone(), - cwd, - is_review_mode: false, - final_output_json_schema, - }; - - // if the environment context has changed, record it in the conversation history - let previous_env_context = EnvironmentContext::from(turn_context.as_ref()); - let new_env_context = EnvironmentContext::from(&fresh_turn_context); - if !new_env_context.equals_except_shell(&previous_env_context) { - let env_response_item = ResponseItem::from(new_env_context); - sess.record_conversation_items(std::slice::from_ref(&env_response_item)) + if let Some(env_item) = sess + .build_environment_update_item(previous_context.as_ref(), ¤t_context) + { + sess.record_conversation_items(std::slice::from_ref(&env_item)) .await; for msg in map_response_item_to_event_messages( - &env_response_item, + &env_item, sess.show_raw_agent_reasoning(), ) { let event = Event { @@ -1389,12 +1369,9 @@ async fn submission_loop( } } - // Install the new persistent context for subsequent tasks/turns. - turn_context = Arc::new(fresh_turn_context); - - // no current task, spawn a new one with the per-turn context - sess.spawn_task(Arc::clone(&turn_context), sub.id, items, RegularTask) + sess.spawn_task(Arc::clone(¤t_context), sub.id, items, RegularTask) .await; + previous_context = Some(current_context); } } Op::ExecApproval { id, decision } => match decision { @@ -1500,6 +1477,7 @@ async fn submission_loop( sess.send_event(event).await; } Op::Compact => { + let turn_context = sess.new_turn(SessionSettingsUpdate::default()).await; // Attempt to inject input into current task if let Err(items) = sess .inject_input(vec![InputItem::Text { @@ -1569,6 +1547,7 @@ async fn submission_loop( sess.send_event(event).await; } Op::Review { review_request } => { + let turn_context = sess.new_turn(SessionSettingsUpdate::default()).await; spawn_review_thread( sess.clone(), config.clone(), @@ -2588,14 +2567,13 @@ mod tests { let (session, turn_context) = make_session_and_context(); let (rollout_items, expected) = sample_rollout(&session, &turn_context); - tokio_test::block_on(session.record_initial_history( - &turn_context, - InitialHistory::Resumed(ResumedHistory { + tokio_test::block_on(session.record_initial_history(InitialHistory::Resumed( + ResumedHistory { conversation_id: ConversationId::default(), history: rollout_items, rollout_path: PathBuf::from("/tmp/resume.jsonl"), - }), - )); + }, + ))); let actual = tokio_test::block_on(async { session.state.lock().await.history_snapshot() }); assert_eq!(expected, actual); @@ -2606,9 +2584,7 @@ mod tests { let (session, turn_context) = make_session_and_context(); let (rollout_items, expected) = sample_rollout(&session, &turn_context); - tokio_test::block_on( - session.record_initial_history(&turn_context, InitialHistory::Forked(rollout_items)), - ); + tokio_test::block_on(session.record_initial_history(InitialHistory::Forked(rollout_items))); let actual = tokio_test::block_on(async { session.state.lock().await.history_snapshot() }); assert_eq!(expected, actual); @@ -2832,53 +2808,56 @@ mod tests { let config = Arc::new(config); let conversation_id = ConversationId::default(); let otel_event_manager = otel_event_manager(conversation_id, config.as_ref()); - let client = ModelClient::new( - config.clone(), - None, - otel_event_manager, - config.model_provider.clone(), - config.model_reasoning_effort, - config.model_reasoning_summary, - conversation_id, - ); - let tools_config = ToolsConfig::new(&ToolsConfigParams { - model_family: &config.model_family, - features: &config.features, - }); - let turn_context = TurnContext { - client, - cwd: config.cwd.clone(), - base_instructions: config.base_instructions.clone(), + let auth_manager = AuthManager::shared(config.cwd.clone(), false); + + let session_configuration = SessionConfiguration { + provider: config.model_provider.clone(), + model: config.model.clone(), + model_reasoning_effort: config.model_reasoning_effort, + model_reasoning_summary: config.model_reasoning_summary, user_instructions: config.user_instructions.clone(), + base_instructions: config.base_instructions.clone(), approval_policy: config.approval_policy, sandbox_policy: config.sandbox_policy.clone(), - shell_environment_policy: config.shell_environment_policy.clone(), - tools_config, - is_review_mode: false, - final_output_json_schema: None, + cwd: config.cwd.clone(), + original_config_do_not_use: Arc::clone(&config), }; + + let state = SessionState::new(session_configuration.clone()); + let services = SessionServices { mcp_connection_manager: McpConnectionManager::default(), session_manager: ExecSessionManager::default(), unified_exec_manager: UnifiedExecSessionManager::default(), - notifier: UserNotifier::default(), + notifier: UserNotifier::new(None), rollout: Mutex::new(None), user_shell: shell::Shell::Unknown, show_raw_agent_reasoning: config.show_raw_agent_reasoning, executor: Executor::new(ExecutorConfig::new( - turn_context.sandbox_policy.clone(), - turn_context.cwd.clone(), + session_configuration.sandbox_policy.clone(), + session_configuration.cwd.clone(), None, )), + auth_manager: Arc::clone(&auth_manager), + otel_event_manager: otel_event_manager.clone(), }; + let session = Session { conversation_id, tx_event, - state: Mutex::new(SessionState::new()), + state: Mutex::new(state), active_turn: Mutex::new(None), services, next_internal_sub_id: AtomicU64::new(0), }; + + let turn_context = Session::make_turn_context( + Some(Arc::clone(&auth_manager)), + &otel_event_manager, + session_configuration.provider.clone(), + &session_configuration, + conversation_id, + ); (session, turn_context) } @@ -2900,53 +2879,56 @@ mod tests { let config = Arc::new(config); let conversation_id = ConversationId::default(); let otel_event_manager = otel_event_manager(conversation_id, config.as_ref()); - let client = ModelClient::new( - config.clone(), - None, - otel_event_manager, - config.model_provider.clone(), - config.model_reasoning_effort, - config.model_reasoning_summary, - conversation_id, - ); - let tools_config = ToolsConfig::new(&ToolsConfigParams { - model_family: &config.model_family, - features: &config.features, - }); - let turn_context = Arc::new(TurnContext { - client, - cwd: config.cwd.clone(), - base_instructions: config.base_instructions.clone(), + let auth_manager = AuthManager::shared(config.cwd.clone(), false); + + let session_configuration = SessionConfiguration { + provider: config.model_provider.clone(), + model: config.model.clone(), + model_reasoning_effort: config.model_reasoning_effort, + model_reasoning_summary: config.model_reasoning_summary, user_instructions: config.user_instructions.clone(), + base_instructions: config.base_instructions.clone(), approval_policy: config.approval_policy, sandbox_policy: config.sandbox_policy.clone(), - shell_environment_policy: config.shell_environment_policy.clone(), - tools_config, - is_review_mode: false, - final_output_json_schema: None, - }); + cwd: config.cwd.clone(), + original_config_do_not_use: Arc::clone(&config), + }; + + let state = SessionState::new(session_configuration.clone()); + let services = SessionServices { mcp_connection_manager: McpConnectionManager::default(), session_manager: ExecSessionManager::default(), unified_exec_manager: UnifiedExecSessionManager::default(), - notifier: UserNotifier::default(), + notifier: UserNotifier::new(None), rollout: Mutex::new(None), user_shell: shell::Shell::Unknown, show_raw_agent_reasoning: config.show_raw_agent_reasoning, executor: Executor::new(ExecutorConfig::new( - config.sandbox_policy.clone(), - config.cwd.clone(), + session_configuration.sandbox_policy.clone(), + session_configuration.cwd.clone(), None, )), + auth_manager: Arc::clone(&auth_manager), + otel_event_manager: otel_event_manager.clone(), }; + let session = Arc::new(Session { conversation_id, tx_event, - state: Mutex::new(SessionState::new()), + state: Mutex::new(state), active_turn: Mutex::new(None), services, next_internal_sub_id: AtomicU64::new(0), }); + + let turn_context = Arc::new(Session::make_turn_context( + Some(Arc::clone(&auth_manager)), + &otel_event_manager, + session_configuration.provider.clone(), + &session_configuration, + conversation_id, + )); (session, turn_context, rx_event) } diff --git a/codex-rs/core/src/environment_context.rs b/codex-rs/core/src/environment_context.rs index 8f3292a226..e7b2e19ffb 100644 --- a/codex-rs/core/src/environment_context.rs +++ b/codex-rs/core/src/environment_context.rs @@ -93,6 +93,25 @@ impl EnvironmentContext { && self.network_access == *network_access && self.writable_roots == *writable_roots } + + pub fn diff(before: &TurnContext, after: &TurnContext) -> Self { + let cwd = if before.cwd != after.cwd { + Some(after.cwd.clone()) + } else { + None + }; + let approval_policy = if before.approval_policy != after.approval_policy { + Some(after.approval_policy) + } else { + None + }; + let sandbox_policy = if before.sandbox_policy != after.sandbox_policy { + Some(after.sandbox_policy.clone()) + } else { + None + }; + EnvironmentContext::new(cwd, approval_policy, sandbox_policy, None) + } } impl From<&TurnContext> for EnvironmentContext { diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 994352eddf..b18f1b9940 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -1,9 +1,13 @@ +use std::sync::Arc; + +use crate::AuthManager; use crate::RolloutRecorder; use crate::exec_command::ExecSessionManager; use crate::executor::Executor; use crate::mcp_connection_manager::McpConnectionManager; use crate::unified_exec::UnifiedExecSessionManager; use crate::user_notification::UserNotifier; +use codex_otel::otel_event_manager::OtelEventManager; use tokio::sync::Mutex; pub(crate) struct SessionServices { @@ -15,4 +19,6 @@ pub(crate) struct SessionServices { pub(crate) user_shell: crate::shell::Shell, pub(crate) show_raw_agent_reasoning: bool, pub(crate) executor: Executor, + pub(crate) auth_manager: Arc, + pub(crate) otel_event_manager: OtelEventManager, } diff --git a/codex-rs/core/src/state/session.rs b/codex-rs/core/src/state/session.rs index 8310d91c0c..fa7543ff53 100644 --- a/codex-rs/core/src/state/session.rs +++ b/codex-rs/core/src/state/session.rs @@ -2,14 +2,15 @@ use codex_protocol::models::ResponseItem; +use crate::codex::SessionConfiguration; use crate::conversation_history::ConversationHistory; use crate::protocol::RateLimitSnapshot; use crate::protocol::TokenUsage; use crate::protocol::TokenUsageInfo; /// Persistent, session-scoped state previously stored directly on `Session`. -#[derive(Default)] pub(crate) struct SessionState { + pub(crate) session_configuration: SessionConfiguration, pub(crate) history: ConversationHistory, pub(crate) token_info: Option, pub(crate) latest_rate_limits: Option, @@ -17,10 +18,12 @@ pub(crate) struct SessionState { impl SessionState { /// Create a new session state mirroring previous `State::default()` semantics. - pub(crate) fn new() -> Self { + pub(crate) fn new(session_configuration: SessionConfiguration) -> Self { Self { + session_configuration, history: ConversationHistory::new(), - ..Default::default() + token_info: None, + latest_rate_limits: None, } } diff --git a/codex-rs/core/tests/suite/prompt_caching.rs b/codex-rs/core/tests/suite/prompt_caching.rs index fcfc960224..caa4cb68de 100644 --- a/codex-rs/core/tests/suite/prompt_caching.rs +++ b/codex-rs/core/tests/suite/prompt_caching.rs @@ -507,7 +507,7 @@ async fn overrides_turn_context_but_keeps_cached_prefix_and_key_constant() { {} "#, - writable.path().to_string_lossy() + writable.path().to_string_lossy(), ); let expected_env_msg_2 = serde_json::json!({ "type": "message", @@ -866,15 +866,14 @@ async fn send_user_turn_with_changes_sends_environment_context() { ]); assert_eq!(body1["input"], expected_input_1); - let expected_env_msg_2 = text_user_input(format!( + let expected_env_msg_2 = text_user_input( r#" - {} never danger-full-access enabled -"#, - default_cwd.to_string_lossy() - )); +"# + .to_string(), + ); let expected_user_message_2 = text_user_input("hello 2".to_string()); let expected_input_2 = serde_json::Value::Array(vec![ expected_ui_msg,