Skip to content

feat(proxy): per-credential and global concurrency caps (#210)#211

Open
hashedone wants to merge 4 commits into
feature/llm-proxyfrom
feat/proxy-concurrency-caps
Open

feat(proxy): per-credential and global concurrency caps (#210)#211
hashedone wants to merge 4 commits into
feature/llm-proxyfrom
feat/proxy-concurrency-caps

Conversation

@hashedone
Copy link
Copy Markdown
Contributor

Closes #210. Sub-issue of #181.

What this PR ships

Schema (migrations/025_user_anthropic_keys_max_concurrent.sql)

ALTER TABLE user_anthropic_keys
    ADD COLUMN max_concurrent INTEGER NOT NULL DEFAULT 8
    CHECK (max_concurrent > 0 AND max_concurrent <= 256);

Existing rows pick up the default 8 via the migration. Bounds mirror the API/UI validation so an out-of-range row is structurally impossible.

API

  • GET /api/v1/me/anthropic-key now returns max_concurrent: i32 | null.
  • PUT /api/v1/me/anthropic-key accepts an optional max_concurrent. None preserves the existing value on rotation, falls back to the DB default 8 on first insert. Out-of-range values rejected with a clear 400 before hitting the DB CHECK.

Proxy handler

Two semaphores acquired on every request, both held for the full lifetime of the response stream:

  1. Global semaphoreOption<Arc<Semaphore>> from PROXY_MAX_GLOBAL_CONCURRENT. Unset = unlimited (default; correct for current small-team deployments). Operator turns it on after capacity testing.
  2. Per-credential semaphoreDashMap<Uuid, Arc<Semaphore>> on AppState, keyed by user_anthropic_keys.user_id. Lazily created on first request per credential, sized to the row's max_concurrent at that moment.

Acquisition order is global → per-credential. Permits drop in reverse order via RAII (per-credential first), which is exception-safe regardless of where a request fails.

Critical detail: permits ride with the response body stream (PermitHoldingStream wrapper) rather than with the handler stack frame. Without this, SSE responses would release the permit the moment the upstream headers came back, breaking the cap for long generations. Tested.

Update semantics

Lazy. A PUT that changes max_concurrent updates the DB row but does not invalidate the in-memory semaphore. The new cap applies on the next process restart, or after the entry is evicted. Avoids atomic-swap edge cases on cap changes. Documented in code and surfaced in the UI ("New value applies on the next server restart").

Rejection behavior

Anthropic-shaped 429 with error.type = "overloaded_error" — same type the real upstream returns under load, so unmodified clients (Claude Code, GSD2) already know to back off.

  • Per-credential rejection message names the cap: "Too many concurrent requests against this credential (cap: N). Retry shortly."
  • Global rejection message: "Server is at capacity. Retry shortly."

Every reject logged via tracing::warn! with structured fields (reason = per_credential_cap | global_cap, cap_value, user_id, path) so post-merge we can distinguish "cap too low" from "someone is attacking."

Web

Numeric input on /me/proxy/ next to the key field, saved together. Pre-filled with the current value on rotation, defaults to 8 for fresh keys. Client-side bounds match the DB CHECK so the user sees an inline error instead of a 400. Existing config row shows the current cap inline ("· cap 8") next to the "Configured / last set" line.

Out of scope

  • Org-level Anthropic keys — cap mechanism will generalize; separate slice.
  • Routing between personal and org keys — comes before org keys per feat: LLM proxy service — universal integration and org-level key management #181 plan.
  • Per-token sub-caps — waits for a proxy_tokens table.
  • Per-IP or per-source-tool caps — different axis.
  • Adaptive caps based on observed upstream 429s.
  • Cap eviction / TTL on the DashMap — revisit if active credentials exceed ~10k.

Verification

Layer Status
cargo fmt --check
cargo clippy --workspace --tests --all-targets -- -D warnings
cargo test --workspace ✅ all green
proxy_integration tests 16 / 16 (3 new for caps + 13 pre-existing)
repo_user_anthropic_keys_test 11 / 11 (4 new for max_concurrent + 7 pre-existing)
pnpm check ✅ 0 errors, 0 warnings

New tests

  • proxy_rejects_when_per_credential_cap_exceeded — 2 in-flight succeed, 3rd returns 429 with cap: 2 in message.
  • proxy_frees_permit_when_request_completes — sequential request after first completes proves the permit drops correctly when the streaming body finishes.
  • proxy_rejects_when_global_cap_exceeded — second user blocked by global cap even with their own per-credential budget free.
  • upsert_persists_explicit_max_concurrent — cap roundtrip through repo.
  • upsert_without_cap_preserves_existing_value — key rotation without cap argument keeps existing value.
  • upsert_with_new_cap_overrides_existing_value — explicit cap on update wins.
  • status_reflects_presence — GET endpoint exposes the cap.

Files

  • crates/tracevault-server/migrations/025_user_anthropic_keys_max_concurrent.sql (new)
  • crates/tracevault-server/src/api/proxy.rs — permits, RAII stream wrapper, 429 paths
  • crates/tracevault-server/src/api/me.rsmax_concurrent in request/response
  • crates/tracevault-server/src/repo/user_anthropic_keys.rsStoredCredential, StoredStatus, COALESCE-based upsert
  • crates/tracevault-server/src/lib.rs — two new AppState fields
  • crates/tracevault-server/src/main.rs — env-driven global semaphore, DashMap construction
  • crates/tracevault-server/Cargo.tomldashmap = "6", futures-util = "0.3"
  • web/src/routes/me/proxy/+page.svelte — numeric input + status display

hashedone added 3 commits May 28, 2026 11:24
Add migration 025 with `max_concurrent INTEGER NOT NULL DEFAULT 8`
(CHECK 1..=256) on user_anthropic_keys. Extend UserAnthropicKeyRepo:

* upsert() now takes Optional<i32> cap; COALESCE preserves existing
  value on update, falls back to DB default 8 on insert.
* get_credential() returns the cap alongside ciphertext+nonce so the
  proxy hot path stays one DB round-trip.
* status() (renames the old configured_at()) returns the cap to the
  GET /me/anthropic-key endpoint, which now exposes it in the JSON
  response.
* PUT /me/anthropic-key accepts an optional max_concurrent with
  client-side bounds matching the DB CHECK.

Repo-layer tests cover: default-applied-on-insert, explicit-set,
preserve-on-rotate-without-new-cap, override-on-rotate-with-new-cap.
Existing tests updated to pass the new None for the optional cap.

Part of #210.
Add two semaphores to AppState and wire them into the proxy handler:

* `proxy_per_credential_semaphores` — `DashMap<Uuid, Arc<Semaphore>>`
  keyed by user_anthropic_keys.user_id. Lazily created on first request
  per credential, sized to the row's max_concurrent at that moment.
  Update semantics are intentionally lazy: a PUT changes the DB row but
  not the in-memory semaphore until restart, avoiding atomic-swap edge
  cases on cap changes.
* `proxy_global_semaphore` — `Option<Arc<Semaphore>>` from the new
  `PROXY_MAX_GLOBAL_CONCURRENT` env var. Unset = unlimited (the right
  default for current single-team deployments). Operator turns it on
  after capacity testing.

Acquisition order is global → per-credential so the per-credential
permit drops first (RAII), keeping the global cap free for other users
the moment a credential finishes.

Permits ride with the response body via a tiny PermitHoldingStream
wrapper. This is structurally critical for SSE: if permits dropped on
handler return rather than stream completion, the cap would only bound
'time to first byte' instead of 'time to last byte' — agents streaming
long generations would not be counted against the cap they're really
using.

Rejection paths return Anthropic-shaped 429 with `overloaded_error`,
matching what real Anthropic returns under load. Per-credential message
names the cap ("cap: N") so users can debug from their /me/proxy/ UI.
Telemetry: structured warn-level log on every reject with reason,
cap_value, user_id, path.

Adds dashmap = "6" and futures-util = "0.3" (already transitive) as
direct deps.

Three integration tests:
* proxy_rejects_when_per_credential_cap_exceeded — 3rd in-flight gets
  429, message names cap.
* proxy_frees_permit_when_request_completes — sequential request
  succeeds after first completes (proves permits release).
* proxy_rejects_when_global_cap_exceeded — second user blocked by
  global cap even with own per-credential budget free.

Closes #210 (backend half).
Add a numeric input next to the Anthropic key field. Saved together
with the key in one PUT, defaulting to 8 (the DB default) for fresh
keys, pre-filled with the current value on rotation.

Client-side bounds (1..=256) match the DB CHECK constraint so the user
sees a clear inline error instead of a 400 from the API.

Shows the current cap alongside the 'Configured / last set' line so
users can see what they've stored at a glance, and a help note
explaining that changes apply on next server restart (per the lazy
in-memory update semantics).

Closes #210 (frontend half).
/// decrypt the key. Returns `(plaintext, max_concurrent)` on success or an
/// Anthropic-shaped error envelope on any failure (no key configured, no
/// master key on this server, ciphertext corrupted, DB error).
async fn load_credential(state: &AppState, user_id: Uuid) -> Result<(String, i32), Response> {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The match over UserAnthropicKeyRepo::get_credential introduces nesting around error/none cases; prefer explicit guard-style early returns for Ok(None)/Err to flatten the happy path and improve readability.

Details

✨ AI Reasoning
​The new load_credential() code performs a DB fetch then handles Ok(Some), Ok(None), and Err cases in a single match. Although error cases already return, the match wraps the main-path credential extraction. Rewriting with guard-style checks (e.g., early-return on Ok(None) and Err before continuing to the happy path) would reduce the visual nesting and make the successful-path logic stand out more clearly.

🔧 How do I fix it?
Place parameter validation and guard clauses at the function start. Use early returns to reduce nesting levels and improve readability.

Reply @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.
Reply @AikidoSec ignore: [REASON] to ignore this issue.
More info

// we ship to today. Operators turn this on after capacity testing; a
// sensible starting value is 256.
let proxy_global_semaphore: Option<std::sync::Arc<tokio::sync::Semaphore>> =
match std::env::var("PROXY_MAX_GLOBAL_CONCURRENT") {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nested match used to parse and validate PROXY_MAX_GLOBAL_CONCURRENT is dense; consider simplifying the parsing/validation into a small helper to reduce nesting and aid readability.

Details

✨ AI Reasoning
​A match expression is nested inside another match to parse and validate an environment variable and log on failure; this increases the mental steps to follow the success and multiple failure branches all at the declaration site. The code tries to parse PROXY_MAX_GLOBAL_CONCURRENT, logs on parse failure, and returns different Option states depending on nested matches — it's a multi-branch decision tree introduced in this change.

🔧 How do I fix it?
Break long lines to enhance clarity. Aim for a maximum of 80-120 characters per line, depending on the context and coding standards.

Reply @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.
Reply @AikidoSec ignore: [REASON] to ignore this issue.
More info

Comment on lines +53 to +57
// max_concurrent is NULL when the caller didn't specify one and
// the DB default kicks in for the column.
sqlx::query(
"INSERT INTO user_anthropic_keys (user_id, key_encrypted, key_nonce)
VALUES ($1, $2, $3)
"INSERT INTO user_anthropic_keys (user_id, key_encrypted, key_nonce, max_concurrent)
VALUES ($1, $2, $3, COALESCE($4, 8))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment says DB default applies on insert, but VALUES (..., COALESCE($4, 8)) always supplies 8 directly. The documented control path is impossible under this query.

Suggested change
// max_concurrent is NULL when the caller didn't specify one and
// the DB default kicks in for the column.
sqlx::query(
"INSERT INTO user_anthropic_keys (user_id, key_encrypted, key_nonce)
VALUES ($1, $2, $3)
"INSERT INTO user_anthropic_keys (user_id, key_encrypted, key_nonce, max_concurrent)
VALUES ($1, $2, $3, COALESCE($4, 8))
// max_concurrent is NULL when the caller didn't specify one and
VALUES ($1, $2, $3, $4)
Details

✨ AI Reasoning
​The code intends to preserve existing caps when omitted and use a default on inserts. However, the inserted value is always computed in SQL with a fallback literal, so the described fallback mechanism is not what executes. This creates a direct contradiction between behavior and explanatory text, which can mislead future maintenance and debugging around cap semantics.

Reply @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.
Reply @AikidoSec ignore: [REASON] to ignore this issue.
More info

Comment on lines +75 to +95
// Optional global concurrency cap across all proxy requests. Unset = no
// global limit; this is the right default for the small-team deployments
// we ship to today. Operators turn this on after capacity testing; a
// sensible starting value is 256.
let proxy_global_semaphore: Option<std::sync::Arc<tokio::sync::Semaphore>> =
match std::env::var("PROXY_MAX_GLOBAL_CONCURRENT") {
Ok(s) => match s.parse::<usize>() {
Ok(n) if n > 0 => {
tracing::info!(cap = n, "proxy global concurrency cap enabled");
Some(std::sync::Arc::new(tokio::sync::Semaphore::new(n)))
}
_ => {
tracing::warn!(
value = %s,
"PROXY_MAX_GLOBAL_CONCURRENT is set but not a positive integer; ignoring"
);
None
}
},
Err(_) => None,
};
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nested match expressions parse PROXY_MAX_GLOBAL_CONCURRENT; prefer early-return/guard-style parsing to flatten control flow (e.g., if let Ok(s) = env::var(...) { if let Ok(n) = s.parse::() { ... } } or return/continue on parse failure).

Show fix
Suggested change
// Optional global concurrency cap across all proxy requests. Unset = no
// global limit; this is the right default for the small-team deployments
// we ship to today. Operators turn this on after capacity testing; a
// sensible starting value is 256.
let proxy_global_semaphore: Option<std::sync::Arc<tokio::sync::Semaphore>> =
match std::env::var("PROXY_MAX_GLOBAL_CONCURRENT") {
Ok(s) => match s.parse::<usize>() {
Ok(n) if n > 0 => {
tracing::info!(cap = n, "proxy global concurrency cap enabled");
Some(std::sync::Arc::new(tokio::sync::Semaphore::new(n)))
}
_ => {
tracing::warn!(
value = %s,
"PROXY_MAX_GLOBAL_CONCURRENT is set but not a positive integer; ignoring"
);
None
}
},
Err(_) => None,
};
// Optional global concurrency cap across all proxy requests. Unset = no
let proxy_global_semaphore: Option<std::sync::Arc<tokio::sync::Semaphore>> = (|| {
let s = std::env::var("PROXY_MAX_GLOBAL_CONCURRENT").ok()?;
let n = s.parse::<usize>().ok()?;
if n == 0 {
tracing::warn!(
value = %s,
"PROXY_MAX_GLOBAL_CONCURRENT is set but not a positive integer; ignoring"
);
return None;
}
tracing::info!(cap = n, "proxy global concurrency cap enabled");
Some(std::sync::Arc::new(tokio::sync::Semaphore::new(n)))
})();
Details

✨ AI Reasoning
​The added code in main() parses an optional environment variable and uses a match nesting (match std::env::var { Ok(s) => match s.parse() { ... } Err(_) => None }). This is input validation and initialization logic that is currently nested two levels. Converting the inner branches to be handled with early returns/guards (or using if-let chains) would flatten the control flow and make the initialization intent clearer, reducing visual indentation and easing future edits.

Reply @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.
Reply @AikidoSec ignore: [REASON] to ignore this issue.
More info

Previously the PUT /me/anthropic-key endpoint required a new key on every
request, which made 'just bump the concurrency cap' awkward — rotating
the key just to nudge max_concurrent felt wrong, and the user couldn't
do it from the UI at all without saving a new key.

Backend:
* PUT now accepts an optional `key` field. At least one of
  `key`/`max_concurrent` must be present; both-absent returns 400.
* Settings-only PUT (cap without key) requires an existing row — we
  refuse to insert a half-row with cap but no ciphertext.
* New `UserAnthropicKeyRepo::update_max_concurrent` does the cap-only
  UPDATE without touching key_encrypted / key_nonce.
* On any successful PUT, drop the in-memory per-credential semaphore
  from the AppState DashMap. The next proxy request rebuilds it
  against the freshly-persisted cap. In-flight requests keep their
  permits on the now-orphaned Arc<Semaphore> until they finish —
  effective cap is briefly (old + new) but only for the duration of
  the in-flight responses, which is the natural quiet point for the
  change to take effect.

Frontend:
* Field label: 'Rotate key (optional)' when configured, 'Set key'
  otherwise.
* Button label: 'Update' / 'Save' instead of 'Replace'.
* Submit gated on a `hasUnsavedChange` derived: typed key OR cap
  changed from stored value.
* Body only includes `key` when the user typed one — preserves the
  ciphertext on cap-only updates.
* Help-text updated: 'New value applies on the next proxy request'
  (was 'on the next server restart' — outdated by this change).

Three new integration tests:
* me_anthropic_key_put_updates_cap_only — proves ciphertext preserved,
  cap updated, semaphore dropped.
* me_anthropic_key_put_rejects_cap_only_when_unconfigured — 400 when
  no row exists.
* me_anthropic_key_put_rejects_empty_body — 400 on neither-present.

Part of #210.
/// True when the user has either typed a new key or changed the cap
/// away from what's stored. The submit button is gated on this — it
/// prevents the user from submitting a no-op request.
const hasUnsavedChange = $derived.by(() => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anonymous arrow function passed to $derived.by is multi-step and non-trivial; extract into a named function to improve readability and testability (e.g., computeHasUnsavedChange()).

Details

✨ AI Reasoning
​An anonymous arrow callback with several distinct steps was added as the argument to a reactive derived helper. It computes keyTyped, capChanged, then applies conditional logic depending on status.configured. This mixes multiple related but separate decisions in an unnamed lambda, which reduces readability and testability. Extracting or naming the callback would clarify its purpose and make unit testing or reuse easier.

🔧 How do I fix it?
Extract complex anonymous functions into named functions with descriptive names, or add explanatory comments for their purpose.

Reply @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.
Reply @AikidoSec ignore: [REASON] to ignore this issue.
More info

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant