Skip to content
This repository has been archived by the owner on Mar 15, 2023. It is now read-only.

Commit

Permalink
near: abide by rate limits, add logging
Browse files Browse the repository at this point in the history
  • Loading branch information
heyitaki committed Jan 4, 2023
1 parent 7364bf3 commit 5f423b1
Show file tree
Hide file tree
Showing 8 changed files with 678 additions and 1,807 deletions.
2,349 changes: 599 additions & 1,750 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions watcher/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"firebase-admin": "^11.4.0",
"js-sha512": "^0.8.0",
"near-api-js": "^1.1.0",
"ora": "^5.4.1",
"winston": "^3.8.2"
},
"devDependencies": {
Expand Down
30 changes: 21 additions & 9 deletions watcher/scripts/backfillNear.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import { ChainName, CONTRACTS } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN } from '@wormhole-foundation/wormhole-monitor-common';
import { BlockResult } from 'near-api-js/lib/providers/provider';
import ora from 'ora';
import { initDb } from '../src/databases/utils';
import { getArchivalRpcProvider, getTransactionsByAccountId } from '../src/utils/near';
import {
getRateLimitedProvider,
getTransactionsByAccountId,
NEAR_ARCHIVE_RPC,
} from '../src/utils/near';
import { getMessagesFromBlockResults } from '../src/watchers/NearWatcher';

