diff --git a/.changeset/nasty-waves-divide.md b/.changeset/nasty-waves-divide.md new file mode 100644 index 0000000000..f85dd96047 --- /dev/null +++ b/.changeset/nasty-waves-divide.md @@ -0,0 +1,46 @@ +--- +"@latticexyz/block-logs-stream": minor +--- + +Add block logs stream package + +```ts +import { filter, map, mergeMap } from "rxjs"; +import { createPublicClient, parseAbi } from "viem"; +import { + createBlockStream, + isNonPendingBlock, + groupLogsByBlockNumber, + blockRangeToLogs, +} from "@latticexyz/block-logs-stream"; + +const publicClient = createPublicClient({ + // your viem public client config here +}); + +const latestBlock$ = await createBlockStream({ publicClient, blockTag: "latest" }); + +const latestBlockNumber$ = latestBlock$.pipe( + filter(isNonPendingBlock), + map((block) => block.number) +); + +latestBlockNumber$ + .pipe( + map((latestBlockNumber) => ({ startBlock: 0n, endBlock: latestBlockNumber })), + blockRangeToLogs({ + publicClient, + address, + events: parseAbi([ + "event StoreDeleteRecord(bytes32 table, bytes32[] key)", + "event StoreSetField(bytes32 table, bytes32[] key, uint8 schemaIndex, bytes data)", + "event StoreSetRecord(bytes32 table, bytes32[] key, bytes data)", + "event StoreEphemeralRecord(bytes32 table, bytes32[] key, bytes data)", + ]), + }), + mergeMap(({ logs }) => from(groupLogsByBlockNumber(logs))) + ) + .subscribe((block) => { + console.log("got events for block", block); + }); +``` diff --git a/packages/block-logs-stream/.eslintrc b/packages/block-logs-stream/.eslintrc new file mode 100644 index 0000000000..6db0063ad7 --- /dev/null +++ b/packages/block-logs-stream/.eslintrc @@ -0,0 +1,6 @@ +{ + "extends": ["../../.eslintrc"], + "rules": { + "@typescript-eslint/explicit-function-return-type": "error" + } +} diff --git a/packages/block-logs-stream/.gitignore b/packages/block-logs-stream/.gitignore new file mode 100644 index 0000000000..1521c8b765 --- /dev/null +++ b/packages/block-logs-stream/.gitignore @@ -0,0 +1 @@ +dist diff --git a/packages/block-logs-stream/.npmignore b/packages/block-logs-stream/.npmignore new file mode 100644 index 0000000000..84815f1eba --- /dev/null +++ b/packages/block-logs-stream/.npmignore @@ -0,0 +1,6 @@ +* + +!dist/** +!src/** +!package.json +!README.md diff --git a/packages/block-logs-stream/README.md b/packages/block-logs-stream/README.md new file mode 100644 index 0000000000..1cbdc26dfd --- /dev/null +++ b/packages/block-logs-stream/README.md @@ -0,0 +1,49 @@ +# Block logs stream + +A set of utilities for efficiently retrieving blockchain event logs. Built on top of [viem][0] and [RxJS][1]. + +[0]: https://viem.sh/ +[1]: https://rxjs.dev/ + +## Example + +```ts +import { filter, map, mergeMap } from "rxjs"; +import { createPublicClient, parseAbi } from "viem"; +import { + createBlockStream, + isNonPendingBlock, + groupLogsByBlockNumber, + blockRangeToLogs, +} from "@latticexyz/block-logs-stream"; + +const publicClient = createPublicClient({ + // your viem public client config here +}); + +const latestBlock$ = await createBlockStream({ publicClient, blockTag: "latest" }); + +const latestBlockNumber$ = latestBlock$.pipe( + filter(isNonPendingBlock), + map((block) => block.number) +); + +latestBlockNumber$ + .pipe( + map((latestBlockNumber) => ({ startBlock: 0n, endBlock: latestBlockNumber })), + blockRangeToLogs({ + publicClient, + address, + events: parseAbi([ + "event StoreDeleteRecord(bytes32 table, bytes32[] key)", + "event StoreSetField(bytes32 table, bytes32[] key, uint8 schemaIndex, bytes data)", + "event StoreSetRecord(bytes32 table, bytes32[] key, bytes data)", + "event StoreEphemeralRecord(bytes32 table, bytes32[] key, bytes data)", + ]), + }), + mergeMap(({ logs }) => from(groupLogsByBlockNumber(logs))) + ) + .subscribe((block) => { + console.log("got events for block", block); + }); +``` diff --git a/packages/block-logs-stream/package.json b/packages/block-logs-stream/package.json new file mode 100644 index 0000000000..15efcc768e --- /dev/null +++ b/packages/block-logs-stream/package.json @@ -0,0 +1,43 @@ +{ + "name": "@latticexyz/block-logs-stream", + "version": "1.42.0", + "description": "Create a stream of EVM block logs for events", + "repository": { + "type": "git", + "url": "https://github.com/latticexyz/mud.git", + "directory": "packages/block-logs-stream" + }, + "license": "MIT", + "type": "module", + "exports": { + ".": "./dist/index.js" + }, + "types": "src/index.ts", + "scripts": { + "build": "pnpm run build:js", + "build:js": "tsup", + "clean": "pnpm run clean:js", + "clean:js": "rimraf dist", + "dev": "tsup --watch", + "lint": "eslint .", + "test": "vitest typecheck --run --passWithNoTests && vitest --run --passWithNoTests" + }, + "dependencies": { + "@latticexyz/common": "workspace:*", + "@latticexyz/config": "workspace:*", + "@latticexyz/schema-type": "workspace:*", + "abitype": "0.8.7", + "debug": "^4.3.4", + "rxjs": "7.5.5", + "viem": "1.1.7" + }, + "devDependencies": { + "@types/debug": "^4.1.7", + "tsup": "^6.7.0", + "vitest": "0.31.4" + }, + "publishConfig": { + "access": "public" + }, + "gitHead": "914a1e0ae4a573d685841ca2ea921435057deb8f" +} diff --git a/packages/block-logs-stream/src/blockRangeToLogs.test.ts b/packages/block-logs-stream/src/blockRangeToLogs.test.ts new file mode 100644 index 0000000000..eb3c8e04fb --- /dev/null +++ b/packages/block-logs-stream/src/blockRangeToLogs.test.ts @@ -0,0 +1,128 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { blockRangeToLogs } from "./blockRangeToLogs"; +import { Subject, lastValueFrom, map, toArray } from "rxjs"; +import { EIP1193RequestFn, RpcLog, Transport, createPublicClient, createTransport } from "viem"; +import { wait } from "./utils"; + +// TODO: there is a chance that these tests will need to be written differently with timers to avoid flakiness + +const mockedTransportRequest = vi.fn, ReturnType>(); +const mockTransport: Transport = () => + createTransport({ + key: "mock", + name: "Mock Transport", + request: mockedTransportRequest as any, + type: "mock", + }); + +const publicClient = createPublicClient({ + transport: mockTransport, +}); + +describe("blockRangeToLogs", () => { + beforeEach(() => { + mockedTransportRequest.mockClear(); + }); + + it("processes block ranges in order", async () => { + const requests: any[] = []; + mockedTransportRequest.mockImplementation(async ({ method, params }): Promise => { + requests.push(params); + if (method !== "eth_getLogs") throw new Error("not implemented"); + await wait(450); + return []; + }); + + const latestBlockNumber$ = new Subject(); + + const logs$ = latestBlockNumber$.pipe( + map((endBlock) => ({ startBlock: 0n, endBlock })), + blockRangeToLogs({ + publicClient, + address: "0x", + events: [], + }) + ); + + (async (): Promise => { + for (let blockNumber = 1000n; blockNumber <= 1010n; blockNumber++) { + await wait(100); + latestBlockNumber$.next(blockNumber); + } + await wait(100); + latestBlockNumber$.complete(); + })(); + + const results = await lastValueFrom(logs$.pipe(toArray())); + + expect(requests).toMatchInlineSnapshot(` + [ + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x3e8", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x3e9", + "toBlock": "0x3ec", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x3ed", + "toBlock": "0x3f0", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x3f1", + "toBlock": "0x3f2", + "topics": [ + [], + ], + }, + ], + ] + `); + + expect(results).toMatchInlineSnapshot(` + [ + { + "fromBlock": 0n, + "logs": [], + "toBlock": 1000n, + }, + { + "fromBlock": 1001n, + "logs": [], + "toBlock": 1004n, + }, + { + "fromBlock": 1005n, + "logs": [], + "toBlock": 1008n, + }, + { + "fromBlock": 1009n, + "logs": [], + "toBlock": 1010n, + }, + ] + `); + }); +}); diff --git a/packages/block-logs-stream/src/blockRangeToLogs.ts b/packages/block-logs-stream/src/blockRangeToLogs.ts new file mode 100644 index 0000000000..07df9e2512 --- /dev/null +++ b/packages/block-logs-stream/src/blockRangeToLogs.ts @@ -0,0 +1,72 @@ +import { EMPTY, OperatorFunction, concatMap, from, pipe, tap } from "rxjs"; +import { FetchLogsResult, fetchLogs } from "./fetchLogs"; +import { AbiEvent, Address } from "abitype"; +import { BlockNumber, PublicClient } from "viem"; + +export type BlockRangeToLogsOptions = { + /** + * [viem `PublicClient`][0] used for fetching logs from the RPC. + * + * [0]: https://viem.sh/docs/clients/public.html + */ + publicClient: PublicClient; + /** + * Optional contract address(es) to fetch logs for. + */ + address?: Address | Address[]; + /** + * Events to fetch logs for. + */ + events: TAbiEvents; + /** + * Optional maximum block range, if your RPC limits the amount of blocks fetched at a time. + */ + maxBlockRange?: bigint; +}; + +export type BlockRangeToLogsResult = OperatorFunction< + { startBlock: BlockNumber; endBlock: BlockNumber }, + FetchLogsResult +>; + +/** + * Takes in an observable of `Observable<{ startBlock: bigint, endBlock: bigint }>` and uses a viem `publicClient` to get logs for the contract `address` and matching `events` and emits the logs as they are fetched. + * + * @param {BlockRangeToLogsOptions} options See `BlockRangeToLogsOptions`. + * @returns {BlockRangeToLogsResult} An operator function that transforms a stream of block ranges into a stream of fetched logs. + */ +export function blockRangeToLogs({ + publicClient, + address, + events, + maxBlockRange, +}: BlockRangeToLogsOptions): BlockRangeToLogsResult { + let fromBlock: bigint; + let toBlock: bigint; + + return pipe( + tap(({ endBlock, startBlock }) => { + fromBlock ??= startBlock; + toBlock = endBlock; + }), + // concatMap only processes the next emission once the inner observable completes, + // so it always uses the latest`toBlock` value. + concatMap(() => { + if (fromBlock > toBlock) return EMPTY; + return from( + fetchLogs({ + publicClient, + address, + events, + fromBlock, + toBlock, + maxBlockRange, + }) + ).pipe( + tap(({ toBlock }) => { + fromBlock = toBlock + 1n; + }) + ); + }) + ); +} diff --git a/packages/block-logs-stream/src/createBlockStream.ts b/packages/block-logs-stream/src/createBlockStream.ts new file mode 100644 index 0000000000..b51e11ff6b --- /dev/null +++ b/packages/block-logs-stream/src/createBlockStream.ts @@ -0,0 +1,20 @@ +import { Observable } from "rxjs"; +import type { Block, BlockTag, PublicClient } from "viem"; + +export type CreateBlockStreamOptions = { + publicClient: PublicClient; + blockTag: BlockTag; +}; + +export type CreateBlockStreamResult = Observable; + +export function createBlockStream({ publicClient, blockTag }: CreateBlockStreamOptions): CreateBlockStreamResult { + return new Observable(function subscribe(subscriber) { + return publicClient.watchBlocks({ + blockTag, + emitOnBegin: true, + onBlock: (block) => subscriber.next(block), + onError: (error) => subscriber.error(error), + }); + }); +} diff --git a/packages/block-logs-stream/src/debug.ts b/packages/block-logs-stream/src/debug.ts new file mode 100644 index 0000000000..b6536cc8aa --- /dev/null +++ b/packages/block-logs-stream/src/debug.ts @@ -0,0 +1,3 @@ +import createDebug from "debug"; + +export const debug = createDebug("mud:block-events-stream"); diff --git a/packages/block-logs-stream/src/fetchLogs.test.ts b/packages/block-logs-stream/src/fetchLogs.test.ts new file mode 100644 index 0000000000..7f95f507b9 --- /dev/null +++ b/packages/block-logs-stream/src/fetchLogs.test.ts @@ -0,0 +1,440 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { + EIP1193RequestFn, + LimitExceededRpcError, + RpcLog, + RpcRequestError, + Transport, + createPublicClient, + createTransport, + hexToNumber, +} from "viem"; +import { fetchLogs } from "./fetchLogs"; + +const mockedTransportRequest = vi.fn, ReturnType>(); +const mockTransport: Transport = () => + createTransport({ + key: "mock", + name: "Mock Transport", + request: mockedTransportRequest as any, + type: "mock", + }); + +const publicClient = createPublicClient({ + transport: mockTransport, +}); + +describe("fetchLogs", () => { + beforeEach(() => { + mockedTransportRequest.mockClear(); + }); + + it("yields chunks of logs for the block range", async () => { + const requests: any[] = []; + mockedTransportRequest.mockImplementation(async ({ method, params }): Promise => { + requests.push(params); + if (method !== "eth_getLogs") throw new Error("not implemented"); + return []; + }); + + const results = []; + for await (const result of fetchLogs({ + publicClient, + address: "0x", + events: [], + fromBlock: 0n, + toBlock: 500n, + maxBlockRange: 100n, + })) { + results.push(result); + } + + expect(requests).toMatchInlineSnapshot(` + [ + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x64", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x65", + "toBlock": "0xc9", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0xca", + "toBlock": "0x12e", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x12f", + "toBlock": "0x193", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x194", + "toBlock": "0x1f4", + "topics": [ + [], + ], + }, + ], + ] + `); + + expect(results).toMatchInlineSnapshot(` + [ + { + "fromBlock": 0n, + "logs": [], + "toBlock": 100n, + }, + { + "fromBlock": 101n, + "logs": [], + "toBlock": 201n, + }, + { + "fromBlock": 202n, + "logs": [], + "toBlock": 302n, + }, + { + "fromBlock": 303n, + "logs": [], + "toBlock": 403n, + }, + { + "fromBlock": 404n, + "logs": [], + "toBlock": 500n, + }, + ] + `); + }); + + it("reduces block range if block range is exceeded", async () => { + const requests: any[] = []; + mockedTransportRequest.mockImplementation(async ({ method, params }): Promise => { + if (method !== "eth_getLogs") throw new Error("not implemented"); + requests.push(params); + + if (hexToNumber((params as any)[0].toBlock) - hexToNumber((params as any)[0].fromBlock) > 500) { + throw new LimitExceededRpcError( + new RpcRequestError({ + body: (params as any)[0], + url: "https://mud.dev", + error: { + code: -32005, + message: "block range exceeded", + }, + }) + ); + } + + return []; + }); + + const results = []; + for await (const result of fetchLogs({ + publicClient, + address: "0x", + events: [], + fromBlock: 0n, + toBlock: 2000n, + })) { + results.push(result); + } + + expect(requests).toMatchInlineSnapshot(` + [ + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x3e8", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x3e8", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x3e8", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x3e8", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x1f4", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x1f5", + "toBlock": "0x5dd", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x1f5", + "toBlock": "0x5dd", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x1f5", + "toBlock": "0x5dd", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x1f5", + "toBlock": "0x5dd", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x1f5", + "toBlock": "0x3e9", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x3ea", + "toBlock": "0x7d0", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x3ea", + "toBlock": "0x7d0", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x3ea", + "toBlock": "0x7d0", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x3ea", + "toBlock": "0x7d0", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x3ea", + "toBlock": "0x5dd", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x5de", + "toBlock": "0x7d0", + "topics": [ + [], + ], + }, + ], + ] + `); + + expect(results).toMatchInlineSnapshot(` + [ + { + "fromBlock": 0n, + "logs": [], + "toBlock": 500n, + }, + { + "fromBlock": 501n, + "logs": [], + "toBlock": 1001n, + }, + { + "fromBlock": 1002n, + "logs": [], + "toBlock": 1501n, + }, + { + "fromBlock": 1502n, + "logs": [], + "toBlock": 2000n, + }, + ] + `); + }); + + it("retries if rate limit is exceeded", async () => { + const requests: any[] = []; + mockedTransportRequest.mockImplementation(async ({ method, params }): Promise => { + if (method !== "eth_getLogs") throw new Error("not implemented"); + requests.push(params); + + if (requests.length < 3) { + throw new LimitExceededRpcError( + new RpcRequestError({ + body: (params as any)[0], + url: "https://viem.sh", + error: { + code: -32005, + message: "rate limit exceeded", + }, + }) + ); + } + + return []; + }); + + const results = []; + for await (const result of fetchLogs({ + publicClient, + address: "0x", + events: [], + fromBlock: 0n, + toBlock: 500n, + })) { + results.push(result); + } + + expect(requests).toMatchInlineSnapshot(` + [ + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x1f4", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x1f4", + "topics": [ + [], + ], + }, + ], + [ + { + "address": "0x", + "fromBlock": "0x0", + "toBlock": "0x1f4", + "topics": [ + [], + ], + }, + ], + ] + `); + + expect(results).toMatchInlineSnapshot(` + [ + { + "fromBlock": 0n, + "logs": [], + "toBlock": 500n, + }, + ] + `); + }); +}); diff --git a/packages/block-logs-stream/src/fetchLogs.ts b/packages/block-logs-stream/src/fetchLogs.ts new file mode 100644 index 0000000000..1031c856e8 --- /dev/null +++ b/packages/block-logs-stream/src/fetchLogs.ts @@ -0,0 +1,101 @@ +import { AbiEvent, Address } from "abitype"; +import { PublicClient, BlockNumber } from "viem"; +import { GetLogsResult, getLogs } from "./getLogs"; +import { bigIntMin, wait } from "./utils"; +import { debug } from "./debug"; + +export type FetchLogsOptions = { + /** + * [viem `PublicClient`][0] used for fetching logs from the RPC. + * + * [0]: https://viem.sh/docs/clients/public.html + */ + publicClient: PublicClient; + /** + * Optional contract address(es) to fetch logs for. + */ + address?: Address | Address[]; + /** + * Events to fetch logs for. + */ + events: TAbiEvents; + /** + * The block number to start fetching logs from (inclusive). + */ + fromBlock: BlockNumber; + /** + * The block number to stop fetching logs at (inclusive). + */ + toBlock: BlockNumber; + /** + * Optional maximum block range, if your RPC limits the amount of blocks fetched at a time. Defaults to 1000n. + */ + maxBlockRange?: bigint; + /** + * Optional maximum amount of retries if the RPC returns a rate limit error. Defaults to 3. + */ + maxRetryCount?: number; +}; + +export type FetchLogsResult = { + fromBlock: BlockNumber; + toBlock: BlockNumber; + logs: GetLogsResult; +}; + +/** + * An asynchronous generator function that fetches logs from the blockchain in a range of blocks. + * + * @remarks + * The function will fetch logs according to the given options. + * It will iteratively move forward in the block range, yielding fetched logs as they become available. + * If the function encounters rate limits, it will retry until `maxRetryCount` is reached. + * If the function encounters a block range that is too large, it will half the block range and retry, until the block range can't be halved anymore. + * + * @param {FetchLogsOptions} options See `FetchLogsOptions`. + * + * @yields The result of the fetched logs for each block range in the given range. + * + * @throws Will throw an error if the block range can't be reduced any further. + */ +export async function* fetchLogs({ + maxBlockRange = 1000n, + maxRetryCount = 3, + ...getLogsOpts +}: FetchLogsOptions): AsyncGenerator> { + let fromBlock = getLogsOpts.fromBlock; + let blockRange = bigIntMin(maxBlockRange, getLogsOpts.toBlock - fromBlock); + let retryCount = 0; + + while (fromBlock <= getLogsOpts.toBlock) { + try { + const toBlock = fromBlock + blockRange; + const logs = await getLogs({ ...getLogsOpts, fromBlock, toBlock }); + yield { fromBlock, toBlock, logs }; + fromBlock = toBlock + 1n; + blockRange = bigIntMin(maxBlockRange, getLogsOpts.toBlock - fromBlock); + } catch (error: unknown) { + if (!(error instanceof Error)) throw error; + + if (error.message.includes("rate limit exceeded") && retryCount < maxRetryCount) { + const seconds = 2 * retryCount; + debug(`too many requests, retrying in ${seconds}s`, error); + await wait(1000 * seconds); + retryCount += 1; + continue; + } + + if (error.message.includes("block range exceeded")) { + blockRange /= 2n; + if (blockRange <= 0n) { + throw new Error("can't reduce block range any further"); + } + debug("block range exceeded, trying a smaller block range", error); + // TODO: adjust maxBlockRange down if we consistently hit this for a given block range size + continue; + } + + throw error; + } + } +} diff --git a/packages/block-logs-stream/src/getLogs.ts b/packages/block-logs-stream/src/getLogs.ts new file mode 100644 index 0000000000..0d3a4cc11f --- /dev/null +++ b/packages/block-logs-stream/src/getLogs.ts @@ -0,0 +1,72 @@ +import { AbiEvent } from "abitype"; +import { + Address, + BlockNumber, + BlockTag, + Log, + PublicClient, + decodeEventLog, + encodeEventTopics, + numberToHex, + formatLog, +} from "viem"; +import { isDefined } from "@latticexyz/common/utils"; + +// Based on https://github.com/wagmi-dev/viem/blob/main/src/actions/public/getLogs.ts +// TODO: swap this out once viem has support for multiple events: https://github.com/wagmi-dev/viem/pull/633 + +export type GetLogsOptions = { + publicClient: PublicClient; + address?: Address | Address[]; + events: TAbiEvents; + fromBlock: BlockNumber | BlockTag; + toBlock: BlockNumber | BlockTag; +}; + +export type GetLogsResult = Log< + bigint, + number, + TAbiEvents[number], + true, + TAbiEvents +>[]; + +export async function getLogs({ + publicClient, + address, + events, + fromBlock, + toBlock, +}: GetLogsOptions): Promise> { + const topics = [events.flatMap((event) => encodeEventTopics({ abi: [event], eventName: event.name }))]; + + const logs = await publicClient.request({ + method: "eth_getLogs", + params: [ + { + address, + topics, + fromBlock: typeof fromBlock === "bigint" ? numberToHex(fromBlock) : fromBlock, + toBlock: typeof toBlock === "bigint" ? numberToHex(toBlock) : toBlock, + }, + ], + }); + + return logs + .map((log) => { + try { + const { eventName, args } = decodeEventLog({ + abi: events, + data: log.data, + topics: log.topics, + strict: true, + }); + return formatLog(log, { args, eventName }); + } catch (err) { + // We're using strict mode, so just skip if there is an error decoding. + // https://viem.sh/docs/actions/public/getLogs.html#strict-mode + return; + } + }) + .filter(isDefined) as GetLogsResult; +} diff --git a/packages/block-logs-stream/src/groupLogsByBlockNumber.test.ts b/packages/block-logs-stream/src/groupLogsByBlockNumber.test.ts new file mode 100644 index 0000000000..5c8633a832 --- /dev/null +++ b/packages/block-logs-stream/src/groupLogsByBlockNumber.test.ts @@ -0,0 +1,110 @@ +import { describe, it, expect } from "vitest"; +import { groupLogsByBlockNumber } from "./groupLogsByBlockNumber"; +import { Log } from "viem"; + +describe("groupLogsByBlockNumber", () => { + it("groups logs by block number and correctly sorts them", () => { + const logs = [ + { + blockNumber: 1n, + blockHash: "0x", + logIndex: 4, + transactionHash: "0x", + transactionIndex: 0, + }, + { + blockNumber: 5n, + blockHash: "0x", + logIndex: 0, + transactionHash: "0x", + transactionIndex: 0, + }, + { + blockNumber: 1n, + blockHash: "0x", + logIndex: 0, + transactionHash: "0x", + transactionIndex: 0, + }, + { + blockNumber: 1n, + blockHash: "0x", + logIndex: 2, + transactionHash: "0x", + transactionIndex: 0, + }, + { + blockNumber: null, + blockHash: null, + logIndex: null, + transactionHash: null, + transactionIndex: null, + }, + { + blockNumber: 3n, + blockHash: "0x", + logIndex: 3, + transactionHash: "0x", + transactionIndex: 0, + }, + ] as any as Log[]; + + expect(groupLogsByBlockNumber(logs)).toMatchInlineSnapshot(` + [ + { + "blockHash": "0x", + "blockNumber": 1n, + "logs": [ + { + "blockHash": "0x", + "blockNumber": 1n, + "logIndex": 0, + "transactionHash": "0x", + "transactionIndex": 0, + }, + { + "blockHash": "0x", + "blockNumber": 1n, + "logIndex": 2, + "transactionHash": "0x", + "transactionIndex": 0, + }, + { + "blockHash": "0x", + "blockNumber": 1n, + "logIndex": 4, + "transactionHash": "0x", + "transactionIndex": 0, + }, + ], + }, + { + "blockHash": "0x", + "blockNumber": 3n, + "logs": [ + { + "blockHash": "0x", + "blockNumber": 3n, + "logIndex": 3, + "transactionHash": "0x", + "transactionIndex": 0, + }, + ], + }, + { + "blockHash": "0x", + "blockNumber": 5n, + "logs": [ + { + "blockHash": "0x", + "blockNumber": 5n, + "logIndex": 0, + "transactionHash": "0x", + "transactionIndex": 0, + }, + ], + }, + ] + `); + }); +}); diff --git a/packages/block-logs-stream/src/groupLogsByBlockNumber.ts b/packages/block-logs-stream/src/groupLogsByBlockNumber.ts new file mode 100644 index 0000000000..0c24ce1312 --- /dev/null +++ b/packages/block-logs-stream/src/groupLogsByBlockNumber.ts @@ -0,0 +1,50 @@ +import { BlockNumber, Hex, Log } from "viem"; +import { NonPendingLog, isNonPendingLog } from "./isNonPendingLog"; +import { bigIntSort } from "./utils"; +import { isDefined } from "@latticexyz/common/utils"; +import { debug } from "./debug"; + +/** + * Groups logs by their block number. + * + * @remarks + * This function takes an array of logs and returns a new array where each item corresponds to a distinct block number. + * Each item in the output array includes the block number, the block hash, and an array of all logs for that block. + * Pending logs are filtered out before processing, as they don't have block numbers. + * + * @param logs The logs to group by block number. + * + * @returns An array of objects where each object represents a distinct block and includes the block number, + * the block hash, and an array of logs for that block. + */ +export function groupLogsByBlockNumber( + logs: readonly TLog[] +): { blockNumber: BlockNumber; blockHash: Hex; logs: readonly NonPendingLog[] }[] { + // Pending logs don't have block numbers, so filter them out. + const nonPendingLogs = logs.filter(isNonPendingLog); + if (logs.length !== nonPendingLogs.length) { + debug( + "pending logs discarded", + logs.filter((log) => !isNonPendingLog(log)) + ); + } + + const blockNumbers = Array.from(new Set(nonPendingLogs.map((log) => log.blockNumber))); + blockNumbers.sort(bigIntSort); + + return blockNumbers + .map((blockNumber) => { + const blockLogs = nonPendingLogs.filter((log) => log.blockNumber === blockNumber); + if (!blockLogs.length) return; + blockLogs.sort((a, b) => (a.logIndex < b.logIndex ? -1 : a.logIndex > b.logIndex ? 1 : 0)); + + if (!blockLogs.length) return; + + return { + blockNumber, + blockHash: blockLogs[0].blockHash, + logs: blockLogs, + }; + }) + .filter(isDefined); +} diff --git a/packages/block-logs-stream/src/index.ts b/packages/block-logs-stream/src/index.ts new file mode 100644 index 0000000000..f7ee62ef5b --- /dev/null +++ b/packages/block-logs-stream/src/index.ts @@ -0,0 +1,6 @@ +export * from "./blockRangeToLogs"; +export * from "./createBlockStream"; +export * from "./fetchLogs"; +export * from "./groupLogsByBlockNumber"; +export * from "./isNonPendingBlock"; +export * from "./isNonPendingLog"; diff --git a/packages/block-logs-stream/src/isNonPendingBlock.ts b/packages/block-logs-stream/src/isNonPendingBlock.ts new file mode 100644 index 0000000000..5545fc23a5 --- /dev/null +++ b/packages/block-logs-stream/src/isNonPendingBlock.ts @@ -0,0 +1,12 @@ +import type { Block } from "viem"; + +export type NonPendingBlock = TBlock & { + hash: NonNullable; + logsBloom: NonNullable; + nonce: NonNullable; + number: NonNullable; +}; + +export function isNonPendingBlock(block: TBlock): block is NonPendingBlock { + return block.hash != null && block.logsBloom != null && block.nonce != null && block.number != null; +} diff --git a/packages/block-logs-stream/src/isNonPendingLog.ts b/packages/block-logs-stream/src/isNonPendingLog.ts new file mode 100644 index 0000000000..6110d18171 --- /dev/null +++ b/packages/block-logs-stream/src/isNonPendingLog.ts @@ -0,0 +1,19 @@ +import type { Log } from "viem"; + +export type NonPendingLog = TLog & { + blockHash: NonNullable; + blockNumber: NonNullable; + logIndex: NonNullable; + transactionHash: NonNullable; + transactionIndex: NonNullable; +}; + +export function isNonPendingLog(log: TLog): log is NonPendingLog { + return ( + log.blockHash != null && + log.blockNumber != null && + log.logIndex != null && + log.transactionHash != null && + log.transactionIndex != null + ); +} diff --git a/packages/block-logs-stream/src/utils.ts b/packages/block-logs-stream/src/utils.ts new file mode 100644 index 0000000000..b365dd9dd1 --- /dev/null +++ b/packages/block-logs-stream/src/utils.ts @@ -0,0 +1,17 @@ +// javascript, y u no support bigints better? + +export function bigIntMin(...args: bigint[]): bigint { + return args.reduce((m, e) => (e < m ? e : m)); +} + +export function bigIntMax(...args: bigint[]): bigint { + return args.reduce((m, e) => (e > m ? e : m)); +} + +export function bigIntSort(a: bigint, b: bigint): -1 | 0 | 1 { + return a < b ? -1 : a > b ? 1 : 0; +} + +export function wait(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/packages/block-logs-stream/tsconfig.json b/packages/block-logs-stream/tsconfig.json new file mode 100644 index 0000000000..e590f0c026 --- /dev/null +++ b/packages/block-logs-stream/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "es2021", + "module": "esnext", + "moduleResolution": "node", + "declaration": true, + "sourceMap": true, + "outDir": "dist", + "isolatedModules": true, + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "strict": true + } +} diff --git a/packages/block-logs-stream/tsup.config.ts b/packages/block-logs-stream/tsup.config.ts new file mode 100644 index 0000000000..b755469f90 --- /dev/null +++ b/packages/block-logs-stream/tsup.config.ts @@ -0,0 +1,11 @@ +import { defineConfig } from "tsup"; + +export default defineConfig({ + entry: ["src/index.ts"], + target: "esnext", + format: ["esm"], + dts: false, + sourcemap: true, + clean: true, + minify: true, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e8467a5773..5e59ef7512 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -48,6 +48,40 @@ importers: specifier: ^4.9.5 version: 4.9.5 + packages/block-logs-stream: + dependencies: + '@latticexyz/common': + specifier: workspace:* + version: link:../common + '@latticexyz/config': + specifier: workspace:* + version: link:../config + '@latticexyz/schema-type': + specifier: workspace:* + version: link:../schema-type + abitype: + specifier: 0.8.7 + version: 0.8.7(typescript@5.0.4) + debug: + specifier: ^4.3.4 + version: 4.3.4(supports-color@8.1.1) + rxjs: + specifier: 7.5.5 + version: 7.5.5 + viem: + specifier: 1.1.7 + version: 1.1.7(typescript@5.0.4) + devDependencies: + '@types/debug': + specifier: ^4.1.7 + version: 4.1.7 + tsup: + specifier: ^6.7.0 + version: 6.7.0(postcss@8.4.23)(typescript@5.0.4) + vitest: + specifier: 0.31.4 + version: 0.31.4 + packages/cli: dependencies: '@ethersproject/abi':