Skip to content

Commit

Permalink
fix: resolve #776
Browse files Browse the repository at this point in the history
  • Loading branch information
jxom committed Apr 21, 2024
1 parent 601ed60 commit 0b0df52
Show file tree
Hide file tree
Showing 14 changed files with 900 additions and 93 deletions.
5 changes: 5 additions & 0 deletions .changeset/dry-walls-sneeze.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"viem": patch
---

Fixed issue where fallback transports with a webSocket transport would not utilize `eth_subscribe` in watcher actions.
Binary file modified bun.lockb
Binary file not shown.
109 changes: 108 additions & 1 deletion src/actions/public/watchBlockNumber.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { describe, expect, test, vi } from 'vitest'

import { localHttpUrl } from '~test/src/constants.js'
import { publicClient, testClient, webSocketClient } from '~test/src/utils.js'
import {
anvilChain,
httpClient,
publicClient,
testClient,
webSocketClient,
} from '~test/src/utils.js'
import { localhost } from '../../chains/index.js'
import {
type PublicClient,
Expand All @@ -11,6 +17,9 @@ import { http } from '../../clients/transports/http.js'
import { wait } from '../../utils/wait.js'
import { mine } from '../test/mine.js'

import { createClient } from '../../clients/createClient.js'
import { fallback } from '../../clients/transports/fallback.js'
import { webSocket } from '../../clients/transports/webSocket.js'
import * as getBlockNumber from './getBlockNumber.js'
import {
type OnBlockNumberParameter,
Expand Down Expand Up @@ -100,6 +109,51 @@ describe('poll', () => {
})
})

describe('transports', () => {
test('http transport', async () => {
const blockNumbers: OnBlockNumberParameter[] = []
const unwatch = watchBlockNumber(httpClient, {
onBlockNumber: (blockNumber) => blockNumbers.push(blockNumber),
pollingInterval: 100,
})
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
unwatch()
expect(blockNumbers.length).toBe(4)
})

test('fallback transport', async () => {
const client = createClient({
chain: anvilChain,
transport: fallback([http(), webSocket()]),
pollingInterval: 200,
})

const blockNumbers: OnBlockNumberParameter[] = []
const unwatch = watchBlockNumber(client, {
onBlockNumber: (blockNumber) => blockNumbers.push(blockNumber),
poll: true,
pollingInterval: 100,
})
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
unwatch()
expect(blockNumbers.length).toBe(4)
})
})

