feat(swap-service): tri-state verificationStatus + decoupled tx/verify polling#37
Conversation
…fy polling Adds VerificationStatus enum (PENDING/SUCCESS/FAILED) and verificationStatus column to the Swap model, decouples affiliate verification from pollSwapStatus, and gates verification on tx-status terminal so each axis can succeed independently. Per-verifier mapping applied for Near and Relay; other verifiers blanket- mapped to SUCCESS. Manual backfill SQL after migration deploy: UPDATE swaps SET "verificationStatus"='SUCCESS' WHERE "isAffiliateVerified" IS NOT NULL; Open TODOs (out of scope for this PR): PENDING termination criterion, admin re-verify endpoint, per-verifier mapping for the remaining verifiers, per-call HTTP timeouts. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ogging Replaces stamped-out short-circuit objects across all verifiers with a noAffiliateResult(status, reason) helper, and drops per-verifier try/catch wraps so HTTP/parse failures flow to the top-level catch (PENDING + retry) instead of incorrectly terminalizing as SUCCESS. Refines per-verifier status mappings under one rule: - missing identifier we need to query upstream -> FAILED - upstream returned no data for our tx -> PENDING (indexer lag) - got our tx, no affiliate marker present -> SUCCESS (definitive) Notable corrections: Thorchain/Maya !memo is now SUCCESS (memo is immutable on-chain - absence is definitive). ButterSwap !bridgeInfo is now PENDING (didn't get data for our tx). HTTP errors across all verifiers now route to PENDING via the top-level catch. Single boundary log in verifySwap replaces logVerification (per- verifier) and the redundant log in swaps.service.ts:verifySwap. Logs now fire for every path (happy/short-circuit/thrown). Renames error -> noAffiliateReason on SwapVerificationResult - the field is only populated when hasAffiliate is false, so "error" was misleading for SUCCESS-no-affiliate cases. Renames pollSwapStatus -> checkSwapStatus (polling lives in pollOne; this method does a single fetch). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Decouples the two axes that were sharing a single 5s cron + worker
pool. CowSwap and other multi-call verifications could starve
tx-status workers, and verification retries were hammering upstream
APIs at tx-status cadence (5s).
Polling layer now has two crons with independent isPolling guards
and shared worker concurrency cap (10):
- pollPendingTxStatus every 5s
- pollPendingVerification every 30s
Each fetches its own focused swap set:
- getPendingTxSwaps status IN ('IDLE','PENDING')
- getPendingVerificationSwaps verificationStatus='PENDING'
AND status IN ('SUCCESS','FAILED')
The status filters are mutually exclusive, so swaps never appear in
both queues. Worker pools are also independent — long-running
verifier I/O can't block tx-status checks.
Collapses markVerificationFailed into verifySwap as a status='FAILED'
short-circuit (the only caller was the poll loop's else branch).
Drops unused getPendingSwaps service method and GET /swaps/pending
controller route — no callers in the monorepo or web repo.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previous catch silently dropped the error and logged only a generic message. Capture err and pass it to logger.error so the stack trace makes it to the log output. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Warning Rate limit exceeded
To continue reviewing without waiting, purchase usage credits in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
📝 WalkthroughWalkthroughThis pull request refactors swap verification and polling logic: it introduces an enum-based verification status ('PENDING', 'SUCCESS', 'FAILED') to replace boolean verification, adds database schema and affiliate-tracking fields, splits polling into concurrent TX-status and verification paths, and rewrites verification logic across all swapper integrations to compute and persist affiliate details. ChangesSwap Verification & Polling Refactoring
Sequence DiagramsequenceDiagram
participant Scheduler as Scheduler (Cron)
participant PollingService as SwapPollingService
participant SwapsService as SwapsService
participant VerificationService as SwapVerificationService
participant Swapper as SwapperService
participant DB as Database
participant WS as WebSocket
rect rgb(135, 206, 250, 0.5)
Note over PollingService,WS: New TX Status Polling (5s cron)
Scheduler->>PollingService: pollPendingTxStatus()
PollingService->>SwapsService: getPendingTxSwaps()
SwapsService->>DB: fetch swaps (sellTxHash != null, status in [IDLE, PENDING])
DB-->>SwapsService: pending TX swaps
SwapsService-->>PollingService: swaps[]
par Concurrent Workers
PollingService->>PollingService: runWorkers(swaps, pollTxStatus)
PollingService->>SwapsService: checkSwapStatus(swapId)
SwapsService->>Swapper: get trade status
Swapper-->>SwapsService: status response
SwapsService-->>PollingService: normalized status
alt Status Changed
PollingService->>SwapsService: updateSwapStatus()
SwapsService->>DB: persist status change
SwapsService->>WS: notify user (status delta)
end
end
end
rect rgb(144, 238, 144, 0.5)
Note over PollingService,WS: New Verification Polling (30s cron)
Scheduler->>PollingService: pollPendingVerification()
PollingService->>SwapsService: getPendingVerificationSwaps()
SwapsService->>DB: fetch swaps (verificationStatus = PENDING, status in [SUCCESS, FAILED])
DB-->>SwapsService: pending verification swaps
SwapsService-->>PollingService: swaps[]
par Concurrent Workers
PollingService->>PollingService: runWorkers(swaps, pollVerification)
PollingService->>SwapsService: verifySwap(swap)
SwapsService->>VerificationService: verifySwap(swap)
VerificationService->>VerificationService: route by swapperName
alt Via Swapper API
VerificationService->>Swapper: fetch verification data
Swapper-->>VerificationService: trade/order data
end
VerificationService-->>VerificationService: compute verificationStatus & affiliate details
VerificationService-->>SwapsService: SwapVerificationResult
alt Verification Complete (SUCCESS or FAILED)
SwapsService->>DB: persist verificationStatus & affiliate fields
end
SwapsService-->>PollingService: result
PollingService->>PollingService: log changes if status updated
end
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~90 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
fe9a326 to
4a09ffa
Compare
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
apps/swap-service/src/swaps/swaps.service.ts (2)
235-256:⚠️ Potential issue | 🟠 Major | ⚡ Quick winBound the polling result set with
take+ deterministic order.
getPendingTxSwapsandgetPendingVerificationSwapsare called every 5s / 30s and bothfindManywithout atakeororderBy. With backlog growth (e.g., a verifier outage that holds many swaps inverificationStatus='PENDING'), each tick will load the entire backlog into memory beforerunWorkersprocesses it 10 at a time, and the next cron tick is gated by theisPolling*flag — so a slow tick can also delay reaction time. Adding a bounded page (with deterministic ordering for fairness) keeps each tick predictable in memory and latency.🛡️ Suggested bound
async getPendingTxSwaps(): Promise<Swap[]> { const swaps = await this.prisma.swap.findMany({ where: { sellTxHash: { not: null }, status: { in: ['IDLE', 'PENDING'] }, }, + orderBy: { updatedAt: 'asc' }, + take: 200, }) return swaps.map(toSwap) } async getPendingVerificationSwaps(): Promise<Swap[]> { const swaps = await this.prisma.swap.findMany({ where: { sellTxHash: { not: null }, verificationStatus: 'PENDING', status: { in: ['SUCCESS', 'FAILED'] }, }, + orderBy: { updatedAt: 'asc' }, + take: 200, }) return swaps.map(toSwap) }Pair this with the existing index on
(verificationStatus, sellTxHash)so the verification query is index-served; consider a similar partial/composite index for the tx query ifEXPLAINshows a seq scan at scale.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/swap-service/src/swaps/swaps.service.ts` around lines 235 - 256, Both getPendingTxSwaps and getPendingVerificationSwaps currently call prisma.swap.findMany without a limit or ordering which can load an unbounded backlog; update each function (the prisma.swap.findMany calls) to include a bounded take (e.g., take: 100) and a deterministic orderBy (for example orderBy: [{ createdAt: 'asc' }, { id: 'asc' }]) so each poll returns a predictable, fair page; keep existing where filters unchanged and ensure the chosen order columns are indexed (e.g., createdAt or id) for performance.
246-256:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winRemove
sellTxHash: { not: null }filter from the verification query.The current filter in
getPendingVerificationSwaps()excludes FAILED swaps that lack asellTxHash, butverifySwap()already short-circuits on anystatus='FAILED'swap without requiringsellTxHash. This creates a gap where FAILED swaps originating from pre-existing data, other services, or direct API calls could remain atverificationStatus='PENDING'indefinitely. The short-circuit logic inverifySwap()is sufficient to handle these swaps safely; dropping thesellTxHashfilter aligns the query with the actual implementation behavior.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/swap-service/src/swaps/swaps.service.ts` around lines 246 - 256, The query in getPendingVerificationSwaps() currently filters out records with sellTxHash null which can leave FAILED swaps stuck in verification; remove the sellTxHash: { not: null } condition from the Prisma findMany where clause so the method returns all swaps with verificationStatus 'PENDING' and status in ['SUCCESS','FAILED'], leaving verifySwap()'s internal short-circuit handling of FAILED/no-sellTxHash cases intact; update the function getPendingVerificationSwaps() accordingly so it no longer references sellTxHash in the where filter.
🧹 Nitpick comments (1)
apps/swap-service/src/polling/swap-polling.service.ts (1)
8-67: ⚡ Quick winConcurrency cap is per-cron, not shared — total concurrent workers can reach 2 ×
POLL_CONCURRENCY.
POLL_CONCURRENCY = 10is consumed independently bypollPendingTxStatusandpollPendingVerification, so when both crons are mid-flight (the 30s verification tick will frequently overlap several 5s tx ticks) up to 20 handlers can be in flight, each potentially holding a DB connection and an outbound HTTP socket. The PR description says "shared concurrency cap", which suggests a single semaphore was intended. Two options:
- Update the description if independent caps are the actual intent (and consider naming them e.g.
TX_POLL_CONCURRENCY/VERIFICATION_POLL_CONCURRENCYfor clarity).- Introduce a shared semaphore (or pass a single queue/limiter into
runWorkers) so both crons draw from one cap.Either is fine, but the current state is a behavior/description mismatch worth resolving before merge so capacity planning (Prisma pool, upstream rate limits) lines up.
♻️ Sketch of a shared limiter
-const POLL_CONCURRENCY = 10 +const POLL_CONCURRENCY = 10 + +class Semaphore { + private active = 0 + private waiters: Array<() => void> = [] + constructor(private readonly limit: number) {} + async acquire(): Promise<void> { + if (this.active < this.limit) { + this.active += 1 + return + } + await new Promise<void>(resolve => this.waiters.push(resolve)) + this.active += 1 + } + release(): void { + this.active -= 1 + const next = this.waiters.shift() + if (next) next() + } +} @@ - private isPollingTx = false - private isPollingVerification = false + private isPollingTx = false + private isPollingVerification = false + private readonly limiter = new Semaphore(POLL_CONCURRENCY) @@ - private async runWorkers(swaps: Swap[], handler: (swap: Swap) => Promise<void>): Promise<void> { - const queue = [...swaps] - const workers = Array.from({ length: Math.min(POLL_CONCURRENCY, queue.length) }, async () => { - while (queue.length > 0) { - const swap = queue.shift() - if (swap) await handler(swap) - } - }) - await Promise.all(workers) - } + private async runWorkers(swaps: Swap[], handler: (swap: Swap) => Promise<void>): Promise<void> { + await Promise.all( + swaps.map(async swap => { + await this.limiter.acquire() + try { + await handler(swap) + } finally { + this.limiter.release() + } + }), + ) + }(Or use a battle-tested limiter like
p-limitif you'd rather not roll your own.)Please confirm whether "shared concurrency cap" in the PR description was meant literally; if so, either the limiter or the description should change before merge.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/swap-service/src/polling/swap-polling.service.ts` around lines 8 - 67, The PR claims a "shared concurrency cap" but POLL_CONCURRENCY is applied separately inside pollPendingTxStatus and pollPendingVerification via runWorkers, so both crons can run up to POLL_CONCURRENCY each (totaling up to 2× capacity); either update the PR/variable naming to reflect independent caps (e.g., TX_POLL_CONCURRENCY / VERIFICATION_POLL_CONCURRENCY) or implement a shared limiter and pass it into runWorkers so both pollPendingTxStatus and pollPendingVerification draw from the same semaphore/limiter (alternative: use a library like p-limit) — update references to POLL_CONCURRENCY, pollPendingTxStatus, pollPendingVerification, and runWorkers accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@apps/swap-service/src/verification/__tests__/fixtures/relay/swap.ts`:
- Line 77: The fixture has verificationStatus: 'SUCCESS' but status remains
'PENDING', creating an impossible state; update the fixture's status field (the
surrounding "status" property in the swap fixture object that also contains
buyTxHash and "Swap complete") to a terminal state consistent with verification
(e.g., 'SUCCESS' or whatever project uses for completed swaps) so verification
tests reflect the invariant that verification runs only after tx status is
terminal.
In `@apps/swap-service/src/verification/swap-verification.service.ts`:
- Around line 97-99: Several branches in swap-verification (notably the
SwapperName.Debridge and SwapperName.Test cases that call
noAffiliateResult('SUCCESS', ...)) incorrectly mark unimplemented verifiers as
SUCCESS; change these to return a non-terminal state (e.g., 'PENDING' or
explicit 'FAILED') so unverified attribution is not persisted as successful.
Update the cases referencing SwapperName.Debridge and SwapperName.Test to call
noAffiliateResult with 'PENDING' (or 'FAILED' if you prefer explicit failure)
instead of 'SUCCESS', and make the same change for the other listed blocks
(around the additional ranges) that currently return 'SUCCESS' without
performing verification; ensure any code that persists verificationStatus (the
result from noAffiliateResult) will not treat the new state as a final success.
- Around line 534-548: The code logs full Bebop request/response payloads on
every poll (JSON.stringify(response.data)) which is noisy and leaks user trade
data; change logging to only emit stable identifiers (keep logging requestUrl,
txHash, and the startNano/endNano/swapTimestamp values) via this.logger.log, and
move the full payload log (response.data) to a debug-level call
(this.logger.debug) or remove it entirely; update the block around the call to
this.httpService.get and firstValueFrom so JSON.stringify(response.data) is no
longer logged at info level and any verbose payloads are gated behind debug (use
this.logger.debug for the payload and keep the rest as-is).
- Around line 494-503: The affiliateBps calculation can produce floating-point
artifacts; update the affiliateBps assignment in swap-verification.service.ts
(the integratorFee -> affiliateBps logic) to normalize to an integer bps value
by parsing integratorFee as a number, multiplying by 10000, and rounding to the
nearest integer (preserving undefined when integratorFee is absent) so the
returned affiliateBps is an integer rather than a floating-point artifact.
In `@prisma/migrations/20260505235634_add_swap_verification_status/migration.sql`:
- Around line 5-8: The migration currently adds swaps.verificationStatus with
NOT NULL DEFAULT 'PENDING', which will mark all historical rows as PENDING;
instead, extend the migration to backfill historical rows deterministically:
after adding the verificationStatus column (and before committing/creating the
index), run UPDATE(s) that set verificationStatus = 'COMPLETED' for swaps
already in your terminal-success state(s) and verificationStatus = 'FAILED' for
swaps already in terminal-failure state(s) (use your existing status/phase
columns or other canonical indicators), leaving only truly in-flight rows as
'PENDING'; keep the DEFAULT as-is and then create the
swaps_verificationStatus_sellTxHash_idx index. Ensure you reference the "swaps"
table, the "verificationStatus" column and the "VerificationStatus" enum when
applying the UPDATE logic so the migration is idempotent and deterministic.
In `@prisma/schema/swap-service.prisma`:
- Around line 50-55: The compound index on the swaps model currently uses
(verificationStatus, sellTxHash) which doesn't match the worker's query that
filters on verificationStatus and status; update the Prisma index definition for
verificationStatus to either @@index([verificationStatus, status]) or
@@index([verificationStatus, status, sellTxHash]) so the verification poll can
use the index efficiently—locate the index line referencing verificationStatus
and sellTxHash in the swaps model and change the field list accordingly.
---
Outside diff comments:
In `@apps/swap-service/src/swaps/swaps.service.ts`:
- Around line 235-256: Both getPendingTxSwaps and getPendingVerificationSwaps
currently call prisma.swap.findMany without a limit or ordering which can load
an unbounded backlog; update each function (the prisma.swap.findMany calls) to
include a bounded take (e.g., take: 100) and a deterministic orderBy (for
example orderBy: [{ createdAt: 'asc' }, { id: 'asc' }]) so each poll returns a
predictable, fair page; keep existing where filters unchanged and ensure the
chosen order columns are indexed (e.g., createdAt or id) for performance.
- Around line 246-256: The query in getPendingVerificationSwaps() currently
filters out records with sellTxHash null which can leave FAILED swaps stuck in
verification; remove the sellTxHash: { not: null } condition from the Prisma
findMany where clause so the method returns all swaps with verificationStatus
'PENDING' and status in ['SUCCESS','FAILED'], leaving verifySwap()'s internal
short-circuit handling of FAILED/no-sellTxHash cases intact; update the function
getPendingVerificationSwaps() accordingly so it no longer references sellTxHash
in the where filter.
---
Nitpick comments:
In `@apps/swap-service/src/polling/swap-polling.service.ts`:
- Around line 8-67: The PR claims a "shared concurrency cap" but
POLL_CONCURRENCY is applied separately inside pollPendingTxStatus and
pollPendingVerification via runWorkers, so both crons can run up to
POLL_CONCURRENCY each (totaling up to 2× capacity); either update the
PR/variable naming to reflect independent caps (e.g., TX_POLL_CONCURRENCY /
VERIFICATION_POLL_CONCURRENCY) or implement a shared limiter and pass it into
runWorkers so both pollPendingTxStatus and pollPendingVerification draw from the
same semaphore/limiter (alternative: use a library like p-limit) — update
references to POLL_CONCURRENCY, pollPendingTxStatus, pollPendingVerification,
and runWorkers accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 45e6e6fa-ac30-4bde-acae-f311577b7aba
📒 Files selected for processing (12)
apps/swap-service/src/polling/swap-polling.service.tsapps/swap-service/src/swaps/swaps.controller.tsapps/swap-service/src/swaps/swaps.service.tsapps/swap-service/src/verification/__tests__/fixtures/near/swap.tsapps/swap-service/src/verification/__tests__/fixtures/relay/swap.tsapps/swap-service/src/verification/__tests__/near.test.tsapps/swap-service/src/verification/__tests__/relay.test.tsapps/swap-service/src/verification/swap-verification.service.tsapps/swap-service/src/verification/utils.tspackages/shared-types/src/index.tsprisma/migrations/20260505235634_add_swap_verification_status/migration.sqlprisma/schema/swap-service.prisma
💤 Files with no reviewable changes (1)
- apps/swap-service/src/swaps/swaps.controller.ts
- Fix relay fixture: status was PENDING but completion fields (buyTxHash, statusMessage, actualBuy) indicated a finished swap. Match the new invariant that verification only runs after tx-status terminalizes. - Round 0x integrator fee bps to integer to avoid float artifacts when fees are decimal-encoded. - Re-target the verification poll index to (verificationStatus, status) so the 30s cron's predicate is fully covered by the index. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Description
Decouples affiliate verification from tx-status polling and replaces the boolean
isAffiliateVerifiedretry signal with a tri-stateverificationStatusenum (PENDING/SUCCESS/FAILED) on theSwapmodel.Previously, verification ran inside
pollSwapStatusand was gated by tx-status: once the tx terminalized (SUCCESS/FAILED) the swap fell out of theIDLE/PENDINGpolling query and verification stopped retrying forever. Each axis can now succeed independently:@Cron EVERY_5_SECONDS) — selectsstatus IN ('IDLE','PENDING')@Cron EVERY_30_SECONDS) — selectsverificationStatus = 'PENDING' AND status IN ('SUCCESS','FAILED')Verification is gated on tx-status terminal:
FAILEDshort-circuitsverificationStatustoFAILED;SUCCESSruns the verifier and the verifier's tri-state output is persisted. Verifier outputs were consolidated through anoAffiliateResult(status, reason)helper and a singlelogResultcall; the per-verifiertry/catch + unverified()boilerplate is gone.Per-verifier mapping (Near + Relay only on this branch — Thorchain mapping lives on a sibling WIP branch):
depositAddress→FAILED; happy path →SUCCESS; SDK throw → top-level catchPENDING(transient retry)relayId→FAILED; "no request data" →PENDING; HTTP throw → top-level catchPENDING; happy path →SUCCESSverifySwap→PENDING(transient retry)SUCCESSterminal (no verifier — don't retry forever)Schema: single migration adds the
VerificationStatusenum, the column with@default(PENDING), and an index on(verificationStatus, sellTxHash). ExistingisAffiliateVerified Boolean?is kept as the orthogonal "is-shapeshift-affiliate" derived flag — two-axis split is more flexible than flattening into one enum. After deploy, manually backfill historical rows:UPDATE swaps SET "verificationStatus"='SUCCESS' WHERE "isAffiliateVerified" IS NOT NULL;(NULL rows keep defaultPENDINGand re-verify automatically).Removes the unused
GET /swaps/pendingcontroller route (only the cron called it).Out of scope (deferred follow-ups)
SUCCESS)Testing
yarn db:generateafter pulling, thenyarn db:migrateto apply20260505235634_add_swap_verification_statusyarn workspace @shapeshift/swap-service test verificationverificationStatustransitions PENDING → SUCCESS once tx terminalizesFAILED— confirmverificationStatusshort-circuits toFAILEDwithout an outbound verifier callSUCCESSSummary by CodeRabbit
Breaking Changes
/swaps/pendingendpoint; pending swaps now queried separately for transactions and verifications.verificationStatus('PENDING' | 'SUCCESS' | 'FAILED') instead ofisVerifiedboolean.New Features