// This script exists because NEAR RPC nodes do not support querying blocks older than 5 epochs
Expand All @@ -18,29 +23,36 @@ const BATCH_SIZE = 1000;
(async () => {
const db = initDb();
const chain: ChainName = 'near';
const provider = await getArchivalRpcProvider();
const provider = await getRateLimitedProvider(NEAR_ARCHIVE_RPC);
const fromBlock = Number(
(await db.getLastBlockByChain(chain)) ?? INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN[chain] ?? 0
);

// fetch all transactions for core bridge contract from explorer
let log = ora('Fetching transactions from NEAR Explorer...').start();
const toBlock = await provider.block({ finality: 'final' });
const transactions = await getTransactionsByAccountId(
CONTRACTS.MAINNET.near.core,
BATCH_SIZE,
toBlock.header.timestamp.toString()
toBlock.header.timestamp.toString().padEnd(19, '9') // pad to nanoseconds
);
log.succeed(`Fetched ${transactions.length} transactions from NEAR Explorer`);

// filter out transactions that precede last seen block
const blocks: BlockResult[] = [];
const blockHashes = [...new Set(transactions.map((tx) => tx.blockHash))];
blockHashes.forEach(async (hash) => {
const block = await provider.block(hash);
if (block.header.height >= fromBlock && block.header.height <= toBlock.header.height) {
const blockHashes = [...new Set(transactions.map((tx) => tx.blockHash))]; // de-dup blocks
log = ora('Fetching blocks...').start();
for (let i = 0; i < blockHashes.length; i++) {
log.text = `Fetching blocks... ${i + 1}/${blockHashes.length}`;
const block = await provider.block({ blockId: blockHashes[i] });
if (block.header.height > fromBlock && block.header.height <= toBlock.header.height) {
blocks.push(block);
}
});
}

const vaasByBlock = await getMessagesFromBlockResults(provider, blocks);
log.text = `Fetched ${blocks.length} blocks`;
log.succeed();
const vaasByBlock = await getMessagesFromBlockResults(provider, blocks, true);
await db.storeVaasByBlock(chain, vaasByBlock);
log.succeed('Uploaded messages to db successfully');
})();
2 changes: 1 addition & 1 deletion watcher/src/databases/JsonDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export class JsonDatabase extends Database {
// this will always overwrite the "last" block, so take caution if manually backfilling gaps
const blockInfos = Object.keys(vaasByBlock);
if (blockInfos.length) {
this.lastBlockByChain[chainId] = blockInfos[blockInfos.length - 1];
this.lastBlockByChain[chainId] = blockInfos.sort()[blockInfos.length - 1];
writeFileSync(this.dbLastBlockFile, JSON.stringify(this.lastBlockByChain), ENCODING);
}
}
Expand Down
25 changes: 13 additions & 12 deletions watcher/src/utils/near.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,19 @@ import {
Transaction,
} from '../types/near';

const NEAR_ARCHIVE_RPC = 'https://archival-rpc.mainnet.near.org';
const NEAR_EXPLORER_TRANSACTION_URL =
'https://backend-mainnet-1713.onrender.com/trpc/transaction.listByAccountId';
export const ARCHIVAL_NODE_RATE_LIMIT_MS = 100;
export const NEAR_ARCHIVE_RPC = 'https://archival-rpc.mainnet.near.org';
export const NEAR_ARCHIVE_NODE_RATE_LIMIT_MS = 100;

export const getArchivalRpcProvider = async (): Promise<Provider> => {
const connection = await connect({ nodeUrl: NEAR_ARCHIVE_RPC, networkId: 'mainnet' });
const provider = connection.connection.provider as JsonRpcProvider;
const originalFn = provider.sendJsonRpc;
provider.sendJsonRpc = async function <T>(method: string, params: object) {
await sleep(ARCHIVAL_NODE_RATE_LIMIT_MS); // respect rate limits: 600req/min
export const getRateLimitedProvider = async (rpc: string): Promise<Provider> => {
const connection = await connect({ nodeUrl: rpc, networkId: 'mainnet' });
const provider = connection.connection.provider;
const originalFn = (provider as JsonRpcProvider).sendJsonRpc;
(provider as JsonRpcProvider).sendJsonRpc = async function <T>(method: string, params: object) {
await sleep(NEAR_ARCHIVE_NODE_RATE_LIMIT_MS); // respect rate limits: 600req/min
return originalFn.call(this, method, params) as Promise<T>;
};

return provider;
};

Expand Down Expand Up @@ -50,7 +49,9 @@ export const getTransactionsByAccountId = async (
).data as GetTransactionsByAccountIdResponse
)[0];
if ('error' in res) throw new Error(res.error.message);
return res.result.data.items.filter(
(tx) => tx.status === 'success' && tx.actions.some((a) => a.kind === 'functionCall') // other actions don't generate logs
);
return res.result.data.items
.filter(
(tx) => tx.status === 'success' && tx.actions.some((a) => a.kind === 'functionCall') // other actions don't generate logs
)
.reverse(); // return chronological order
};
File renamed without changes.
63 changes: 35 additions & 28 deletions watcher/src/watchers/NearWatcher.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { CONTRACTS } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { connect } from 'near-api-js';
import { Provider, TypedError } from 'near-api-js/lib/providers';
import { BlockResult, ExecutionStatus } from 'near-api-js/lib/providers/provider';
import ora from 'ora';
import { RPCS_BY_CHAIN } from '../consts';
import { VaasByBlock } from '../databases/types';
import { makeBlockKey, makeVaaKey } from '../databases/utils';
import { EventLog } from '../types/near';
import { isWormholePublishEventLog } from '../utils/types';
import { getRateLimitedProvider } from '../utils/near';
import { isWormholePublishEventLog } from '../utils/validation';
import { Watcher } from './Watcher';

export class NearWatcher extends Watcher {
Expand Down Expand Up @@ -53,45 +54,51 @@ export class NearWatcher extends Watcher {
}

async getProvider(): Promise<Provider> {
if (!this.provider) {
const connection = await connect({ nodeUrl: RPCS_BY_CHAIN.near!, networkId: 'mainnet' });
this.provider = connection.connection.provider;
}
return this.provider;
return (this.provider = this.provider || (await getRateLimitedProvider(RPCS_BY_CHAIN.near!)));
}
}

export const getMessagesFromBlockResults = async (
provider: Provider,
blocks: BlockResult[]
blocks: BlockResult[],
debug: boolean = false
): Promise<VaasByBlock> => {
const vaasByBlock: VaasByBlock = {};
for (const block of blocks) {
const chunks = await Promise.all(
block.chunks.map(({ chunk_hash }) => provider.chunk(chunk_hash))
);
let log: ora.Ora;
if (debug) log = ora(`Fetching messages from ${blocks.length} blocks...`).start();
for (let i = 0; i < blocks.length; i++) {
if (debug) log!.text = `Fetching messages from block ${i + 1}/${blocks.length}...`;
const chunks = [];
for (const chunk of blocks[i].chunks) {
chunks.push(await provider.chunk(chunk.chunk_hash));
}

const transactions = chunks.flatMap(({ transactions }) => transactions);
for (const tx of transactions) {
const outcome = await provider.txStatus(tx.hash, CONTRACTS.MAINNET.near.core);
if (
(outcome.status as ExecutionStatus).SuccessValue ||
(outcome.status as ExecutionStatus).SuccessReceiptId
) {
const logs = outcome.receipts_outcome
.filter(({ outcome }) => (outcome as any).executor_id === CONTRACTS.MAINNET.near.core)
.flatMap(({ outcome }) => outcome.logs)
.filter((log) => log.startsWith('EVENT_JSON:')) // https://nomicon.io/Standards/EventsFormat
.map((log) => JSON.parse(log.slice(11)) as EventLog)
.filter(isWormholePublishEventLog);
for (const log of logs) {
const { height, timestamp } = block.header;
const blockKey = makeBlockKey(height.toString(), timestamp.toString());
const vaaKey = makeVaaKey(tx.hash, 'near', log.emitter, log.seq.toString());
vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey];
}
const logs = outcome.receipts_outcome
.filter(
({ outcome }) =>
(outcome as any).executor_id === CONTRACTS.MAINNET.near.core &&
(outcome.status as ExecutionStatus).SuccessValue
)
.flatMap(({ outcome }) => outcome.logs)
.filter((log) => log.startsWith('EVENT_JSON:')) // https://nomicon.io/Standards/EventsFormat
.map((log) => JSON.parse(log.slice(11)) as EventLog)
.filter(isWormholePublishEventLog);
for (const log of logs) {
const { height, timestamp } = blocks[i].header;
const blockKey = makeBlockKey(height.toString(), timestamp.toString());
const vaaKey = makeVaaKey(tx.hash, 'near', log.emitter, log.seq.toString());
vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey];
}
}
}

if (debug) {
const numMessages = Object.values(vaasByBlock).flat().length;
log!.succeed(`Fetched ${numMessages} messages from ${blocks.length} blocks`);
}

return vaasByBlock;
};
15 changes: 8 additions & 7 deletions watcher/src/watchers/__tests__/NearWatcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ import { CONTRACTS } from '@certusone/wormhole-sdk';
import { describe, expect, jest, test } from '@jest/globals';
import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN } from '@wormhole-foundation/wormhole-monitor-common/dist/consts';
import {
ARCHIVAL_NODE_RATE_LIMIT_MS,
getArchivalRpcProvider,
getRateLimitedProvider,
getTransactionsByAccountId,
NEAR_ARCHIVE_NODE_RATE_LIMIT_MS,
NEAR_ARCHIVE_RPC,
} from '../../utils/near';
import { getMessagesFromBlockResults, NearWatcher } from '../NearWatcher';

