Conversation
ehayes2000
commented
Apr 8, 2026
- extract proxy logic to standalone crate
- move mcp from bin of DCS to standalone service
- add refresh token to auth
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThe PR migrates MCP OAuth functionality from document_cognition_service to dedicated crates mcp_auth_proxy and mcp_service. It relocates OAuth handlers, middleware, and state to mcp_auth_proxy with restructured domain-driven design, removes OAuth from document_cognition_service, adds FusionAuth refresh-token support, and updates FusionAuth configuration for local/dev environments. Changes
Possibly related PRs
🚥 Pre-merge checks | ✅ 1 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
whutchinson98
left a comment
There was a problem hiding this comment.
nit: not a huge fan of the name auth_proxy perhaps making it more explicit like mcp_auth_proxy and McpAuthProxyService etc.
Also, I think we should add in db persistence before merging in as this means we can't safely scale these services due to potentially losing refresh tokens
- extract proxy logic to standalone crate
There was a problem hiding this comment.
Actionable comments posted: 14
♻️ Duplicate comments (1)
rust/cloud-storage/mcp_service/src/main.rs (1)
47-57: 🧹 Nitpick | 🔵 TrivialBackground task panics will silently stop cleanup.
The spawned task stores no
JoinHandle, so if it panics (e.g., due to an unexpected error variant), cleanup stops silently. Consider adding panic logging or usingtokio::spawnwithcatch_unwind:♻️ Option: Add panic handling
// Spawn background cleanup for expired OAuth entries let cleanup_state = context.auth_proxy.clone(); - tokio::spawn(async move { + tokio::spawn(async move { let mut interval = tokio::time::interval(AUTH_PROXY_CLEANUP_INTERVAL); loop { interval.tick().await; if let Err(error) = cleanup_state.cleanup_expired().await { tracing::error!(error=?error, "auth proxy cleanup task failed"); } } - }); + }.instrument(tracing::info_span!("auth_proxy_cleanup")));Alternatively, store the
JoinHandleand monitor it, or use.inspect_err()with a counter metric to detect repeated failures.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_service/src/main.rs` around lines 47 - 57, The background cleanup tokio::spawn for cleanup_state.cleanup_expired() can panic and stop silently because its JoinHandle is ignored; wrap the spawned future in std::panic::AssertUnwindSafe + tokio::spawn(async move { match std::panic::catch_unwind(|| async move { ... }.into_future()) { ... }}) or use tokio::spawn with futures::FutureExt::catch_unwind to log panics (include tracing::error with the panic payload) and/or return a JoinHandle from the surrounding context and monitor it (store the JoinHandle and .await or .inspect_err() to emit metrics on repeated failures). Ensure references to cleanup_state, cleanup_expired(), AUTH_PROXY_CLEANUP_INTERVAL and the tokio::spawn call are updated so panics are caught and logged or the handle is stored and supervised.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@infra/stacks/fusionauth-instance/index.ts`:
- Around line 272-277: The localhost redirect URL list contains a redundant
entry: the pattern 'http://localhost:8085/*' already covers
'http://localhost:8085/oauth/*'; in the array built when (stack === 'local' ||
stack === 'dev') remove the duplicate 'http://localhost:8085/oauth/*' entry so
only 'http://localhost:8085/*' remains (locate the array construction around the
conditional that appends these URLs in
infra/stacks/fusionauth-instance/index.ts).
In
`@rust/cloud-storage/macro_db_client/migrations/20260408190000_mcp_auth_proxy_refresh_credentials.sql`:
- Around line 1-11: The migration stores sensitive tokens in plaintext (table
mcp_auth_proxy_refresh_credentials, columns refresh_token and access_token), so
update the schema and application flow to protect them: replace the primary key
refresh_token with a non-reversible identifier (e.g., refresh_token_hash or id)
stored as the PK and indexed, store only a hash (e.g., SHA-256) of the refresh
token for lookup, and change access_token to an encrypted column (or bytea) so
the application performs application-level encryption/decryption before
insert/select (or move secrets to a dedicated secrets vault), update related
code that inserts/queries this table (functions that currently use
refresh_token/access_token) to compute/compare the hash and to encrypt/decrypt
the access_token, and add a migration note documenting the threat-model decision
if you choose not to encrypt.
In `@rust/cloud-storage/mcp_auth_proxy/Cargo.toml`:
- Around line 25-26: The workspace Cargo.toml is missing the macro_user_id crate
in its members list; open rust/cloud-storage/Cargo.toml and add "macro_user_id"
to the workspace.members array (alongside existing entries like "macro_auth") so
the crate at rust/cloud-storage/macro_user_id is recognized and the dependency
macro_user_id in mcp_auth_proxy can be resolved by the workspace.
In `@rust/cloud-storage/mcp_auth_proxy/README.md`:
- Around line 1-6: Update the README.md so it correctly references the current
crate name `mcp_auth_proxy` (instead of `mcp_service`) and clarify its role as a
proxy to the auth server; also fix the Markdown link syntax by changing
occurrences of `(Official Docs)[https://...]` and `(Useful fusionauth
article)[https://...]` to the proper `[Official Docs](https://...)` and `[Useful
fusionauth article](https://...)` form, and ensure the links include brief
descriptive text and backtick-quoted crate names where appropriate (e.g.,
`mcp_auth_proxy`) for clarity.
In `@rust/cloud-storage/mcp_auth_proxy/src/domain/ports.rs`:
- Around line 27-49: Add an atomic consume-and-rotate primitive to the
RefreshCredentialsStore trait (e.g. fn consume_and_rotate<'a>(&'a self,
old_refresh_token: &'a RefreshToken, new_refresh_token: Option<&'a
RefreshToken>, new_credentials: Option<&'a RefreshCredentials>) -> Pin<Box<dyn
Future<Output = anyhow::Result<Option<RefreshCredentials>>> + Send + 'a>>) that
performs the read+delete+put in one transactional call and returns the consumed
credentials (or None) on success; implement this method atomically in the
Postgres adapter using a DB transaction/SELECT FOR UPDATE to ensure two
concurrent refreshes cannot both consume the same token, and update
domain/service.rs to call this new consume_and_rotate method instead of separate
get/delete/put calls.
In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs`:
- Around line 136-145: The code removes the auth code from self.codes (via
self.codes.remove(...)) and binds it to issued before persisting refresh
credentials with put, which can fail and leave the client unable to retry;
change the flow in the token exchange handler so the authorization code is not
removed until the refresh credentials persistence (the put call) succeeds —
either call self.codes.remove only after a successful put, or if you must remove
it first, capture the removed entry and reinsert it into self.codes on any
storage error (re-using the same key/value) and return
TokenExchangeError::InvalidOrExpiredCode only when the code truly doesn't exist;
update the logic around issued, self.codes.remove, and the put call to ensure
atomicity/retryability.
- Around line 53-55: The service currently stores OAuth flow state in in-memory
DashMaps (pending and codes on McpAuthProxyServiceImpl holding
PendingAuthorization and IssuedAuthorizationCode), which will break across
restarts or multi-replica routing and cause
UnknownOrExpiredSession/InvalidOrExpiredCode; move these state stores to a
durable/shared backend (e.g., Redis, DynamoDB, or database) and replace the
DashMap usage with a repository layer (e.g., AuthorizationRepository) used by
the authorize, /oauth/callback, and /token handlers so issued codes and pending
sessions survive process restarts and multi-replica traffic; ensure reads/writes
use TTLs and atomic operations where necessary and update
McpAuthProxyServiceImpl to depend on and call the new storage interface instead
of accessing pending and codes directly.
- Around line 312-317: The current redirect URL is built by string concatenation
which breaks if pending.client_redirect_uri already contains a query or
fragment; instead parse pending.client_redirect_uri into a url::Url, use
Url::parse(...) and then call url.query_pairs_mut().append_pair("code",
&issued_code).append_pair("state", &pending.client_state) to add the parameters
safely, and return Ok(url.into_string()); also handle Url::parse errors (map or
return the existing error type) so malformed client_redirect_uri is surfaced
instead of producing an invalid callback URL.
- Around line 288-298: The code removes a pending entry with
self.pending.remove(&session_id) but never enforces the TTL: after removal you
must check pending.expires_at against the current time and reject expired
sessions; update the block that obtains pending (the variable pending) to
compare pending.expires_at (or compute elapsed against PENDING_AUTH_TTL) and
return CompleteCallbackError::UnknownOrExpiredSession if expired before
proceeding to oauth_provider.exchange_authorization_code; ensure you use the
same time source used elsewhere (e.g., chrono::Utc::now() or the project time
helper) for the comparison.
In `@rust/cloud-storage/mcp_auth_proxy/src/inbound/middleware.rs`:
- Around line 68-72: Add tracing instrumentation to the middleware function
validate_bearer by annotating it with #[tracing::instrument(skip(jwt_args,
request, next))] so calls and spans are recorded for observability; ensure you
do not include any error field in the instrument macro (do not capture an `err`)
because this function returns an axum::response::Response rather than a Result,
and keep the skip list exactly jwt_args, request, next to avoid logging request
bodies or large state.
In `@rust/cloud-storage/mcp_auth_proxy/src/lib.rs`:
- Around line 16-19: Update the module-level comment to remove the claim that
refresh-token bookkeeping is "process-local" and kept "in memory" and instead
note that refresh credentials are persisted via the new
PgRefreshCredentialsStore (and that the broker uses Postgres-backed storage for
refresh_token -> access_token mapping); reference PgRefreshCredentialsStore and
any existing FusionAuth refresh API caveats, and keep the rest of the limitation
text accurate to the current implementation.
In `@rust/cloud-storage/mcp_auth_proxy/src/outbound/fusionauth.rs`:
- Around line 1-11: The file uses dyn Future<Output = ...> in the return types
(e.g., the impls referenced in fusionauth.rs where OAuthProvider methods are
implemented) but never imports Future; add an import for Future (for example use
std::future::Future;) at the top of the file so the dyn Future types
compile—keep existing Pin and Arc imports and ensure the Future import is in
scope for the async-trait-like return signatures.
In
`@rust/cloud-storage/mcp_auth_proxy/src/outbound/pg_refresh_credentials_store.rs`:
- Around line 1-11: The compilation error is caused by missing import of Future
used in trait-object/Pinned futures in this module; add the standard Future
import by inserting use std::future::Future; alongside the existing use
std::pin::Pin; so the trait implementations and signatures that reference Future
(e.g., related to RefreshCredentialsStore, and types handling RefreshCredentials
and RefreshToken) will compile.
In `@rust/cloud-storage/mcp_service/src/context.rs`:
- Line 74: The three bootstrap functions that return Results—build_context,
build_tool_context, and build_auth_proxy—lack tracing instrumentation; add the
attribute #[tracing::instrument(err)] above each function signature (e.g., above
pub async fn build_context() -> anyhow::Result<McpContext> and the corresponding
build_tool_context and build_auth_proxy definitions) so errors are recorded on
failure; ensure you use the err attribute only and do not add level = "info".
---
Duplicate comments:
In `@rust/cloud-storage/mcp_service/src/main.rs`:
- Around line 47-57: The background cleanup tokio::spawn for
cleanup_state.cleanup_expired() can panic and stop silently because its
JoinHandle is ignored; wrap the spawned future in std::panic::AssertUnwindSafe +
tokio::spawn(async move { match std::panic::catch_unwind(|| async move { ...
}.into_future()) { ... }}) or use tokio::spawn with
futures::FutureExt::catch_unwind to log panics (include tracing::error with the
panic payload) and/or return a JoinHandle from the surrounding context and
monitor it (store the JoinHandle and .await or .inspect_err() to emit metrics on
repeated failures). Ensure references to cleanup_state, cleanup_expired(),
AUTH_PROXY_CLEANUP_INTERVAL and the tokio::spawn call are updated so panics are
caught and logged or the handle is stored and supervised.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 07caf934-feed-4640-916e-21414b05344e
⛔ Files ignored due to path filters (1)
rust/cloud-storage/Cargo.lockis excluded by!**/*.lock,!**/Cargo.lock
📒 Files selected for processing (27)
infra/stacks/fusionauth-instance/index.tsinfra/stacks/mcp-server/mcp-server.tsrust/cloud-storage/Cargo.tomlrust/cloud-storage/document_cognition_service/Cargo.tomlrust/cloud-storage/document_cognition_service/src/lib.rsrust/cloud-storage/document_cognition_service/src/mcp_oauth/handlers.rsrust/cloud-storage/document_cognition_service/src/mcp_oauth/middleware.rsrust/cloud-storage/document_cognition_service/src/mcp_oauth/mod.rsrust/cloud-storage/document_cognition_service/src/mcp_oauth/state.rsrust/cloud-storage/macro_db_client/migrations/20260408190000_mcp_auth_proxy_refresh_credentials.sqlrust/cloud-storage/mcp_auth_proxy/Cargo.tomlrust/cloud-storage/mcp_auth_proxy/README.mdrust/cloud-storage/mcp_auth_proxy/src/domain.rsrust/cloud-storage/mcp_auth_proxy/src/domain/models.rsrust/cloud-storage/mcp_auth_proxy/src/domain/ports.rsrust/cloud-storage/mcp_auth_proxy/src/domain/service.rsrust/cloud-storage/mcp_auth_proxy/src/inbound.rsrust/cloud-storage/mcp_auth_proxy/src/inbound/axum_router.rsrust/cloud-storage/mcp_auth_proxy/src/inbound/middleware.rsrust/cloud-storage/mcp_auth_proxy/src/lib.rsrust/cloud-storage/mcp_auth_proxy/src/outbound.rsrust/cloud-storage/mcp_auth_proxy/src/outbound/fusionauth.rsrust/cloud-storage/mcp_auth_proxy/src/outbound/pg_refresh_credentials_store.rsrust/cloud-storage/mcp_service/Cargo.tomlrust/cloud-storage/mcp_service/src/context.rsrust/cloud-storage/mcp_service/src/main.rsrust/cloud-storage/mcp_service/src/tool_service.rs
💤 Files with no reviewable changes (6)
- rust/cloud-storage/document_cognition_service/src/lib.rs
- rust/cloud-storage/document_cognition_service/Cargo.toml
- rust/cloud-storage/document_cognition_service/src/mcp_oauth/mod.rs
- rust/cloud-storage/document_cognition_service/src/mcp_oauth/middleware.rs
- rust/cloud-storage/document_cognition_service/src/mcp_oauth/handlers.rs
- rust/cloud-storage/document_cognition_service/src/mcp_oauth/state.rs
| ...((stack === 'local' || stack === 'dev') | ||
| ? [ | ||
| 'http://localhost:8085/*', | ||
| 'http://localhost:8085/oauth/*', | ||
| ] | ||
| : []), |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Redundant URL pattern in localhost redirect URLs.
The pattern 'http://localhost:8085/*' already covers all paths including /oauth/*, making 'http://localhost:8085/oauth/*' redundant.
♻️ Suggested simplification
...((stack === 'local' || stack === 'dev')
? [
'http://localhost:8085/*',
- 'http://localhost:8085/oauth/*',
]
: []),📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ...((stack === 'local' || stack === 'dev') | |
| ? [ | |
| 'http://localhost:8085/*', | |
| 'http://localhost:8085/oauth/*', | |
| ] | |
| : []), | |
| ...((stack === 'local' || stack === 'dev') | |
| ? [ | |
| 'http://localhost:8085/*', | |
| ] | |
| : []), |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@infra/stacks/fusionauth-instance/index.ts` around lines 272 - 277, The
localhost redirect URL list contains a redundant entry: the pattern
'http://localhost:8085/*' already covers 'http://localhost:8085/oauth/*'; in the
array built when (stack === 'local' || stack === 'dev') remove the duplicate
'http://localhost:8085/oauth/*' entry so only 'http://localhost:8085/*' remains
(locate the array construction around the conditional that appends these URLs in
infra/stacks/fusionauth-instance/index.ts).
| CREATE TABLE IF NOT EXISTS mcp_auth_proxy_refresh_credentials | ||
| ( | ||
| refresh_token TEXT PRIMARY KEY, | ||
| access_token TEXT NOT NULL, | ||
| expires_at timestamptz NOT NULL, | ||
| created_at timestamptz NOT NULL DEFAULT now(), | ||
| updated_at timestamptz NOT NULL DEFAULT now() | ||
| ); | ||
|
|
||
| CREATE INDEX IF NOT EXISTS idx_mcp_auth_proxy_refresh_credentials_expires_at | ||
| ON mcp_auth_proxy_refresh_credentials (expires_at); |
There was a problem hiding this comment.
Consider security implications of storing tokens as plaintext.
Storing refresh_token and access_token as plain TEXT means these sensitive credentials are visible in database backups, logs, and to anyone with database access. Consider:
- Encrypting the
access_tokencolumn at rest (application-level encryption) - Using a hashed or opaque identifier as the primary key instead of the raw refresh token
If the current approach is acceptable for your threat model (e.g., database is already encrypted at rest, access is tightly controlled), document this decision.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@rust/cloud-storage/macro_db_client/migrations/20260408190000_mcp_auth_proxy_refresh_credentials.sql`
around lines 1 - 11, The migration stores sensitive tokens in plaintext (table
mcp_auth_proxy_refresh_credentials, columns refresh_token and access_token), so
update the schema and application flow to protect them: replace the primary key
refresh_token with a non-reversible identifier (e.g., refresh_token_hash or id)
stored as the PK and indexed, store only a hash (e.g., SHA-256) of the refresh
token for lookup, and change access_token to an encrypted column (or bytea) so
the application performs application-level encryption/decryption before
insert/select (or move secrets to a dedicated secrets vault), update related
code that inserts/queries this table (functions that currently use
refresh_token/access_token) to compute/compare the hash and to encrypt/decrypt
the access_token, and add a migration note documenting the threat-model decision
if you choose not to encrypt.
| macro_auth = { path = "../macro_auth", optional = true } | ||
| macro_user_id = { path = "../macro_user_id", optional = true } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if macro_user_id crate exists
fd -t d "macro_user_id" rust/cloud-storage/ --max-depth 1
# Also check if it has a Cargo.toml
fd "Cargo.toml" rust/cloud-storage/macro_user_id/Repository: macro-inc/macro
Length of output: 1778
🏁 Script executed:
#!/bin/bash
# Check if macro_user_id crate exists in rust/cloud-storage directory
find rust/cloud-storage -maxdepth 1 -type d -name "*macro_user_id*" 2>/dev/null
# Check workspace Cargo.toml for members list
if [ -f "rust/cloud-storage/Cargo.toml" ]; then
echo "=== Workspace members in rust/cloud-storage/Cargo.toml ==="
rg -A 50 '^\[workspace\]' rust/cloud-storage/Cargo.toml | head -60
fiRepository: macro-inc/macro
Length of output: 1266
🏁 Script executed:
#!/bin/bash
# Extract the full members list and search for macro_user_id
rg -A 200 '^\[workspace\]' rust/cloud-storage/Cargo.toml | rg '"macro_user_id"'
# Also verify the macro_user_id directory has a Cargo.toml
if [ -f "rust/cloud-storage/macro_user_id/Cargo.toml" ]; then
echo "=== macro_user_id/Cargo.toml exists ==="
head -5 rust/cloud-storage/macro_user_id/Cargo.toml
else
echo "=== macro_user_id/Cargo.toml NOT FOUND ==="
fiRepository: macro-inc/macro
Length of output: 182
🏁 Script executed:
#!/bin/bash
# Get the complete members list from workspace Cargo.toml
rg -A 300 '^\[workspace\]' rust/cloud-storage/Cargo.toml | grep -E '^\s+"' | sortRepository: macro-inc/macro
Length of output: 3238
Add macro_user_id to the workspace members list in rust/cloud-storage/Cargo.toml.
The crate exists at rust/cloud-storage/macro_user_id but is not declared in the workspace members list. This is required for the workspace to recognize it as a member crate and properly manage the dependency.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rust/cloud-storage/mcp_auth_proxy/Cargo.toml` around lines 25 - 26, The
workspace Cargo.toml is missing the macro_user_id crate in its members list;
open rust/cloud-storage/Cargo.toml and add "macro_user_id" to the
workspace.members array (alongside existing entries like "macro_auth") so the
crate at rust/cloud-storage/macro_user_id is recognized and the dependency
macro_user_id in mcp_auth_proxy can be resolved by the workspace.
| The `mcp_service` acts as proxy to the auth server. | ||
|
|
||
| I believe this is needed because FusionAuth doesn't support DCR | ||
|
|
||
| (Official Docs)[https://modelcontextprotocol.io/docs/tutorials/security/authorization] | ||
| (Useful fusionauth article)[https://fusionauth.io/articles/ai/mcp-connecting-software-ai] |
There was a problem hiding this comment.
Fix Markdown link syntax and clarify crate reference.
- The README references
mcp_servicebut this is themcp_auth_proxycrate's README - consider clarifying. - Markdown link syntax is reversed - should be
[text](url)not(text)[url].
📝 Proposed fix
-The `mcp_service` acts as proxy to the auth server.
+The `mcp_auth_proxy` crate provides an OAuth broker/proxy for the MCP service to the auth server.
I believe this is needed because FusionAuth doesn't support DCR
-(Official Docs)[https://modelcontextprotocol.io/docs/tutorials/security/authorization]
-(Useful fusionauth article)[https://fusionauth.io/articles/ai/mcp-connecting-software-ai]
+[Official Docs](https://modelcontextprotocol.io/docs/tutorials/security/authorization)
+[Useful FusionAuth article](https://fusionauth.io/articles/ai/mcp-connecting-software-ai)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rust/cloud-storage/mcp_auth_proxy/README.md` around lines 1 - 6, Update the
README.md so it correctly references the current crate name `mcp_auth_proxy`
(instead of `mcp_service`) and clarify its role as a proxy to the auth server;
also fix the Markdown link syntax by changing occurrences of `(Official
Docs)[https://...]` and `(Useful fusionauth article)[https://...]` to the proper
`[Official Docs](https://...)` and `[Useful fusionauth article](https://...)`
form, and ensure the links include brief descriptive text and backtick-quoted
crate names where appropriate (e.g., `mcp_auth_proxy`) for clarity.
| /// Persistent store for refresh-token-backed credentials. | ||
| pub trait RefreshCredentialsStore: Send + Sync { | ||
| /// Persists the current access token for a refresh token. | ||
| fn put<'a>( | ||
| &'a self, | ||
| refresh_token: &'a RefreshToken, | ||
| credentials: &'a RefreshCredentials, | ||
| ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>; | ||
|
|
||
| /// Loads credentials for a refresh token if present. | ||
| fn get<'a>( | ||
| &'a self, | ||
| refresh_token: &'a RefreshToken, | ||
| ) -> Pin<Box<dyn Future<Output = anyhow::Result<Option<RefreshCredentials>>> + Send + 'a>>; | ||
|
|
||
| /// Deletes the stored credentials for a refresh token. | ||
| fn delete<'a>( | ||
| &'a self, | ||
| refresh_token: &'a RefreshToken, | ||
| ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'a>>; | ||
|
|
||
| /// Removes expired credentials from storage. | ||
| fn cleanup_expired(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>; |
There was a problem hiding this comment.
Add an atomic refresh-token rotation primitive.
The get/delete/put API forces refresh rotation to span multiple awaited operations. In rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs, that lets two concurrent refreshes read the same token before either delete happens, and a delete/put failure can strand the session. Please expose a single transactional consume/rotate method on the store and implement it atomically in the Postgres adapter.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rust/cloud-storage/mcp_auth_proxy/src/domain/ports.rs` around lines 27 - 49,
Add an atomic consume-and-rotate primitive to the RefreshCredentialsStore trait
(e.g. fn consume_and_rotate<'a>(&'a self, old_refresh_token: &'a RefreshToken,
new_refresh_token: Option<&'a RefreshToken>, new_credentials: Option<&'a
RefreshCredentials>) -> Pin<Box<dyn Future<Output =
anyhow::Result<Option<RefreshCredentials>>> + Send + 'a>>) that performs the
read+delete+put in one transactional call and returns the consumed credentials
(or None) on success; implement this method atomically in the Postgres adapter
using a DB transaction/SELECT FOR UPDATE to ensure two concurrent refreshes
cannot both consume the same token, and update domain/service.rs to call this
new consume_and_rotate method instead of separate get/delete/put calls.
| pub async fn validate_bearer( | ||
| axum::extract::State(jwt_args): axum::extract::State<JwtValidationArgs>, | ||
| mut request: Request<Body>, | ||
| next: axum::middleware::Next, | ||
| ) -> axum::response::Response { |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider adding #[tracing::instrument(skip(jwt_args, request, next))] for observability.
The middleware function would benefit from instrumentation for debugging auth failures, but note that per coding guidelines, err should not be included since this returns Response, not Result.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rust/cloud-storage/mcp_auth_proxy/src/inbound/middleware.rs` around lines 68
- 72, Add tracing instrumentation to the middleware function validate_bearer by
annotating it with #[tracing::instrument(skip(jwt_args, request, next))] so
calls and spans are recorded for observability; ensure you do not include any
error field in the instrument macro (do not capture an `err`) because this
function returns an axum::response::Response rather than a Result, and keep the
skip list exactly jwt_args, request, next to avoid logging request bodies or
large state.
| //! Postgres-backed storage for MCP auth proxy refresh credentials. | ||
|
|
||
| use std::pin::Pin; | ||
|
|
||
| use chrono::{DateTime, Utc}; | ||
| use sqlx::{Pool, Postgres}; | ||
|
|
||
| use crate::domain::{ | ||
| models::{RefreshCredentials, RefreshToken}, | ||
| ports::RefreshCredentialsStore, | ||
| }; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify compilation of the crate
cd rust/cloud-storage && cargo check -p mcp_auth_proxy --features outbound 2>&1 | head -50Repository: macro-inc/macro
Length of output: 145
🏁 Script executed:
cat -n rust/cloud-storage/mcp_auth_proxy/src/outbound/pg_refresh_credentials_store.rsRepository: macro-inc/macro
Length of output: 4089
Missing Future import will cause compilation error.
The code uses Future in trait object syntax at lines 31, 60, 83, and 99, but doesn't import it. Add the missing import:
🐛 Proposed fix
//! Postgres-backed storage for MCP auth proxy refresh credentials.
-use std::pin::Pin;
+use std::{future::Future, pin::Pin};
use chrono::{DateTime, Utc};📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| //! Postgres-backed storage for MCP auth proxy refresh credentials. | |
| use std::pin::Pin; | |
| use chrono::{DateTime, Utc}; | |
| use sqlx::{Pool, Postgres}; | |
| use crate::domain::{ | |
| models::{RefreshCredentials, RefreshToken}, | |
| ports::RefreshCredentialsStore, | |
| }; | |
| //! Postgres-backed storage for MCP auth proxy refresh credentials. | |
| use std::{future::Future, pin::Pin}; | |
| use chrono::{DateTime, Utc}; | |
| use sqlx::{Pool, Postgres}; | |
| use crate::domain::{ | |
| models::{RefreshCredentials, RefreshToken}, | |
| ports::RefreshCredentialsStore, | |
| }; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@rust/cloud-storage/mcp_auth_proxy/src/outbound/pg_refresh_credentials_store.rs`
around lines 1 - 11, The compilation error is caused by missing import of Future
used in trait-object/Pinned futures in this module; add the standard Future
import by inserting use std::future::Future; alongside the existing use
std::pin::Pin; so the trait implementations and signatures that reference Future
(e.g., related to RefreshCredentialsStore, and types handling RefreshCredentials
and RefreshToken) will compile.
| async fn main() -> anyhow::Result<()> { | ||
| MacroEntrypoint::default().init(); | ||
|
|
||
| pub async fn build_context() -> anyhow::Result<McpContext> { |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Read the context.rs file around the specified lines
wc -l rust/cloud-storage/mcp_service/src/context.rsRepository: macro-inc/macro
Length of output: 108
🏁 Script executed:
# Read sections around the mentioned lines
sed -n '70,80p' rust/cloud-storage/mcp_service/src/context.rsRepository: macro-inc/macro
Length of output: 429
🏁 Script executed:
# Read section around line 130-137
sed -n '126,142p' rust/cloud-storage/mcp_service/src/context.rsRepository: macro-inc/macro
Length of output: 658
🏁 Script executed:
# Read section around line 291-295
sed -n '287,302p' rust/cloud-storage/mcp_service/src/context.rsRepository: macro-inc/macro
Length of output: 604
🏁 Script executed:
# Check lines immediately before line 74
sed -n '72,75p' rust/cloud-storage/mcp_service/src/context.rsRepository: macro-inc/macro
Length of output: 209
🏁 Script executed:
# Check lines immediately before line 130
sed -n '128,132p' rust/cloud-storage/mcp_service/src/context.rsRepository: macro-inc/macro
Length of output: 134
🏁 Script executed:
# Check lines immediately before line 291
sed -n '289,294p' rust/cloud-storage/mcp_service/src/context.rsRepository: macro-inc/macro
Length of output: 199
Add #[tracing::instrument(err)] to these Result-returning bootstrap functions.
All three functions need instrumentation to make startup failures diagnosable:
build_context()(line 74)build_tool_context()(line 130)build_auth_proxy()(line 291)
Per coding guidelines, functions returning Result must include the err attribute and must not include level = "info".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rust/cloud-storage/mcp_service/src/context.rs` at line 74, The three
bootstrap functions that return Results—build_context, build_tool_context, and
build_auth_proxy—lack tracing instrumentation; add the attribute
#[tracing::instrument(err)] above each function signature (e.g., above pub async
fn build_context() -> anyhow::Result<McpContext> and the corresponding
build_tool_context and build_auth_proxy definitions) so errors are recorded on
failure; ensure you use the err attribute only and do not add level = "info".
87a42ee to
79bdb5e
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rust/cloud-storage/mcp_service/src/context.rs (1)
42-64:⚠️ Potential issue | 🟠 MajorAdd
SearchServiceUrltoMcpEnvVarsand wireSearchServiceClientto it.
SearchServiceClient::newshould receive a dedicated search service URL, not the document storage service URL. The memory service correctly configures this viaconfig.search_service_url(memory/src/context.rs:56–58), but mcp_service (line 144) reusesdss_url, which will route all search requests to the wrong upstream. AddSearchServiceUrlto theMcpEnvVarsstruct and update line 144 to use it instead ofdss_url.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_service/src/context.rs` around lines 42 - 64, Add a SearchServiceUrl field to the McpEnvVars struct and wire it into the service config so SearchServiceClient::new receives the dedicated search URL instead of the document storage URL; specifically, add SearchServiceUrl to McpEnvVars, ensure the environment-to-config mapping fills config.search_service_url from that field, and replace the use of dss_url when constructing SearchServiceClient::new with config.search_service_url.
♻️ Duplicate comments (7)
rust/cloud-storage/mcp_service/src/context.rs (1)
74-75: 🛠️ Refactor suggestion | 🟠 MajorInstrument these new bootstrap helpers, but skip secret-bearing arguments.
build_context,build_tool_context, andbuild_auth_proxyare now the main startup failure boundaries, but none of them are instrumented. Please add#[tracing::instrument(err)]here, and useskip(...)on the helpers that take auth keys or env-derived secrets.As per coding guidelines, "Use
#[tracing::instrument(err)]on functions returningResult. Do NOT adderron functions returningOption,(), or non-Result types. Never includelevel = \"info\"."Also applies to: 130-137, 291-295
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_service/src/context.rs` around lines 74 - 75, Add tracing instrumentation to the startup boundary functions that return Results: annotate build_context, build_tool_context, and build_auth_proxy with #[tracing::instrument(err)] and include skip(...) for any parameters that carry secrets or env-derived keys (e.g., env_vars, auth_key, api_key, secret, or similar argument names found in those functions) so secrets are not logged; ensure you only add err on functions that return Result and do not add level = "info".rust/cloud-storage/macro_db_client/migrations/20260408190000_mcp_auth_proxy_refresh_credentials.sql (1)
1-7:⚠️ Potential issue | 🟠 MajorDon't persist live OAuth tokens in plaintext.
refresh_tokenis the primary key here andaccess_tokenis stored as rawTEXT, so anyone with database read access, backups, or query logging can replay active credentials. Store a non-reversible lookup key for the refresh token and encrypt the access token before persistence.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/macro_db_client/migrations/20260408190000_mcp_auth_proxy_refresh_credentials.sql` around lines 1 - 7, The migration persists raw OAuth tokens (table mcp_auth_proxy_refresh_credentials; columns refresh_token and access_token) which is unsafe; change the schema to store a non-reversible lookup key for the refresh token (e.g., refresh_token_hash as the PRIMARY KEY or unique indexed column) and store the access token as an encrypted blob (e.g., access_token_encrypted) instead of TEXT. Update any code that inserts/queries this table (look for functions that write/read mcp_auth_proxy_refresh_credentials) to: 1) hash the incoming refresh_token with a strong KDF (salted HMAC/Argon2) and store only the hash as the lookup key, and 2) encrypt the access_token with a secrets-managed symmetric key and store ciphertext plus IV/metadata; also add a migration path for existing rows (re-encrypt or rotate tokens out of plaintext) and create appropriate indexes on the new hash column.rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs (4)
136-181:⚠️ Potential issue | 🟠 MajorDon't burn the broker code before refresh credentials are durable.
The code is removed from
self.codesbeforeputsucceeds. If persistence fails after the upstream tokens have been issued, the client cannot retry/tokenand has to restart the whole auth flow. Keep the code untilputsucceeds, or reinsert it on storage failure.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs` around lines 136 - 181, The code currently removes the authorization code from self.codes via self.codes.remove(...) before persisting refresh credentials, which can lock out clients if refresh_credentials_store.put(...) fails; change the flow to not remove the code until put(...) completes successfully (i.e., retrieve/peek the entry first via self.codes.get or clone the value without removing), then call refresh_credentials_store.put(&refresh_token, ...). If put succeeds, then remove the code from self.codes (or remove atomically), but if put fails reinsert/keep the original code so the client can retry; ensure error mapping still returns TokenExchangeError::RefreshCredentialsStorage on put failure and preserve the same issued/access_token/refresh_token semantics used later.
288-292:⚠️ Potential issue | 🟠 MajorEnforce
PENDING_AUTH_TTLincomplete_callback.This path removes the pending session but never checks
pending.expires_at, so a callback that arrives after the 10-minute TTL is still accepted until cleanup runs.Suggested fix
let pending = self .pending .remove(&session_id) .map(|(_, pending)| pending) .ok_or(CompleteCallbackError::UnknownOrExpiredSession)?; + + if pending.expires_at <= Instant::now() { + return Err(CompleteCallbackError::UnknownOrExpiredSession); + } let (access_token, refresh_token) = self .oauth_provider .exchange_authorization_code(¶ms.code)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs` around lines 288 - 292, complete_callback currently removes the pending session via self.pending.remove(&session_id) without validating pending.expires_at against PENDING_AUTH_TTL, so callbacks after TTL are accepted; change the logic in complete_callback to first lookup (e.g., get or entry API) the pending tuple for session_id, verify pending.expires_at is within PENDING_AUTH_TTL (or compare to current time and return CompleteCallbackError::UnknownOrExpiredSession if expired), and only then remove the entry and proceed with completion to ensure expired sessions are rejected.
312-317:⚠️ Potential issue | 🟠 MajorAppending query params with
format!breaks valid redirect URIs.If
client_redirect_urialready has a query string or fragment, this produces an invalid callback URL and the MCP client will not receivecode/statereliably. Build the redirect by parsing the URI and appending the parameters instead of concatenating strings.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs` around lines 312 - 317, The redirect is built by string concatenation which breaks when pending.client_redirect_uri already contains a query or fragment; instead parse pending.client_redirect_uri into a url::Url (e.g., using url::Url::parse), use url.query_pairs_mut().append_pair("code", &issued_code) and .append_pair("state", &pending.client_state) to add params safely, then return the composed url via url.into_string(); also handle Url::parse errors and convert them to the existing error type/path used in the surrounding function (the symbols to update are pending.client_redirect_uri, issued_code, pending.client_state and the function in service.rs that returns Ok(format!(...))).
53-57:⚠️ Potential issue | 🔴 CriticalProcess-local auth state will break the standalone deployment.
pendingandcodesonly live in one process. Any restart or multi-replica routing between/authorize,/oauth/callback, and/tokenturns valid flows intoUnknownOrExpiredSession/InvalidOrExpiredCode. Move this state to a shared durable store with TTLs; single-replica is the only safe interim rollout.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs` around lines 53 - 57, The in-memory fields pending and codes on McpAuthProxyServiceImpl cause sessions and codes to be lost across restarts/replicas; replace these Arc<DashMap<String, ...>> fields with abstractions backed by a shared durable store (e.g., Redis or a database) that implement store traits (e.g., PendingAuthorizationStore and IssuedAuthorizationCodeStore) and are injected into McpAuthProxyServiceImpl in place of pending/codes; update the code paths that read/write these maps (authorize, oauth_callback, token exchange methods) to use the new async store API, ensure entries are set with appropriate TTLs on creation and checks remove expired entries, and keep existing semantics for lookups and deletes so multi-replica routing and restarts do not break flows.rust/cloud-storage/mcp_auth_proxy/src/domain/ports.rs (1)
32-54:⚠️ Potential issue | 🟠 MajorAdd a single-use refresh-token consume/rotate operation.
get/delete/putforces refresh rotation across multiple awaited steps. Two refresh requests can read the same token before either delete happens, and a failure between delete and put can strand the session. The store needs one atomic consume/rotate primitive instead of these separate calls.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_auth_proxy/src/domain/ports.rs` around lines 32 - 54, The current RefreshCredentialsStore trait (methods put/get/delete/cleanup_expired) exposes separate operations that allow races during refresh; add a single atomic consume-and-rotate primitive (e.g., consume_and_rotate or consume_rotate) to the trait that takes a RefreshToken and an operation that either returns new RefreshCredentials or indicates deletion, and returns the consumed credentials (or an error) as an async result; update trait consumers to use this new method instead of separate get/delete/put so implementations can perform the rotation atomically and avoid two requests reading the same token or leaving the store in a stranded state.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs`:
- Around line 81-84: Add the attribute #[tracing::instrument(err)] to each
service method that returns a Result to capture span/error context: apply it to
the functions named refresh_token_exchange, exchange_authorization_code_token,
start_authorization, complete_callback, exchange_token, and cleanup_expired;
place the attribute directly above each async fn declaration and do not add this
attribute to any functions that return Option, (), or non-Result types, and do
not include any level = "info" on the attribute.
In `@rust/cloud-storage/mcp_auth_proxy/src/inbound/axum_router.rs`:
- Around line 106-168: The token handler returns plain-text tuples for
TokenExchangeError cases; update token (fn token) to return JSON OAuth error
responses instead: map each TokenExchangeError variant to an appropriate HTTP
status and a JSON body like {"error": "<oauth_error_code>", "error_description":
"<human message>"} and return Json(body).into_response() (preserve existing
tracing::error logs for server/upstream failures). You can add a small helper
(e.g., map_token_error or token_error_response) that takes TokenExchangeError
and returns (StatusCode, serde_json::Value) so each match arm uses Json(...)
into_response rather than a string tuple; ensure error codes follow OAuth names
(invalid_request, invalid_grant, unsupported_grant_type, invalid_client, etc.)
and include the human-readable description.
In `@rust/cloud-storage/mcp_auth_proxy/src/outbound/fusionauth.rs`:
- Around line 37-44: The tracing span on construct_authorize_url is currently
recording the sensitive OAuth `state`; update the attribute on the function to
skip that parameter so the state is not emitted (e.g., change
#[tracing::instrument(skip(self), err)] to skip the `state` argument as well),
ensuring you keep the existing `err` and still skip `self`; this targets the
construct_authorize_url function and prevents the OAuth state token from being
logged.
In `@rust/cloud-storage/mcp_service/src/main.rs`:
- Around line 49-55: Replace the current explicit if-let error branch inside the
tokio::spawn background task with an inspect_err call on the future result
returned by cleanup_state.cleanup_expired().await; i.e., where you currently
await cleanup_state.cleanup_expired().await and then use if let Err(error) to
log, instead call cleanup_state.cleanup_expired().await.inspect_err(|error|
tracing::error!(error=?error, "auth proxy cleanup task failed")).await so the
logging follows the repo convention (symbols: tokio::spawn,
AUTH_PROXY_CLEANUP_INTERVAL, interval.tick, cleanup_state.cleanup_expired).
---
Outside diff comments:
In `@rust/cloud-storage/mcp_service/src/context.rs`:
- Around line 42-64: Add a SearchServiceUrl field to the McpEnvVars struct and
wire it into the service config so SearchServiceClient::new receives the
dedicated search URL instead of the document storage URL; specifically, add
SearchServiceUrl to McpEnvVars, ensure the environment-to-config mapping fills
config.search_service_url from that field, and replace the use of dss_url when
constructing SearchServiceClient::new with config.search_service_url.
---
Duplicate comments:
In
`@rust/cloud-storage/macro_db_client/migrations/20260408190000_mcp_auth_proxy_refresh_credentials.sql`:
- Around line 1-7: The migration persists raw OAuth tokens (table
mcp_auth_proxy_refresh_credentials; columns refresh_token and access_token)
which is unsafe; change the schema to store a non-reversible lookup key for the
refresh token (e.g., refresh_token_hash as the PRIMARY KEY or unique indexed
column) and store the access token as an encrypted blob (e.g.,
access_token_encrypted) instead of TEXT. Update any code that inserts/queries
this table (look for functions that write/read
mcp_auth_proxy_refresh_credentials) to: 1) hash the incoming refresh_token with
a strong KDF (salted HMAC/Argon2) and store only the hash as the lookup key, and
2) encrypt the access_token with a secrets-managed symmetric key and store
ciphertext plus IV/metadata; also add a migration path for existing rows
(re-encrypt or rotate tokens out of plaintext) and create appropriate indexes on
the new hash column.
In `@rust/cloud-storage/mcp_auth_proxy/src/domain/ports.rs`:
- Around line 32-54: The current RefreshCredentialsStore trait (methods
put/get/delete/cleanup_expired) exposes separate operations that allow races
during refresh; add a single atomic consume-and-rotate primitive (e.g.,
consume_and_rotate or consume_rotate) to the trait that takes a RefreshToken and
an operation that either returns new RefreshCredentials or indicates deletion,
and returns the consumed credentials (or an error) as an async result; update
trait consumers to use this new method instead of separate get/delete/put so
implementations can perform the rotation atomically and avoid two requests
reading the same token or leaving the store in a stranded state.
In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs`:
- Around line 136-181: The code currently removes the authorization code from
self.codes via self.codes.remove(...) before persisting refresh credentials,
which can lock out clients if refresh_credentials_store.put(...) fails; change
the flow to not remove the code until put(...) completes successfully (i.e.,
retrieve/peek the entry first via self.codes.get or clone the value without
removing), then call refresh_credentials_store.put(&refresh_token, ...). If put
succeeds, then remove the code from self.codes (or remove atomically), but if
put fails reinsert/keep the original code so the client can retry; ensure error
mapping still returns TokenExchangeError::RefreshCredentialsStorage on put
failure and preserve the same issued/access_token/refresh_token semantics used
later.
- Around line 288-292: complete_callback currently removes the pending session
via self.pending.remove(&session_id) without validating pending.expires_at
against PENDING_AUTH_TTL, so callbacks after TTL are accepted; change the logic
in complete_callback to first lookup (e.g., get or entry API) the pending tuple
for session_id, verify pending.expires_at is within PENDING_AUTH_TTL (or compare
to current time and return CompleteCallbackError::UnknownOrExpiredSession if
expired), and only then remove the entry and proceed with completion to ensure
expired sessions are rejected.
- Around line 312-317: The redirect is built by string concatenation which
breaks when pending.client_redirect_uri already contains a query or fragment;
instead parse pending.client_redirect_uri into a url::Url (e.g., using
url::Url::parse), use url.query_pairs_mut().append_pair("code", &issued_code)
and .append_pair("state", &pending.client_state) to add params safely, then
return the composed url via url.into_string(); also handle Url::parse errors and
convert them to the existing error type/path used in the surrounding function
(the symbols to update are pending.client_redirect_uri, issued_code,
pending.client_state and the function in service.rs that returns
Ok(format!(...))).
- Around line 53-57: The in-memory fields pending and codes on
McpAuthProxyServiceImpl cause sessions and codes to be lost across
restarts/replicas; replace these Arc<DashMap<String, ...>> fields with
abstractions backed by a shared durable store (e.g., Redis or a database) that
implement store traits (e.g., PendingAuthorizationStore and
IssuedAuthorizationCodeStore) and are injected into McpAuthProxyServiceImpl in
place of pending/codes; update the code paths that read/write these maps
(authorize, oauth_callback, token exchange methods) to use the new async store
API, ensure entries are set with appropriate TTLs on creation and checks remove
expired entries, and keep existing semantics for lookups and deletes so
multi-replica routing and restarts do not break flows.
In `@rust/cloud-storage/mcp_service/src/context.rs`:
- Around line 74-75: Add tracing instrumentation to the startup boundary
functions that return Results: annotate build_context, build_tool_context, and
build_auth_proxy with #[tracing::instrument(err)] and include skip(...) for any
parameters that carry secrets or env-derived keys (e.g., env_vars, auth_key,
api_key, secret, or similar argument names found in those functions) so secrets
are not logged; ensure you only add err on functions that return Result and do
not add level = "info".
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 49467fb8-5e0c-42fb-8c4d-73bd4c7e7536
⛔ Files ignored due to path filters (1)
rust/cloud-storage/Cargo.lockis excluded by!**/*.lock,!**/Cargo.lock
📒 Files selected for processing (11)
rust/cloud-storage/macro_db_client/migrations/20260408190000_mcp_auth_proxy_refresh_credentials.sqlrust/cloud-storage/mcp_auth_proxy/Cargo.tomlrust/cloud-storage/mcp_auth_proxy/src/domain/models.rsrust/cloud-storage/mcp_auth_proxy/src/domain/ports.rsrust/cloud-storage/mcp_auth_proxy/src/domain/service.rsrust/cloud-storage/mcp_auth_proxy/src/inbound/axum_router.rsrust/cloud-storage/mcp_auth_proxy/src/outbound.rsrust/cloud-storage/mcp_auth_proxy/src/outbound/fusionauth.rsrust/cloud-storage/mcp_auth_proxy/src/outbound/pg_refresh_credentials_store.rsrust/cloud-storage/mcp_service/src/context.rsrust/cloud-storage/mcp_service/src/main.rs
| async fn refresh_token_exchange( | ||
| &self, | ||
| params: TokenRequest, | ||
| ) -> Result<TokenResponse, TokenExchangeError> { |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
❓ Verification inconclusive
Script executed:
cat -n rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs | head -100Repository: macro-inc/macro
Repository: macro-inc/macro
Exit code: 0
stdout:
1 //! Service implementation for the MCP OAuth broker.
2
3 use base64::{Engine, engine::general_purpose::URL_SAFE_NO_PAD};
4 use chrono::{Duration as ChronoDuration, Utc};
5 use dashmap::DashMap;
6 use sha2::{Digest, Sha256};
7 use std::{
8 sync::Arc,
9 time::{Duration, Instant},
10 };
11
12 use super::{
13 models::{
14 AuthorizeRequest, CallbackRequest, IssuedAuthorizationCode, PendingAuthorization,
15 RefreshCredentials, TokenRequest, TokenResponse,
16 },
17 ports::{OAuthProvider, RefreshCredentialsStore},
18 };
19
20 const PENDING_AUTH_TTL: Duration = Duration::from_secs(10 * 60);
21 const AUTHORIZATION_CODE_TTL: Duration = Duration::from_secs(5 * 60);
22 const REFRESH_CREDENTIAL_TTL_DAYS: i64 = 30;
23
24 /// Domain interface for the MCP OAuth broker.
25 pub trait McpAuthProxyService: Clone + Send + Sync + 'static {
26 /// Returns OAuth authorization server discovery metadata.
27 fn authorization_server_metadata(&self) -> serde_json::Value;
28 /// Returns protected-resource metadata for MCP clients.
29 fn protected_resource_metadata(&self) -> serde_json::Value;
30 /// Registers a public MCP client dynamically.
31 fn register_client(&self, body: serde_json::Value) -> serde_json::Value;
32 /// Starts an OAuth authorization flow and returns the upstream authorize URL.
33 fn start_authorization(
34 &self,
35 params: AuthorizeRequest,
36 ) -> Result<String, StartAuthorizationError>;
37 /// Completes the upstream callback and returns the loopback redirect URL.
38 fn complete_callback(
39 &self,
40 params: CallbackRequest,
41 ) -> impl Future<Output = Result<String, CompleteCallbackError>> + Send;
42 /// Exchanges a broker-issued code or refresh token for bearer credentials.
43 fn exchange_token(
44 &self,
45 params: TokenRequest,
46 ) -> impl Future<Output = Result<TokenResponse, TokenExchangeError>> + Send;
47 /// Removes expired broker state from in-memory storage.
48 fn cleanup_expired(&self) -> impl Future<Output = anyhow::Result<()>> + Send;
49 }
50
51 /// Domain service backing the MCP OAuth broker.
52 #[derive(Clone)]
53 pub struct McpAuthProxyServiceImpl {
54 pending: Arc<DashMap<String, PendingAuthorization>>,
55 codes: Arc<DashMap<String, IssuedAuthorizationCode>>,
56 oauth_provider: Arc<dyn OAuthProvider>,
57 refresh_credentials_store: Arc<dyn RefreshCredentialsStore>,
58 public_url: String,
59 }
60
61 impl McpAuthProxyServiceImpl {
62 /// Creates a new auth proxy service backed by an upstream OAuth provider.
63 pub fn new(
64 public_url: String,
65 oauth_provider: Arc<dyn OAuthProvider>,
66 refresh_credentials_store: Arc<dyn RefreshCredentialsStore>,
67 ) -> Self {
68 Self {
69 pending: Arc::new(DashMap::new()),
70 codes: Arc::new(DashMap::new()),
71 oauth_provider,
72 refresh_credentials_store,
73 public_url,
74 }
75 }
76
77 fn refresh_credentials_expiration() -> chrono::DateTime<Utc> {
78 Utc::now() + ChronoDuration::days(REFRESH_CREDENTIAL_TTL_DAYS)
79 }
80
81 async fn refresh_token_exchange(
82 &self,
83 params: TokenRequest,
84 ) -> Result<TokenResponse, TokenExchangeError> {
85 let refresh_token = params
86 .refresh_token
87 .ok_or(TokenExchangeError::RefreshTokenRequired)?;
88
89 let existing = self
90 .refresh_credentials_store
91 .get(&refresh_token)
92 .await
93 .map_err(TokenExchangeError::RefreshCredentialsStorage)?
94 .ok_or(TokenExchangeError::InvalidRefreshToken)?;
95
96 if existing.expires_at <= Utc::now() {
97 self.refresh_credentials_store
98 .delete(&refresh_token)
99 .await
100 .map_err(TokenExchangeError::RefreshCredentialsStorage)?;
Script executed:
cat -n rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs | sed -n '75,350p'Repository: macro-inc/macro
Repository: macro-inc/macro
Exit code: 0
stdout:
75 }
76
77 fn refresh_credentials_expiration() -> chrono::DateTime<Utc> {
78 Utc::now() + ChronoDuration::days(REFRESH_CREDENTIAL_TTL_DAYS)
79 }
80
81 async fn refresh_token_exchange(
82 &self,
83 params: TokenRequest,
84 ) -> Result<TokenResponse, TokenExchangeError> {
85 let refresh_token = params
86 .refresh_token
87 .ok_or(TokenExchangeError::RefreshTokenRequired)?;
88
89 let existing = self
90 .refresh_credentials_store
91 .get(&refresh_token)
92 .await
93 .map_err(TokenExchangeError::RefreshCredentialsStorage)?
94 .ok_or(TokenExchangeError::InvalidRefreshToken)?;
95
96 if existing.expires_at <= Utc::now() {
97 self.refresh_credentials_store
98 .delete(&refresh_token)
99 .await
100 .map_err(TokenExchangeError::RefreshCredentialsStorage)?;
101 return Err(TokenExchangeError::InvalidRefreshToken);
102 }
103
104 let (access_token, new_refresh_token) = self
105 .oauth_provider
106 .refresh_access_token(&existing.access_token, &refresh_token)
107 .await
108 .map_err(TokenExchangeError::RefreshFailed)?;
109
110 self.refresh_credentials_store
111 .delete(&refresh_token)
112 .await
113 .map_err(TokenExchangeError::RefreshCredentialsStorage)?;
114 self.refresh_credentials_store
115 .put(
116 &new_refresh_token,
117 &RefreshCredentials {
118 access_token: access_token.clone(),
119 expires_at: Self::refresh_credentials_expiration(),
120 },
121 )
122 .await
123 .map_err(TokenExchangeError::RefreshCredentialsStorage)?;
124
125 Ok(TokenResponse {
126 access_token,
127 refresh_token: new_refresh_token,
128 token_type: "Bearer",
129 })
130 }
131
132 async fn exchange_authorization_code_token(
133 &self,
134 params: TokenRequest,
135 ) -> Result<TokenResponse, TokenExchangeError> {
136 let issued = self
137 .codes
138 .remove(
139 params
140 .code
141 .as_deref()
142 .ok_or(TokenExchangeError::CodeRequired)?,
143 )
144 .map(|(_, code)| code)
145 .ok_or(TokenExchangeError::InvalidOrExpiredCode)?;
146
147 if issued.expires_at < Instant::now() {
148 return Err(TokenExchangeError::CodeExpired);
149 }
150
151 match ¶ms.redirect_uri {
152 Some(uri) if *uri != issued.redirect_uri => {
153 return Err(TokenExchangeError::RedirectUriMismatch);
154 }
155 None => return Err(TokenExchangeError::RedirectUriRequired),
156 _ => {}
157 }
158
159 match params.code_verifier {
160 Some(verifier) => {
161 let digest = Sha256::digest(verifier.as_bytes());
162 let computed = URL_SAFE_NO_PAD.encode(digest);
163 if computed != issued.code_challenge {
164 return Err(TokenExchangeError::PkceVerificationFailed);
165 }
166 }
167 None => return Err(TokenExchangeError::CodeVerifierRequired),
168 }
169
170 let access_token = issued.access_token;
171 let refresh_token = issued.refresh_token;
172 self.refresh_credentials_store
173 .put(
174 &refresh_token,
175 &RefreshCredentials {
176 access_token: access_token.clone(),
177 expires_at: Self::refresh_credentials_expiration(),
178 },
179 )
180 .await
181 .map_err(TokenExchangeError::RefreshCredentialsStorage)?;
182
183 Ok(TokenResponse {
184 access_token,
185 refresh_token,
186 token_type: "Bearer",
187 })
188 }
189 }
190
191 impl McpAuthProxyService for McpAuthProxyServiceImpl {
192 /// Authorization server discovery metadata.
193 fn authorization_server_metadata(&self) -> serde_json::Value {
194 tracing::debug!("oauth-authorization-server metadata requested");
195 let base = &self.public_url;
196 serde_json::json!({
197 "issuer": base,
198 "authorization_endpoint": format!("{base}/authorize"),
199 "token_endpoint": format!("{base}/token"),
200 "registration_endpoint": format!("{base}/register"),
201 "response_types_supported": ["code"],
202 "grant_types_supported": ["authorization_code", "refresh_token"],
203 "code_challenge_methods_supported": ["S256"],
204 })
205 }
206
207 /// Protected resource metadata for MCP clients.
208 fn protected_resource_metadata(&self) -> serde_json::Value {
209 tracing::debug!("oauth-protected-resource metadata requested");
210 let base = &self.public_url;
211 serde_json::json!({
212 "authorization_server": base,
213 "authorization_servers": [base],
214 })
215 }
216
217 /// Handles dynamic client registration for public MCP clients.
218 fn register_client(&self, body: serde_json::Value) -> serde_json::Value {
219 let client_id = uuid::Uuid::new_v4().to_string();
220 let client_name = body
221 .get("client_name")
222 .and_then(|v| v.as_str())
223 .unwrap_or("mcp-client");
224
225 tracing::info!(%client_id, %client_name, "dynamic client registration");
226
227 serde_json::json!({
228 "client_id": client_id,
229 "client_name": client_name,
230 "redirect_uris": body.get("redirect_uris").cloned().unwrap_or(serde_json::json!([])),
231 "grant_types": ["authorization_code", "refresh_token"],
232 "response_types": ["code"],
233 "token_endpoint_auth_method": "none",
234 })
235 }
236
237 /// Starts an OAuth authorization flow and returns the upstream authorize URL.
238 fn start_authorization(
239 &self,
240 params: AuthorizeRequest,
241 ) -> Result<String, StartAuthorizationError> {
242 if params.response_type != "code" {
243 return Err(StartAuthorizationError::UnsupportedResponseType);
244 }
245 if params.code_challenge_method != "S256" {
246 return Err(StartAuthorizationError::UnsupportedCodeChallengeMethod);
247 }
248 if !is_allowed_redirect_uri(¶ms.redirect_uri) {
249 return Err(StartAuthorizationError::InvalidRedirectUri);
250 }
251
252 let session_id = uuid::Uuid::new_v4().to_string();
253 tracing::info!(%session_id, "starting OAuth authorize flow");
254
255 self.pending.insert(
256 session_id.clone(),
257 PendingAuthorization {
258 code_challenge: params.code_challenge,
259 client_state: params.state,
260 client_redirect_uri: params.redirect_uri,
261 expires_at: Instant::now() + PENDING_AUTH_TTL,
262 },
263 );
264
265 self.oauth_provider
266 .construct_authorize_url(&session_id)
267 .map_err(StartAuthorizationError::ConstructAuthorizeUrl)
268 }
269
270 /// Completes the upstream OAuth callback and returns the redirect URL for
271 /// the MCP client loopback callback.
272 async fn complete_callback(
273 &self,
274 params: CallbackRequest,
275 ) -> Result<String, CompleteCallbackError> {
276 let session_id = params
277 .state
278 .as_deref()
279 .map(|state| state.trim_matches('"').to_string())
280 .ok_or(CompleteCallbackError::MissingState)?;
281
282 tracing::info!(
283 %session_id,
284 pending_count = self.pending.len(),
285 "oauth callback received"
286 );
287
288 let pending = self
289 .pending
290 .remove(&session_id)
291 .map(|(_, pending)| pending)
292 .ok_or(CompleteCallbackError::UnknownOrExpiredSession)?;
293
294 let (access_token, refresh_token) = self
295 .oauth_provider
296 .exchange_authorization_code(¶ms.code)
297 .await
298 .map_err(CompleteCallbackError::AuthorizationCodeExchangeFailed)?;
299
300 let issued_code = uuid::Uuid::new_v4().to_string();
301 self.codes.insert(
302 issued_code.clone(),
303 IssuedAuthorizationCode {
304 access_token,
305 refresh_token,
306 code_challenge: pending.code_challenge,
307 redirect_uri: pending.client_redirect_uri.clone(),
308 expires_at: Instant::now() + AUTHORIZATION_CODE_TTL,
309 },
310 );
311
312 Ok(format!(
313 "{}?code={}&state={}",
314 pending.client_redirect_uri,
315 urlencoding::encode(&issued_code),
316 urlencoding::encode(&pending.client_state),
317 ))
318 }
319
320 /// Exchanges a broker-issued authorization code for an upstream bearer
321 /// token after verifying redirect URI and PKCE.
322 async fn exchange_token(
323 &self,
324 params: TokenRequest,
325 ) -> Result<TokenResponse, TokenExchangeError> {
326 match params.grant_type.as_str() {
327 "authorization_code" => self.exchange_authorization_code_token(params).await,
328 "refresh_token" => self.refresh_token_exchange(params).await,
329 _ => Err(TokenExchangeError::UnsupportedGrantType),
330 }
331 }
332
333 /// Removes expired pending sessions and broker-issued codes.
334 async fn cleanup_expired(&self) -> anyhow::Result<()> {
335 let now = Instant::now();
336 self.pending.retain(|_, value| value.expires_at > now);
337 self.codes.retain(|_, value| value.expires_at > now);
338 self.refresh_credentials_store.cleanup_expired().await
339 }
340 }
341
342 fn is_allowed_redirect_uri(uri: &str) -> bool {
343 let Ok(parsed) = url::Url::parse(uri) else {
344 return false;
345 };
346
347 matches!(parsed.host_str(), Some("localhost" | "127.0.0.1" | "[::1]"))
348 }
349
350 /// Errors returned when starting authorization.
Add #[tracing::instrument(err)] to all Result-returning service methods.
These auth paths return structured domain errors, but without span instrumentation the failure context will be much harder to correlate once this runs as its own service.
Apply to:
- Line 81:
refresh_token_exchange - Line 132:
exchange_authorization_code_token - Line 238:
start_authorization - Line 272:
complete_callback - Line 322:
exchange_token - Line 334:
cleanup_expired
Per coding guidelines: Use #[tracing::instrument(err)] on functions returning Result. Do NOT add err on functions returning Option, (), or non-Result types. Never include level = "info".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs` around lines 81 -
84, Add the attribute #[tracing::instrument(err)] to each service method that
returns a Result to capture span/error context: apply it to the functions named
refresh_token_exchange, exchange_authorization_code_token, start_authorization,
complete_callback, exchange_token, and cleanup_expired; place the attribute
directly above each async fn declaration and do not add this attribute to any
functions that return Option, (), or non-Result types, and do not include any
level = "info" on the attribute.
| async fn token( | ||
| State(auth_proxy): State<McpAuthProxyServiceImpl>, | ||
| axum::Form(params): axum::Form<TokenRequest>, | ||
| ) -> Response { | ||
| match auth_proxy.exchange_token(params).await { | ||
| Ok(response) => Json(response).into_response(), | ||
| Err(TokenExchangeError::UnsupportedGrantType) => ( | ||
| axum::http::StatusCode::BAD_REQUEST, | ||
| "unsupported grant_type", | ||
| ) | ||
| .into_response(), | ||
| Err(TokenExchangeError::CodeRequired) => { | ||
| (axum::http::StatusCode::BAD_REQUEST, "code required").into_response() | ||
| } | ||
| Err(TokenExchangeError::InvalidOrExpiredCode) => ( | ||
| axum::http::StatusCode::BAD_REQUEST, | ||
| "invalid or expired code", | ||
| ) | ||
| .into_response(), | ||
| Err(TokenExchangeError::CodeExpired) => { | ||
| (axum::http::StatusCode::BAD_REQUEST, "code expired").into_response() | ||
| } | ||
| Err(TokenExchangeError::RedirectUriMismatch) => { | ||
| (axum::http::StatusCode::BAD_REQUEST, "redirect_uri mismatch").into_response() | ||
| } | ||
| Err(TokenExchangeError::RedirectUriRequired) => { | ||
| (axum::http::StatusCode::BAD_REQUEST, "redirect_uri required").into_response() | ||
| } | ||
| Err(TokenExchangeError::CodeVerifierRequired) => ( | ||
| axum::http::StatusCode::BAD_REQUEST, | ||
| "code_verifier required", | ||
| ) | ||
| .into_response(), | ||
| Err(TokenExchangeError::PkceVerificationFailed) => ( | ||
| axum::http::StatusCode::BAD_REQUEST, | ||
| "PKCE verification failed", | ||
| ) | ||
| .into_response(), | ||
| Err(TokenExchangeError::RefreshTokenRequired) => ( | ||
| axum::http::StatusCode::BAD_REQUEST, | ||
| "refresh_token required", | ||
| ) | ||
| .into_response(), | ||
| Err(TokenExchangeError::InvalidRefreshToken) => { | ||
| (axum::http::StatusCode::BAD_REQUEST, "invalid refresh token").into_response() | ||
| } | ||
| Err(TokenExchangeError::RefreshCredentialsStorage(error)) => { | ||
| tracing::error!(error=?error, "refresh credential storage failed"); | ||
| ( | ||
| axum::http::StatusCode::INTERNAL_SERVER_ERROR, | ||
| "refresh credential storage failed", | ||
| ) | ||
| .into_response() | ||
| } | ||
| Err(TokenExchangeError::RefreshFailed(error)) => { | ||
| tracing::error!(error=?error, "upstream refresh token exchange failed"); | ||
| ( | ||
| axum::http::StatusCode::BAD_GATEWAY, | ||
| "refresh token exchange failed", | ||
| ) | ||
| .into_response() | ||
| } | ||
| } |
There was a problem hiding this comment.
Return JSON OAuth errors from /token.
All of these failure paths return plain-text tuples. OAuth clients hitting the token endpoint expect machine-readable JSON errors, so these responses will be hard to handle correctly once MCP is used as a standalone auth server.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rust/cloud-storage/mcp_auth_proxy/src/inbound/axum_router.rs` around lines
106 - 168, The token handler returns plain-text tuples for TokenExchangeError
cases; update token (fn token) to return JSON OAuth error responses instead: map
each TokenExchangeError variant to an appropriate HTTP status and a JSON body
like {"error": "<oauth_error_code>", "error_description": "<human message>"} and
return Json(body).into_response() (preserve existing tracing::error logs for
server/upstream failures). You can add a small helper (e.g., map_token_error or
token_error_response) that takes TokenExchangeError and returns (StatusCode,
serde_json::Value) so each match arm uses Json(...) into_response rather than a
string tuple; ensure error codes follow OAuth names (invalid_request,
invalid_grant, unsupported_grant_type, invalid_client, etc.) and include the
human-readable description.
| #[tracing::instrument(skip(self), err)] | ||
| fn construct_authorize_url(&self, state: &str) -> anyhow::Result<String> { | ||
| self.client.construct_oauth2_authorize_url( | ||
| &self.google_idp_id, | ||
| None, | ||
| Some(state.to_owned()), | ||
| ) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, verify the file exists and read the relevant section
wc -l rust/cloud-storage/mcp_auth_proxy/src/outbound/fusionauth.rsRepository: macro-inc/macro
Length of output: 122
🏁 Script executed:
# Read the file around the specified lines to get full context
sed -n '30,50p' rust/cloud-storage/mcp_auth_proxy/src/outbound/fusionauth.rsRepository: macro-inc/macro
Length of output: 680
🏁 Script executed:
# Look for other tracing::instrument usages in this file
rg 'tracing::instrument' rust/cloud-storage/mcp_auth_proxy/src/outbound/fusionauth.rs -A 3Repository: macro-inc/macro
Length of output: 497
🏁 Script executed:
# Search for similar OAuth state handling patterns in the codebase
rg 'tracing::instrument.*state' rust/cloud-storage/ -A 2Repository: macro-inc/macro
Length of output: 30138
🏁 Script executed:
# Check if there are any existing patterns of skipping state/token parameters in tracing
rg 'skip\([^)]*state' rust/cloud-storage/ -A 1Repository: macro-inc/macro
Length of output: 22009
Skip the raw OAuth state parameter in this span.
#[tracing::instrument] records function arguments by default. This currently emits the OAuth state token (used for CSRF prevention) into trace data. The underlying FusionAuthClient::construct_oauth2_authorize_url call already skips this parameter; the wrapper should do the same to prevent security-sensitive values from appearing in logs.
Suggested fix
- #[tracing::instrument(skip(self), err)]
+ #[tracing::instrument(skip(self, state), err)]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rust/cloud-storage/mcp_auth_proxy/src/outbound/fusionauth.rs` around lines 37
- 44, The tracing span on construct_authorize_url is currently recording the
sensitive OAuth `state`; update the attribute on the function to skip that
parameter so the state is not emitted (e.g., change
#[tracing::instrument(skip(self), err)] to skip the `state` argument as well),
ensuring you keep the existing `err` and still skip `self`; this targets the
construct_authorize_url function and prevents the OAuth state token from being
logged.
| tokio::spawn(async move { | ||
| let mut interval = tokio::time::interval(AUTH_PROXY_CLEANUP_INTERVAL); | ||
| loop { | ||
| interval.tick().await; | ||
| if let Err(error) = cleanup_state.cleanup_expired().await { | ||
| tracing::error!(error=?error, "auth proxy cleanup task failed"); | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Use .inspect_err() in the cleanup loop.
This is the new error-logging path for the background task, and the repo convention is to keep it as result.inspect_err(...) instead of if let Err(...).
As per coding guidelines, "Prefer .inspect_err() instead of if let Err(e) when doing logging in error handling".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rust/cloud-storage/mcp_service/src/main.rs` around lines 49 - 55, Replace the
current explicit if-let error branch inside the tokio::spawn background task
with an inspect_err call on the future result returned by
cleanup_state.cleanup_expired().await; i.e., where you currently await
cleanup_state.cleanup_expired().await and then use if let Err(error) to log,
instead call cleanup_state.cleanup_expired().await.inspect_err(|error|
tracing::error!(error=?error, "auth proxy cleanup task failed")).await so the
logging follows the repo convention (symbols: tokio::spawn,
AUTH_PROXY_CLEANUP_INTERVAL, interval.tick, cleanup_state.cleanup_expired).
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rust/cloud-storage/mcp_service/src/context.rs (1)
42-64:⚠️ Potential issue | 🟠 MajorAdd
SearchServiceUrltoMcpEnvVarsconfig and pass it toSearchServiceClient.
SearchServiceClientis initialized withdss_urlon line 144, but thememoryservice has a dedicatedSearchServiceUrlenv var that should be threaded through.McpEnvVars(lines 42-64) is missing this config field. Without it, all search tool calls from MCP will be routed to the document storage service instead of the search service.Add
SearchServiceUrlto theMcpEnvVarsstruct and pass it toSearchServiceClient::newon line 144 instead ofdss_url.Also applies to: 138–145
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_service/src/context.rs` around lines 42 - 64, McpEnvVars is missing the SearchServiceUrl field so the search client is being initialized with the document storage URL (dss_url); add SearchServiceUrl to the McpEnvVars struct and update the initialization call to pass that value into SearchServiceClient::new (replace the dss_url argument with the SearchServiceUrl field) where the client is constructed (previously around the call to SearchServiceClient::new on/near line 144).
♻️ Duplicate comments (7)
infra/stacks/fusionauth-instance/index.ts (1)
271-272:⚠️ Potential issue | 🟡 MinorRemove redundant localhost redirect allowlist entry.
http://localhost:8085/*already covershttp://localhost:8085/oauth/*, so Line 271 has a duplicate pattern.Suggested simplification
- ? ['http://localhost:8085/*', 'http://localhost:8085/oauth/*'] + ? ['http://localhost:8085/*'] : []),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@infra/stacks/fusionauth-instance/index.ts` around lines 271 - 272, In the redirect URI allowlist array, remove the redundant entry 'http://localhost:8085/oauth/*' because 'http://localhost:8085/*' already covers it; locate the array expression that currently contains both 'http://localhost:8085/*' and 'http://localhost:8085/oauth/*' and delete the latter string so only the broader pattern remains.rust/cloud-storage/macro_db_client/migrations/20260408190000_mcp_auth_proxy_refresh_credentials.sql (1)
1-7:⚠️ Potential issue | 🟠 MajorDon't persist live OAuth credentials in plaintext.
refresh_tokenis the primary key here andaccess_tokenis stored as rawTEXT, so anyone with database or backup access can replay active credentials. Hash or replace the refresh token with an opaque lookup key, and encrypt or externalize the access token before persisting it.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/macro_db_client/migrations/20260408190000_mcp_auth_proxy_refresh_credentials.sql` around lines 1 - 7, The migration currently persists plaintext credentials in table mcp_auth_proxy_refresh_credentials (columns refresh_token PRIMARY KEY and access_token TEXT); change the schema to avoid storing raw tokens by replacing refresh_token with an opaque lookup_key (e.g., token_id) or by storing only a cryptographic hash of the refresh_token (e.g., refresh_token_hash) as the primary/unique key, and change access_token to an encrypted blob (e.g., encrypted_access_token BYTEA) or remove it and reference an external secrets store; also add metadata columns for encryption parameters (iv/salt) or a vault reference and update any code that inserts/queries this table (functions that use refresh_token and access_token) to hash/lookup or encrypt/decrypt tokens accordingly.rust/cloud-storage/mcp_auth_proxy/src/inbound/axum_router.rs (1)
106-168:⚠️ Potential issue | 🟠 MajorReturn OAuth JSON errors from
/token.These branches currently emit plain-text tuples. Token clients expect machine-readable
error/error_descriptionfields, so failures likeinvalid_grantandinvalid_requestare hard to handle correctly.🧩 Suggested response helper
+fn token_error( + status: axum::http::StatusCode, + error: &'static str, + description: &'static str, +) -> Response { + (status, Json(serde_json::json!({ + "error": error, + "error_description": description, + }))) + .into_response() +} + async fn token( State(auth_proxy): State<McpAuthProxyServiceImpl>, axum::Form(params): axum::Form<TokenRequest>, ) -> Response { match auth_proxy.exchange_token(params).await { Ok(response) => Json(response).into_response(), - Err(TokenExchangeError::InvalidRefreshToken) => { - (axum::http::StatusCode::BAD_REQUEST, "invalid refresh token").into_response() - } + Err(TokenExchangeError::InvalidRefreshToken) => token_error( + axum::http::StatusCode::BAD_REQUEST, + "invalid_grant", + "invalid refresh token", + ),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_auth_proxy/src/inbound/axum_router.rs` around lines 106 - 168, The token handler currently returns plain-text tuples for many TokenExchangeError branches; change each error branch in the token(...) async fn to return a JSON body with OAuth2-style fields { "error": "<oauth_error_code>", "error_description": "<human message>" } using Json(...) . Map variants like UnsupportedGrantType, CodeRequired, InvalidOrExpiredCode, CodeExpired, RedirectUriMismatch, RedirectUriRequired, CodeVerifierRequired, PkceVerificationFailed, RefreshTokenRequired, InvalidRefreshToken to appropriate error codes ("invalid_request" or "invalid_grant") and descriptive error_description strings, and keep status codes as before; for internal errors (RefreshCredentialsStorage(error) and RefreshFailed(error)) include logging (tracing::error!) and return a JSON body with "error":"server_error" or "error":"temporarily_unavailable" and an error_description, keeping HTTP 500/502 respectively. Ensure the returned type remains Response by calling Json(error_response).into_response() for each branch.rust/cloud-storage/mcp_auth_proxy/src/outbound/pg_refresh_credentials_store.rs (1)
3-3:⚠️ Potential issue | 🔴 CriticalImport
Futureor this adapter won't compile.Lines 31, 60, 83, and 99 use
dyn Future<...>, but Line 3 only importsPin.Futureisn't in the Rust prelude, so this file fails to compile as written.🐛 Proposed fix
-use std::pin::Pin; +use std::{future::Future, pin::Pin};Run this read-only check to confirm the unresolved symbol is used without an import:
#!/bin/bash set -euo pipefail file="rust/cloud-storage/mcp_auth_proxy/src/outbound/pg_refresh_credentials_store.rs" echo "=== top of file ===" sed -n '1,12p' "$file" echo echo "=== Future references ===" rg -n '\bFuture\b' "$file" echo echo "=== std/core Future imports ===" rg -n 'use\s+(std|core)::.*\bFuture\b' "$file" || trueAlso applies to: 31-31, 60-60, 83-83, 99-99
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_auth_proxy/src/outbound/pg_refresh_credentials_store.rs` at line 3, The file imports Pin but also uses dyn Future in several places (e.g., in the outbound/pg_refresh_credentials_store module where functions return dyn Future at call sites like the async adapter types referenced on lines using dyn Future); add an import for std::future::Future at the top of the file (alongside the existing Pin import) so the dyn Future<T> references compile, and re-run the build to verify the unresolved symbol is resolved.rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs (2)
288-292:⚠️ Potential issue | 🟠 MajorReject expired pending sessions before exchanging the upstream code.
remove()only proves the session exists. If cleanup has not run yet, callbacks pastPENDING_AUTH_TTLare still accepted and exchanged upstream. Checkpending.expires_atbefore callingexchange_authorization_code.🛠️ Minimal fix
let pending = self .pending .remove(&session_id) .map(|(_, pending)| pending) .ok_or(CompleteCallbackError::UnknownOrExpiredSession)?; + + if pending.expires_at <= Instant::now() { + return Err(CompleteCallbackError::UnknownOrExpiredSession); + } let (access_token, refresh_token) = self🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs` around lines 288 - 292, The code currently removes the pending entry and proceeds to exchange the upstream code without checking expiry; update the flow in the method where you call self.pending.remove(&session_id) (the variable `pending`) to first retrieve and inspect pending.expires_at against the current time (e.g., Utc::now()) and if expired return CompleteCallbackError::UnknownOrExpiredSession instead of calling exchange_authorization_code; only call exchange_authorization_code for non-expired pending sessions (or alternatively check expiry before removing so you still remove on success).
53-58:⚠️ Potential issue | 🔴 CriticalMove pending sessions and issued codes out of process memory.
These
DashMaps only survive inside one instance. After extracting MCP into its own service, a restart or cross-replica hop between/authorize,/oauth/callback, and/tokenwill make valid flows fail. If this has to ship first, a hard single-replica rollout is the only safe stopgap.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs` around lines 53 - 58, The pending and codes DashMap fields on McpAuthProxyServiceImpl (pending: Arc<DashMap<String, PendingAuthorization>> and codes: Arc<DashMap<String, IssuedAuthorizationCode>>) are in-process only and must be moved to an out-of-process store; replace these in-memory maps with an interface-backed persistent store (e.g., a trait like PendingSessionStore / AuthorizationCodeStore implemented by Redis/DB) and inject instances via the existing constructor alongside oauth_provider and refresh_credentials_store, then update all uses (get/set/remove) of PendingAuthorization and IssuedAuthorizationCode in McpAuthProxyServiceImpl methods to call the new store trait methods so sessions and codes survive restarts and cross-replica requests.rust/cloud-storage/mcp_auth_proxy/src/domain/ports.rs (1)
29-50:⚠️ Potential issue | 🟠 MajorMake refresh-token rotation a single store operation.
RefreshCredentialsStoreonly exposesget/delete/put, sodomain/service.rshas to span refresh rotation across multiple awaited calls. That still leaves a race where two refreshes can read the same token before either delete lands, and a delete/put failure can strand the session after the upstream refresh already succeeded.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_auth_proxy/src/domain/ports.rs` around lines 29 - 50, RefreshCredentialsStore's separate get/delete/put causes race conditions during refresh-token rotation; add an atomic operation on the trait (e.g., a method named rotate_refresh_credentials or replace_refresh_credentials) that accepts the refresh_token plus the expected current RefreshCredentials (Option<RefreshCredentials>) and the new RefreshCredentials and performs a compare-and-swap-style replace returning the previous value or success/failure result; update callers in domain/service.rs to use this single atomic rotate method instead of sequential get/delete/put and implement the new trait method in all concrete stores.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs`:
- Around line 136-167: The code currently removes the auth code from self.codes
immediately via self.codes.remove(...) and then performs redirect URI and PKCE
checks; change this so you first lookup/peek the entry (e.g., self.codes.get or
.get_key_value equivalent) to validate issued.redirect_uri, issued.expires_at
and PKCE using params.redirect_uri and params.code_verifier, and only call
self.codes.remove(...) to consume the code after all checks pass; keep the same
TokenExchangeError variants (TokenExchangeError::CodeRequired,
::InvalidOrExpiredCode, ::CodeExpired, ::RedirectUriRequired,
::RedirectUriMismatch, ::CodeVerifierRequired, ::PkceVerificationFailed) and
ensure the issued variable is obtained from the non-consuming lookup before
removal.
---
Outside diff comments:
In `@rust/cloud-storage/mcp_service/src/context.rs`:
- Around line 42-64: McpEnvVars is missing the SearchServiceUrl field so the
search client is being initialized with the document storage URL (dss_url); add
SearchServiceUrl to the McpEnvVars struct and update the initialization call to
pass that value into SearchServiceClient::new (replace the dss_url argument with
the SearchServiceUrl field) where the client is constructed (previously around
the call to SearchServiceClient::new on/near line 144).
---
Duplicate comments:
In `@infra/stacks/fusionauth-instance/index.ts`:
- Around line 271-272: In the redirect URI allowlist array, remove the redundant
entry 'http://localhost:8085/oauth/*' because 'http://localhost:8085/*' already
covers it; locate the array expression that currently contains both
'http://localhost:8085/*' and 'http://localhost:8085/oauth/*' and delete the
latter string so only the broader pattern remains.
In
`@rust/cloud-storage/macro_db_client/migrations/20260408190000_mcp_auth_proxy_refresh_credentials.sql`:
- Around line 1-7: The migration currently persists plaintext credentials in
table mcp_auth_proxy_refresh_credentials (columns refresh_token PRIMARY KEY and
access_token TEXT); change the schema to avoid storing raw tokens by replacing
refresh_token with an opaque lookup_key (e.g., token_id) or by storing only a
cryptographic hash of the refresh_token (e.g., refresh_token_hash) as the
primary/unique key, and change access_token to an encrypted blob (e.g.,
encrypted_access_token BYTEA) or remove it and reference an external secrets
store; also add metadata columns for encryption parameters (iv/salt) or a vault
reference and update any code that inserts/queries this table (functions that
use refresh_token and access_token) to hash/lookup or encrypt/decrypt tokens
accordingly.
In `@rust/cloud-storage/mcp_auth_proxy/src/domain/ports.rs`:
- Around line 29-50: RefreshCredentialsStore's separate get/delete/put causes
race conditions during refresh-token rotation; add an atomic operation on the
trait (e.g., a method named rotate_refresh_credentials or
replace_refresh_credentials) that accepts the refresh_token plus the expected
current RefreshCredentials (Option<RefreshCredentials>) and the new
RefreshCredentials and performs a compare-and-swap-style replace returning the
previous value or success/failure result; update callers in domain/service.rs to
use this single atomic rotate method instead of sequential get/delete/put and
implement the new trait method in all concrete stores.
In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs`:
- Around line 288-292: The code currently removes the pending entry and proceeds
to exchange the upstream code without checking expiry; update the flow in the
method where you call self.pending.remove(&session_id) (the variable `pending`)
to first retrieve and inspect pending.expires_at against the current time (e.g.,
Utc::now()) and if expired return CompleteCallbackError::UnknownOrExpiredSession
instead of calling exchange_authorization_code; only call
exchange_authorization_code for non-expired pending sessions (or alternatively
check expiry before removing so you still remove on success).
- Around line 53-58: The pending and codes DashMap fields on
McpAuthProxyServiceImpl (pending: Arc<DashMap<String, PendingAuthorization>> and
codes: Arc<DashMap<String, IssuedAuthorizationCode>>) are in-process only and
must be moved to an out-of-process store; replace these in-memory maps with an
interface-backed persistent store (e.g., a trait like PendingSessionStore /
AuthorizationCodeStore implemented by Redis/DB) and inject instances via the
existing constructor alongside oauth_provider and refresh_credentials_store,
then update all uses (get/set/remove) of PendingAuthorization and
IssuedAuthorizationCode in McpAuthProxyServiceImpl methods to call the new store
trait methods so sessions and codes survive restarts and cross-replica requests.
In `@rust/cloud-storage/mcp_auth_proxy/src/inbound/axum_router.rs`:
- Around line 106-168: The token handler currently returns plain-text tuples for
many TokenExchangeError branches; change each error branch in the token(...)
async fn to return a JSON body with OAuth2-style fields { "error":
"<oauth_error_code>", "error_description": "<human message>" } using Json(...) .
Map variants like UnsupportedGrantType, CodeRequired, InvalidOrExpiredCode,
CodeExpired, RedirectUriMismatch, RedirectUriRequired, CodeVerifierRequired,
PkceVerificationFailed, RefreshTokenRequired, InvalidRefreshToken to appropriate
error codes ("invalid_request" or "invalid_grant") and descriptive
error_description strings, and keep status codes as before; for internal errors
(RefreshCredentialsStorage(error) and RefreshFailed(error)) include logging
(tracing::error!) and return a JSON body with "error":"server_error" or
"error":"temporarily_unavailable" and an error_description, keeping HTTP 500/502
respectively. Ensure the returned type remains Response by calling
Json(error_response).into_response() for each branch.
In
`@rust/cloud-storage/mcp_auth_proxy/src/outbound/pg_refresh_credentials_store.rs`:
- Line 3: The file imports Pin but also uses dyn Future in several places (e.g.,
in the outbound/pg_refresh_credentials_store module where functions return dyn
Future at call sites like the async adapter types referenced on lines using dyn
Future); add an import for std::future::Future at the top of the file (alongside
the existing Pin import) so the dyn Future<T> references compile, and re-run the
build to verify the unresolved symbol is resolved.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: e544eb35-2e25-459e-b5ba-11382e9a3b62
⛔ Files ignored due to path filters (1)
rust/cloud-storage/Cargo.lockis excluded by!**/*.lock,!**/Cargo.lock
📒 Files selected for processing (13)
infra/stacks/fusionauth-instance/index.tsinfra/stacks/mcp-server/mcp-server.tsrust/cloud-storage/macro_db_client/migrations/20260408190000_mcp_auth_proxy_refresh_credentials.sqlrust/cloud-storage/mcp_auth_proxy/Cargo.tomlrust/cloud-storage/mcp_auth_proxy/src/domain/models.rsrust/cloud-storage/mcp_auth_proxy/src/domain/ports.rsrust/cloud-storage/mcp_auth_proxy/src/domain/service.rsrust/cloud-storage/mcp_auth_proxy/src/inbound/axum_router.rsrust/cloud-storage/mcp_auth_proxy/src/outbound.rsrust/cloud-storage/mcp_auth_proxy/src/outbound/fusionauth.rsrust/cloud-storage/mcp_auth_proxy/src/outbound/pg_refresh_credentials_store.rsrust/cloud-storage/mcp_service/src/context.rsrust/cloud-storage/mcp_service/src/main.rs
| let issued = self | ||
| .codes | ||
| .remove( | ||
| params | ||
| .code | ||
| .as_deref() | ||
| .ok_or(TokenExchangeError::CodeRequired)?, | ||
| ) | ||
| .map(|(_, code)| code) | ||
| .ok_or(TokenExchangeError::InvalidOrExpiredCode)?; | ||
|
|
||
| if issued.expires_at < Instant::now() { | ||
| return Err(TokenExchangeError::CodeExpired); | ||
| } | ||
|
|
||
| match ¶ms.redirect_uri { | ||
| Some(uri) if *uri != issued.redirect_uri => { | ||
| return Err(TokenExchangeError::RedirectUriMismatch); | ||
| } | ||
| None => return Err(TokenExchangeError::RedirectUriRequired), | ||
| _ => {} | ||
| } | ||
|
|
||
| match params.code_verifier { | ||
| Some(verifier) => { | ||
| let digest = Sha256::digest(verifier.as_bytes()); | ||
| let computed = URL_SAFE_NO_PAD.encode(digest); | ||
| if computed != issued.code_challenge { | ||
| return Err(TokenExchangeError::PkceVerificationFailed); | ||
| } | ||
| } | ||
| None => return Err(TokenExchangeError::CodeVerifierRequired), |
There was a problem hiding this comment.
Delay code consumption until after local validation.
self.codes.remove(...) runs before the redirect URI and PKCE checks. A caller that only learns the broker code can send an invalid /token request and permanently burn it without the verifier, because the legitimate client will later see InvalidOrExpiredCode.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs` around lines 136 -
167, The code currently removes the auth code from self.codes immediately via
self.codes.remove(...) and then performs redirect URI and PKCE checks; change
this so you first lookup/peek the entry (e.g., self.codes.get or .get_key_value
equivalent) to validate issued.redirect_uri, issued.expires_at and PKCE using
params.redirect_uri and params.code_verifier, and only call
self.codes.remove(...) to consume the code after all checks pass; keep the same
TokenExchangeError variants (TokenExchangeError::CodeRequired,
::InvalidOrExpiredCode, ::CodeExpired, ::RedirectUriRequired,
::RedirectUriMismatch, ::CodeVerifierRequired, ::PkceVerificationFailed) and
ensure the issued variable is obtained from the non-consuming lookup before
removal.
I think I've addressed all your feedback. The donkey (coderabbit) is pointing out that I need to encrypt the refresh tokens and he's probably right. I'll do that before merging |
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (7)
rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs (5)
81-84: 🛠️ Refactor suggestion | 🟠 MajorInstrument the Result-returning service methods.
These are the core auth paths, and every failure currently loses span/error context at the point where it originates. Please add
#[tracing::instrument(err)]torefresh_token_exchange,exchange_authorization_code_token,start_authorization,complete_callback,exchange_token, andcleanup_expired.As per coding guidelines: Use
#[tracing::instrument(err)]on functions returningResult. Do NOT adderron functions returningOption,(), or non-Result types. Never includelevel = "info".Also applies to: 132-135, 238-241, 272-275, 322-325, 334-338
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs` around lines 81 - 84, The listed service methods lose tracing error context; add the attribute #[tracing::instrument(err)] to each Result-returning method: refresh_token_exchange, exchange_authorization_code_token, start_authorization, complete_callback, exchange_token, and cleanup_expired (also apply at the other occurrences around the indicated ranges), making sure you only add err on functions that return Result and do NOT add err to Option/() or any non-Result functions and do not include level = "info".
312-317:⚠️ Potential issue | 🟠 MajorAppend
code/statewithUrl, notformat!.If
client_redirect_urialready has a query string or fragment, this produces an invalid loopback callback URL. Build the redirect withurl::Urlandquery_pairs_mut()instead.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs` around lines 312 - 317, The current code builds the redirect URI using format! with pending.client_redirect_uri, which breaks when the redirect already has query params or fragments; instead parse pending.client_redirect_uri with url::Url, use url::Url::parse or Url::from_str, call url.query_pairs_mut().append_pair("code", &issued_code).append_pair("state", &pending.client_state), then return the Url as the string (e.g., url.into_string()) so that code and state are appended safely and correctly; reference pending.client_redirect_uri, issued_code, pending.client_state, and use url::Url and query_pairs_mut() to implement the fix.
53-58:⚠️ Potential issue | 🔴 CriticalMove broker session/code state out of process memory.
This standalone service still keeps
pendingandcodesin per-processDashMaps. Any restart or replica hop between/authorize,/oauth/callback, and/tokenwill turn valid flows intoUnknownOrExpiredSession/InvalidOrExpiredCode.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs` around lines 53 - 58, The service keeps ephemeral per-process state in McpAuthProxyServiceImpl fields pending and codes (DashMap<String, PendingAuthorization> and DashMap<String, IssuedAuthorizationCode>), which must be persisted so OAuth flows survive restarts/replicas; refactor to move pending and codes into a durable store (reuse or extend RefreshCredentialsStore or add a new trait like PersistentSessionStore) and update methods that read/write these maps (authorize, oauth_callback, token handlers) to use the persistent store APIs (get/set/delete) instead of the in-memory DashMaps so sessions and issued codes survive process restarts and cross-replica routing.
288-292:⚠️ Potential issue | 🟠 MajorReject expired pending sessions on callback.
cleanup_expired()is only periodic, so removing the entry is not enough. A callback that arrives afterPENDING_AUTH_TTLbut before the next sweep is still accepted here.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs` around lines 288 - 292, The callback path currently removes the pending entry but doesn't check its age, allowing callbacks after PENDING_AUTH_TTL (but before cleanup_expired runs) to succeed; update the removal logic in the function using self.pending.remove(&session_id) (the tuple currently deconstructed as |(_, pending)| pending) to inspect the stored timestamp/creation time, compare it to now against PENDING_AUTH_TTL, and if expired discard it and return CompleteCallbackError::UnknownOrExpiredSession; only proceed with the pending data when the timestamp is within the TTL.
136-181:⚠️ Potential issue | 🟠 MajorOnly consume the broker code after validation and durable persistence.
self.codes.remove(...)happens before expiry / redirect URI / PKCE checks and before the refresh credentials are written. That means a bad/tokenrequest can burn a valid code, and a transientput()failure forces a full re-auth instead of letting the client retry.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs` around lines 136 - 181, The code currently calls self.codes.remove(...) immediately which consumes the authorization code before validating expires_at/redirect_uri/PKCE and before persisting refresh credentials; change the flow to first lookup the entry without removing (e.g., read/peek issued via self.codes.get or equivalent using params.code.as_deref()), perform expiry/redirect_uri/code_verifier checks against the fetched issued, then call self.refresh_credentials_store.put(...) and only after a successful put remove the code from self.codes (self.codes.remove(&code)). Ensure TokenExchangeError variants are preserved on validation failures and map storage errors from put as before so transient put failures do not burn the code.rust/cloud-storage/mcp_auth_proxy/src/inbound/axum_router.rs (1)
106-168:⚠️ Potential issue | 🟠 MajorReturn OAuth JSON errors from
/token.The success path is JSON, but every
TokenExchangeErrorarm still returns a plain-text tuple. OAuth/MCP clients need machine-readableerror/error_descriptionbodies here, especially on refresh and code-exchange failures.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_auth_proxy/src/inbound/axum_router.rs` around lines 106 - 168, The token handler returns plain-text tuples for TokenExchangeError branches but must return machine-readable OAuth JSON; update async fn token (handling TokenExchangeError) to convert each error arm into a JSON response with fields like error and error_description (and include any internal details only in error_description when appropriate) and use the same status codes currently used; for example, replace tuple responses in match arms for TokenExchangeError::UnsupportedGrantType, ::CodeRequired, ::InvalidOrExpiredCode, ::CodeExpired, ::RedirectUriMismatch, ::RedirectUriRequired, ::CodeVerifierRequired, ::PkceVerificationFailed, ::RefreshTokenRequired, ::InvalidRefreshToken, ::RefreshCredentialsStorage(error), and ::RefreshFailed(error) with Json(oauth_error_struct).into_response() (or serde_json::json! macro) including the error identifier (e.g. "invalid_grant", "invalid_request", "invalid_client", etc.) and a descriptive error_description string, and keep the tracing::error logging for internal errors (RefreshCredentialsStorage, RefreshFailed) but do not expose stack traces in the JSON body.rust/cloud-storage/mcp_auth_proxy/src/domain/ports.rs (1)
28-50:⚠️ Potential issue | 🟠 MajorAdd an atomic consume/rotate primitive to the store API.
The current
get/delete/putcontract forcesrust/cloud-storage/mcp_auth_proxy/src/domain/service.rs:89-123to rotate a refresh token across multiple awaited calls. That lets concurrent refreshes race on the same token and leaves the session stranded if the delete succeeds but the subsequent put fails.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rust/cloud-storage/mcp_auth_proxy/src/domain/ports.rs` around lines 28 - 50, The store API must provide an atomic consume/rotate primitive to avoid races when rotating a refresh token; add a new async method on the RefreshCredentialsStore trait (e.g. consume_and_replace or rotate_credentials) that accepts the target RefreshToken and the new RefreshCredentials and performs the read-delete-put atomically, returning the consumed old RefreshCredentials (Option<RefreshCredentials>) or an error; update the rotation logic in the service that currently calls get/delete/put to use this new atomic method (replace the multi-await sequence in the rotation code path with a single await of the new method) so concurrent refreshes cannot interleave and leave the session stranded.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@rust/cloud-storage/mcp_auth_proxy/src/outbound/pg_refresh_credentials_store.rs`:
- Around line 48-65: The put() path currently binds credentials.access_token as
plaintext; replace that by encrypting the access token with the application
envelope/AEAD before binding (e.g., produce an encrypted bytes or base64 string)
and change the stored column handling if needed (store bytea or base64 string) —
specifically update the code around the INSERT/ON CONFLICT that binds
access_token (and the similar block at the other occurrence) to call your
encryption helper (e.g., encrypt_access_token()) and bind the ciphertext
instead; likewise update get() to read the encrypted access_token, decrypt it
via decrypt_access_token() and return the plaintext in the Credentials struct,
and surface decryption errors properly. Ensure key management is used (KMS or
local master key), add unit tests for round-trip encrypt/decrypt for
refresh_token_hash/access_token, and handle failures (return error) rather than
returning plaintext on error.
---
Duplicate comments:
In `@rust/cloud-storage/mcp_auth_proxy/src/domain/ports.rs`:
- Around line 28-50: The store API must provide an atomic consume/rotate
primitive to avoid races when rotating a refresh token; add a new async method
on the RefreshCredentialsStore trait (e.g. consume_and_replace or
rotate_credentials) that accepts the target RefreshToken and the new
RefreshCredentials and performs the read-delete-put atomically, returning the
consumed old RefreshCredentials (Option<RefreshCredentials>) or an error; update
the rotation logic in the service that currently calls get/delete/put to use
this new atomic method (replace the multi-await sequence in the rotation code
path with a single await of the new method) so concurrent refreshes cannot
interleave and leave the session stranded.
In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs`:
- Around line 81-84: The listed service methods lose tracing error context; add
the attribute #[tracing::instrument(err)] to each Result-returning method:
refresh_token_exchange, exchange_authorization_code_token, start_authorization,
complete_callback, exchange_token, and cleanup_expired (also apply at the other
occurrences around the indicated ranges), making sure you only add err on
functions that return Result and do NOT add err to Option/() or any non-Result
functions and do not include level = "info".
- Around line 312-317: The current code builds the redirect URI using format!
with pending.client_redirect_uri, which breaks when the redirect already has
query params or fragments; instead parse pending.client_redirect_uri with
url::Url, use url::Url::parse or Url::from_str, call
url.query_pairs_mut().append_pair("code", &issued_code).append_pair("state",
&pending.client_state), then return the Url as the string (e.g.,
url.into_string()) so that code and state are appended safely and correctly;
reference pending.client_redirect_uri, issued_code, pending.client_state, and
use url::Url and query_pairs_mut() to implement the fix.
- Around line 53-58: The service keeps ephemeral per-process state in
McpAuthProxyServiceImpl fields pending and codes (DashMap<String,
PendingAuthorization> and DashMap<String, IssuedAuthorizationCode>), which must
be persisted so OAuth flows survive restarts/replicas; refactor to move pending
and codes into a durable store (reuse or extend RefreshCredentialsStore or add a
new trait like PersistentSessionStore) and update methods that read/write these
maps (authorize, oauth_callback, token handlers) to use the persistent store
APIs (get/set/delete) instead of the in-memory DashMaps so sessions and issued
codes survive process restarts and cross-replica routing.
- Around line 288-292: The callback path currently removes the pending entry but
doesn't check its age, allowing callbacks after PENDING_AUTH_TTL (but before
cleanup_expired runs) to succeed; update the removal logic in the function using
self.pending.remove(&session_id) (the tuple currently deconstructed as |(_,
pending)| pending) to inspect the stored timestamp/creation time, compare it to
now against PENDING_AUTH_TTL, and if expired discard it and return
CompleteCallbackError::UnknownOrExpiredSession; only proceed with the pending
data when the timestamp is within the TTL.
- Around line 136-181: The code currently calls self.codes.remove(...)
immediately which consumes the authorization code before validating
expires_at/redirect_uri/PKCE and before persisting refresh credentials; change
the flow to first lookup the entry without removing (e.g., read/peek issued via
self.codes.get or equivalent using params.code.as_deref()), perform
expiry/redirect_uri/code_verifier checks against the fetched issued, then call
self.refresh_credentials_store.put(...) and only after a successful put remove
the code from self.codes (self.codes.remove(&code)). Ensure TokenExchangeError
variants are preserved on validation failures and map storage errors from put as
before so transient put failures do not burn the code.
In `@rust/cloud-storage/mcp_auth_proxy/src/inbound/axum_router.rs`:
- Around line 106-168: The token handler returns plain-text tuples for
TokenExchangeError branches but must return machine-readable OAuth JSON; update
async fn token (handling TokenExchangeError) to convert each error arm into a
JSON response with fields like error and error_description (and include any
internal details only in error_description when appropriate) and use the same
status codes currently used; for example, replace tuple responses in match arms
for TokenExchangeError::UnsupportedGrantType, ::CodeRequired,
::InvalidOrExpiredCode, ::CodeExpired, ::RedirectUriMismatch,
::RedirectUriRequired, ::CodeVerifierRequired, ::PkceVerificationFailed,
::RefreshTokenRequired, ::InvalidRefreshToken,
::RefreshCredentialsStorage(error), and ::RefreshFailed(error) with
Json(oauth_error_struct).into_response() (or serde_json::json! macro) including
the error identifier (e.g. "invalid_grant", "invalid_request", "invalid_client",
etc.) and a descriptive error_description string, and keep the tracing::error
logging for internal errors (RefreshCredentialsStorage, RefreshFailed) but do
not expose stack traces in the JSON body.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 45c6453b-00e7-462f-8a25-436f27e206b9
⛔ Files ignored due to path filters (1)
rust/cloud-storage/Cargo.lockis excluded by!**/*.lock,!**/Cargo.lock
📒 Files selected for processing (13)
infra/stacks/fusionauth-instance/index.tsinfra/stacks/mcp-server/mcp-server.tsrust/cloud-storage/macro_db_client/migrations/20260408190000_mcp_auth_proxy_refresh_credentials.sqlrust/cloud-storage/mcp_auth_proxy/Cargo.tomlrust/cloud-storage/mcp_auth_proxy/src/domain/models.rsrust/cloud-storage/mcp_auth_proxy/src/domain/ports.rsrust/cloud-storage/mcp_auth_proxy/src/domain/service.rsrust/cloud-storage/mcp_auth_proxy/src/inbound/axum_router.rsrust/cloud-storage/mcp_auth_proxy/src/outbound.rsrust/cloud-storage/mcp_auth_proxy/src/outbound/fusionauth.rsrust/cloud-storage/mcp_auth_proxy/src/outbound/pg_refresh_credentials_store.rsrust/cloud-storage/mcp_service/src/context.rsrust/cloud-storage/mcp_service/src/main.rs
| sqlx::query( | ||
| r#" | ||
| INSERT INTO mcp_auth_proxy_refresh_credentials ( | ||
| refresh_token_hash, | ||
| access_token, | ||
| expires_at | ||
| ) | ||
| VALUES ($1, $2, $3) | ||
| ON CONFLICT (refresh_token_hash) DO UPDATE | ||
| SET access_token = EXCLUDED.access_token, | ||
| expires_at = EXCLUDED.expires_at, | ||
| updated_at = now() | ||
| "#, | ||
| ) | ||
| .bind(refresh_token_hash) | ||
| .bind(credentials.access_token.as_str()) | ||
| .bind(credentials.expires_at) | ||
| .execute(&self.db) |
There was a problem hiding this comment.
Encrypt persisted bearer tokens before writing them to Postgres.
put() inserts credentials.access_token as raw text, and get() reads the same value back unprotected. A DB leak now exposes live upstream bearer credentials, so this field needs application-level encryption or envelope encryption before this store is safe to run in production.
Also applies to: 78-92
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@rust/cloud-storage/mcp_auth_proxy/src/outbound/pg_refresh_credentials_store.rs`
around lines 48 - 65, The put() path currently binds credentials.access_token as
plaintext; replace that by encrypting the access token with the application
envelope/AEAD before binding (e.g., produce an encrypted bytes or base64 string)
and change the stored column handling if needed (store bytea or base64 string) —
specifically update the code around the INSERT/ON CONFLICT that binds
access_token (and the similar block at the other occurrence) to call your
encryption helper (e.g., encrypt_access_token()) and bind the ciphertext
instead; likewise update get() to read the encrypted access_token, decrypt it
via decrypt_access_token() and return the plaintext in the Credentials struct,
and surface decryption errors properly. Ensure key management is used (KMS or
local master key), add unit tests for round-trip encrypt/decrypt for
refresh_token_hash/access_token, and handle failures (return error) rather than
returning plaintext on error.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rust/cloud-storage/fusionauth/src/oauth/refresh_token.rs`:
- Around line 61-88: The non-OK branch should preserve FusionAuth "invalid
grant"/invalid refresh-token failures instead of collapsing them to Generic: in
the error arm of the match in oauth/refresh_token.rs (the res.status() _ =>
branch) detect FusionAuth's refresh-grant failure (either by status code
commonly returned for invalid grant or by inspecting the response body for the
OAuth error string "invalid_grant"/"invalid_refresh_token"), and return
FusionAuthClientError::InvalidGrant (or the specific InvalidRefreshToken variant
you use) rather than FusionAuthClientError::Generic; keep the existing
tracing::error log but map the parsed body/text to the correct
FusionAuthClientError variant so FusionAuthOAuthProvider::refresh_access_token
can surface RefreshFailed/InvalidRefreshToken downstream.
- Around line 93-95: The tracing span on complete_refresh_token_grant should
record returned errors; update the tracing attribute on the function (pub async
fn complete_refresh_token_grant) to include the err option (e.g. add , err to
#[tracing::instrument(skip(self), fields(...), err)]) so failures are recorded
on the span when the Result is Err.
In `@rust/cloud-storage/mcp_auth_proxy/src/domain/service.rs`:
- Around line 1-17: The trait OAuthProvider (in super::ports) uses impl
Future<Output = ...> in its method return types (e.g., the async-returning
methods such as authorize, callback, token), but Future is not imported; fix
this by adding an import for Future (e.g., add use std::future::Future;) near
the top of the file so the trait method return types compile.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: dc40a371-f1f4-4c56-9daa-dcdc354ecd56
📒 Files selected for processing (11)
infra/stacks/fusionauth-instance/index.tsinfra/stacks/mcp-server/mcp-server.tsrust/cloud-storage/fusionauth/src/oauth/mod.rsrust/cloud-storage/fusionauth/src/oauth/refresh_token.rsrust/cloud-storage/mcp_auth_proxy/src/domain/models.rsrust/cloud-storage/mcp_auth_proxy/src/domain/ports.rsrust/cloud-storage/mcp_auth_proxy/src/domain/service.rsrust/cloud-storage/mcp_auth_proxy/src/inbound/axum_router.rsrust/cloud-storage/mcp_auth_proxy/src/lib.rsrust/cloud-storage/mcp_auth_proxy/src/outbound/fusionauth.rsrust/cloud-storage/mcp_service/src/main.rs
| match res.status() { | ||
| reqwest::StatusCode::OK => { | ||
| tracing::info!("refresh token grant complete"); | ||
| let body = res | ||
| .json::<RefreshTokenGrantCompleteResponse>() | ||
| .await | ||
| .map_err(|e| { | ||
| tracing::error!(error=?e, "unable to decode successful oauth2 token response"); | ||
| FusionAuthClientError::Generic(GenericErrorResponse { | ||
| message: e.to_string(), | ||
| }) | ||
| })?; | ||
|
|
||
| Ok((body.access_token.into(), body.refresh_token.into())) | ||
| } | ||
| _ => { | ||
| let body = res.text().await.map_err(|e| { | ||
| FusionAuthClientError::Generic(GenericErrorResponse { | ||
| message: e.to_string(), | ||
| }) | ||
| })?; | ||
|
|
||
| tracing::error!(body=%body, "unexpected response from fusionauth"); | ||
|
|
||
| Err(FusionAuthClientError::Generic(GenericErrorResponse { | ||
| message: body, | ||
| })) | ||
| } |
There was a problem hiding this comment.
Preserve invalid refresh-token failures instead of collapsing them to Generic.
This branch turns every non-200 refresh response into a generic upstream failure. When FusionAuth rejects an expired/revoked refresh token, that now flows through FusionAuthOAuthProvider::refresh_access_token as RefreshFailed, and /token returns a 502 for a client-side auth error. Map the known refresh-grant failures to FusionAuthClientError::InvalidGrant / InvalidRefreshToken here so the broker can return the right OAuth error downstream.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rust/cloud-storage/fusionauth/src/oauth/refresh_token.rs` around lines 61 -
88, The non-OK branch should preserve FusionAuth "invalid grant"/invalid
refresh-token failures instead of collapsing them to Generic: in the error arm
of the match in oauth/refresh_token.rs (the res.status() _ => branch) detect
FusionAuth's refresh-grant failure (either by status code commonly returned for
invalid grant or by inspecting the response body for the OAuth error string
"invalid_grant"/"invalid_refresh_token"), and return
FusionAuthClientError::InvalidGrant (or the specific InvalidRefreshToken variant
you use) rather than FusionAuthClientError::Generic; keep the existing
tracing::error log but map the parsed body/text to the correct
FusionAuthClientError variant so FusionAuthOAuthProvider::refresh_access_token
can surface RefreshFailed/InvalidRefreshToken downstream.
| /// Completes the OAuth2 refresh token grant flow. | ||
| #[tracing::instrument(skip(self), fields(application_id=%self.client_id, fusion_auth_base_url=%self.fusion_auth_base_url))] | ||
| pub async fn complete_refresh_token_grant( |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
🧩 Analysis chain
🏁 Script executed:
cat -n rust/cloud-storage/fusionauth/src/oauth/refresh_token.rs | sed -n '90,120p'Repository: macro-inc/macro
Length of output: 990
Add err to this tracing span.
This method returns Result, so failures should be recorded on the span too.
♻️ Suggested change
- #[tracing::instrument(skip(self), fields(application_id=%self.client_id, fusion_auth_base_url=%self.fusion_auth_base_url))]
+ #[tracing::instrument(
+ err,
+ skip(self),
+ fields(application_id=%self.client_id, fusion_auth_base_url=%self.fusion_auth_base_url)
+ )]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Completes the OAuth2 refresh token grant flow. | |
| #[tracing::instrument(skip(self), fields(application_id=%self.client_id, fusion_auth_base_url=%self.fusion_auth_base_url))] | |
| pub async fn complete_refresh_token_grant( | |
| /// Completes the OAuth2 refresh token grant flow. | |
| #[tracing::instrument( | |
| err, | |
| skip(self), | |
| fields(application_id=%self.client_id, fusion_auth_base_url=%self.fusion_auth_base_url) | |
| )] | |
| pub async fn complete_refresh_token_grant( |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rust/cloud-storage/fusionauth/src/oauth/refresh_token.rs` around lines 93 -
95, The tracing span on complete_refresh_token_grant should record returned
errors; update the tracing attribute on the function (pub async fn
complete_refresh_token_grant) to include the err option (e.g. add , err to
#[tracing::instrument(skip(self), fields(...), err)]) so failures are recorded
on the span when the Result is Err.