Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions codex-rs/analytics/src/analytics_client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ use codex_app_server_protocol::TurnSteerParams;
use codex_app_server_protocol::TurnSteerResponse;
use codex_app_server_protocol::UserInput;
use codex_login::default_client::DEFAULT_ORIGINATOR;
use codex_login::default_client::originator;
use codex_login::default_client::Originator;
use codex_plugin::AppConnectorId;
use codex_plugin::PluginCapabilitySummary;
use codex_plugin::PluginId;
Expand Down Expand Up @@ -965,7 +965,7 @@ fn app_mentioned_event_serializes_expected_shape() {
"thread_id": "thread-1",
"turn_id": "turn-1",
"app_name": "Calendar",
"product_client_id": originator().value,
"product_client_id": Originator::process_default().value().to_string(),
"invoke_type": "explicit",
"model_slug": "gpt-5"
}
Expand Down Expand Up @@ -1003,7 +1003,7 @@ fn app_used_event_serializes_expected_shape() {
"thread_id": "thread-2",
"turn_id": "turn-2",
"app_name": "Google Drive",
"product_client_id": originator().value,
"product_client_id": Originator::process_default().value().to_string(),
"invoke_type": "implicit",
"model_slug": "gpt-5"
}
Expand Down Expand Up @@ -2774,7 +2774,7 @@ fn plugin_used_event_serializes_expected_shape() {
"has_skills": true,
"mcp_server_count": 2,
"connector_ids": ["calendar", "drive"],
"product_client_id": originator().value,
"product_client_id": Originator::process_default().value().to_string(),
"thread_id": "thread-3",
"turn_id": "turn-3",
"model_slug": "gpt-5"
Expand Down Expand Up @@ -2803,7 +2803,7 @@ fn plugin_management_event_serializes_expected_shape() {
"has_skills": true,
"mcp_server_count": 2,
"connector_ids": ["calendar", "drive"],
"product_client_id": originator().value
"product_client_id": Originator::process_default().value().to_string()
}
})
);
Expand Down Expand Up @@ -3009,7 +3009,7 @@ async fn reducer_ingests_skill_invoked_fact() {
"skill_id": expected_skill_id,
"skill_name": "doc",
"event_params": {
"product_client_id": originator().value,
"product_client_id": Originator::process_default().value().to_string(),
"skill_scope": "user",
"plugin_id": null,
"repo_url": null,
Expand Down Expand Up @@ -3170,7 +3170,7 @@ async fn reducer_ingests_plugin_state_changed_fact() {
"has_skills": true,
"mcp_server_count": 2,
"connector_ids": ["calendar", "drive"],
"product_client_id": originator().value
"product_client_id": Originator::process_default().value().to_string()
}
}])
);
Expand Down
4 changes: 3 additions & 1 deletion codex-rs/analytics/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerResponse;
use codex_login::AuthManager;
use codex_login::CodexAuth;
use codex_login::default_client::Originator;
use codex_login::default_client::create_client;
use codex_plugin::PluginTelemetryMetadata;
use codex_protocol::request_permissions::RequestPermissionsResponse;
Expand Down Expand Up @@ -440,7 +441,8 @@ async fn send_track_events_request(auth: &CodexAuth, url: &str, events: Vec<Trac

let payload = TrackEventsRequest { events };

let response = create_client()
let originator = Originator::process_default();
let response = create_client(&originator)
.post(url)
.timeout(ANALYTICS_EVENTS_TIMEOUT)
.headers(codex_model_provider::auth_provider_from_auth(auth).to_auth_headers())
Expand Down
6 changes: 3 additions & 3 deletions codex-rs/analytics/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::facts::TurnSubmissionType;
use crate::now_unix_millis;
use codex_app_server_protocol::CodexErrorInfo;
use codex_app_server_protocol::CommandExecutionSource;
use codex_login::default_client::originator;
use codex_login::default_client::Originator;
use codex_plugin::PluginTelemetryMetadata;
use codex_protocol::approvals::NetworkApprovalProtocol;
use codex_protocol::models::AdditionalPermissionProfile;
Expand Down Expand Up @@ -890,7 +890,7 @@ pub(crate) fn codex_app_metadata(
thread_id: Some(tracking.thread_id.clone()),
turn_id: Some(tracking.turn_id.clone()),
app_name: app.app_name,
product_client_id: Some(originator().value),
product_client_id: Some(Originator::process_default().value().to_string()),
invoke_type: app.invocation_type,
model_slug: Some(tracking.model_slug.clone()),
}
Expand Down Expand Up @@ -920,7 +920,7 @@ pub(crate) fn codex_plugin_metadata(plugin: PluginTelemetryMetadata) -> CodexPlu
.map(|connector_id| connector_id.0)
.collect()
}),
product_client_id: Some(originator().value),
product_client_id: Some(Originator::process_default().value().to_string()),
}
}