Expand All @@ -26,14 +27,14 @@ test('getMessagesForBlocks', async () => {
});

test('getArchivalRpcProvider', async () => {
const provider = await getArchivalRpcProvider();
const provider = await getRateLimitedProvider(NEAR_ARCHIVE_RPC);
const start = performance.now();

// grab first block with activity from core contract
expect(
await provider.block({ blockId: 'Asie8hpJFKaipvw8jh1wPfBwwbjP6JUfsQdCuQvwr3Sz' })
).toBeTruthy();
expect(performance.now() - start).toBeGreaterThan(ARCHIVAL_NODE_RATE_LIMIT_MS);
expect(performance.now() - start).toBeGreaterThan(NEAR_ARCHIVE_NODE_RATE_LIMIT_MS);
});

test('getTransactionsByAccountId', async () => {
Expand All @@ -43,7 +44,7 @@ test('getTransactionsByAccountId', async () => {
'1669732480649090392'
);
expect(transactions.length).toEqual(10);
expect(transactions[0].hash).toEqual('8Bnf3ehhB5AWLxjVXMcTUmku1SdKgRLZpVxN67ViDgiD');
expect(transactions[0].hash).toEqual('7jDrPnvErjbi3EHbQBcKT9wtiUPo77J9tpxXjE3KHcUp');

// test custom timestamp, filtering out non function call actions, and querying last page
transactions = await getTransactionsByAccountId(
Expand All @@ -52,7 +53,7 @@ test('getTransactionsByAccountId', async () => {
'1661429914932000000'
);
expect(transactions.length).toEqual(2);
expect(transactions.at(-1)?.hash).toEqual('3VivTHp1W5ErWgsASUQvW1qwoTCsxYeke4498apDJsss');
expect(transactions[0].hash).toEqual('3VivTHp1W5ErWgsASUQvW1qwoTCsxYeke4498apDJsss');
});

describe('getMessagesFromBlockResults', () => {
Expand All @@ -66,7 +67,7 @@ describe('getMessagesFromBlockResults', () => {
});

test('with ArchivalProvider', async () => {
const provider = await getArchivalRpcProvider();
const provider = await getRateLimitedProvider(NEAR_ARCHIVE_RPC);
const messages = await getMessagesFromBlockResults(provider, [
await provider.block({ blockId: 'Bzjemj99zxe1h8kVp8H2hwVifmbQL8HT34LyPHzEK5qp' }),
await provider.block({ blockId: '4SHFxSo8DdP8DhMauS5iFqfmdLwLET3W3e8Lg9PFvBSn' }),
Expand Down

0 comments on commit 5f423b1

Please sign in to comment.