describe('behavior', () => {
test('does not emit when no new incoming blocks', async () => {
const blockNumbers: OnBlockNumberParameter[] = []
Expand Down Expand Up @@ -373,6 +427,59 @@ describe('subscribe', () => {
expect(blockNumbers.length).toBe(5)
})

test('fallback transport', async () => {
const client = createClient({
chain: anvilChain,
transport: fallback([webSocket(), http()]),
pollingInterval: 200,
})

const blockNumbers: OnBlockNumberParameter[] = []
const unwatch = watchBlockNumber(client, {
onBlockNumber: (blockNumber) => blockNumbers.push(blockNumber),
})
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
unwatch()
expect(blockNumbers.length).toBe(5)
})

test('fallback transport (poll: false)', async () => {
const client = createClient({
chain: anvilChain,
transport: fallback([http(), webSocket()]),
pollingInterval: 200,
})

const blockNumbers: OnBlockNumberParameter[] = []
const unwatch = watchBlockNumber(client, {
poll: false,
onBlockNumber: (blockNumber) => blockNumbers.push(blockNumber),
})
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
unwatch()
expect(blockNumbers.length).toBe(5)
})

describe('behavior', () => {
test('watch > unwatch > watch', async () => {
let blockNumbers: OnBlockNumberParameter[] = []
Expand Down
53 changes: 36 additions & 17 deletions src/actions/public/watchBlockNumber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { Client } from '../../clients/createClient.js'
import type { Transport } from '../../clients/transports/createTransport.js'
import type { ErrorType } from '../../errors/utils.js'
import type { Chain } from '../../types/chain.js'
import type { GetTransportConfig } from '../../types/transport.js'
import type { HasTransportType } from '../../types/transport.js'
import { hexToBigInt } from '../../utils/encoding/fromHex.js'
import { getAction } from '../../utils/getAction.js'
import { observe } from '../../utils/observe.js'
Expand All @@ -28,7 +28,7 @@ export type WatchBlockNumberParameters<
/** The callback to call when an error occurred when trying to get for a new block. */
onError?: ((error: Error) => void) | undefined
} & (
| (GetTransportConfig<TTransport>['type'] extends 'webSocket'
| (HasTransportType<TTransport, 'webSocket'> extends true
? {
emitMissed?: undefined
emitOnBegin?: undefined
Expand Down Expand Up @@ -91,8 +91,16 @@ export function watchBlockNumber<
pollingInterval = client.pollingInterval,
}: WatchBlockNumberParameters<TTransport>,
): WatchBlockNumberReturnType {
const enablePolling =
typeof poll_ !== 'undefined' ? poll_ : client.transport.type !== 'webSocket'
const enablePolling = (() => {
if (typeof poll_ !== 'undefined') return poll_
if (client.transport.type === 'webSocket') return false
if (
client.transport.type === 'fallback' &&
client.transport.transports[0].config.type === 'webSocket'
)
return false
return true
})()

let prevBlockNumber: GetBlockNumberReturnType | undefined

Expand Down Expand Up @@ -161,19 +169,30 @@ export function watchBlockNumber<
let unsubscribe = () => (active = false)
;(async () => {
try {
const { unsubscribe: unsubscribe_ } =
await client.transport.subscribe({
params: ['newHeads'],
onData(data: any) {
if (!active) return
const blockNumber = hexToBigInt(data.result?.number)
emit.onBlockNumber(blockNumber, prevBlockNumber)
prevBlockNumber = blockNumber
},
onError(error: Error) {
emit.onError?.(error)
},
})
const transport = (() => {
if (client.transport.type === 'fallback') {
const transport = client.transport.transports.find(
(transport: ReturnType<Transport>) =>
transport.config.type === 'webSocket',
)
if (!transport) return client.transport
return transport.value
}
return client.transport
})()

const { unsubscribe: unsubscribe_ } = await transport.subscribe({
params: ['newHeads'],
onData(data: any) {
if (!active) return
const blockNumber = hexToBigInt(data.result?.number)
emit.onBlockNumber(blockNumber, prevBlockNumber)
prevBlockNumber = blockNumber
},
onError(error: Error) {
emit.onError?.(error)
},
})
unsubscribe = unsubscribe_
if (!active) unsubscribe()
} catch (err) {
Expand Down
127 changes: 127 additions & 0 deletions src/actions/public/watchBlocks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { describe, expect, test, vi } from 'vitest'

import { accounts, localHttpUrl } from '~test/src/constants.js'
import {
anvilChain,
httpClient,
publicClient,
testClient,
walletClient,
Expand All @@ -20,6 +22,9 @@ import { wait } from '../../utils/wait.js'
import { mine } from '../test/mine.js'
import { sendTransaction } from '../wallet/sendTransaction.js'

import { createClient } from '../../clients/createClient.js'
import { fallback } from '../../clients/transports/fallback.js'
import { webSocket } from '../../clients/transports/webSocket.js'
import * as getBlock from './getBlock.js'
import { type OnBlockParameter, watchBlocks } from './watchBlocks.js'

Expand Down Expand Up @@ -156,6 +161,63 @@ describe('poll', () => {
expect(block.randomness).toBeDefined()
})

describe('transports', () => {
test('http transport', async () => {
const blocks: OnBlockParameter[] = []
const prevBlocks: OnBlockParameter[] = []
const unwatch = watchBlocks(httpClient, {
onBlock: (block, prevBlock) => {
blocks.push(block)
prevBlock && block !== prevBlock && prevBlocks.push(prevBlock)
},
pollingInterval: 100,
})
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
unwatch()
expect(blocks.length).toBe(4)
expect(prevBlocks.length).toBe(3)
})

test('fallback transport', async () => {
const client = createClient({
chain: anvilChain,
transport: fallback([http(), webSocket()]),
pollingInterval: 200,
})

const blocks: OnBlockParameter[] = []
const prevBlocks: OnBlockParameter[] = []
const unwatch = watchBlocks(client, {
onBlock: (block, prevBlock) => {
blocks.push(block)
prevBlock && block !== prevBlock && prevBlocks.push(prevBlock)
},
})
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
unwatch()
expect(blocks.length).toBe(5)
expect(prevBlocks.length).toBe(4)
expect(typeof blocks[0].number).toBe('bigint')
})
})

describe('behavior', () => {
test('does not emit when no new incoming blocks', async () => {
const blocks: OnBlockParameter[] = []
Expand Down Expand Up @@ -699,6 +761,71 @@ describe('subscribe', () => {
})
})

test('fallback transport', async () => {
const client = createClient({
chain: anvilChain,
transport: fallback([webSocket(), http()]),
pollingInterval: 200,
})

const blocks: OnBlockParameter[] = []
const prevBlocks: OnBlockParameter[] = []
const unwatch = watchBlocks(client, {
onBlock: (block, prevBlock) => {
blocks.push(block)
prevBlock && block !== prevBlock && prevBlocks.push(prevBlock)
},
})
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
unwatch()
expect(blocks.length).toBe(5)
expect(prevBlocks.length).toBe(4)
expect(typeof blocks[0].number).toBe('bigint')
})

test('fallback transport (poll: false)', async () => {
const client = createClient({
chain: anvilChain,
transport: fallback([http(), webSocket()]),
pollingInterval: 200,
})

const blocks: OnBlockParameter[] = []
const prevBlocks: OnBlockParameter[] = []
const unwatch = watchBlocks(client, {
poll: false,
onBlock: (block, prevBlock) => {
blocks.push(block)
prevBlock && block !== prevBlock && prevBlocks.push(prevBlock)
},
})
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
await mine(testClient, { blocks: 1 })
await wait(200)
unwatch()
expect(blocks.length).toBe(5)
expect(prevBlocks.length).toBe(4)
expect(typeof blocks[0].number).toBe('bigint')
})

describe('errors', () => {
test('handles error thrown on init', async () => {
const client = {
Expand Down

0 comments on commit 0b0df52

Please sign in to comment.