Expand Down
4 changes: 2 additions & 2 deletions codex-rs/analytics/src/reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ use codex_app_server_protocol::UserInput;
use codex_app_server_protocol::WebSearchAction;
use codex_git_utils::collect_git_info;
use codex_git_utils::get_git_repo_root;
use codex_login::default_client::originator;
use codex_login::default_client::Originator;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Personality;
use codex_protocol::config_types::ReasoningSummary;
Expand Down Expand Up @@ -680,7 +680,7 @@ impl AnalyticsReducer {
turn_id: Some(tracking.turn_id.clone()),
invoke_type: Some(invocation.invocation_type),
model_slug: Some(tracking.model_slug.clone()),
product_client_id: Some(originator().value),
product_client_id: Some(Originator::process_default().value().to_string()),
repo_url,
skill_scope: Some(skill_scope.to_string()),
plugin_id: invocation.plugin_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use super::protocol::EnrollRemoteServerResponse;
use super::protocol::RemoteControlTarget;
use axum::http::HeaderMap;
use codex_api::SharedAuthProvider;
use codex_login::default_client::Originator;
use codex_login::default_client::build_reqwest_client;
use codex_state::RemoteControlEnrollmentRecord;
use codex_state::StateRuntime;
Expand Down Expand Up @@ -204,7 +205,8 @@ pub(super) async fn enroll_remote_control_server(
app_server_version: env!("CARGO_PKG_VERSION"),
installation_id: installation_id.to_string(),
};
let client = build_reqwest_client();
let originator = Originator::process_default();
let client = build_reqwest_client(&originator);
let mut auth_headers = HeaderMap::new();
auth.auth_provider.add_auth_headers(&mut auth_headers);
let http_request = client
Expand Down
115 changes: 64 additions & 51 deletions codex-rs/app-server/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use codex_login::auth::ExternalAuth;
use codex_login::auth::ExternalAuthRefreshContext;
use codex_login::auth::ExternalAuthRefreshReason;
use codex_login::auth::ExternalAuthTokens;
use codex_login::default_client::Originator;
use codex_protocol::ThreadId;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::W3cTraceContext;
Expand Down Expand Up @@ -193,8 +194,7 @@ pub(crate) struct ConnectionSessionState {
pub(crate) struct InitializedConnectionSessionState {
pub(crate) experimental_api_enabled: bool,
pub(crate) opted_out_notification_methods: HashSet<String>,
pub(crate) app_server_client_name: String,
pub(crate) client_version: String,
pub(crate) originator: Originator,
pub(crate) request_attestation: bool,
}

Expand Down Expand Up @@ -232,13 +232,21 @@ impl ConnectionSessionState {
pub(crate) fn app_server_client_name(&self) -> Option<&str> {
self.initialized
.get()
.map(|session| session.app_server_client_name.as_str())
.and_then(|session| session.originator.app_server_client())
.map(codex_login::default_client::AppServerClient::name)
}

pub(crate) fn client_version(&self) -> Option<&str> {
self.initialized
.get()
.map(|session| session.client_version.as_str())
.and_then(|session| session.originator.app_server_client())
.map(codex_login::default_client::AppServerClient::version)
}

pub(crate) fn originator(&self) -> Option<Originator> {
self.initialized
.get()
.map(|session| session.originator.clone())
}

pub(crate) fn request_attestation(&self) -> bool {
Expand Down Expand Up @@ -802,8 +810,12 @@ impl MessageProcessor {
);

let serialization_scope = codex_request.serialization_scope();
let app_server_client_name = session.app_server_client_name().map(str::to_string);
let client_version = session.client_version().map(str::to_string);
let Some(originator) = session.originator() else {
return Err(crate::error_code::internal_error(
"initialized session is missing originator",
));
};
let request_context = request_context.with_originator(originator);
let error_request_id = connection_request_id.clone();
let rpc_gate = Arc::clone(&session.rpc_gate);
let processor = Arc::clone(self);
Expand All @@ -817,8 +829,6 @@ impl MessageProcessor {
connection_request_id,
codex_request,
request_context,
app_server_client_name,
client_version,
)
.await;
if let Err(error) = result {
Expand Down Expand Up @@ -846,8 +856,6 @@ impl MessageProcessor {
connection_request_id: ConnectionRequestId,
codex_request: ClientRequest,
request_context: RequestContext,
app_server_client_name: Option<String>,
client_version: Option<String>,
) -> Result<(), JSONRPCErrorError> {
let connection_id = connection_request_id.connection_id;
let request_id = ConnectionRequestId {
Expand Down Expand Up @@ -876,7 +884,7 @@ impl MessageProcessor {
.map(|response| Some(response.into())),
ClientRequest::ExternalAgentConfigImport { params, .. } => self
.external_agent_config_processor
.import(request_id.clone(), params)
.import(request_id.clone(), params, &request_context)
.await
.map(|()| None),
ClientRequest::ConfigValueWrite { params, .. } => {
Expand All @@ -887,7 +895,7 @@ impl MessageProcessor {
}
ClientRequest::ExperimentalFeatureEnablementSet { params, .. } => {
self.config_processor
.experimental_feature_enablement_set(request_id.clone(), params)
.experimental_feature_enablement_set(&request_context, params)
.await
}
ClientRequest::RemoteControlEnable { .. } => self
Expand Down Expand Up @@ -962,13 +970,7 @@ impl MessageProcessor {
.map(|response| Some(response.into())),
ClientRequest::ThreadStart { params, .. } => {
self.thread_processor
.thread_start(
request_id.clone(),
params,
app_server_client_name.clone(),
client_version.clone(),
request_context,
)
.thread_start(request_id.clone(), params, request_context)
.await
}
ClientRequest::ThreadUnsubscribe { params, .. } => {
Expand All @@ -978,22 +980,12 @@ impl MessageProcessor {
}
ClientRequest::ThreadResume { params, .. } => {
self.thread_processor
.thread_resume(
request_id.clone(),
params,
app_server_client_name.clone(),
client_version.clone(),
)
.thread_resume(request_id.clone(), params, &request_context)
.await
}
ClientRequest::ThreadFork { params, .. } => {
self.thread_processor
.thread_fork(
request_id.clone(),
params,
app_server_client_name.clone(),
client_version.clone(),
)
.thread_fork(request_id.clone(), params, &request_context)
.await
}
ClientRequest::ThreadArchive { params, .. } => {
Expand Down Expand Up @@ -1100,48 +1092,72 @@ impl MessageProcessor {
self.marketplace_processor.marketplace_upgrade(params).await
}
ClientRequest::PluginList { params, .. } => {
self.plugin_processor.plugin_list(params).await
self.plugin_processor
.plugin_list(params, &request_context)
.await
}
ClientRequest::PluginInstalled { params, .. } => {
self.plugin_processor.plugin_installed(params).await
self.plugin_processor
.plugin_installed(params, &request_context)
.await
}
ClientRequest::PluginRead { params, .. } => {
self.plugin_processor.plugin_read(params).await
self.plugin_processor
.plugin_read(params, &request_context)
.await
}
ClientRequest::PluginSkillRead { params, .. } => {
self.plugin_processor.plugin_skill_read(params).await
self.plugin_processor
.plugin_skill_read(params, &request_context)
.await
}
ClientRequest::PluginShareSave { params, .. } => {
self.plugin_processor.plugin_share_save(params).await
self.plugin_processor
.plugin_share_save(params, &request_context)
.await
}
ClientRequest::PluginShareUpdateTargets { params, .. } => {
self.plugin_processor
.plugin_share_update_targets(params)
.plugin_share_update_targets(params, &request_context)
.await
}
ClientRequest::PluginShareList { params, .. } => {
self.plugin_processor.plugin_share_list(params).await
self.plugin_processor
.plugin_share_list(params, &request_context)
.await
}
ClientRequest::PluginShareCheckout { params, .. } => {
self.plugin_processor.plugin_share_checkout(params).await
self.plugin_processor
.plugin_share_checkout(params, &request_context)
.await
}
ClientRequest::PluginShareDelete { params, .. } => {
self.plugin_processor.plugin_share_delete(params).await
self.plugin_processor
.plugin_share_delete(params, &request_context)
.await
}
ClientRequest::AppsList { params, .. } => {
self.apps_processor.apps_list(&request_id, params).await
self.apps_processor
.apps_list(&request_context, params)
.await
}
ClientRequest::SkillsConfigWrite { params, .. } => {
self.catalog_processor.skills_config_write(params).await
}
ClientRequest::PluginInstall { params, .. } => {
self.plugin_processor.plugin_install(params).await
self.plugin_processor
.plugin_install(params, &request_context)
.await
}
ClientRequest::PluginUninstall { params, .. } => {
self.plugin_processor.plugin_uninstall(params).await
self.plugin_processor
.plugin_uninstall(params, &request_context)
.await
}
ClientRequest::ModelList { params, .. } => {
self.catalog_processor.model_list(params).await
self.catalog_processor
.model_list(params, &request_context)
.await
}
ClientRequest::ExperimentalFeatureList { params, .. } => {
self.catalog_processor
Expand All @@ -1158,12 +1174,7 @@ impl MessageProcessor {
}
ClientRequest::TurnStart { params, .. } => {
self.turn_processor
.turn_start(
request_id.clone(),
params,
app_server_client_name.clone(),
client_version.clone(),
)
.turn_start(request_id.clone(), params, &request_context)
.await
}
ClientRequest::ThreadInjectItems { params, .. } => {
Expand Down Expand Up @@ -1201,7 +1212,9 @@ impl MessageProcessor {
self.turn_processor.thread_realtime_list_voices().await
}
ClientRequest::ReviewStart { params, .. } => {
self.turn_processor.review_start(&request_id, params).await
self.turn_processor
.review_start(&request_id, params, &request_context)
.await
}
ClientRequest::McpServerOauthLogin { params, .. } => {
self.mcp_processor.mcp_server_oauth_login(params).await
Expand Down
Loading
Loading