Skip to content

Commit

Permalink
feat: fall back to getLogs if filters are not supported (#76)
Browse files Browse the repository at this point in the history
* feat: fall back to getLogs if filters are not supported

* Update src/actions/public/watchContractEvent.test.ts

Co-authored-by: awkweb <tom@meagher.co>

* pr review

---------

Co-authored-by: awkweb <tom@meagher.co>
  • Loading branch information
jxom and tmm committed Feb 20, 2023
1 parent 45c7f8d commit b2669bd
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 32 deletions.
2 changes: 2 additions & 0 deletions site/docs/actions/public/watchEvent.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Watches and returns emitted [Event Logs](/docs/glossary/terms#TODO).

This Action will batch up all the Event Logs found within the [`pollingInterval`](#pollinginterval-optional), and invoke them via [`onLogs`](#onLogs).

`watchEvent` will attempt to create an [Event Filter](/docs/actions/public/watchEvent) and listen to changes to the Filter per polling interval, however, if the RPC Provider does not support Filters (ie. `eth_newFilter`), then `watchEvent` will fall back to using [`getLogs`](/docs/actions/public/getLogs) instead.

## Import

```ts
Expand Down
2 changes: 2 additions & 0 deletions site/docs/contract/watchContractEvent.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Watches and returns emitted contract event logs.

This Action will batch up all the event logs found within the [`pollingInterval`](#pollinginterval-optional), and invoke them via [`onLogs`](#onLogs).

`watchContractEvent` will attempt to create an [Event Filter](/docs/contract/createContractEventFilter) and listen to changes to the Filter per polling interval, however, if the RPC Provider does not support Filters (ie. `eth_newFilter`), then `watchContractEvent` will fall back to using [`getLogs`](/docs/actions/public/getLogs) instead.

## Import

```ts
Expand Down
3 changes: 3 additions & 0 deletions src/_test/setup.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { afterAll, beforeAll, beforeEach, vi } from 'vitest'

import { cleanupCache, listenersCache } from '../utils/observe'
import { promiseCache, responseCache } from '../utils/promise/withCache'

beforeAll(() => {
Expand All @@ -17,6 +18,8 @@ beforeAll(() => {
beforeEach(() => {
promiseCache.clear()
responseCache.clear()
listenersCache.clear()
cleanupCache.clear()
})

afterAll(() => {
Expand Down
57 changes: 57 additions & 0 deletions src/actions/public/watchContractEvent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import {
import { impersonateAccount, stopImpersonatingAccount } from '../test'
import { writeContract } from '../wallet'
import * as createContractEventFilter from './createContractEventFilter'
import * as getBlockNumber from './getBlockNumber'
import * as getFilterChanges from './getFilterChanges'
import * as getLogs from './getLogs'
import { OnLogsResponse, watchContractEvent } from './watchContractEvent'

beforeAll(async () => {
Expand Down Expand Up @@ -421,8 +423,63 @@ test('args: args unnamed', async () => {
expect(logs[0][1].eventName).toEqual('Transfer')
})

test(
'falls back to `getLogs` if `createContractEventFilter` throws',
async () => {
// TODO: Something weird going on where the `getFilterChanges` spy is taking
// results of the previous test. This `wait` fixes it. ¯\_(ツ)_/¯
await wait(1)
const getFilterChangesSpy = vi.spyOn(getFilterChanges, 'getFilterChanges')
const getLogsSpy = vi.spyOn(getLogs, 'getLogs')
vi.spyOn(
createContractEventFilter,
'createContractEventFilter',
).mockRejectedValueOnce(new Error('foo'))

let logs: OnLogsResponse[] = []

const unwatch = watchContractEvent(publicClient, {
abi: usdcContractConfig.abi,
onLogs: (logs_) => logs.push(logs_),
})

await wait(1000)
await writeContract(walletClient, {
...usdcContractConfig,
functionName: 'transfer',
args: [accounts[0].address, 1n],
from: address.vitalik,
})
await writeContract(walletClient, {
...usdcContractConfig,
functionName: 'transfer',
args: [accounts[0].address, 1n],
from: address.vitalik,
})
await wait(1000)
await writeContract(walletClient, {
...usdcContractConfig,
functionName: 'transfer',
args: [accounts[1].address, 1n],
from: address.vitalik,
})
await wait(2000)
unwatch()

expect(logs.length).toBe(2)
expect(logs[0].length).toBe(2)
expect(logs[1].length).toBe(1)
expect(getFilterChangesSpy).toBeCalledTimes(0)
expect(getLogsSpy).toBeCalled()
},
{ retry: 3 },
)

describe('errors', () => {
test('handles error thrown from creating filter', async () => {
vi.spyOn(getBlockNumber, 'getBlockNumber').mockRejectedValueOnce(
new Error('foo'),
)
vi.spyOn(
createContractEventFilter,
'createContractEventFilter',
Expand Down
65 changes: 49 additions & 16 deletions src/actions/public/watchContractEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ import type {
Filter,
Log,
} from '../../types'
import type { GetAbiItemArgs } from '../../utils'
import { getAbiItem } from '../../utils'
import { observe } from '../../utils/observe'
import { poll } from '../../utils/poll'
import {
createContractEventFilter,
CreateContractEventFilterArgs,
} from './createContractEventFilter'
import { getBlockNumber } from './getBlockNumber'
import { getFilterChanges } from './getFilterChanges'
import { getLogs, GetLogsArgs } from './getLogs'
import { uninstallFilter } from './uninstallFilter'

export type OnLogsResponse<
Expand Down Expand Up @@ -75,31 +79,60 @@ export function watchContractEvent<
])

return observe(observerId, { onLogs, onError }, (emit) => {
let filter: Filter<'event', TAbi, TEventName>
let currentBlockNumber: bigint
let filter: Filter<'event', TAbi, TEventName> | undefined
let initialized = false

const unwatch = poll(
async () => {
if (!initialized) {
try {
filter = (await createContractEventFilter(client, {
abi,
address,
args,
eventName,
} as unknown as CreateContractEventFilterArgs)) as Filter<
'event',
TAbi,
TEventName
>
} catch {}
initialized = true
return
}

try {
if (!filter) {
try {
filter = (await createContractEventFilter(client, {
abi,
let logs: Log[]
if (filter) {
logs = await getFilterChanges(client, { filter })
} else {
// If the filter doesn't exist, we will fall back to use `getLogs`.
// The fall back exists because some RPC Providers do not support filters.

// Fetch the block number to use for `getLogs`.
const blockNumber = await getBlockNumber(client)

// If the block number has changed, we will need to fetch the logs.
// If the block number doesn't exist, we are yet to reach the first poll interval,
// so do not emit any logs.
if (currentBlockNumber && currentBlockNumber !== blockNumber) {
logs = await getLogs(client, {
address,
args,
eventName,
} as unknown as CreateContractEventFilterArgs)) as Filter<
'event',
TAbi,
TEventName
>
return
} catch (err) {
unwatch()
throw err
fromBlock: blockNumber,
toBlock: blockNumber,
event: getAbiItem({
abi,
name: eventName,
} as unknown as GetAbiItemArgs),
} as unknown as GetLogsArgs)
} else {
logs = []
}
currentBlockNumber = blockNumber
}

const logs = await getFilterChanges(client, { filter })
if (logs.length === 0) return
if (batch) emit.onLogs(logs as any)
else logs.forEach((log) => emit.onLogs([log] as any))
Expand Down
51 changes: 51 additions & 0 deletions src/actions/public/watchEvent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import {
import { impersonateAccount, mine, stopImpersonatingAccount } from '../test'
import { sendTransaction, writeContract } from '../wallet'
import * as createEventFilter from './createEventFilter'
import * as getBlockNumber from './getBlockNumber'
import * as getLogs from './getLogs'
import * as getFilterChanges from './getFilterChanges'
import { OnLogsResponse, watchEvent } from './watchEvent'

Expand Down Expand Up @@ -216,8 +218,57 @@ test('args: address + event', async () => {

test.todo('args: args')

test('falls back to `getLogs` if `createEventFilter` throws', async () => {
// Something weird going on where the `getFilterChanges` spy is taking
// results of the previous test. This `wait` fixes it. ¯\_(ツ)_/¯
await wait(1)
const getFilterChangesSpy = vi.spyOn(getFilterChanges, 'getFilterChanges')
const getLogsSpy = vi.spyOn(getLogs, 'getLogs')
vi.spyOn(createEventFilter, 'createEventFilter').mockRejectedValueOnce(
new Error('foo'),
)

let logs: OnLogsResponse[] = []

const unwatch = watchEvent(publicClient, {
onLogs: (logs_) => logs.push(logs_),
})

await wait(1000)
await writeContract(walletClient, {
...usdcContractConfig,
functionName: 'transfer',
args: [accounts[0].address, 1n],
from: address.vitalik,
})
await writeContract(walletClient, {
...usdcContractConfig,
functionName: 'transfer',
args: [accounts[0].address, 1n],
from: address.vitalik,
})
await wait(1000)
await writeContract(walletClient, {
...usdcContractConfig,
functionName: 'transfer',
args: [accounts[1].address, 1n],
from: address.vitalik,
})
await wait(2000)
unwatch()

expect(logs.length).toBe(2)
expect(logs[0].length).toBe(2)
expect(logs[1].length).toBe(1)
expect(getFilterChangesSpy).toBeCalledTimes(0)
expect(getLogsSpy).toBeCalled()
})

describe('errors', () => {
test('handles error thrown from creating filter', async () => {
vi.spyOn(getBlockNumber, 'getBlockNumber').mockRejectedValueOnce(
new Error('foo'),
)
vi.spyOn(createEventFilter, 'createEventFilter').mockRejectedValueOnce(
new Error('foo'),
)
Expand Down
56 changes: 42 additions & 14 deletions src/actions/public/watchEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import type {
import { observe } from '../../utils/observe'
import { poll } from '../../utils/poll'
import { createEventFilter, CreateEventFilterArgs } from './createEventFilter'
import { getBlockNumber } from './getBlockNumber'
import { getFilterChanges } from './getFilterChanges'
import { getLogs } from './getLogs'
import { uninstallFilter } from './uninstallFilter'

export type OnLogsResponse<
Expand Down Expand Up @@ -73,30 +75,56 @@ export function watchEvent<
])

return observe(observerId, { onLogs, onError }, (emit) => {
let currentBlockNumber: bigint
let filter: Filter<'event', [TAbiEvent], TEventName, any>
let initialized = false

const unwatch = poll(
async () => {
if (!initialized) {
try {
filter = (await createEventFilter(client, {
address,
args,
event: event!,
} as unknown as CreateEventFilterArgs)) as unknown as Filter<
'event',
[TAbiEvent],
TEventName
>
} catch {}
initialized = true
return
}

try {
if (!filter) {
try {
filter = (await createEventFilter(client, {
let logs: Log[]
if (filter) {
logs = await getFilterChanges(client, { filter })
} else {
// If the filter doesn't exist, we will fall back to use `getLogs`.
// The fall back exists because some RPC Providers do not support filters.

// Fetch the block number to use for `getLogs`.
const blockNumber = await getBlockNumber(client)

// If the block number has changed, we will need to fetch the logs.
// If the block number doesn't exist, we are yet to reach the first poll interval,
// so do not emit any logs.
if (currentBlockNumber && currentBlockNumber !== blockNumber) {
logs = await getLogs(client, {
address,
args,
event,
} as CreateEventFilterArgs)) as unknown as Filter<
'event',
[TAbiEvent],
TEventName
>
return
} catch (err) {
unwatch()
throw err
fromBlock: blockNumber,
toBlock: blockNumber,
event: event!,
})
} else {
logs = []
}
currentBlockNumber = blockNumber
}

const logs = await getFilterChanges(client, { filter })
if (logs.length === 0) return
if (batch) emit.onLogs(logs as any)
else logs.forEach((log) => emit.onLogs([log] as any))
Expand Down
7 changes: 5 additions & 2 deletions src/utils/observe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ import type { MaybePromise } from '../types/utils'
type Callback = ((...args: any[]) => any) | undefined
type Callbacks = Record<string, Callback>

const listenersCache = new Map<string, { id: number; fns: Callbacks }[]>()
const cleanupCache = new Map<string, () => void>()
export const listenersCache = new Map<
string,
{ id: number; fns: Callbacks }[]
>()
export const cleanupCache = new Map<string, () => void>()

type EmitFunction<TCallbacks extends Callbacks> = (
emit: TCallbacks,
Expand Down

2 comments on commit b2669bd

@vercel
Copy link

@vercel vercel bot commented on b2669bd Feb 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

viem-playground – ./playgrounds/browser

viem-playground-wagmi-dev.vercel.app
viem-playground.vercel.app
viem-playground-git-main-wagmi-dev.vercel.app

@vercel
Copy link

@vercel vercel bot commented on b2669bd Feb 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

viem-site – ./site

viem-site.vercel.app
viem-site-wagmi-dev.vercel.app
viem-site-git-main-wagmi-dev.vercel.app

Please sign in to comment.