Skip to content
99 changes: 63 additions & 36 deletions apps/swap-service/src/polling/swap-polling.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Injectable, Logger } from '@nestjs/common'
import { Cron, CronExpression } from '@nestjs/schedule'

import { SwapsService } from '../swaps/swaps.service'
import type { Swap } from '../swaps/types'
import { WebsocketGateway } from '../websocket/websocket.gateway'

const POLL_CONCURRENCY = 10
Expand All @@ -10,67 +11,93 @@ const POLL_CONCURRENCY = 10
export class SwapPollingService {
private readonly logger = new Logger(SwapPollingService.name)

private isPolling = false
private isPollingTx = false
private isPollingVerification = false

constructor(
private swapsService: SwapsService,
private websocketGateway: WebsocketGateway,
) {}

@Cron(CronExpression.EVERY_5_SECONDS)
async pollPendingSwaps() {
if (this.isPolling) return
this.isPolling = true
async pollPendingTxStatus() {
if (this.isPollingTx) return
this.isPollingTx = true

try {
// TODO: paginate with a batch size + oldest-first ordering once the in-flight
// queue grows enough that one cron tick can't drain it within the 5s interval.
const pendingSwaps = await this.swapsService.getPendingSwaps()
if (pendingSwaps.length === 0) return

this.logger.log(`Polling ${pendingSwaps.length} pending swaps`)

const queue = [...pendingSwaps]
const workers = Array.from({ length: Math.min(POLL_CONCURRENCY, queue.length) }, async () => {
while (queue.length > 0) {
const swap = queue.shift()
if (swap) await this.pollOne(swap)
}
})
await Promise.all(workers)
const swaps = await this.swapsService.getPendingTxSwaps()
if (swaps.length === 0) return

this.logger.log(`Polling tx status for ${swaps.length} swaps`)
await this.runWorkers(swaps, (swap) => this.pollTxStatus(swap))
} catch (err) {
this.logger.error('Failed to poll pending swaps:', err)
this.logger.error('Failed to poll pending tx status:', err)
} finally {
this.isPolling = false
this.isPollingTx = false
}
}

private async pollOne(swap: Awaited<ReturnType<SwapsService['getPendingSwaps']>>[number]): Promise<void> {
const statusUpdate = await (async () => {
try {
return await this.swapsService.pollSwapStatus(swap.swapId)
} catch (err) {
this.logger.error(`Failed to poll swap ${swap.swapId}:`, err)
return
}
})()
@Cron(CronExpression.EVERY_30_SECONDS)
async pollPendingVerification() {
if (this.isPollingVerification) return
this.isPollingVerification = true

if (!statusUpdate || statusUpdate.status === swap.status) return
try {
const swaps = await this.swapsService.getPendingVerificationSwaps()
if (swaps.length === 0) return

this.logger.log(`Status changed for swap ${swap.swapId}: ${swap.status} -> ${statusUpdate.status}`)
this.logger.log(`Polling verification for ${swaps.length} swaps`)
await this.runWorkers(swaps, (swap) => this.pollVerification(swap))
} catch (err) {
this.logger.error('Failed to poll pending verification:', err)
} finally {
this.isPollingVerification = false
}
}

private async runWorkers(swaps: Swap[], handler: (swap: Swap) => Promise<void>): Promise<void> {
const queue = [...swaps]
const workers = Array.from({ length: Math.min(POLL_CONCURRENCY, queue.length) }, async () => {
while (queue.length > 0) {
const swap = queue.shift()
if (swap) await handler(swap)
}
})
await Promise.all(workers)
}

private async pollTxStatus(swap: Swap): Promise<void> {
try {
const updatedSwap = await this.swapsService.updateSwapStatus({
const statusUpdate = await this.swapsService.checkSwapStatus(swap.swapId)

if (statusUpdate.status === swap.status) return

this.logger.log(`Status changed for swap ${swap.swapId}: ${swap.status} -> ${statusUpdate.status}`)

const updated = await this.swapsService.updateSwapStatus({
swapId: swap.swapId,
status: statusUpdate.status,
sellTxHash: statusUpdate.sellTxHash,
buyTxHash: statusUpdate.buyTxHash,
statusMessage: statusUpdate.statusMessage,
})

this.websocketGateway.sendSwapUpdateToUser(swap.userId, updatedSwap)
} catch (error) {
this.logger.error(`Failed to persist status change for swap ${swap.swapId}:`, error)
this.websocketGateway.sendSwapUpdateToUser(updated.userId, updated)
} catch (err) {
this.logger.error(`Failed to poll tx status for swap ${swap.swapId}:`, err)
}
}

private async pollVerification(swap: Swap): Promise<void> {
try {
const updated = await this.swapsService.verifySwap(swap)
if (updated.verificationStatus !== swap.verificationStatus) {
this.logger.log(
`Verification changed for swap ${swap.swapId}: ${swap.verificationStatus} -> ${updated.verificationStatus}`,
)
}
} catch (err) {
this.logger.error(`Failed to verify swap ${swap.swapId}:`, err)
}
}
}
5 changes: 0 additions & 5 deletions apps/swap-service/src/swaps/swaps.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ export class SwapsController {
return this.swapsService.createSwap(data)
}

@Get('pending')
async getPendingSwaps() {
return this.swapsService.getPendingSwaps()
}

@Get(':swapId')
async getSwapById(@Param('swapId') swapId: string) {
const swap = await this.swapsService.getSwapById(swapId)
Expand Down
80 changes: 45 additions & 35 deletions apps/swap-service/src/swaps/swaps.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,11 @@ export class SwapsService {

try {
await this.sendStatusUpdateNotification(swap)
} catch {
logger.error(`Failed to send notification for swap ${swap.swapId}`)
} catch (err) {
logger.error(`Failed to send notification for swap ${swap.swapId}:`, err)
}

logger.log(`Swap status updated: ${swap.swapId} -> ${swap.status}`)
logger.log(`Swap status updated for swap: ${swap.swapId} (${swap.status})`)

return swap
} catch (error) {
Expand Down Expand Up @@ -232,9 +232,24 @@ export class SwapsService {
return { swaps: rows.map(toSwap), nextCursor: getNextCursor(rows, limit) }
}

async getPendingSwaps(): Promise<Swap[]> {
async getPendingTxSwaps(): Promise<Swap[]> {
const swaps = await this.prisma.swap.findMany({
where: { status: { in: ['IDLE', 'PENDING'] }, sellTxHash: { not: null } },
where: {
sellTxHash: { not: null },
status: { in: ['IDLE', 'PENDING'] },
},
})

return swaps.map(toSwap)
}

async getPendingVerificationSwaps(): Promise<Swap[]> {
const swaps = await this.prisma.swap.findMany({
where: {
sellTxHash: { not: null },
verificationStatus: 'PENDING',
status: { in: ['SUCCESS', 'FAILED'] },
},
})

return swaps.map(toSwap)
Expand Down Expand Up @@ -347,8 +362,8 @@ export class SwapsService {
}
}

async pollSwapStatus(swapId: string): Promise<SwapStatusResponse> {
logger.log(`Polling status for swap: ${swapId}`)
async checkSwapStatus(swapId: string): Promise<SwapStatusResponse> {
logger.log(`Checking status for swap: ${swapId}`)

const prismaSwap = await this.prisma.swap.findUnique({ where: { swapId } })
if (!prismaSwap) throw new NotFoundException(`Swap not found: ${swapId}`)
Expand All @@ -374,61 +389,56 @@ export class SwapsService {

const statusMessage = Array.isArray(message) ? message[0] : message

await this.reconcileSwap(swap)

return {
status: status === TxStatus.Confirmed ? 'SUCCESS' : status === TxStatus.Failed ? 'FAILED' : 'PENDING',
sellTxHash: swap.sellTxHash,
buyTxHash,
statusMessage: typeof statusMessage === 'string' ? statusMessage : '',
}
} catch (error) {
logger.error(`Failed to poll swap status for ${swapId}:`, error)
logger.error(`Failed to check swap status for ${swapId}:`, error)
return {
status: 'PENDING',
statusMessage: `Error polling status: ${error instanceof Error ? error.message : 'Unknown error'}`,
}
}
}

private async reconcileSwap(swap: Swap): Promise<void> {
try {
const verificationResult = await this.swapVerificationService.verifySwap(swap)

logger.log(
[
`Swap verified: ${swap.swapId}`,
verificationResult.isVerified ? 'ok' : 'failed',
verificationResult.hasAffiliate &&
`affiliate ${verificationResult.affiliateAddress} (${verificationResult.affiliateBps} bps)`,
verificationResult.error && `error: ${verificationResult.error}`,
]
.filter(Boolean)
.join(' | '),
async verifySwap(swap: Swap): Promise<Swap> {
if (swap.status === 'FAILED') {
return toSwap(
await this.prisma.swap.update({
where: { swapId: swap.swapId },
data: { verificationStatus: 'FAILED', isAffiliateVerified: false },
}),
)
}

const isAffiliateVerified = verificationResult.isVerified && verificationResult.hasAffiliate
const verificationResult = await this.swapVerificationService.verifySwap(swap)
if (verificationResult.verificationStatus === 'PENDING') return swap

const affiliateVerificationDetails: AffiliateVerificationDetails = {
hasAffiliate: verificationResult.hasAffiliate,
affiliateBps: verificationResult.affiliateBps,
affiliateAddress: verificationResult.affiliateAddress,
verifiedSellAmountCryptoBaseUnit: verificationResult.verifiedSellAmountCryptoBaseUnit,
}
const isAffiliateVerified = verificationResult.verificationStatus === 'SUCCESS' && verificationResult.hasAffiliate

const affiliateVerificationDetails: AffiliateVerificationDetails = {
hasAffiliate: verificationResult.hasAffiliate,
affiliateBps: verificationResult.affiliateBps,
affiliateAddress: verificationResult.affiliateAddress,
verifiedSellAmountCryptoBaseUnit: verificationResult.verifiedSellAmountCryptoBaseUnit,
}

return toSwap(
await this.prisma.swap.update({
where: { swapId: swap.swapId },
data: {
verificationStatus: verificationResult.verificationStatus,
isAffiliateVerified,
affiliateFeeAssetId: verificationResult.actualAffiliateFeeAssetId,
affiliateAssetUsd: verificationResult.actualAffiliateFeeUsd,
affiliateVerificationDetails: isAffiliateVerified ? affiliateVerificationDetails : Prisma.DbNull,
actualBuyAmountCryptoBaseUnit: verificationResult.actualBuyAmountCryptoBaseUnit,
actualAffiliateFeeAmountCryptoBaseUnit: verificationResult.actualAffiliateFeeAmountCryptoBaseUnit,
},
})
} catch (error) {
logger.warn(`Failed to verify affiliate for swap ${swap.swapId}:`, error)
}
}),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export default {
affiliateAssetUsd: '2288.4',
isAffiliateVerified: null,
affiliateVerificationDetails: null,
verificationStatus: 'PENDING',
affiliateAddress: '0xa44c286ba83bb771cd0107b2c1df678435bd1535',
affiliateBps: 60,
origin: 'api',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export default {
sellAmountCryptoBaseUnit: '1000000000000000',
expectedBuyAmountCryptoBaseUnit: '2272662',
actualBuyAmountCryptoBaseUnit: '2272662',
status: 'PENDING',
status: 'SUCCESS',
source: 'Relay',
swapperName: SwapperName.Relay,
sellAccountId: '3c70e97c6f86a5b5cfdf82dfd3380ac4ed3d8b89dabf86f631bdf739372926be',
Expand Down Expand Up @@ -74,6 +74,7 @@ export default {
buyAssetUsd: '0.9998',
affiliateAssetUsd: '0.9998',
isAffiliateVerified: true,
verificationStatus: 'SUCCESS',
Comment thread
coderabbitai[bot] marked this conversation as resolved.
affiliateVerificationDetails: {
affiliateBps: 60,
hasAffiliate: true,
Expand Down
16 changes: 8 additions & 8 deletions apps/swap-service/src/verification/__tests__/near.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe('verifyNearIntents', () => {
const result = await service.verifySwap(swap)

expect(result).toMatchObject({
isVerified: true,
verificationStatus: 'SUCCESS',
hasAffiliate: true,
affiliateBps: 30,
affiliateAddress: 'shapeshifttokenomics.sputnik-dao.near',
Expand Down Expand Up @@ -56,7 +56,7 @@ describe('verifyNearIntents', () => {

const result = await service.verifySwap(swap)

expect(result.isVerified).toBe(true)
expect(result.verificationStatus).toBe('SUCCESS')
expect(result.hasAffiliate).toBe(false)
expect(result.affiliateBps).toBeUndefined()
expect(result.affiliateAddress).toBeUndefined()
Expand Down Expand Up @@ -104,24 +104,24 @@ describe('verifyNearIntents', () => {
expect(result.verifiedSellAmountCryptoBaseUnit).toBe(response.quoteResponse.quote.amountIn)
})

it('returns unverified when nearIntentsSpecific.depositAddress is missing', async () => {
it('returns FAILED when nearIntentsSpecific.depositAddress is missing', async () => {
const swapWithoutMetadata = { ...swap, metadata: {} as SwapperSpecificMetadata } as Swap

const result = await service.verifySwap(swapWithoutMetadata)

expect(result).toMatchObject({
isVerified: false,
verificationStatus: 'FAILED',
hasAffiliate: false,
error: 'Missing depositAddress in nearIntentsSpecific metadata',
noAffiliateReason: 'Missing depositAddress in nearIntentsSpecific metadata',
})
})

it('returns unverified when the SDK throws', async () => {
it('returns PENDING when the SDK throws (transient — retry next tick)', async () => {
jest.spyOn(OneClickService, 'getExecutionStatus').mockRejectedValue(new Error('upstream 500'))

const result = await service.verifySwap(swap)

expect(result.isVerified).toBe(false)
expect(result.error).toBe('upstream 500')
expect(result.verificationStatus).toBe('PENDING')
expect(result.noAffiliateReason).toBe('upstream 500')
})
})
Loading
Loading