diff --git a/config.schema.json b/config.schema.json index 6092806..36cb707 100644 --- a/config.schema.json +++ b/config.schema.json @@ -35,6 +35,69 @@ }, "type": "object" }, + "BalanceWatcherConfigSchema": { + "description": "Balance watcher is a component tracking transfers of a token and reporting balances of its accounts.", + "properties": { + "blocksMaxChunkSize": { + "description": "Max. number of blocks to fetch at once", + "type": "number" + }, + "contractAddress": { + "description": "The address of the contract to watch.", + "type": "string" + }, + "decimals": { + "description": "The number of decimals to divide balances with.", + "type": "number" + }, + "enabled": { + "description": "Specify `false` to disable the balance watcher", + "type": "boolean" + }, + "maxParallelChunks": { + "description": "Max. number of chunks to process in parallel", + "type": "number" + }, + "pollInterval": { + "description": "Interval in which to look for the latest block number (if not busy processing the backlog)", + "type": ["string", "number"] + }, + "retryWaitTime": { + "anyOf": [ + { + "$ref": "#/definitions/ExponentialBackoffConfig" + }, + { + "$ref": "#/definitions/LinearBackoffConfig" + }, + { + "type": ["string", "number"] + } + ], + "description": "Wait time before retrying to fetch and process blocks after failure" + }, + "startAt": { + "anyOf": [ + { + "enum": ["genesis", "latest"], + "type": "string" + }, + { + "type": "number" + } + ], + "description": "If no checkpoint exists (yet), this specifies which block should be chosen as the starting point." + } + }, + "type": "object" + }, + "BalanceWatchersConfigSchema": { + "additionalProperties": { + "$ref": "#/definitions/BalanceWatcherConfigSchema" + }, + "description": "Balance watchers is a component tracking transfers of tokens and reporting balances of accounts.", + "type": "object" + }, "BlockWatcherConfigSchema": { "description": "Block watcher is the component that retrieves blocks, transactions, event logs from the node and sends\nthem to output.", "properties": { @@ -618,6 +681,10 @@ "SourcetypesSchema": { "description": "Configurable set of `sourcetype` field values emitted by ethlogger", "properties": { + "balance": { + "default": "ethereum:balance", + "type": "string" + }, "block": { "default": "ethereum:block", "type": "string" @@ -656,6 +723,10 @@ "$ref": "#/definitions/AbiRepositoryConfigSchema", "description": "ABI repository configuration" }, + "balanceWatchers": { + "$ref": "#/definitions/BalanceWatchersConfigSchema", + "description": "Balance watchers, tracking balance of ERC-20 token holders" + }, "blockWatcher": { "$ref": "#/definitions/BlockWatcherConfigSchema", "description": "Block watcher settings, configure how blocks, transactions, event logs are ingested" diff --git a/docs/configuration.md b/docs/configuration.md index a0002e5..743fef1 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -81,6 +81,7 @@ Root configuration schema for ethlogger | `abi` | [`AbiRepository`](#AbiRepository) | ABI repository configuration | | `contractInfo` | [`ContractInfo`](#ContractInfo) | Contract info cache settings | | `blockWatcher` | [`BlockWatcher`](#BlockWatcher) | Block watcher settings, configure how blocks, transactions, event logs are ingested | +| `balanceWatchers` | [`BalanceWatchers`](#BalanceWatchers) | Balance watchers, tracking balance of ERC-20 token holders | | `nodeMetrics` | [`NodeMetrics`](#NodeMetrics) | Settings for the node metrics collector | | `nodeInfo` | [`NodeInfo`](#NodeInfo) | Settings for the node info collector | | `pendingTx` | [`PendingTx`](#PendingTx) | Settings for collecting pending transactions from node | @@ -141,6 +142,7 @@ Configurable set of `sourcetype` field values emitted by ethlogger | `nodeInfo` | `string` | `"ethereum:node:info"` | | `nodeMetrics` | `string` | `"ethereum:node:metrics"` | | `gethPeer` | `string` | `"ethereum:geth:peer"` | +| `balance` | `string` | `"ethereum:balance"` | ### ConsoleOutput @@ -245,6 +247,29 @@ Block watcher is the component that retrieves blocks, transactions, event logs f | `retryWaitTime` | [`WaitTime`](#WaitTime) | Wait time before retrying to fetch and process blocks after failure | | `decryptPrivateTransactions` | `boolean` | For chains/nodes that do support private transactions, this setting instructs block watcher to attempt to load the decrypted payload for private transactions | +### BalanceWatchers + +Balance watchers is a component tracking transfers of tokens and reporting balances of accounts. + +| Name | Type | Description | +| ---- | ----------------------------------------------- | -------------------------------------------------------------------------- | +| `-` | map | Mapping of name => balancer watcher.

See BalanceWatcherConfigSchema | + +### BalanceWatcher + +Balance watcher is a component tracking transfers of a token and reporting balances of its accounts. + +| Name | Type | Description | +| -------------------- | --------------------------- | ------------------------------------------------------------------------------------------------- | +| `contractAddress` | `string` | The address of the contract to watch. | +| `decimals` | `number` | The number of decimals to divide balances with. | +| `startAt` | [`StartBlock`](#StartBlock) | If no checkpoint exists (yet), this specifies which block should be chosen as the starting point. | +| `enabled` | `boolean` | Specify `false` to disable the balance watcher | +| `pollInterval` | [`Duration`](#Duration) | Interval in which to look for the latest block number (if not busy processing the backlog) | +| `blocksMaxChunkSize` | `number` | Max. number of blocks to fetch at once | +| `maxParallelChunks` | `number` | Max. number of chunks to process in parallel | +| `retryWaitTime` | [`WaitTime`](#WaitTime) | Wait time before retrying to fetch and process blocks after failure | + ### NodeMetrics The node metrics collector retrieves numeric measurements from nodes on a periodic basis. diff --git a/examples/erc20-tracking/.gitignore b/examples/erc20-tracking/.gitignore new file mode 100644 index 0000000..e53ed4a --- /dev/null +++ b/examples/erc20-tracking/.gitignore @@ -0,0 +1,2 @@ +/.ethlogger-state.json +/checkpoints.json diff --git a/examples/erc20-tracking/README.md b/examples/erc20-tracking/README.md new file mode 100644 index 0000000..c55f7ea --- /dev/null +++ b/examples/erc20-tracking/README.md @@ -0,0 +1,32 @@ +# Watching ERC-20 token balances + +This is an example showing how to use ethlogger to track the activity of an ERC-20 token. + +Ethlogger configuration is provided in the form of [environment variables](../../docs/cli.md#environment-variables) in [docker-compose.yaml](./docker-compose.yaml#L25) and a configuration file, [ethlogger.yaml](./ethlogger.yaml). + +## Run + +1. Start docker-compose + +```sh-session +$ cd examples/erc20-tracking +$ docker-compose up -d +``` + +2. Wait for all containers to start. + You can rely on the output of `docker ps` to see the state of services. + +3. Then go to [http://localhost:8000](http://localhost:8000) to explore the data produced by ethlogger. + Login using user `admin` and password `changeme` + +## Note + +> This example is not meant to be used in a production setup. +> Using the logging driver to log to a container in the same docker-compose stack shouldn't be used in production. +> Splunk and ethlogger persist data using local volumes and a checkpoints file. If blocks are no longer being ingested, or if you want to change the blockchain you are using, you should clear this state. To start clean, run the following. + +```sh-session +$ docker-compose down +$ rm checkpoints.json +$ docker volume prune +``` diff --git a/examples/erc20-tracking/docker-compose.yaml b/examples/erc20-tracking/docker-compose.yaml new file mode 100644 index 0000000..6feab9b --- /dev/null +++ b/examples/erc20-tracking/docker-compose.yaml @@ -0,0 +1,43 @@ +version: '3.6' + +services: + splunk: + image: splunk/splunk:latest + container_name: splunk + environment: + - SPLUNK_START_ARGS=--accept-license + - SPLUNK_HEC_TOKEN=11111111-1111-1111-1111-1111111111113 + - SPLUNK_PASSWORD=changeme + - SPLUNK_APPS_URL=https://github.com/splunk/ethereum-basics/releases/download/latest/ethereum-basics.tgz + ports: + - 8000:8000 + - 8088:8088 + healthcheck: + test: ['CMD', 'curl', '-f', 'http://localhost:8000'] + interval: 5s + timeout: 5s + retries: 20 + volumes: + - ./splunk.yml:/tmp/defaults/default.yml + - /opt/splunk/var + - /opt/splunk/etc + ethlogger: + image: ghcr.io/splunkdlt/ethlogger:latest + container_name: ethlogger + command: -c /app/ethlogger.yaml + environment: + - ETH_RPC_URL=https://dai.poa.network + # Use these environment variables to connect to infura + # - ETH_RPC_URL=https://mainnet.infura.io/v3/ + - START_AT_BLOCK=latest + - SPLUNK_HEC_URL=https://splunk:8088 + - SPLUNK_HEC_TOKEN=11111111-1111-1111-1111-1111111111113 + - SPLUNK_EVENTS_INDEX=main + - SPLUNK_METRICS_INDEX=metrics + - SPLUNK_INTERNAL_INDEX=metrics + - SPLUNK_HEC_REJECT_INVALID_CERTS=false + volumes: + - ./:/app + depends_on: + - splunk + restart: always diff --git a/examples/erc20-tracking/ethlogger.yaml b/examples/erc20-tracking/ethlogger.yaml new file mode 100644 index 0000000..b84879c --- /dev/null +++ b/examples/erc20-tracking/ethlogger.yaml @@ -0,0 +1,11 @@ +balanceWatchers: + wxdai: + # https://blockscout.com/xdai/mainnet/address/0xe91D153E0b41518A2Ce8Dd3D7944Fa863463a97d/transactions + contractAddress: '0xe91d153e0b41518a2ce8dd3d7944fa863463a97d' + startAt: 19023469 +blockWatcher: + enabled: false +nodeMetrics: + enabled: false +nodeStats: + enabled: false diff --git a/examples/erc20-tracking/splunk.yml b/examples/erc20-tracking/splunk.yml new file mode 100644 index 0000000..6deadf3 --- /dev/null +++ b/examples/erc20-tracking/splunk.yml @@ -0,0 +1,11 @@ +splunk: + conf: + indexes: + directory: /opt/splunk/etc/apps/search/local + content: + metrics: + coldPath: $SPLUNK_DB/metrics/colddb + datatype: metric + homePath: $SPLUNK_DB/metrics/db + maxTotalDataSizeMB: 512000 + thawedPath: $SPLUNK_DB/metrics/thaweddb diff --git a/scripts/gendocs.ts b/scripts/gendocs.ts index 0236737..212aee3 100644 --- a/scripts/gendocs.ts +++ b/scripts/gendocs.ts @@ -14,8 +14,9 @@ type UnkownType = { type: 'unknown' }; type LiteralTypeInfo = { type: 'literal'; value: string }; type PrimitiveTypeInfo = { type: 'primitive'; name: string }; type ObjectTypeInfo = { type: 'object'; name: string }; +type MapTypeInfo = { type: 'map'; name: string }; type UnionTypeInfo = Array; -type TypeInfo = LiteralTypeInfo | PrimitiveTypeInfo | ObjectTypeInfo | UnionTypeInfo | UnkownType; +type TypeInfo = LiteralTypeInfo | PrimitiveTypeInfo | ObjectTypeInfo | UnionTypeInfo | MapTypeInfo | UnkownType; interface Field { name: string; @@ -29,6 +30,13 @@ interface Field { const inlineCode = (s: string) => '`' + s + '`'; const link = (to: string, label: string) => `[${label}](${to})`; +function formatFieldName(name: string): string { + if (name === '__index') { + return '-'; + } + return name; +} + function formatTypeInfo(type: TypeInfo): string { if (Array.isArray(type)) { return type.map(formatTypeInfo).join(` \\| `); @@ -40,6 +48,8 @@ function formatTypeInfo(type: TypeInfo): string { return inlineCode(type.value); } else if (type.type === 'object') { return link(`#${type.name}`, inlineCode(type.name)); + } else if (type.type === 'map') { + return 'map'; } throw new Error('INVALID TYPE: ' + JSON.stringify(type)); } @@ -175,6 +185,20 @@ function createConfigurationSchemaReference(): string { const memberType = typeChecker.getTypeAtLocation(member.declarations[0]); const resolveType = (type: ts.Type): TypeInfo => { const flags = type.flags; + if (flags & ts.TypeFlags.Any) { + if (member.name == '__index') { + const fieldMapType = member + .getDeclarations() + ?.values() + .next().value.type; + const name = fieldMapType?.typeName.escapedText + ?.replace(/Schema$/, '') + .replace(/Config$/, ''); + const mappedType = findType(fieldMapType?.typeName.escapedText, program, typeChecker); + appendSectionForType(mappedType); + return { type: 'map', name }; + } + } if (flags & ts.TypeFlags.StringLiteral) { return { type: 'literal', value: JSON.stringify((type as ts.LiteralType).value) }; } @@ -231,7 +255,7 @@ function createConfigurationSchemaReference(): string { const see = member.getJsDocTags().find(t => t.name === 'see')?.text; const defaultValue = member.getJsDocTags().find(t => t.name === 'default')?.text; section.fields.push({ - name: member.name, + name: formatFieldName(member.name), type: resolveType(memberType), description: docs.length ? docs.map(d => d.text).join(' ') : undefined, example, diff --git a/src/balancewatcher.ts b/src/balancewatcher.ts new file mode 100644 index 0000000..d0c9c88 --- /dev/null +++ b/src/balancewatcher.ts @@ -0,0 +1,321 @@ +import { createModuleDebug } from './utils/debug'; +import { ManagedResource } from './utils/resource'; +import { EthereumClient } from './eth/client'; +import { Output, OutputMessage } from './output'; +import { BalanceWatcherConfig } from './config'; +import { NodePlatformAdapter } from './platforms'; +import { ABORT, AbortHandle } from './utils/abort'; +import { linearBackoff, resolveWaitTime, retry, WaitTime } from './utils/retry'; +import { Cache } from './utils/cache'; +import { ContractInfo } from './abi/contract'; +import { AggregateMetric } from './utils/stats'; +import { blockNumber, ethCall, getBlock, getTransactionReceipt } from './eth/requests'; +import { BlockRange, blockRangeSize, blockRangeToArray, chunkedBlockRanges, serializeBlockRange } from './blockrange'; +import { parallel, sleep } from './utils/async'; +import { RawBlockResponse, RawTransactionResponse } from './eth/responses'; +import { bigIntToNumber } from './utils/bn'; +import { formatBlock, formatHexToFloatingPoint } from './format'; +import { parseBlockTime } from './blockwatcher'; +import { BalanceMessage, FormattedBlock } from './msgs'; +import { sha3 } from './abi/wasm'; +import { Checkpoint } from './state'; + +const { debug, info, warn, error, trace } = createModuleDebug('balancewatcher'); + +const transferEventSignature = sha3('Transfer(address,address,uint256)')!; + +const initialCounters = { + blocksProcessed: 0, + transfersProcessed: 0, +}; + +/** + * BalanceWatcher will watch the activity of a contract on chain, and keep track of all accounts transferring tokens. + */ +export class BalanceWatcher implements ManagedResource { + private active: boolean = true; + private ethClient: EthereumClient; + private checkpoint: Checkpoint; + private output: Output; + private config: BalanceWatcherConfig; + private nodePlatform: NodePlatformAdapter; + private abortHandle = new AbortHandle(); + private endCallbacks: Array<() => void> = []; + private waitAfterFailure: WaitTime; + private chunkQueueMaxSize: number; + private counters = { ...initialCounters }; + private aggregates = { + blockProcessTime: new AggregateMetric(), + txProcessTime: new AggregateMetric(), + eventProcessTime: new AggregateMetric(), + }; + + constructor({ + ethClient, + checkpoint, + output, + config, + waitAfterFailure = linearBackoff({ min: 0, step: 2500, max: 120_000 }), + chunkQueueMaxSize = 1000, + nodePlatform, + }: { + ethClient: EthereumClient; + output: Output; + checkpoint: Checkpoint; + config: BalanceWatcherConfig; + waitAfterFailure?: WaitTime; + chunkQueueMaxSize?: number; + contractInfoCache?: Cache>; + nodePlatform: NodePlatformAdapter; + }) { + this.ethClient = ethClient; + this.checkpoint = checkpoint; + this.output = output; + this.config = config; + this.waitAfterFailure = waitAfterFailure; + this.chunkQueueMaxSize = chunkQueueMaxSize; + this.nodePlatform = nodePlatform; + } + + async start(): Promise { + debug('Starting balances watcher for %s', this.config.contractAddress); + const { ethClient, checkpoint } = this; + + if (checkpoint.isEmpty()) { + if (typeof this.config.startAt === 'number') { + if (this.config.startAt < 0) { + const latestBlockNumber = await ethClient.request(blockNumber()); + checkpoint.setInitialBlockNumber(Math.max(0, latestBlockNumber + this.config.startAt)); + } else { + checkpoint.setInitialBlockNumber(this.config.startAt); + } + } else if (this.config.startAt === 'genesis') { + checkpoint.setInitialBlockNumber(0); + } else if (this.config.startAt === 'latest') { + const latestBlockNumber = await ethClient.request(blockNumber()); + checkpoint.setInitialBlockNumber(latestBlockNumber); + } else { + throw new Error(`Invalid start block: ${JSON.stringify(this.config.startAt)}`); + } + info( + 'Determined initial block number: %d from configured value %o', + checkpoint.getinitialBlockNumber(), + this.config.startAt + ); + } + + let failures = 0; + while (this.active) { + try { + const latestBlockNumber = await ethClient.request(blockNumber()); + debug('Received latest block number: %d', latestBlockNumber); + const todo = checkpoint.getIncompleteRanges(latestBlockNumber); + debug('Found %d block ranges to process', todo.length); + if (todo.length) { + info( + 'Latest block number is %d - processing remaining %d blocks', + latestBlockNumber, + todo.map(blockRangeSize).reduce((a, b) => a + b, 0) + ); + } + + for (const range of todo) { + if (!this.active) { + throw ABORT; + } + await parallel( + chunkedBlockRanges(range, this.config.blocksMaxChunkSize, this.chunkQueueMaxSize).map( + chunk => async () => { + if (!this.active) { + return; + } + await retry(() => this.processChunk(chunk), { + attempts: 100, + waitBetween: this.waitAfterFailure, + taskName: `block range ${serializeBlockRange(chunk)}`, + abortHandle: this.abortHandle, + warnOnError: true, + onRetry: attempt => + warn( + 'Retrying to process block range %s (attempt %d)', + serializeBlockRange(chunk), + attempt + ), + }); + } + ), + { maxConcurrent: this.config.maxParallelChunks, abortHandle: this.abortHandle } + ); + failures = 0; + } + } catch (e) { + if (e === ABORT) { + break; + } + error('Error in block watcher polling loop', e); + failures++; + const waitTime = resolveWaitTime(this.waitAfterFailure, failures); + if (waitTime > 0) { + warn('Waiting for %d ms after %d consecutive failures', waitTime, failures); + await this.abortHandle.race(sleep(waitTime)); + } + } + if (this.active) { + try { + await this.abortHandle.race(sleep(this.config.pollInterval)); + } catch (e) { + if (e === ABORT) { + break; + } else { + warn('Unexpected error while waiting for next polling loop iteration', e); + } + } + } + } + if (this.endCallbacks != null) { + this.endCallbacks.forEach(cb => cb()); + } + info('Balances watcher stopped'); + } + + async processChunk(chunk: BlockRange) { + const startTime = Date.now(); + info('Processing chunk %s', serializeBlockRange(chunk)); + + debug('Requesting block range', chunk); + const blockRequestStart = Date.now(); + const blocks = await this.ethClient + .requestBatch( + blockRangeToArray(chunk) + .filter(blockNumber => this.checkpoint.isIncomplete(blockNumber)) + .map(blockNumber => getBlock(blockNumber)) + ) + .catch(e => + Promise.reject(new Error(`Failed to request batch of blocks ${serializeBlockRange(chunk)}: ${e}`)) + ); + debug('Received %d blocks in %d ms', blocks.length, Date.now() - blockRequestStart); + for (const block of blocks) { + await this.processBlock(block); + } + info( + 'Completed %d blocks of chunk %s in %d ms', + blocks.length, + serializeBlockRange(chunk), + Date.now() - startTime + ); + } + + async processBlock(block: RawBlockResponse) { + if (block.number != null && !this.checkpoint.isIncomplete(bigIntToNumber(block.number))) { + warn('Skipping processing of block %d since it is marked complete in our checkpoint'); + return; + } + const startTime = Date.now(); + const formattedBlock = formatBlock(block); + if (formattedBlock.number == null) { + debug('Ignoring block %s without number', block.hash); + return; + } + const blockTime = parseBlockTime(formattedBlock.timestamp); + + if (!this.active) { + return; + } + const outputMessages = await this.abortHandle.race( + Promise.all(block.transactions.map(tx => this.processTransaction(tx, blockTime, formattedBlock))) + ); + outputMessages.forEach(msgs => msgs.forEach(msg => this.output.write(msg))); + this.checkpoint.markBlockComplete(formattedBlock.number); + this.counters.blocksProcessed++; + this.aggregates.blockProcessTime.push(Date.now() - startTime); + } + + public async shutdown() { + info('Shutting down balances watcher for %s', this.config.contractAddress); + this.active = false; + this.abortHandle.abort(); + await new Promise(resolve => { + this.endCallbacks.push(resolve); + }); + } + + public flushStats() { + const stats = { + ...this.counters, + ...this.aggregates.blockProcessTime.flush('blockProcessTime'), + ...this.aggregates.txProcessTime.flush('txProcessTime'), + ...this.aggregates.eventProcessTime.flush('eventProcessTime'), + abortHandles: this.abortHandle.size, + }; + this.counters = { ...initialCounters }; + return stats; + } + + async processTransaction( + rawTx: RawTransactionResponse | string, + blockTime: number, + formattedBlock: FormattedBlock + ): Promise { + if (!this.active) { + return []; + } + if (typeof rawTx === 'string') { + warn('Received raw transaction as string from block %d', formattedBlock.number); + return []; + } + const startTime = Date.now(); + trace('Processing transaction %s from block %d', rawTx.hash, formattedBlock.number); + + const receipt = await this.ethClient.request(getTransactionReceipt(rawTx.hash)); + + let outputMessages = Array(); + if (receipt != null && receipt.logs != null) { + const addresses = receipt.logs + ?.map(log => { + if (this.config.contractAddress == log.address) { + if (log.topics[0] == transferEventSignature && log.topics.length == 3) { + const from = '0x' + log.topics[1].substr(26); + const to = '0x' + log.topics[2].substr(26); + return [from, to]; + } + } + }) + .flat() + .filter(a => a !== undefined); + if (addresses.length > 0) { + this.counters.transfersProcessed++; + outputMessages = await this.abortHandle.race( + Promise.all(addresses.map(addr => this.getBalance(addr, formattedBlock, blockTime))) + ); + } + } + + this.aggregates.txProcessTime.push(Date.now() - startTime); + trace('Completed processing transaction %s in %d ms', rawTx.hash, Date.now() - startTime); + return outputMessages; + } + + async getBalance(address: string, formattedBlock: FormattedBlock, blockTime: number): Promise { + const response = await this.ethClient.request( + ethCall( + this.config.contractAddress, + 'balanceOf(address)', + ['0'.repeat(24) + address.substr(2)], + formattedBlock.number! + ) + ); + const balance = formatHexToFloatingPoint(response, this.config.decimals); + + return { + body: { + contract: this.config.contractAddress, + blockHash: formattedBlock.hash!, + blockNumber: formattedBlock.number!, + account: address, + balance: balance, + }, + time: blockTime, + type: 'balance', + }; + } +} diff --git a/src/blockwatcher.ts b/src/blockwatcher.ts index 33046c4..dff5d60 100644 --- a/src/blockwatcher.ts +++ b/src/blockwatcher.ts @@ -1,7 +1,7 @@ import { ContractInfo, getContractInfo } from './abi/contract'; import { AbiRepository } from './abi/repo'; import { BlockRange, blockRangeSize, blockRangeToArray, chunkedBlockRanges, serializeBlockRange } from './blockrange'; -import { Checkpoint } from './checkpoint'; +import { Checkpoint } from './state'; import { BlockWatcherConfig } from './config'; import { EthereumClient } from './eth/client'; import { blockNumber, getBlock, getTransactionReceipt } from './eth/requests'; @@ -60,7 +60,7 @@ const initialCounters = { export class BlockWatcher implements ManagedResource { private active: boolean = true; private ethClient: EthereumClient; - private checkpoints: Checkpoint; + private checkpoint: Checkpoint; private output: Output; private abiRepo?: AbiRepository; private config: BlockWatcherConfig; @@ -79,7 +79,7 @@ export class BlockWatcher implements ManagedResource { constructor({ ethClient, - checkpoints, + checkpoint, output, abiRepo, contractInfoCache, @@ -89,7 +89,7 @@ export class BlockWatcher implements ManagedResource { nodePlatform, }: { ethClient: EthereumClient; - checkpoints: Checkpoint; + checkpoint: Checkpoint; output: Output; abiRepo: AbiRepository; config: BlockWatcherConfig; @@ -99,7 +99,7 @@ export class BlockWatcher implements ManagedResource { nodePlatform: NodePlatformAdapter; }) { this.ethClient = ethClient; - this.checkpoints = checkpoints; + this.checkpoint = checkpoint; this.output = output; this.abiRepo = abiRepo; this.config = config; @@ -113,27 +113,27 @@ export class BlockWatcher implements ManagedResource { async start(): Promise { debug('Starting block watcher'); - const { ethClient, checkpoints } = this; + const { ethClient, checkpoint } = this; - if (checkpoints.isEmpty()) { + if (checkpoint.isEmpty()) { if (typeof this.config.startAt === 'number') { if (this.config.startAt < 0) { const latestBlockNumber = await ethClient.request(blockNumber()); - checkpoints.initialBlockNumber = Math.max(0, latestBlockNumber + this.config.startAt); + checkpoint.setInitialBlockNumber(Math.max(0, latestBlockNumber + this.config.startAt)); } else { - checkpoints.initialBlockNumber = this.config.startAt; + checkpoint.setInitialBlockNumber(this.config.startAt); } } else if (this.config.startAt === 'genesis') { - checkpoints.initialBlockNumber = 0; + checkpoint.setInitialBlockNumber(0); } else if (this.config.startAt === 'latest') { const latestBlockNumber = await ethClient.request(blockNumber()); - checkpoints.initialBlockNumber = latestBlockNumber; + checkpoint.setInitialBlockNumber(latestBlockNumber); } else { throw new Error(`Invalid start block: ${JSON.stringify(this.config.startAt)}`); } info( 'Determined initial block number: %d from configured value %o', - checkpoints.initialBlockNumber, + checkpoint.getinitialBlockNumber(), this.config.startAt ); } @@ -143,7 +143,7 @@ export class BlockWatcher implements ManagedResource { try { const latestBlockNumber = await ethClient.request(blockNumber()); debug('Received latest block number: %d', latestBlockNumber); - const todo = checkpoints.getIncompleteRanges(latestBlockNumber); + const todo = checkpoint.getIncompleteRanges(latestBlockNumber); debug('Found %d block ranges to process', todo.length); if (todo.length) { info( @@ -221,7 +221,7 @@ export class BlockWatcher implements ManagedResource { const blocks = await this.ethClient .requestBatch( blockRangeToArray(chunk) - .filter(blockNumber => this.checkpoints.isIncomplete(blockNumber)) + .filter(blockNumber => this.checkpoint.isIncomplete(blockNumber)) .map(blockNumber => getBlock(blockNumber)) ) .catch(e => @@ -240,7 +240,7 @@ export class BlockWatcher implements ManagedResource { } async processBlock(block: RawBlockResponse) { - if (block.number != null && !this.checkpoints.isIncomplete(bigIntToNumber(block.number))) { + if (block.number != null && !this.checkpoint.isIncomplete(bigIntToNumber(block.number))) { warn('Skipping processing of block %d since it is marked complete in our checkpoint'); return; } @@ -265,7 +265,7 @@ export class BlockWatcher implements ManagedResource { } outputMessages.forEach(msg => this.output.write(msg)); txMsgs.forEach(msgs => msgs.forEach(msg => this.output.write(msg))); - this.checkpoints.markBlockComplete(formattedBlock.number); + this.checkpoint.markBlockComplete(formattedBlock.number); this.counters.blocksProcessed++; this.aggregates.blockProcessTime.push(Date.now() - startTime); } diff --git a/src/config.ts b/src/config.ts index 84763ce..2057c84 100644 --- a/src/config.ts +++ b/src/config.ts @@ -33,6 +33,8 @@ export interface EthloggerConfigSchema { contractInfo: ContractInfoConfigSchema; /** Block watcher settings, configure how blocks, transactions, event logs are ingested */ blockWatcher: BlockWatcherConfigSchema; + /** Balance watchers, tracking balance of ERC-20 token holders */ + balanceWatchers: BalanceWatchersConfigSchema; /** Settings for the node metrics collector */ nodeMetrics: NodeMetricsConfigSchema; /** Settings for the node info collector */ @@ -59,6 +61,7 @@ export interface EthloggerConfig { abi: AbiRepositoryConfig; contractInfo: ContractInfoConfig; blockWatcher: BlockWatcherConfig; + balanceWatchers: Map; nodeMetrics: NodeMetricsConfig; nodeInfo: NodeInfoConfig; pendingTx: PendingTxConfig; @@ -210,10 +213,51 @@ export interface BlockWatcherConfigSchema { decryptPrivateTransactions: boolean; } +/** + * Balance watcher is a component tracking transfers of a token and reporting balances of its accounts. + */ +export interface BalanceWatcherConfigSchema { + /** + * The address of the contract to watch. + */ + contractAddress: string; + /** + * The number of decimals to divide balances with. + */ + decimals: number; + /** If no checkpoint exists (yet), this specifies which block should be chosen as the starting point. */ + startAt: StartBlock; + /** Specify `false` to disable the balance watcher */ + enabled: boolean; + /** Interval in which to look for the latest block number (if not busy processing the backlog) */ + pollInterval: DurationConfig; + /** Max. number of blocks to fetch at once */ + blocksMaxChunkSize: number; + /** Max. number of chunks to process in parallel */ + maxParallelChunks: number; + /** Wait time before retrying to fetch and process blocks after failure */ + retryWaitTime: WaitTimeConfig; +} + +/** + * Balance watchers is a component tracking transfers of tokens and reporting balances of accounts. + */ +interface BalanceWatchersConfigSchema { + /** + * Mapping of name => balancer watcher. + * @see BalanceWatcherConfigSchema + */ + [name: string]: BalanceWatcherConfigSchema; +} + export interface BlockWatcherConfig extends Omit { pollInterval: Duration; retryWaitTime: WaitTime; } +export interface BalanceWatcherConfig extends Omit { + pollInterval: Duration; + retryWaitTime: WaitTime; +} /** The node metrics collector retrieves numeric measurements from nodes on a periodic basis. */ export interface NodeMetricsConfigSchema { @@ -349,6 +393,8 @@ export interface SourcetypesSchema { nodeMetrics?: string; /** @default "ethereum:geth:peer" */ gethPeer?: string; + /** @default "ethereum:balance" */ + balance?: string; } /** Console output prints all generated events and metrics to STDOUT */ @@ -670,6 +716,19 @@ export async function loadEthloggerConfig(flags: CliFlags, dryRun: boolean = fal return val; }; + const configRequired = (flag: string, configValue: string | undefined): string => { + if (configValue == null) { + if (dryRun) { + error('Missing required field %s', flag); + return ''; + } else { + throw new ConfigError(`Missing required field ${flag}`); + } + } else { + return configValue; + } + }; + const parseSpecificHecConfig = ( defaults: DeepPartial | undefined, indexFlag: keyof CliFlags, @@ -729,6 +788,22 @@ export async function loadEthloggerConfig(flags: CliFlags, dryRun: boolean = fal }; } }; + const balanceWatchers = new Map(); + const defaultBalanceWatchers = defaults.balanceWatchers ?? {}; + for (const key in defaultBalanceWatchers) { + const value = defaultBalanceWatchers[key]; + + balanceWatchers.set(key, { + enabled: value?.enabled ?? true, + pollInterval: parseDuration(value?.pollInterval) ?? 500, + blocksMaxChunkSize: value?.blocksMaxChunkSize ?? 25, + maxParallelChunks: value?.maxParallelChunks ?? 3, + startAt: value?.startAt ?? 'genesis', + retryWaitTime: waitTimeFromConfig(value?.retryWaitTime) ?? 10000, + decimals: value?.decimals ?? 18, + contractAddress: configRequired('contractAddress', value?.contractAddress), + }); + } const config: EthloggerConfig = { eth: { @@ -807,6 +882,7 @@ export async function loadEthloggerConfig(flags: CliFlags, dryRun: boolean = fal defaults.blockWatcher?.decryptPrivateTransactions ?? false, }, + balanceWatchers: balanceWatchers, checkpoint: { filename: defaults.checkpoint?.filename ?? 'checkpoint.json', saveInterval: parseDuration(defaults.checkpoint?.saveInterval) ?? 100, diff --git a/src/eth/requests.ts b/src/eth/requests.ts index 3ce791c..aea4e39 100644 --- a/src/eth/requests.ts +++ b/src/eth/requests.ts @@ -15,6 +15,7 @@ import { RawTransactionResponse, SyncStatus, } from './responses'; +import { sha3 } from '../abi/wasm'; export interface EthRequest

{ method: string; @@ -149,6 +150,19 @@ export const quorumPrivateTransactionPayload = (id: string): EthRequest<[string] params: [id], }); +export const ethCall = ( + contractAddress: string, + methodSignature: string, + params: string[], + blockNumber: number +): EthRequest<[{ to: string; data: string }, string], string> => ({ + method: 'eth_call', + params: [ + { to: contractAddress, data: sha3(methodSignature)?.substr(0, 6) + params.join() }, + '0x' + blockNumber.toString(16), + ], +}); + // Parity specific requests /** Returns the node enode URI */ diff --git a/src/eth/responses.ts b/src/eth/responses.ts index 63d953f..375b009 100644 --- a/src/eth/responses.ts +++ b/src/eth/responses.ts @@ -140,6 +140,14 @@ export interface GethPeer { [k: string]: any; } +export interface BalanceBody { + contract: string; + blockHash: string; + blockNumber: number; + account: string; + balance: string; +} + export type GethPeers = GethPeer[]; export interface ParityNodeKind { diff --git a/src/format.ts b/src/format.ts index e30ab2c..2d8cdd0 100644 --- a/src/format.ts +++ b/src/format.ts @@ -124,3 +124,17 @@ export function formatLogEvent(evt: RawLogResponse, addressInfo?: AddressInfo, e event, }; } + +/** Format a value into a floating-point number divided by decimals. */ +export function formatHexToFloatingPoint(value: string, decimals: number): string { + // first get value into a decimal radix: + let formattedValue = value == '0x' ? '0' : BigInt(value).toString(10); + // pad the value with as many zeros as needed: + formattedValue = formattedValue.padStart(decimals + 1, '0'); + const integerPart = formattedValue.substr(0, formattedValue.length - decimals); + if (integerPart == formattedValue) { + return integerPart; + } + // now inject the floating number period: + return integerPart + '.' + formattedValue.substr(formattedValue.length - decimals); +} diff --git a/src/index.ts b/src/index.ts index d291cac..b2ce83e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,7 +4,7 @@ import { inspect } from 'util'; import { ContractInfo, getContractInfo } from './abi/contract'; import { AbiRepository } from './abi/repo'; import { BlockWatcher } from './blockwatcher'; -import { Checkpoint } from './checkpoint'; +import { State } from './state'; import { CLI_FLAGS } from './cliflags'; import { ConfigError, EthloggerConfig, loadEthloggerConfig } from './config'; import { BatchedEthereumClient, EthereumClient } from './eth/client'; @@ -21,6 +21,7 @@ import LRUCache from './utils/lru'; import { ManagedResource, shutdownAll } from './utils/resource'; import { waitForSignal } from './utils/signal'; import { InternalStatsCollector } from './utils/stats'; +import { BalanceWatcher } from './balancewatcher'; const { debug, error, info } = createModuleDebug('cli'); @@ -161,12 +162,11 @@ class Ethlogger extends Command { internalStatsCollector.addSource(transport, 'ethTransport'); internalStatsCollector.addSource(output, 'output'); - const checkpoints = addResource( - new Checkpoint({ - path: config.checkpoint.filename, - saveInterval: config.checkpoint.saveInterval, - }) - ); + const state = new State({ + path: config.checkpoint.filename, + saveInterval: config.checkpoint.saveInterval, + }); + const checkpoints = addResource(state); await checkpoints.initialize(); const abiRepo = new AbiRepository(config.abi); @@ -194,7 +194,7 @@ class Ethlogger extends Command { if (config.blockWatcher.enabled) { blockWatcher = new BlockWatcher({ - checkpoints, + checkpoint: state.getCheckpoint('main'), ethClient: client, output, abiRepo: abiRepo, @@ -207,11 +207,28 @@ class Ethlogger extends Command { } else { debug('Block watcher is disabled'); } + const balanceWatcherResources = []; + + for (const [name, balanceWatcherConfig] of config.balanceWatchers) { + if (balanceWatcherConfig.enabled) { + const balanceWatcher = new BalanceWatcher({ + checkpoint: state.getCheckpoint(name), + ethClient: client, + output, + config: balanceWatcherConfig, + contractInfoCache, + nodePlatform: platformAdapter, + }); + addResource(balanceWatcher); + internalStatsCollector.addSource(balanceWatcher, 'balanceWatcher-' + name); + balanceWatcherResources.push(balanceWatcher); + } + } internalStatsCollector.start(); return Promise.all( - [blockWatcher?.start(), nodeStatsCollector.start()].map(p => + [blockWatcher?.start(), nodeStatsCollector.start(), ...balanceWatcherResources.map(b => b.start())].map(p => p?.catch(e => { if (e !== ABORT) { error('Error in ethlogger task:', e); diff --git a/src/msgs.ts b/src/msgs.ts index 8759814..96885fc 100644 --- a/src/msgs.ts +++ b/src/msgs.ts @@ -1,5 +1,5 @@ import { DecodedStruct } from './abi/decode'; -import { GethPeer } from './eth/responses'; +import { BalanceBody, GethPeer } from './eth/responses'; export type Address = string; export type Value = string | number | boolean | Array; @@ -260,6 +260,12 @@ export interface GethPeerMessage { body: GethPeer; } +export interface BalanceMessage { + type: 'balance'; + time: number; + body: BalanceBody; +} + export interface QuorumProtocolInfo { consensusMechanism: 'istanbul' | 'raft'; [k: string]: any; diff --git a/src/output.ts b/src/output.ts index 02f7573..9581a9e 100644 --- a/src/output.ts +++ b/src/output.ts @@ -1,6 +1,7 @@ import { EthloggerConfig, HecOutputConfig } from './config'; import { HecClient } from './hec'; import { + BalanceMessage, BlockMessage, GethPeerMessage, LogEventMessage, @@ -22,6 +23,7 @@ export const defaultSourcetypes = { nodeInfo: 'ethereum:node:info', nodeMetrics: 'ethereum:node:metrics', gethPeer: 'ethereum:geth:peer', + balance: 'ethereum:balance', }; export type OutputMessage = @@ -31,7 +33,8 @@ export type OutputMessage = | LogEventMessage | NodeInfoMessage | NodeMetricsMessage - | GethPeerMessage; + | GethPeerMessage + | BalanceMessage; export interface Output extends ManagedResource { write(message: OutputMessage): void; @@ -50,6 +53,7 @@ export class HecOutput implements Output, ManagedResource { case 'pendingtx': case 'nodeInfo': case 'gethPeer': + case 'balance': this.eventsHec.pushEvent({ time: msg.time, body: msg.body, diff --git a/src/checkpoint.ts b/src/state.ts similarity index 51% rename from src/checkpoint.ts rename to src/state.ts index 4f235c6..fe21255 100644 --- a/src/checkpoint.ts +++ b/src/state.ts @@ -5,65 +5,53 @@ import { getInverseBlockRanges, parseBlockRange, serializeBlockRange, - blockRangeSize, blockRangeIncludes, } from './blockrange'; import { createModuleDebug } from './utils/debug'; import { ManagedResource } from './utils/resource'; import { alwaysResolve, sleep } from './utils/async'; -const { debug, info, error } = createModuleDebug('checkpoint'); +const { debug, info, error } = createModuleDebug('state'); -export interface CheckpointConfig { +export interface StateConfig { path: string; - initialBlockNumber?: number; saveInterval?: number; } -export class Checkpoint implements ManagedResource { - private active: boolean = true; - private completed: BlockRange[] = []; +export interface Checkpoint { + getID(): string; + getinitialBlockNumber(): number | undefined; + setInitialBlockNumber(blockNumber: number): void; + markBlockComplete(blockNumber: number): void; + markComplete(range: BlockRange): void; + isIncomplete(block: number): boolean; + getIncompleteRanges(latestBlock?: number): BlockRange[]; + isEmpty(): boolean; +} + +class CheckpointImpl implements Checkpoint { + private completed: BlockRange[]; public initialBlockNumber: number | undefined; - private readonly path: string; - private committedVersion: number = 0; - private pendingVersion: number = 0; - private savePromise: Promise | null = null; - private readonly saveInterval: number; + private saveCallback: Function; + public id: string; - constructor({ initialBlockNumber, path, saveInterval = 250 }: CheckpointConfig) { + constructor(id: string, saveFn: Function, initialBlockNumber?: number, completed: BlockRange[] = []) { + this.id = id; this.initialBlockNumber = initialBlockNumber; - this.path = path; - this.saveInterval = saveInterval; + this.saveCallback = saveFn; + this.completed = completed; } - public async initialize() { - try { - const fileContents = await readFile(this.path, { encoding: 'utf-8' }); - this.initializeFromCheckpointContents(fileContents); - info('Found existing checkpoints: %o', { - initialBlockNumber: this.initialBlockNumber, - completedBlockCount: this.completed.map(blockRangeSize).reduce((a, b) => a + b, 0), - }); - } catch (e) { - if (e.code === 'ENOENT') { - info('Checkpoint does not exist yet, starting from a clean slate'); - return; - } - throw new Error(`Failed to initialize checkpoints: ${e}`); - } + public getinitialBlockNumber(): number | undefined { + return this.initialBlockNumber; } - public initializeFromCheckpointContents(fileContents: string) { - const { v, init, ranges } = JSON.parse(fileContents); - if (v !== 1) { - throw new Error(`Invalid version number in checkpoint file: ${v}`); - } - if (init == null || isNaN(init)) { - throw new Error('Invalid checkpoint file - unable to parse inital block number'); - } - this.initialBlockNumber = init; - this.completed = compactRanges(ranges.map(parseBlockRange)); - debug('Loaded checkpoint from file with %d ranges', this.completed.length); + public setInitialBlockNumber(blockNumber: number) { + this.initialBlockNumber = blockNumber; + } + + public getID(): string { + return this.id; } public markBlockComplete(blockNumber: number) { @@ -71,12 +59,8 @@ export class Checkpoint implements ManagedResource { } public markComplete(range: BlockRange) { - if (!this.active) { - throw new Error('Checkpoint has already been shut down'); - } this.completed = compactRanges([...this.completed, range]); - this.pendingVersion++; - this.scheduleSave(); + this.saveCallback(); } public isIncomplete(block: number): boolean { @@ -90,16 +74,88 @@ export class Checkpoint implements ManagedResource { return getInverseBlockRanges(this.completed, this.initialBlockNumber, latestBlock != null ? latestBlock : null); } + public getCompletedRanges(): string[] { + return this.completed.map(serializeBlockRange); + } + public isEmpty(): boolean { return this.completed.length === 0; } +} + +export class State implements ManagedResource { + private active: boolean = true; + private readonly path: string; + private committedVersion: number = 0; + private pendingVersion: number = 0; + private savePromise: Promise | null = null; + private readonly saveInterval: number; + private checkpoints: Map; + + constructor({ path, saveInterval = 250 }: StateConfig) { + this.path = path; + this.saveInterval = saveInterval; + this.checkpoints = new Map(); + } + + public async initialize() { + try { + const fileContents = await readFile(this.path, { encoding: 'utf-8' }); + this.initializeFromCheckpointContents(fileContents); + info( + 'Found existing checkpoints: %o', + Array.from(this.checkpoints).map(([, value]) => value.toString()) + ); + } catch (e) { + if (e.code === 'ENOENT') { + info('Checkpoint does not exist yet, starting from a clean slate'); + return; + } + throw new Error(`Failed to initialize checkpoints: ${e}`); + } + } + + public initializeFromCheckpointContents(fileContents: string) { + const obj: any = JSON.parse(fileContents); + if (obj.v == 1) { + const { init, ranges } = JSON.parse(fileContents); + if (init == null || isNaN(init)) { + throw new Error('Invalid checkpoint file - unable to parse initial block number'); + } + const completed = compactRanges(ranges.map(parseBlockRange)); + this.checkpoints.set('main', new CheckpointImpl('main', this.save, init, completed)); + + debug('Loaded checkpoint from file with %d ranges', completed.length); + } else if (obj.v == 2) { + for (const k in obj) { + if (k == 'v') { + continue; + } + const { init, ranges } = obj[k]; + const completed = compactRanges(ranges.map(parseBlockRange)); + this.checkpoints.set(k as string, new CheckpointImpl(k, this.save, init, completed)); + } + } else { + throw new Error(`Invalid version number in checkpoint file: ${obj}`); + } + } + + public getCheckpoint(id: string): Checkpoint { + const checkpoint = this.checkpoints.get(id); + if (checkpoint == null) { + const newCheckpoint = new CheckpointImpl(id, this.save); + this.checkpoints.set(id, newCheckpoint); + return newCheckpoint; + } + return checkpoint; + } public serialize(): string { - return JSON.stringify( - { v: 1, init: this.initialBlockNumber, ranges: this.completed.map(serializeBlockRange) }, - null, - 2 - ); + const toSerialize: { [k: string]: any } = { v: 2 }; + this.checkpoints.forEach((value: CheckpointImpl) => { + toSerialize[value.getID()] = { init: value.initialBlockNumber, ranges: value.getCompletedRanges() }; + }); + return JSON.stringify(toSerialize, null, 2); } private scheduleSave() { @@ -127,6 +183,7 @@ export class Checkpoint implements ManagedResource { }; public async save() { + this.pendingVersion++; await this.savePromise; if (this.pendingVersion > this.committedVersion) { await this.saveInternal(); diff --git a/test/abi/testcases/ethdenver.test.ts b/test/abi/testcases/ethdenver.test.ts index 0f66393..29fae70 100644 --- a/test/abi/testcases/ethdenver.test.ts +++ b/test/abi/testcases/ethdenver.test.ts @@ -3,7 +3,7 @@ import { join } from 'path'; import { ContractInfo } from '../../../src/abi/contract'; import { AbiRepository } from '../../../src/abi/repo'; import { BlockWatcher } from '../../../src/blockwatcher'; -import { Checkpoint } from '../../../src/checkpoint'; +import { State } from '../../../src/state'; import { BatchedEthereumClient } from '../../../src/eth/client'; import { HttpTransport } from '../../../src/eth/http'; import { withRecorder } from '../../../src/eth/recorder'; @@ -43,16 +43,17 @@ test('blockwatcher', async () => { reconcileStructShapeFromTuples: false, }); await abiRepo.initialize(); - const checkpoints = new Checkpoint({ - initialBlockNumber: 0, + const checkpoints = new State({ path: join(__dirname, '../../../tmp/tmpcheckpoint.json'), saveInterval: 10000, }); + const checkpoint = checkpoints.getCheckpoint('main'); + checkpoint.setInitialBlockNumber(0); const output = new TestOutput(); const contractInfoCache = new LRUCache>({ maxSize: 100 }); const blockWatcher = new BlockWatcher({ abiRepo, - checkpoints, + checkpoint, config: { enabled: true, blocksMaxChunkSize: 1, diff --git a/test/abi/testcases/ost.test.ts b/test/abi/testcases/ost.test.ts index 80b0f65..2446ce7 100644 --- a/test/abi/testcases/ost.test.ts +++ b/test/abi/testcases/ost.test.ts @@ -3,7 +3,7 @@ import { join } from 'path'; import { ContractInfo } from '../../../src/abi/contract'; import { AbiRepository } from '../../../src/abi/repo'; import { BlockWatcher } from '../../../src/blockwatcher'; -import { Checkpoint } from '../../../src/checkpoint'; +import { State } from '../../../src/state'; import { BatchedEthereumClient } from '../../../src/eth/client'; import { HttpTransport } from '../../../src/eth/http'; import { withRecorder } from '../../../src/eth/recorder'; @@ -43,16 +43,17 @@ test('blockwatcher', async () => { reconcileStructShapeFromTuples: false, }); await abiRepo.initialize(); - const checkpoints = new Checkpoint({ - initialBlockNumber: 0, + const checkpoints = new State({ path: join(__dirname, '../../../tmp/tmpcheckpoint.json'), saveInterval: 10000, }); + const checkpoint = checkpoints.getCheckpoint('main'); + checkpoint.setInitialBlockNumber(0); const output = new TestOutput(); const contractInfoCache = new LRUCache>({ maxSize: 100 }); const blockWatcher = new BlockWatcher({ abiRepo, - checkpoints, + checkpoint, config: { enabled: true, blocksMaxChunkSize: 1, diff --git a/test/blockwatcher.test.ts b/test/blockwatcher.test.ts index e83a8f6..191dd95 100644 --- a/test/blockwatcher.test.ts +++ b/test/blockwatcher.test.ts @@ -2,7 +2,7 @@ import { join } from 'path'; import { ContractInfo } from '../src/abi/contract'; import { AbiRepository } from '../src/abi/repo'; import { BlockWatcher } from '../src/blockwatcher'; -import { Checkpoint } from '../src/checkpoint'; +import { State } from '../src/state'; import { BatchedEthereumClient } from '../src/eth/client'; import { HttpTransport } from '../src/eth/http'; import { withRecorder } from '../src/eth/recorder'; @@ -39,16 +39,17 @@ test('blockwatcher', async () => { reconcileStructShapeFromTuples: false, }); await abiRepo.initialize(); - const checkpoints = new Checkpoint({ - initialBlockNumber: 123, + const state = new State({ path: join(__dirname, '../tmp/tmpcheckpoint.json'), saveInterval: 10000, }); + const checkpoint = state.getCheckpoint('main'); + checkpoint.setInitialBlockNumber(123); const output = new TestOutput(); const contractInfoCache = new LRUCache>({ maxSize: 100 }); const blockWatcher = new BlockWatcher({ abiRepo, - checkpoints, + checkpoint, config: { enabled: true, blocksMaxChunkSize: 1, diff --git a/test/checkpoint.test.ts b/test/checkpoint.test.ts index 8ee9c8e..dad44c1 100644 --- a/test/checkpoint.test.ts +++ b/test/checkpoint.test.ts @@ -1,42 +1,46 @@ -import { Checkpoint } from '../src/checkpoint'; +import { State } from '../src/state'; test('Checkpoint', () => { - const empty = new Checkpoint({ path: '.foo' }); - expect(empty.isEmpty()).toBe(true); + const empty = new State({ path: '.foo' }); + const checkpoint = empty.getCheckpoint('main'); + expect(checkpoint.isEmpty()).toBe(true); - const checkpoints = new Checkpoint({ path: '.foo', initialBlockNumber: 0 }); - expect(checkpoints.serialize()).toMatchInlineSnapshot(` + const state = new State({ path: '.foo' }); + expect(state.serialize()).toMatchInlineSnapshot(` "{ - \\"v\\": 1, - \\"init\\": 0, - \\"ranges\\": [] + \\"v\\": 2 }" `); + state.getCheckpoint('main').setInitialBlockNumber(0); - checkpoints.markComplete({ from: 10, to: 20 }); - expect(checkpoints.serialize()).toMatchInlineSnapshot(` + state.getCheckpoint('main').markComplete({ from: 10, to: 20 }); + expect(state.serialize()).toMatchInlineSnapshot(` "{ - \\"v\\": 1, - \\"init\\": 0, - \\"ranges\\": [ - \\"10-20\\" - ] + \\"v\\": 2, + \\"main\\": { + \\"init\\": 0, + \\"ranges\\": [ + \\"10-20\\" + ] + } }" `); - checkpoints.markComplete({ from: 30, to: 40 }); - expect(checkpoints.serialize()).toMatchInlineSnapshot(` + state.getCheckpoint('main').markComplete({ from: 30, to: 40 }); + expect(state.serialize()).toMatchInlineSnapshot(` "{ - \\"v\\": 1, - \\"init\\": 0, - \\"ranges\\": [ - \\"10-20\\", - \\"30-40\\" - ] + \\"v\\": 2, + \\"main\\": { + \\"init\\": 0, + \\"ranges\\": [ + \\"10-20\\", + \\"30-40\\" + ] + } }" `); - expect(checkpoints.getIncompleteRanges(50)).toMatchInlineSnapshot(` + expect(state.getCheckpoint('main').getIncompleteRanges(50)).toMatchInlineSnapshot(` Array [ Object { "from": 0, @@ -53,7 +57,29 @@ test('Checkpoint', () => { ] `); - const restored = new Checkpoint({ path: '.foo' }); + const contractCheckpoint = state.getCheckpoint('0xdeadbeef'); + contractCheckpoint.setInitialBlockNumber(1000); + contractCheckpoint.markComplete({ from: 1100, to: 1200 }); + expect(state.serialize()).toMatchInlineSnapshot(` + "{ + \\"v\\": 2, + \\"main\\": { + \\"init\\": 0, + \\"ranges\\": [ + \\"10-20\\", + \\"30-40\\" + ] + }, + \\"0xdeadbeef\\": { + \\"init\\": 1000, + \\"ranges\\": [ + \\"1100-1200\\" + ] + } + }" + `); + + const restored = new State({ path: '.foo' }); restored.initializeFromCheckpointContents(` { "v": 1, @@ -63,8 +89,8 @@ test('Checkpoint', () => { "30-40" ] }`); - expect(restored.initialBlockNumber).toMatchInlineSnapshot(`0`); - expect(restored.getIncompleteRanges(50)).toMatchInlineSnapshot(` + expect(restored.getCheckpoint('main').getinitialBlockNumber()).toMatchInlineSnapshot(`0`); + expect(restored.getCheckpoint('main').getIncompleteRanges(50)).toMatchInlineSnapshot(` Array [ Object { "from": 0, @@ -80,4 +106,16 @@ test('Checkpoint', () => { }, ] `); + expect(restored.serialize()).toMatchInlineSnapshot(` + "{ + \\"v\\": 2, + \\"main\\": { + \\"init\\": 0, + \\"ranges\\": [ + \\"10-20\\", + \\"30-40\\" + ] + } + }" + `); }); diff --git a/test/config.test.ts b/test/config.test.ts index 44e7ddc..fba51a6 100644 --- a/test/config.test.ts +++ b/test/config.test.ts @@ -18,6 +18,7 @@ test('defaults', async () => { "reconcileStructShapeFromTuples": true, "requireContractMatch": true, }, + "balanceWatchers": Map {}, "blockWatcher": Object { "blocksMaxChunkSize": 25, "decryptPrivateTransactions": false, diff --git a/test/failures/mainnet_overflow.test.ts b/test/failures/mainnet_overflow.test.ts index c7da52b..c51c762 100644 --- a/test/failures/mainnet_overflow.test.ts +++ b/test/failures/mainnet_overflow.test.ts @@ -2,7 +2,7 @@ import { join } from 'path'; import { ContractInfo } from '../../src/abi/contract'; import { AbiRepository } from '../../src/abi/repo'; import { BlockWatcher } from '../../src/blockwatcher'; -import { Checkpoint } from '../../src/checkpoint'; +import { State } from '../../src/state'; import { BatchedEthereumClient } from '../../src/eth/client'; import { HttpTransport } from '../../src/eth/http'; import { withRecorder } from '../../src/eth/recorder'; @@ -39,16 +39,17 @@ test(`mainnet overflow ${BLOCK}`, async () => { reconcileStructShapeFromTuples: false, }); await abiRepo.initialize(); - const checkpoints = new Checkpoint({ - initialBlockNumber: 0, + const checkpoints = new State({ path: join(__dirname, `../../tmp/tmpcheckpoint_${BLOCK}.json`), saveInterval: 10000, }); + const checkpoint = checkpoints.getCheckpoint('main'); + checkpoint.setInitialBlockNumber(0); const output = new TestOutput(); const contractInfoCache = new LRUCache>({ maxSize: 100 }); const blockWatcher = new BlockWatcher({ abiRepo, - checkpoints, + checkpoint, config: { enabled: true, blocksMaxChunkSize: 1, diff --git a/test/format.test.ts b/test/format.test.ts index b919c36..da79460 100644 --- a/test/format.test.ts +++ b/test/format.test.ts @@ -1,4 +1,4 @@ -import { formatBlock, formatTransaction } from '../src/format'; +import { formatBlock, formatHexToFloatingPoint, formatTransaction } from '../src/format'; test('formatBlock', () => { expect( @@ -110,3 +110,14 @@ test('formatTransaction', () => { } `); }); + +test('formatHexToFloatingPoint', () => { + expect(formatHexToFloatingPoint('', 13)).toMatchInlineSnapshot(`"0.0000000000000"`); + expect(formatHexToFloatingPoint('0x', 13)).toMatchInlineSnapshot(`"0.0000000000000"`); + expect(formatHexToFloatingPoint('0x00', 13)).toMatchInlineSnapshot(`"0.0000000000000"`); + expect(formatHexToFloatingPoint('0x01', 13)).toMatchInlineSnapshot(`"0.0000000000001"`); + expect(formatHexToFloatingPoint('0xaf', 0)).toMatchInlineSnapshot(`"175"`); + expect(formatHexToFloatingPoint('0xaf', 1)).toMatchInlineSnapshot(`"17.5"`); + expect(formatHexToFloatingPoint('0xaf', 2)).toMatchInlineSnapshot(`"1.75"`); + expect(formatHexToFloatingPoint('0xdeadbeef', 8)).toMatchInlineSnapshot(`"37.35928559"`); +}); diff --git a/test/platforms/quorum.test.ts b/test/platforms/quorum.test.ts index 71ce73b..242041c 100644 --- a/test/platforms/quorum.test.ts +++ b/test/platforms/quorum.test.ts @@ -6,7 +6,7 @@ import { join } from 'path'; import { suppressDebugLogging } from '../../src/utils/debug'; import { BlockWatcher } from '../../src/blockwatcher'; import { AbiRepository } from '../../src/abi/repo'; -import { Checkpoint } from '../../src/checkpoint'; +import { State } from '../../src/state'; import { TestOutput } from '../testoutput'; let logHandle: any; @@ -56,15 +56,16 @@ test('decrypt private transaction', async () => { }, ], }); - + const state = new State({ + path: join(__dirname, '../../tmp'), + saveInterval: 1, + }); + const checkpoint = state.getCheckpoint('main'); + checkpoint.setInitialBlockNumber(0); const blockWatcher = new BlockWatcher({ ethClient: client, abiRepo, - checkpoints: new Checkpoint({ - initialBlockNumber: 0, - path: join(__dirname, '../../tmp'), - saveInterval: 1, - }), + checkpoint: checkpoint, config: { enabled: true, decryptPrivateTransactions: true,