Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions codex-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion codex-rs/app-server-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ pub use codex_app_server::in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
pub use codex_app_server::in_process::InProcessServerEvent;
use codex_app_server::in_process::InProcessStartArgs;
use codex_app_server::in_process::LogDbLayer;
pub use codex_app_server::in_process::StateDbHandle;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
Expand All @@ -47,6 +46,7 @@ use codex_config::LoaderOverrides;
use codex_config::NoopThreadConfigLoader;
use codex_config::RemoteThreadConfigLoader;
use codex_config::ThreadConfigLoader;
pub use codex_core::StateDbHandle;
use codex_core::config::Config;
pub use codex_exec_server::EnvironmentManager;
pub use codex_exec_server::EnvironmentManagerArgs;
Expand Down
6 changes: 4 additions & 2 deletions codex-rs/app-server/src/bespoke_event_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2629,7 +2629,8 @@ mod tests {
config.model_provider.clone(),
config.codex_home.to_path_buf(),
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
),
)
.await,
);
let codex_core::NewThread {
thread_id: conversation_id,
Expand Down Expand Up @@ -3214,7 +3215,8 @@ mod tests {
config.model_provider.clone(),
config.codex_home.to_path_buf(),
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
),
)
.await,
);
let codex_core::NewThread {
thread_id: conversation_id,
Expand Down
25 changes: 18 additions & 7 deletions codex-rs/app-server/src/in_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,12 @@ use codex_config::CloudRequirementsLoader;
use codex_config::LoaderOverrides;
use codex_config::ThreadConfigLoader;
use codex_core::config::Config;
use codex_core::init_state_db_from_config;
use codex_exec_server::EnvironmentManager;
use codex_feedback::CodexFeedback;
use codex_login::AuthManager;
use codex_protocol::protocol::SessionSource;
pub use codex_rollout::StateDbHandle;
use codex_rollout::state_db::StateDbHandle;
pub use codex_state::log_db::LogDbLayer;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -127,7 +128,7 @@ pub struct InProcessStartArgs {
pub feedback: CodexFeedback,
/// SQLite tracing layer used to flush recently emitted logs before feedback upload.
pub log_db: Option<LogDbLayer>,
/// Process-wide SQLite state handle shared with embedded app-server consumers.
/// Optional state DB handle to use for the in-process runtime.
pub state_db: Option<StateDbHandle>,
/// Environment manager used by core execution and filesystem operations.
pub environment_manager: Arc<EnvironmentManager>,
Expand Down Expand Up @@ -344,7 +345,7 @@ impl InProcessClientHandle {
/// the runtime is shut down and an `InvalidData` error is returned.
pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle> {
let initialize = args.initialize.clone();
let client = start_uninitialized(args);
let client = start_uninitialized(args).await;

let initialize_response = client
.request(ClientRequest::Initialize {
Expand All @@ -364,8 +365,12 @@ pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle>
Ok(client)
}

fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
async fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
let channel_capacity = args.channel_capacity.max(1);
let state_db = match args.state_db.clone() {
Some(state_db) => Some(state_db),
None => init_state_db_from_config(args.config.as_ref()).await,
};
let (client_tx, mut client_rx) = mpsc::channel::<InProcessClientMessage>(channel_capacity);
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);

Expand Down Expand Up @@ -414,6 +419,12 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
);
let (processor_tx, mut processor_rx) = mpsc::channel::<ProcessorCommand>(channel_capacity);
let mut processor_handle = tokio::spawn(async move {
let Some(state_db) = state_db else {
warn!(
"in-process app-server state db initialization failed; shutting down processor task"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we observe this correctly?

);
return;
};
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing: Arc::clone(&processor_outgoing),
analytics_events_client,
Expand All @@ -423,7 +434,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
environment_manager: args.environment_manager,
feedback: args.feedback,
log_db: args.log_db,
state_db: args.state_db,
state_db,
config_warnings: args.config_warnings,
session_source: args.session_source,
auth_manager,
Expand Down Expand Up @@ -761,7 +772,7 @@ mod tests {
) -> InProcessClientHandle {
let codex_home = TempDir::new().expect("temp dir");
let config = Arc::new(build_test_config(codex_home.path()).await);
let state_db = codex_rollout::state_db::try_init(config.as_ref())
let state_db = init_state_db_from_config(config.as_ref())
.await
.expect("state db should initialize for in-process test");
let args = InProcessStartArgs {
Expand Down Expand Up @@ -819,7 +830,7 @@ mod tests {
}

#[tokio::test]
async fn in_process_allows_device_key_requests_to_reach_device_key_processor() {
async fn in_process_allows_device_key_requests_to_reach_device_key_api() {
let client = start_test_client(SessionSource::Cli).await;
const MALFORMED_KEY_ID_MESSAGE: &str = concat!(
"invalid device key payload: keyId must be dk_hse_, dk_tpm_, or dk_osn_ ",
Expand Down
38 changes: 14 additions & 24 deletions codex-rs/app-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ use codex_config::TextRange as CoreTextRange;
use codex_core::ExecPolicyError;
use codex_core::check_execpolicy_for_warnings;
use codex_core::config::find_codex_home;
use codex_core::init_state_db_from_config;
use codex_exec_server::EnvironmentManager;
use codex_exec_server::ExecServerRuntimePaths;
use codex_feedback::CodexFeedback;
use codex_protocol::protocol::SessionSource;
use codex_rollout::state_db as rollout_state_db;
use codex_state::log_db;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -487,9 +487,9 @@ pub async fn run_main_with_transport_options(
}
};

let state_db_result = rollout_state_db::try_init(&config).await;
let state_db_init_error = state_db_result.as_ref().err().map(ToString::to_string);
let state_db = state_db_result.ok();
let state_db = init_state_db_from_config(&config)
.await
.ok_or_else(|| std::io::Error::other("failed to initialize sqlite state db"))?;

if should_run_personality_migration {
let effective_toml = config.config_layer_stack.effective_config();
Expand Down Expand Up @@ -598,10 +598,12 @@ pub async fn run_main_with_transport_options(

let feedback_layer = feedback.logger_layer();
let feedback_metadata_layer = feedback.metadata_layer();
let log_db = state_db.clone().map(log_db::start);
let log_db_layer = log_db
.clone()
.map(|layer| layer.with_filter(Targets::new().with_default(Level::TRACE)));
let log_db = log_db::start(state_db.clone());
let log_db_layer = Some(
log_db
.clone()
.with_filter(Targets::new().with_default(Level::TRACE)),
);
let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer());
let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());
let _ = tracing_subscriber::registry()
Expand All @@ -618,10 +620,6 @@ pub async fn run_main_with_transport_options(
None => error!("{}", warning.summary),
}
}
if let Some(err) = &state_db_init_error {
error!("failed to initialize sqlite state db: {err}");
}

let transport_shutdown_token = CancellationToken::new();
let mut transport_accept_handles = Vec::<JoinHandle<()>>::new();

Expand Down Expand Up @@ -666,25 +664,17 @@ pub async fn run_main_with_transport_options(
let auth_manager =
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await;

let remote_control_config_enabled = config.features.enabled(Feature::RemoteControl);
let remote_control_enabled = remote_control_config_enabled && state_db.is_some();
if remote_control_config_enabled && state_db.is_none() {
error!("remote control disabled because sqlite state db is unavailable");
}
let remote_control_enabled = config.features.enabled(Feature::RemoteControl);
if transport_accept_handles.is_empty() && !remote_control_enabled {
return Err(std::io::Error::new(
ErrorKind::InvalidInput,
if remote_control_config_enabled && state_db.is_none() {
"no transport configured; remote control disabled because sqlite state db is unavailable"
} else {
"no transport configured; use --listen or enable remote control"
},
"no transport configured; use --listen or enable remote control",
));
}

let (remote_control_accept_handle, remote_control_handle) = start_remote_control(
config.chatgpt_base_url.clone(),
state_db.clone(),
Some(state_db.clone()),
auth_manager.clone(),
transport_event_tx.clone(),
transport_shutdown_token.clone(),
Expand Down Expand Up @@ -768,7 +758,7 @@ pub async fn run_main_with_transport_options(
config_manager,
environment_manager,
feedback: feedback.clone(),
log_db,
log_db: Some(log_db),
state_db: state_db.clone(),
config_warnings,
session_source,
Expand Down
11 changes: 7 additions & 4 deletions codex-rs/app-server/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use codex_app_server_protocol::experimental_required_message;
use codex_arg0::Arg0DispatchPaths;
use codex_chatgpt::workspace_settings;
use codex_core::ThreadManager;
use codex_core::agent_graph_store_from_state_db;
use codex_core::config::Config;
use codex_core::thread_store_from_config;
use codex_exec_server::EnvironmentManager;
Expand Down Expand Up @@ -254,7 +255,7 @@ pub(crate) struct MessageProcessorArgs {
pub(crate) environment_manager: Arc<EnvironmentManager>,
pub(crate) feedback: CodexFeedback,
pub(crate) log_db: Option<LogDbLayer>,
pub(crate) state_db: Option<StateDbHandle>,
pub(crate) state_db: StateDbHandle,
pub(crate) config_warnings: Vec<ConfigWarningNotification>,
pub(crate) session_source: SessionSource,
pub(crate) auth_manager: Arc<AuthManager>,
Expand Down Expand Up @@ -291,14 +292,16 @@ impl MessageProcessor {
// affect per-thread behavior, but they must not move newly started,
// resumed, or forked threads to a different persistence backend/root.
let thread_store = thread_store_from_config(config.as_ref(), state_db.clone());
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
let thread_manager = Arc::new(ThreadManager::new(
config.as_ref(),
auth_manager.clone(),
session_source,
environment_manager,
Some(analytics_events_client.clone()),
Arc::clone(&thread_store),
state_db.clone(),
Arc::clone(&thread_store),
agent_graph_store.clone(),
));
thread_manager
.plugins_manager()
Expand Down Expand Up @@ -344,7 +347,7 @@ impl MessageProcessor {
Arc::clone(&config),
feedback,
log_db,
state_db.clone(),
Some(state_db.clone()),
);
let git_processor = GitRequestProcessor::new();
let initialize_processor = InitializeRequestProcessor::new(
Expand Down Expand Up @@ -395,7 +398,7 @@ impl MessageProcessor {
thread_watch_manager.clone(),
Arc::clone(&thread_list_state_permit),
thread_goal_processor.clone(),
state_db.clone(),
Some(state_db.clone()),
);
let turn_processor = TurnRequestProcessor::new(
auth_manager.clone(),
Expand Down
6 changes: 5 additions & 1 deletion codex-rs/app-server/src/message_processor_tracing_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use codex_config::CloudRequirementsLoader;
use codex_config::LoaderOverrides;
use codex_core::config::Config;
use codex_core::config::ConfigBuilder;
use codex_core::init_state_db_from_config;
use codex_exec_server::EnvironmentManager;
use codex_feedback::CodexFeedback;
use codex_login::AuthManager;
Expand Down Expand Up @@ -281,6 +282,9 @@ async fn build_test_processor(
outgoing_tx,
analytics_events_client.clone(),
));
let state_db = init_state_db_from_config(config.as_ref())
.await
.expect("tracing test processor requires state db");
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing,
analytics_events_client,
Expand All @@ -290,7 +294,7 @@ async fn build_test_processor(
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
feedback: CodexFeedback::new(),
log_db: None,
state_db: None,
state_db,
config_warnings: Vec::new(),
session_source: SessionSource::VSCode,
auth_manager,
Expand Down
22 changes: 6 additions & 16 deletions codex-rs/app-server/src/request_processors/device_key_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use codex_device_key::RemoteControlClientConnectionAudience;
use codex_device_key::RemoteControlClientConnectionSignPayload;
use codex_device_key::RemoteControlClientEnrollmentAudience;
use codex_device_key::RemoteControlClientEnrollmentSignPayload;
use codex_rollout::state_db::StateDbHandle;
use codex_state::DeviceKeyBindingRecord;
use codex_state::StateRuntime;

#[derive(Clone)]
pub(crate) struct DeviceKeyRequestProcessor {
Expand All @@ -43,10 +43,7 @@ pub(crate) struct DeviceKeyRequestProcessor {
}

impl DeviceKeyRequestProcessor {
pub(crate) fn new(
outgoing: Arc<OutgoingMessageSender>,
state_db: Option<Arc<StateRuntime>>,
) -> Self {
pub(crate) fn new(outgoing: Arc<OutgoingMessageSender>, state_db: StateDbHandle) -> Self {
Self {
outgoing,
store: DeviceKeyStore::new(Arc::new(StateDeviceKeyBindingStore::new(state_db))),
Expand Down Expand Up @@ -170,33 +167,26 @@ async fn sign_device_key(
}

struct StateDeviceKeyBindingStore {
state_db: Option<Arc<StateRuntime>>,
state_db: StateDbHandle,
}

impl StateDeviceKeyBindingStore {
fn new(state_db: Option<Arc<StateRuntime>>) -> Self {
fn new(state_db: StateDbHandle) -> Self {
Self { state_db }
}

async fn state_db(&self) -> Result<Arc<StateRuntime>, DeviceKeyError> {
self.state_db
.clone()
.ok_or_else(|| DeviceKeyError::Platform("sqlite state db unavailable".to_string()))
}
}

impl fmt::Debug for StateDeviceKeyBindingStore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StateDeviceKeyBindingStore")
.field("has_state_db", &self.state_db.is_some())
.finish_non_exhaustive()
}
}

#[async_trait]
impl DeviceKeyBindingStore for StateDeviceKeyBindingStore {
async fn get_binding(&self, key_id: &str) -> Result<Option<DeviceKeyBinding>, DeviceKeyError> {
let state_db = self.state_db().await?;
let state_db = self.state_db.clone();
state_db
.get_device_key_binding(key_id)
.await
Expand All @@ -214,7 +204,7 @@ impl DeviceKeyBindingStore for StateDeviceKeyBindingStore {
key_id: &str,
binding: &DeviceKeyBinding,
) -> Result<(), DeviceKeyError> {
let state_db = self.state_db().await?;
let state_db = self.state_db.clone();
state_db
.upsert_device_key_binding(&DeviceKeyBindingRecord {
key_id: key_id.to_string(),
Expand Down
Loading
Loading