Skip to content

Commit b56e5aa

Browse files
committed
feat(reku): enhance BaseCrossChecker to support multiple addresses and add retry options for log retrieval
1 parent 3317194 commit b56e5aa

File tree

3 files changed

+119
-15
lines changed

3 files changed

+119
-15
lines changed

packages/reku/src/event/crosschecker/basechecker.ts

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { ethers } from 'ethers'
2+
import type { ContractAddress } from '@ora-io/utils'
23
import { timeoutWithRetry, to } from '@ora-io/utils'
34
import { ETH_BLOCK_COUNT_ONE_HOUR } from '../../constants'
45
import type { Providers } from '../../types/w3'
@@ -112,18 +113,46 @@ export class BaseCrossChecker {
112113
// get period logs
113114
const { fromBlock, toBlock, address, topics } = options
114115
debug('start crosscheck from %d to %d', fromBlock, toBlock)
115-
const params = {
116-
fromBlock,
117-
toBlock,
118-
...(address && { address }),
119-
...(topics && { topics }),
120-
}
116+
121117
if (this.provider.provider) {
122-
const logs = await timeoutWithRetry(() => {
123-
if (!this.provider || !this.provider.provider)
124-
throw new Error('provider not ready')
125-
return this.provider?.provider?.getLogs(params)
126-
}, 15 * 1000, 3)
118+
const addresses: ContractAddress[][] = []
119+
if (Array.isArray(address)) {
120+
if (options.addressGroupLimit) {
121+
for (let i = 0; i < address.length; i += options.addressGroupLimit)
122+
addresses.push(address.slice(i, i + options.addressGroupLimit))
123+
}
124+
else {
125+
addresses.push(address)
126+
}
127+
}
128+
else {
129+
addresses.push([address])
130+
}
131+
132+
const requests = addresses.map((address) => {
133+
const params = {
134+
fromBlock,
135+
toBlock,
136+
...(address && { address }),
137+
...(topics && { topics }),
138+
}
139+
140+
const fn = async () => {
141+
if (!this.provider || !this.provider.provider)
142+
throw new Error('provider not ready')
143+
return this.provider?.provider?.getLogs(params)
144+
}
145+
if (options.retryOptions)
146+
return timeoutWithRetry(fn, options.retryOptions.timeout || 15 * 1000, options.retryOptions.retries || 3)
147+
148+
else
149+
return fn()
150+
})
151+
152+
const logs = (await Promise.all(requests)).reduce((acc, curr) => {
153+
return acc.concat(curr)
154+
}, [])
155+
127156
// get ignoreLogs keys
128157
const ignoreLogs = options.ignoreLogs
129158

packages/reku/src/event/crosschecker/interface.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ export type FnOnMissingLog = (log: ethers.Log) => Awaitable<void>
66
export interface SimpleLog { transactionHash: string; index?: number }
77

88
export interface LogFilterParam {
9-
address: ContractAddress
9+
address: ContractAddress | ContractAddress[]
1010
topics: string[]
1111
fromBlock?: number
1212
toBlock?: number
@@ -15,6 +15,11 @@ export interface LogFilterParam {
1515
export interface BaseCrossCheckParam extends LogFilterParam {
1616
onMissingLog: FnOnMissingLog
1717
ignoreLogs?: SimpleLog[]
18+
addressGroupLimit?: number // how many addresses to get logs at most
19+
retryOptions?: {
20+
timeout?: Milliseconds
21+
retries?: number
22+
}
1823
}
1924

2025
export interface CrossCheckFromParam extends BaseCrossCheckParam {

packages/reku/tests/basechecker.test.ts

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ describe('BaseCrossChecker', () => {
312312
expect(mockProvider.provider.getLogs).toHaveBeenCalledWith({
313313
fromBlock: 1000,
314314
toBlock: 2000,
315-
address: '0xcontract1',
315+
address: ['0xcontract1'],
316316
topics: ['0xtopic1'],
317317
})
318318
expect(onMissingLogSpy).toHaveBeenCalledTimes(2)
@@ -324,7 +324,7 @@ describe('BaseCrossChecker', () => {
324324
const options: CrossCheckRangeParam = {
325325
fromBlock: 1000,
326326
toBlock: 2000,
327-
address: '0xcontract1',
327+
address: ['0xcontract1'],
328328
topics: ['0xtopic1'],
329329
onMissingLog: onMissingLogSpy,
330330
}
@@ -348,7 +348,7 @@ describe('BaseCrossChecker', () => {
348348
expect(mockProvider.provider.getLogs).toHaveBeenCalledWith({
349349
fromBlock: 1000,
350350
toBlock: 2000,
351-
address: '0xcontract1',
351+
address: ['0xcontract1'],
352352
topics: ['0xtopic1'],
353353
})
354354
})
@@ -368,5 +368,75 @@ describe('BaseCrossChecker', () => {
368368

369369
expect(onMissingLogSpy).toHaveBeenCalledTimes(3)
370370
})
371+
372+
it('should handle addressGroupLimit', async () => {
373+
mockProvider.provider.getLogs.mockResolvedValue(mockLogs)
374+
375+
const options: CrossCheckRangeParam = {
376+
fromBlock: 1000,
377+
toBlock: 2000,
378+
address: ['0xcontract1', '0xcontract2'],
379+
addressGroupLimit: 1,
380+
onMissingLog: onMissingLogSpy,
381+
topics: [],
382+
}
383+
384+
await (baseChecker as any)._crossCheck(options)
385+
386+
expect(mockProvider.provider.getLogs).toHaveBeenCalledTimes(2)
387+
})
388+
389+
it('should handle addressGroupLimit with single address', async () => {
390+
mockProvider.provider.getLogs.mockResolvedValue(mockLogs)
391+
392+
const options: CrossCheckRangeParam = {
393+
fromBlock: 1000,
394+
toBlock: 2000,
395+
address: '0xcontract1',
396+
addressGroupLimit: 1,
397+
onMissingLog: onMissingLogSpy,
398+
topics: [],
399+
}
400+
401+
await (baseChecker as any)._crossCheck(options)
402+
403+
expect(mockProvider.provider.getLogs).toHaveBeenCalledTimes(1)
404+
405+
expect(mockProvider.provider.getLogs).toHaveBeenCalledWith({
406+
fromBlock: 1000,
407+
toBlock: 2000,
408+
address: ['0xcontract1'],
409+
topics: [],
410+
})
411+
})
412+
413+
it.only('should handle retryOptions', async () => {
414+
mockProvider.provider.getLogs.mockRejectedValue(Error('provider not ready'))
415+
416+
const options: CrossCheckRangeParam = {
417+
fromBlock: 1000,
418+
toBlock: 2000,
419+
address: '0xcontract1',
420+
topics: ['0xtopic1'],
421+
onMissingLog: onMissingLogSpy,
422+
retryOptions: {
423+
timeout: 1000,
424+
retries: 1,
425+
},
426+
}
427+
428+
const defaultCallTimes = 1
429+
430+
try {
431+
await (baseChecker as any)._crossCheck(options)
432+
}
433+
catch (error) {
434+
expect(error).toBeInstanceOf(Error)
435+
expect((error as Error).message).toBe('provider not ready')
436+
}
437+
438+
expect(mockProvider.provider.getLogs).toHaveBeenCalledTimes(defaultCallTimes + options.retryOptions!.retries!)
439+
expect(onMissingLogSpy).toHaveBeenCalledTimes(0)
440+
})
371441
})
372442
})

0 commit comments

Comments
 (0)