Skip to content

Commit

Permalink
feat: 🎸 Support for Quorum private transaction payload
Browse files Browse the repository at this point in the history
A new settings enables ethlogger to fetch decrypted information for
private transactions. This also enables ABI decoding of the data.
  • Loading branch information
ziegfried committed Aug 15, 2020
1 parent b713dc6 commit a25aad9
Show file tree
Hide file tree
Showing 22 changed files with 237 additions and 80 deletions.
4 changes: 4 additions & 0 deletions config.schema.json
Expand Up @@ -38,6 +38,10 @@
"description": "Max. number of blocks to fetch at once",
"type": "number"
},
"decryptPrivateTransactions": {
"description": "For chains/nodes that do support private transcations, this settings instructs block watcher to\nattempt to load the decrypted payload for private transactions",
"type": "boolean"
},
"enabled": {
"description": "Specify `false` to disable the block watcher",
"type": "boolean"
Expand Down
1 change: 1 addition & 0 deletions defaults.ethlogger.yaml
Expand Up @@ -73,6 +73,7 @@ blockWatcher:
blocksMaxChunkSize: 25
maxParallelChunks: 3
startAt: genesis
decryptPrivateTransactions: false
retryWaitTime:
type: linear-backoff
min: 0
Expand Down
5 changes: 5 additions & 0 deletions docs/cli.md
Expand Up @@ -53,6 +53,10 @@ OPTIONS
--debug
Enable debug log output
--decrypt-private-txs
Enable to download decrypted content of private transactions (only available
for Quorum at the moment)
--[no-]eth-reject-invalid-certs
Disable to allow ethereum client to connect to HTTPS without rejecting
invalid (self-signed) certificates
Expand Down Expand Up @@ -146,6 +150,7 @@ OPTIONS
| `ETH_REJECT_INVALID_CERTS` | `boolean` | Disable to allow ethereum client to connect to HTTPS without rejecting invalid (self-signed) certificates |
| `ABI_DIR` | `string` | Directory containing ABI definitions (JSON files). This directory will be searched recursively |
| `START_AT_BLOCK` | `string` | First block to start ingesting from. Possible values are "genesis", "latest", an absolute block number or a negative number describing how many blocks before the latest one to start at |
| `DECRYPT_PRIVATE_TXS` | `boolean` | Enable to download decrypted content of private transactions (only available for Quorum at the moment) |
| `REJECT_INVALID_CERTS` | `boolean` | Disable to allow all HTTP clients (HEC and ETH) to connect to HTTPS without rejecting invalid (self-signed) certificates |
| `NETWORK_NAME` | `string` | The network name will be attached to all events sent to Splunk. This is typically either "mainnet" or "testnet". |
| `CHAIN_NAME` | `string` | The name of the chain that will be attached to all events sent to Splunk |
Expand Down
17 changes: 9 additions & 8 deletions docs/configuration.md
Expand Up @@ -233,14 +233,15 @@ Ethlogger checks for each address it encounters whether it is a smart contract b

Block watcher is the component that retrieves blocks, transactions, event logs from the node and sends them to output.

| Name | Type | Description |
| -------------------- | --------------------------- | ------------------------------------------------------------------------------------------------- |
| `enabled` | `boolean` | Specify `false` to disable the block watcher |
| `pollInterval` | [`Duration`](#Duration) | Interval in which to look for the latest block number (if not busy processing the backlog) |
| `blocksMaxChunkSize` | `number` | Max. number of blocks to fetch at once |
| `maxParallelChunks` | `number` | Max. number of chunks to process in parallel |
| `startAt` | [`StartBlock`](#StartBlock) | If no checkpoint exists (yet), this specifies which block should be chosen as the starting point. |
| `retryWaitTime` | [`WaitTime`](#WaitTime) | Wait time before retrying to fetch and process blocks after failure |
| Name | Type | Description |
| ---------------------------- | --------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `enabled` | `boolean` | Specify `false` to disable the block watcher |
| `pollInterval` | [`Duration`](#Duration) | Interval in which to look for the latest block number (if not busy processing the backlog) |
| `blocksMaxChunkSize` | `number` | Max. number of blocks to fetch at once |
| `maxParallelChunks` | `number` | Max. number of chunks to process in parallel |
| `startAt` | [`StartBlock`](#StartBlock) | If no checkpoint exists (yet), this specifies which block should be chosen as the starting point. |
| `retryWaitTime` | [`WaitTime`](#WaitTime) | Wait time before retrying to fetch and process blocks after failure |
| `decryptPrivateTransactions` | `boolean` | For chains/nodes that do support private transcations, this settings instructs block watcher to attempt to load the decrypted payload for private transactions |

### NodeMetrics

Expand Down
116 changes: 70 additions & 46 deletions src/blockwatcher.ts
Expand Up @@ -2,12 +2,14 @@ import { ContractInfo, getContractInfo } from './abi/contract';
import { AbiRepository } from './abi/repo';
import { BlockRange, blockRangeSize, blockRangeToArray, chunkedBlockRanges, serializeBlockRange } from './blockrange';
import { Checkpoint } from './checkpoint';
import { BlockWatcherConfig } from './config';
import { EthereumClient } from './eth/client';
import { blockNumber, getBlock, getTransactionReceipt } from './eth/requests';
import { RawBlockResponse, RawLogResponse, RawTransactionResponse } from './eth/responses';
import { formatBlock, formatLogEvent, formatTransaction } from './format';
import { Address, AddressInfo, FormattedBlock, LogEventMessage } from './msgs';
import { Address, AddressInfo, FormattedBlock, LogEventMessage, PrivateTransactionPayload } from './msgs';
import { Output, OutputMessage } from './output';
import { isEnterprisePlatform, NodePlatformAdapter } from './platforms';
import { ABORT, AbortHandle } from './utils/abort';
import { parallel, sleep } from './utils/async';
import { bigIntToNumber } from './utils/bn';
Expand Down Expand Up @@ -61,10 +63,8 @@ export class BlockWatcher implements ManagedResource {
private checkpoints: Checkpoint;
private output: Output;
private abiRepo?: AbiRepository;
private startAt: StartBlock;
private chunkSize: number = 25;
private maxParallelChunks: number = 3;
private pollInterval: number = 500;
private config: BlockWatcherConfig;
private nodePlatform: NodePlatformAdapter;
private abortHandle = new AbortHandle();
private endCallbacks: Array<() => void> = [];
private waitAfterFailure: WaitTime;
Expand All @@ -82,36 +82,30 @@ export class BlockWatcher implements ManagedResource {
checkpoints,
output,
abiRepo,
startAt = 'genesis',
contractInfoCache,
config,
waitAfterFailure = linearBackoff({ min: 0, step: 2500, max: 120_000 }),
chunkQueueMaxSize = 1000,
chunkSize = 25,
maxParallelChunks = 3,
pollInterval = 500,
nodePlatform,
}: {
ethClient: EthereumClient;
checkpoints: Checkpoint;
output: Output;
abiRepo: AbiRepository;
startAt: StartBlock;
config: BlockWatcherConfig;
waitAfterFailure?: WaitTime;
chunkQueueMaxSize?: number;
contractInfoCache?: Cache<string, Promise<ContractInfo>>;
chunkSize: number;
maxParallelChunks: number;
pollInterval: number;
nodePlatform: NodePlatformAdapter;
}) {
this.ethClient = ethClient;
this.checkpoints = checkpoints;
this.output = output;
this.abiRepo = abiRepo;
this.startAt = startAt;
this.config = config;
this.waitAfterFailure = waitAfterFailure;
this.chunkSize = chunkSize;
this.maxParallelChunks = maxParallelChunks;
this.pollInterval = pollInterval;
this.chunkQueueMaxSize = chunkQueueMaxSize;
this.nodePlatform = nodePlatform;
if (contractInfoCache) {
this.contractInfoCache = contractInfoCache;
}
Expand All @@ -122,25 +116,25 @@ export class BlockWatcher implements ManagedResource {
const { ethClient, checkpoints } = this;

if (checkpoints.isEmpty()) {
if (typeof this.startAt === 'number') {
if (this.startAt < 0) {
if (typeof this.config.startAt === 'number') {
if (this.config.startAt < 0) {
const latestBlockNumber = await ethClient.request(blockNumber());
checkpoints.initialBlockNumber = Math.max(0, latestBlockNumber + this.startAt);
checkpoints.initialBlockNumber = Math.max(0, latestBlockNumber + this.config.startAt);
} else {
checkpoints.initialBlockNumber = this.startAt;
checkpoints.initialBlockNumber = this.config.startAt;
}
} else if (this.startAt === 'genesis') {
} else if (this.config.startAt === 'genesis') {
checkpoints.initialBlockNumber = 0;
} else if (this.startAt === 'latest') {
} else if (this.config.startAt === 'latest') {
const latestBlockNumber = await ethClient.request(blockNumber());
checkpoints.initialBlockNumber = latestBlockNumber;
} else {
throw new Error(`Invalid start block: ${JSON.stringify(this.startAt)}`);
throw new Error(`Invalid start block: ${JSON.stringify(this.config.startAt)}`);
}
info(
'Determined initial block number: %d from configured value %o',
checkpoints.initialBlockNumber,
this.startAt
this.config.startAt
);
}

Expand All @@ -164,25 +158,27 @@ export class BlockWatcher implements ManagedResource {
throw ABORT;
}
await parallel(
chunkedBlockRanges(range, this.chunkSize, this.chunkQueueMaxSize).map(chunk => async () => {
if (!this.active) {
return;
chunkedBlockRanges(range, this.config.blocksMaxChunkSize, this.chunkQueueMaxSize).map(
chunk => async () => {
if (!this.active) {
return;
}
await retry(() => this.processChunk(chunk), {
attempts: 100,
waitBetween: this.waitAfterFailure,
taskName: `block range ${serializeBlockRange(chunk)}`,
abortHandle: this.abortHandle,
warnOnError: true,
onRetry: attempt =>
warn(
'Retrying to process block range %s (attempt %d)',
serializeBlockRange(chunk),
attempt
),
});
}
await retry(() => this.processChunk(chunk), {
attempts: 100,
waitBetween: this.waitAfterFailure,
taskName: `block range ${serializeBlockRange(chunk)}`,
abortHandle: this.abortHandle,
warnOnError: true,
onRetry: attempt =>
warn(
'Retrying to process block range %s (attempt %d)',
serializeBlockRange(chunk),
attempt
),
});
}),
{ maxConcurrent: this.maxParallelChunks, abortHandle: this.abortHandle }
),
{ maxConcurrent: this.config.maxParallelChunks, abortHandle: this.abortHandle }
);
failures = 0;
}
Expand All @@ -200,7 +196,7 @@ export class BlockWatcher implements ManagedResource {
}
if (this.active) {
try {
await this.abortHandle.race(sleep(this.pollInterval));
await this.abortHandle.race(sleep(this.config.pollInterval));
} catch (e) {
if (e === ABORT) {
break;
Expand Down Expand Up @@ -300,9 +296,36 @@ export class BlockWatcher implements ManagedResource {
contractAddresInfo = await this.lookupContractInfo(receipt.contractAddress);
}

let privatePayload: PrivateTransactionPayload | undefined;
if (this.config.decryptPrivateTransactions) {
if (isEnterprisePlatform(this.nodePlatform)) {
if (this.nodePlatform.isPrivateTransaction(rawTx)) {
debug('Retrieving decrypted private transaction input %o', {
txHash: rawTx.hash,
input: rawTx.input,
});
try {
const input = await this.nodePlatform.getRawTransactionInput(rawTx.input, this.ethClient);
debug('Successfully retrieved decrypted transaction payload %o', { input });
privatePayload = { input };
} catch (e) {
error('Failed to retrieve decrypted transaction payload %o', {
txHash: rawTx.hash,
input: rawTx.input,
});
}
}
} else {
warn(
"Decryption of private transactions is enabled, but detected node type (%s) doesn't support it",
this.nodePlatform.name
);
}
}

let callInfo;
if (this.abiRepo && toInfo && toInfo.isContract) {
callInfo = this.abiRepo.decodeFunctionCall(rawTx.input, {
callInfo = this.abiRepo.decodeFunctionCall(privatePayload?.input ?? rawTx.input, {
contractAddress: rawTx.to ?? undefined,
contractFingerprint: toInfo.fingerprint,
});
Expand All @@ -321,7 +344,8 @@ export class BlockWatcher implements ManagedResource {
toAddressInfo(fromInfo),
toAddressInfo(toInfo),
toAddressInfo(contractAddresInfo),
callInfo
callInfo,
privatePayload
),
},
...(await Promise.all(receipt?.logs?.map(l => this.processTransactionLog(l, blockTime)) ?? [])),
Expand Down
5 changes: 5 additions & 0 deletions src/cliflags.ts
Expand Up @@ -157,6 +157,11 @@ export const CLI_FLAGS = {
return n;
},
}),
'decrypt-private-txs': flags.boolean({
env: 'DECRYPT_PRIVATE_TXS',
description:
'Enable to download decrypted content of private transactions (only available for Quorum at the moment)',
}),

'reject-invalid-certs': flags.boolean({
env: 'REJECT_INVALID_CERTS',
Expand Down
10 changes: 10 additions & 0 deletions src/config.ts
Expand Up @@ -197,6 +197,11 @@ export interface BlockWatcherConfigSchema {
startAt: StartBlock;
/** Wait time before retrying to fetch and process blocks after failure */
retryWaitTime: WaitTimeConfig;
/**
* For chains/nodes that do support private transcations, this settings instructs block watcher to
* attempt to load the decrypted payload for private transactions
*/
decryptPrivateTransactions: boolean;
}

export interface BlockWatcherConfig extends Omit<BlockWatcherConfigSchema, 'retryWaitTime'> {
Expand Down Expand Up @@ -786,6 +791,11 @@ export async function loadEthloggerConfig(flags: CliFlags, dryRun: boolean = fal
pollInterval: parseDuration(defaults.blockWatcher?.pollInterval) ?? 500,
startAt: parseStartAt(flags['start-at-block'] ?? defaults.blockWatcher?.startAt) ?? 'genesis',
retryWaitTime: waitTimeFromConfig(defaults.blockWatcher?.retryWaitTime) ?? 60000,
decryptPrivateTransactions:
flags['decrypt-private-txs'] ??
parseBooleanEnvVar(CLI_FLAGS['decrypt-private-txs'].env) ??
defaults.blockWatcher?.decryptPrivateTransactions ??
false,
},
checkpoint: {
filename: defaults.checkpoint?.filename ?? 'checkpoint.json',
Expand Down
7 changes: 6 additions & 1 deletion src/eth/requests.ts
Expand Up @@ -124,7 +124,7 @@ export const gethTxpool = (): EthRequest<[], GethTxpool> => ({

// Quorum specific requests

export const quroumIstanbulSnapshot = (): EthRequest<[], any> => ({
export const quorumIstanbulSnapshot = (): EthRequest<[], any> => ({
method: 'istanbul_getSnapshot',
});

Expand All @@ -144,6 +144,11 @@ export const quorumRaftCluster = (): EthRequest<[], any> => ({
method: 'raft_cluster',
});

export const quorumPrivateTransactionPayload = (id: string): EthRequest<[string], string> => ({
method: 'eth_getQuorumPayload',
params: [id],
});

// Parity specific requests

/** Returns the node enode URI */
Expand Down
5 changes: 4 additions & 1 deletion src/format.ts
Expand Up @@ -9,6 +9,7 @@ import {
FormattedPendingTransaction,
FormattedTransaction,
FunctionCall,
PrivateTransactionPayload,
} from './msgs';
import { bigIntToNumber, parseBigInt } from './utils/bn';

Expand Down Expand Up @@ -72,7 +73,8 @@ export function formatTransaction(
fromInfo?: AddressInfo,
toInfo?: AddressInfo,
contractAddressInfo?: AddressInfo,
call?: FunctionCall
call?: FunctionCall,
privatePayload?: PrivateTransactionPayload
): FormattedTransaction {
return {
...formatBaseTransaction(rawTx),
Expand All @@ -87,6 +89,7 @@ export function formatTransaction(
toInfo,
contractAddressInfo,
call,
privatePayload,
};
}

Expand Down
6 changes: 2 additions & 4 deletions src/index.ts
Expand Up @@ -190,11 +190,9 @@ class Ethlogger extends Command {
ethClient: client,
output,
abiRepo: abiRepo,
startAt: config.blockWatcher.startAt,
config: config.blockWatcher,
contractInfoCache,
chunkSize: config.blockWatcher.blocksMaxChunkSize,
maxParallelChunks: config.blockWatcher.maxParallelChunks,
pollInterval: config.blockWatcher.pollInterval,
nodePlatform: platformAdapter,
});
addResource(blockWatcher);
internalStatsCollector.addSource(blockWatcher, 'blockWatcher');
Expand Down

0 comments on commit a25aad9

Please sign in to comment.