From ad3f09d074b8a526b1de767dea2fdcf3a80c8a1e Mon Sep 17 00:00:00 2001 From: Louis-Amas Date: Fri, 1 Mar 2024 13:31:22 +0100 Subject: [PATCH] fix: logs not an array --- src/blockManager.ts | 313 +++++++++++++++++++++----------------------- 1 file changed, 151 insertions(+), 162 deletions(-) diff --git a/src/blockManager.ts b/src/blockManager.ts index 9f9bc8f..e79ad69 100644 --- a/src/blockManager.ts +++ b/src/blockManager.ts @@ -6,7 +6,8 @@ import LogSubscriber from "./logSubscriber"; import { Result } from "./util/types"; import { Mutex } from "async-mutex"; -const ZERO_ADDRESS = '0x0000000000000000000000000000000000000000000000000000000000000000'; +const ZERO_ADDRESS = + "0x0000000000000000000000000000000000000000000000000000000000000000"; // eslint-disable-next-line @typescript-eslint/no-namespace namespace BlockManager { @@ -52,8 +53,7 @@ namespace BlockManager { export type ErrorOrLogsWithCommonAncestor = Result< { logs: Log[]; // if commonAncestor exists than it's returning logs from commonAncestor.number + 1 to newblock.number - commonAncestor?: Block;// commonAncestor - + commonAncestor?: Block; // commonAncestor }, { error: @@ -101,8 +101,8 @@ namespace BlockManager { */ retryDelayGetLogsMs: number; /** - * Batch block size - */ + * Batch block size + */ batchSize: number; }; @@ -172,7 +172,7 @@ class BlockManager { public async getBlock( blockNumber: number, - exclusive: boolean = true, + exclusive: boolean = true ): Promise { if (!exclusive) { return this.blocksByNumber[blockNumber]; @@ -197,7 +197,7 @@ class BlockManager { * Initialize the BlockManager cache with block */ public async initialize(block: BlockManager.Block) { - logger.info('[BlockManager] initialize()', { data: { block ,}}); + logger.info("[BlockManager] initialize()", { data: { block } }); this.lastBlock = block; this.blocksByNumber = {}; @@ -213,8 +213,8 @@ class BlockManager { /* subscribeToLogs enables a subscription for all logs emitted for the contract at the address. * Only one subscription can exist by address. Calling a second time this function with the same - * address will result in cancelling the previous subscription. - * */ + * address will result in cancelling the previous subscription. + * */ public async subscribeToLogs( addressAndTopics: BlockManager.AddressAndTopics, subscriber: LogSubscriber @@ -252,16 +252,15 @@ class BlockManager { ]; this.countsBlocksCached--; } - } - /** - * Find commonAncestor between RPC is the local cache. - * This methods compare blocks between cache and RPC until it finds a matching block. - * It return the matching block - * This methods compares blocks between cache and RPC until it finds a matching block. - * It return the matching block. - */ + /** + * Find commonAncestor between RPC is the local cache. + * This methods compare blocks between cache and RPC until it finds a matching block. + * It return the matching block + * This methods compares blocks between cache and RPC until it finds a matching block. + * It return the matching block. + */ private async findCommonAncestor( rec: number = 0 ): Promise { @@ -278,7 +277,7 @@ class BlockManager { const rpcBlocks = await this.options.getBlocksBatch( this.lastBlock!.number - this.options.batchSize, - this.lastBlock!.number, + this.lastBlock!.number ); if (rpcBlocks.error) { @@ -294,10 +293,10 @@ class BlockManager { const cachedBlock = this.blocksByNumber[currentBlockNumber]; if (fetchedBlock.hash === cachedBlock.hash) { - return { - error: undefined, + return { + error: undefined, ok: cachedBlock, - } + }; } } @@ -328,7 +327,10 @@ class BlockManager { blocksPromises.push(this.options.getBlock(i)); } - const blocks = await this.options.getBlocksBatch(this.lastBlock!.number + 1, newBlock.number); + const blocks = await this.options.getBlocksBatch( + this.lastBlock!.number + 1, + newBlock.number + ); if (blocks.error) { return this.populateValidChainUntilBlock(newBlock, rec + 1); @@ -356,10 +358,10 @@ class BlockManager { /** * Establish a valid chain with last block = newBlock.number. - * + * * Returns found commonAncestor. */ private async handleReorg( - newBlock: BlockManager.Block, + newBlock: BlockManager.Block ): Promise { let { error, ok: commonAncestor } = await this.findCommonAncestor(); @@ -385,14 +387,11 @@ class BlockManager { }; } - logger.debug( - '[BlockManager] handleReorg(): commonAncestor', - { - data: { - commonAncestor, - } - } - ); + logger.debug("[BlockManager] handleReorg(): commonAncestor", { + data: { + commonAncestor, + }, + }); /* remove all blocks that has been reorged from cache */ for (let i = commonAncestor!.number + 1; i <= this.lastBlock!.number; ++i) { @@ -407,7 +406,7 @@ class BlockManager { return { error: undefined, ok: commonAncestor! }; } - /** + /** * * queryLogs function tries to get logs between fromBlock (excluded) to toBlock (included). This * function handles retry and reorg. The function expect that all blocks between fromBlock and toBlock @@ -418,17 +417,14 @@ class BlockManager { toBlock: BlockManager.Block, rec = 0, commonAncestor?: BlockManager.Block, - blocksMap?: Record, + blocksMap?: Record ): Promise { - logger.debug( - '[BlockManager] queryLogs()', - { - data: { - fromBlock, - toBlock, - } - } - ); + logger.debug("[BlockManager] queryLogs()", { + data: { + fromBlock, + toBlock, + }, + }); if (rec > this.options.maxRetryGetLogs) { return { error: { @@ -449,22 +445,25 @@ class BlockManager { /* the rpc might be a bit late, wait retryDelayGetLogsMs to let it catch up */ await sleep(this.options.retryDelayGetLogsMs); - if ((!error.includes("not processed yet") && !error.includes("cannot be found"))) { - logger.error( - '[BlockManager] queryLogs(): failure', - { - data: { - error, - fromBlock, - toBlock, - } - } - ); + if ( + !error.includes("not processed yet") && + !error.includes("cannot be found") + ) { + logger.error("[BlockManager] queryLogs(): failure", { + data: { + error, + fromBlock, + toBlock, + }, + }); } return this.queryLogs(fromBlock, toBlock, rec + 1); } - const logs = ok!; + let logs = ok!; + if (!Array.isArray(logs)) { + logs = []; + } /* DIRTY: if we detected a reorg we already repopulate the chain until toBlock.number */ if (!commonAncestor && !blocksMap) { @@ -472,8 +471,9 @@ class BlockManager { } for (const log of logs) { - const block = - blocksMap ? blocksMap[log.blockNumber] : this.blocksByNumber[log.blockNumber]; // TODO: verify that block exists + const block = blocksMap + ? blocksMap[log.blockNumber] + : this.blocksByNumber[log.blockNumber]; // TODO: verify that block exists if (!block) { return { @@ -498,10 +498,15 @@ class BlockManager { ok: undefined, }; } - /** Our cache is consistent again we retry queryLogs, - * we should retry with from = _commonAncestor, to get all rollbacked events. - * */ - return this.queryLogs(_commonAncestor, toBlock, rec + 1, _commonAncestor); + /** Our cache is consistent again we retry queryLogs, + * we should retry with from = _commonAncestor, to get all rollbacked events. + * */ + return this.queryLogs( + _commonAncestor, + toBlock, + rec + 1, + _commonAncestor + ); } } @@ -544,15 +549,12 @@ class BlockManager { const subscriber = this.subscribersByAddress[address]; subscriber.initializedAt = block; subscriber.lastSeenEventBlock = block; - logger.debug( - '[BlockManager] subscriberInitialize()', - { - data: { - address, - block, - } - } - ); + logger.debug("[BlockManager] subscriberInitialize()", { + data: { + address, + block, + }, + }); } } } @@ -591,29 +593,23 @@ class BlockManager { * it needs to be initialized again. **/ this.waitingToBeInitializedSet.add(address); - logger.info( - '[BlockManager] addToInitializeList()', - { - data: { - initializedAt: subscriber.initializedAt, - block, - } - } - ); + logger.info("[BlockManager] addToInitializeList()", { + data: { + initializedAt: subscriber.initializedAt, + block, + }, + }); } else if ( subscriber.lastSeenEventBlock && subscriber.lastSeenEventBlock.number > block.number ) { subscriber.rollback(block); - logger.info( - '[BlockManager] rollback()', - { - data: { - address, - block, - } - } - ); + logger.info("[BlockManager] rollback()", { + data: { + address, + block, + }, + }); } } } @@ -624,27 +620,31 @@ class BlockManager { this.checkLastBlockExist(); let from = this.lastBlock!.number + 1; - logger.info( - `[BlockManager] handleBatchBlock()`, { data: newBlock }, - ); + logger.info(`[BlockManager] handleBatchBlock()`, { data: newBlock }); const logs: Log[] = []; do { - const countBlocksLeft = (newBlock.number - from) + 1; // from is included + const countBlocksLeft = newBlock.number - from + 1; // from is included logger.debug( - `[BlockManager] handleBatchBlock() still ${countBlocksLeft} blocks left to handle`, + `[BlockManager] handleBatchBlock() still ${countBlocksLeft} blocks left to handle` ); - const to = this.options.batchSize >= countBlocksLeft ? newBlock.number : from + this.options.batchSize; + const to = + this.options.batchSize >= countBlocksLeft + ? newBlock.number + : from + this.options.batchSize; /* fetch all blocks between from and to */ const blocksResult = await this.options.getBlocksBatch(from - 1, to); if (blocksResult.error) { - return { error: blocksResult.error, ok: undefined}; + return { error: blocksResult.error, ok: undefined }; } - const blocks = blocksResult.ok.slice(1); /* extract blocks between from (included) and to (included) */ + const blocks = + blocksResult.ok.slice( + 1 + ); /* extract blocks between from (included) and to (included) */ /* build a block map number to block */ const blocksMap = blocksResult.ok.reduce((acc, block) => { @@ -656,31 +656,28 @@ class BlockManager { const toBlock = blocks[blocks.length - 1]; const fromBlock = blocks[0]; - /** - * when quering block with a multicall sometimes it return empty block hash for block latest - * we can override it our self, because later we check that chain valid see ref:A. - **/ + /** + * when quering block with a multicall sometimes it return empty block hash for block latest + * we can override it our self, because later we check that chain valid see ref:A. + **/ if (toBlock.hash === ZERO_ADDRESS) { if (toBlock.number === newBlock.number) { toBlock.hash = newBlock.hash; // repair problem with multi call } } - logger.debug( - '[BlockManager] handleBatchBlock()', - { - data: { - from: fromBlock, - to: toBlock, - } - } - ); + logger.debug("[BlockManager] handleBatchBlock()", { + data: { + from: fromBlock, + to: toBlock, + }, + }); /** - * ref: A - * Here we check if the batch of blocks we queried is consitent with the chain we have in cache - * if not then we detected a reorg - */ + * ref: A + * Here we check if the batch of blocks we queried is consitent with the chain we have in cache + * if not then we detected a reorg + */ if (this.lastBlock!.hash !== fromBlock.parentHash) { logger.warn(`[BlockManager] batch detected a reorg`, { data: { @@ -689,9 +686,8 @@ class BlockManager { }, }); - const { error: reorgError, ok: reorgAncestor } = await this.handleReorg( - newBlock, + newBlock ); if (reorgError) { @@ -713,7 +709,7 @@ class BlockManager { toBlock, 0, undefined, - blocksMap, + blocksMap ); if (queryLogsError) { @@ -736,36 +732,35 @@ class BlockManager { await this.handleSubscribersInitialize(newBlock); await this.handleBlockPostHooks(); - } else { - const blocksMap = blocks.reduce((acc, block) => { - acc[block.number] = block; - return acc; - }, {} as Record); + const blocksMap = blocks.reduce((acc, block) => { + acc[block.number] = block; + return acc; + }, {} as Record); - const { error: queryLogsError, ok: okLogs } = await this.queryLogs( - this.lastBlock!, - toBlock, - 0, - undefined, - blocksMap, - ); + const { error: queryLogsError, ok: okLogs } = await this.queryLogs( + this.lastBlock!, + toBlock, + 0, + undefined, + blocksMap + ); - if (queryLogsError) { - return { error: "FailedFetchingLog", ok: undefined }; - } - - if (okLogs.commonAncestor) { - this.rollbackSubscribers(okLogs.commonAncestor); - } else { - /* construct valid chain */ - for (const block of blocks) { - this.setLastBlock(block); - } + if (queryLogsError) { + return { error: "FailedFetchingLog", ok: undefined }; + } + + if (okLogs.commonAncestor) { + this.rollbackSubscribers(okLogs.commonAncestor); + } else { + /* construct valid chain */ + for (const block of blocks) { + this.setLastBlock(block); } + } - logs.push(...okLogs.logs); - await this.applyLogs(okLogs.logs); + logs.push(...okLogs.logs); + await this.applyLogs(okLogs.logs); } from = toBlock.number + 1; @@ -779,7 +774,7 @@ class BlockManager { ok: { logs: logs, }, - } + }; } async _handleBlock( @@ -790,13 +785,13 @@ class BlockManager { if (cachedBlock && cachedBlock.hash === newBlock.hash) { /* newBlock is already stored in cache bail out*/ logger.debug( - '[BlockManager] handleBlock() block already in cache, ignoring...', - { data: newBlock, }, + "[BlockManager] handleBlock() block already in cache, ignoring...", + { data: newBlock } ); return { error: undefined, ok: { logs: [], rollback: undefined } }; } - if ( (newBlock.number - this.lastBlock!.number) > 1) { + if (newBlock.number - this.lastBlock!.number > 1) { return await this.handleBatchBlock(newBlock); } @@ -804,15 +799,12 @@ class BlockManager { if (newBlock.parentHash !== this.lastBlock!.hash) { /* newBlock is not successor of this.lastBlock a reorg has been detected */ - logger.warn( - '[BlockManager] handleBlock() reorg', - { - data: { - last: this.lastBlock, - newBlock: newBlock, - } - } - ); + logger.warn("[BlockManager] handleBlock() reorg", { + data: { + last: this.lastBlock, + newBlock: newBlock, + }, + }); const { error: reorgError, ok: reorgAncestor } = await this.handleReorg( newBlock @@ -835,7 +827,7 @@ class BlockManager { reorgAncestor, newBlock, 0, - reorgAncestor, + reorgAncestor ); if (queryLogsError) { @@ -875,13 +867,10 @@ class BlockManager { }, }; } else { - logger.debug( - `[BlockManager] handleBlock() normal`, - { data: newBlock }, - ); + logger.debug(`[BlockManager] handleBlock() normal`, { data: newBlock }); const { error: queryLogsError, ok: okQueryLogs } = await this.queryLogs( this.lastBlock!, - newBlock, + newBlock ); if (queryLogsError) { @@ -914,7 +903,7 @@ class BlockManager { }, }; } -} + } /** * Add new block in BlockManager cache, detect reorganization, and ensure that cache is consistent @@ -923,7 +912,7 @@ class BlockManager { newBlock: BlockManager.Block ): Promise { return await this.mutex.runExclusive(async () => { - return this._handleBlock(newBlock) + return this._handleBlock(newBlock); }); } }