Skip to content

Commit

Permalink
fix: 🐛 extended peer information now with dedicated config
Browse files Browse the repository at this point in the history
Moved extended peer info collection (only available for geth atm) to
have its own configuration and schedule, to be consistent with how
pending transaction collection is configured.
  • Loading branch information
ziegfried committed Feb 13, 2020
1 parent 9d6cc3f commit 54bdfd9
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 7 deletions.
4 changes: 4 additions & 0 deletions defaults.ethlogger.yaml
Expand Up @@ -97,5 +97,9 @@ pendingTx:
enabled: false
collectInterval: 10s
retryWaitTime: 10s
peerInfo:
enabled: false
collectInterval: 10s
retryWaitTime: 10s
internalMetrics:
collectInterval: 1s
4 changes: 4 additions & 0 deletions examples/quorum/docker-compose.yaml
Expand Up @@ -344,6 +344,8 @@ services:
- SPLUNK_INTERNAL_INDEX=metrics
- SPLUNK_HEC_REJECT_INVALID_CERTS=false
- ABI_DIR=/abi
- COLLECT_PENDING_TX=true
- COLLECT_PEER_INFO=true
- DEBUG=ethlogger:abi:*
volumes:
- ./abi:/abi
Expand All @@ -370,6 +372,8 @@ services:
- SPLUNK_HEC_REJECT_INVALID_CERTS=false
- COLLECT_BLOCKS=false
- ABI_DIR=/abi
- COLLECT_PENDING_TX=true
- COLLECT_PEER_INFO=true
- DEBUG=ethlogger:abi:*
volumes:
- ./abi:/abi
Expand Down
5 changes: 5 additions & 0 deletions src/cliflags.ts
Expand Up @@ -53,6 +53,11 @@ export const CLI_FLAGS = {
env: 'COLLECT_PENDING_TX',
description: 'Enables collection of pending transactions',
}),
'collect-peer-info': flags.boolean({
allowNo: true,
env: 'COLLECT_PEER_INFO',
description: 'Enables collection of detailed peer information',
}),
'collect-internal-metrics': flags.boolean({
allowNo: true,
env: 'COLLECT_INTERNAL_METRICS',
Expand Down
30 changes: 30 additions & 0 deletions src/config.ts
Expand Up @@ -39,6 +39,8 @@ export interface EthloggerConfigSchema {
nodeInfo: NodeInfoConfigSchema;
/** Settings for collecting pending transactions from node */
pendingTx: PendingTxConfigSchema;
/** Settings for collecting peer informataion from the node */
peerInfo: PeerInfoConfigSchema;
/** Settings for internal metrics collection */
internalMetrics: InternalMetricsConfigSchema;
}
Expand All @@ -60,6 +62,7 @@ export interface EthloggerConfig {
nodeMetrics: NodeMetricsConfig;
nodeInfo: NodeInfoConfig;
pendingTx: PendingTxConfig;
peerInfo: PeerInfoConfig;
internalMetrics: InternalMetricsConfig;
}

Expand Down Expand Up @@ -239,6 +242,24 @@ export interface PendingTxConfig extends Omit<PendingTxConfigSchema, 'retryWaitT
retryWaitTime: WaitTime;
}

/**
* Periodic collection of detailed peer information.
* Note that this is only possible with certain types of ethereum nodes (geth atm)
*/
export interface PeerInfoConfigSchema {
/** Enable or disable collection of peer informataion */
enabled: boolean;
/** Interval in which to collect peer information */
collectInterval: DurationConfig;
/** Wait time before retrying to collect peer information after failure */
retryWaitTime: WaitTimeConfig;
}

export interface PeerInfoConfig extends Omit<PeerInfoConfigSchema, 'retryWaitTime'> {
collectInterval: Duration;
retryWaitTime: WaitTime;
}

/**
* Ethlogger-internal metrics allow for visibility into the operation of ethlogger itself.
*/
Expand Down Expand Up @@ -813,6 +834,15 @@ export async function loadEthloggerConfig(flags: CliFlags, dryRun: boolean = fal
collectInterval: parseDuration(defaults.pendingTx?.collectInterval) ?? 30000,
retryWaitTime: waitTimeFromConfig(defaults.pendingTx?.retryWaitTime) ?? 30000,
},
peerInfo: {
enabled:
flags['collect-peer-info'] ??
parseBooleanEnvVar(CLI_FLAGS['collect-peer-info'].env) ??
defaults.peerInfo?.enabled ??
false,
collectInterval: parseDuration(defaults.peerInfo?.collectInterval) ?? 10000,
retryWaitTime: waitTimeFromConfig(defaults.peerInfo?.retryWaitTime) ?? 10000,
},
};

const result = config as EthloggerConfig;
Expand Down
7 changes: 4 additions & 3 deletions src/index.ts
@@ -1,14 +1,15 @@
import { Command } from '@oclif/command';
import debugModule from 'debug';
import { inspect } from 'util';
import { ContractInfo } from './abi/contract';
import { AbiRepository } from './abi/repo';
import { BlockWatcher } from './blockwatcher';
import { Checkpoint } from './checkpoint';
import { CLI_FLAGS } from './cliflags';
import { ConfigError, loadEthloggerConfig, EthloggerConfig } from './config';
import { ContractInfo } from './abi/contract';
import { ConfigError, EthloggerConfig, loadEthloggerConfig } from './config';
import { BatchedEthereumClient, EthereumClient } from './eth/client';
import { HttpTransport } from './eth/http';
import { checkHealthState, HealthStateMonitor } from './health';
import { HecClient } from './hec';
import { introspectTargetNodePlatform } from './introspect';
import { substituteVariablesInHecConfig } from './meta';
Expand All @@ -20,7 +21,6 @@ import LRUCache from './utils/lru';
import { ManagedResource, shutdownAll } from './utils/resource';
import { waitForSignal } from './utils/signal';
import { InternalStatsCollector } from './utils/stats';
import { checkHealthState, HealthStateMonitor } from './health';

const { debug, error, info } = createModuleDebug('cli');

Expand Down Expand Up @@ -154,6 +154,7 @@ class Ethlogger extends Command {
nodeMetrics: config.nodeMetrics,
nodeInfo: config.nodeInfo,
pendingTx: config.pendingTx,
peerInfo: config.peerInfo,
});
addResource(nodeStatsCollector);
internalStatsCollector.addSource(nodeStatsCollector, 'nodeStatsCollector');
Expand Down
40 changes: 39 additions & 1 deletion src/nodestats.ts
@@ -1,4 +1,4 @@
import { NodeInfoConfig, NodeMetricsConfig, PendingTxConfig } from './config';
import { NodeInfoConfig, NodeMetricsConfig, PendingTxConfig, PeerInfoConfig } from './config';
import { EthereumClient } from './eth/client';
import { Output, OutputMessage } from './output';
import { NodePlatformAdapter } from './platforms';
Expand All @@ -18,6 +18,8 @@ const initialCounters = {
metricsErrorCount: 0,
pendingTxCollectCount: 0,
pendingTxErrorCount: 0,
peerInfoCollectCount: 0,
peerInfoErrorCount: 0,
};

export class NodeStatsCollector implements ManagedResource {
Expand All @@ -28,6 +30,7 @@ export class NodeStatsCollector implements ManagedResource {
infoCollectDuration: new AggregateMetric(),
metricsCollectDuration: new AggregateMetric(),
pendingtxCollectDuration: new AggregateMetric(),
peerInfoCollectDuration: new AggregateMetric(),
};
constructor(
private config: {
Expand All @@ -37,6 +40,7 @@ export class NodeStatsCollector implements ManagedResource {
nodeInfo: NodeInfoConfig;
nodeMetrics: NodeMetricsConfig;
pendingTx: PendingTxConfig;
peerInfo: PeerInfoConfig;
}
) {}

Expand All @@ -47,6 +51,7 @@ export class NodeStatsCollector implements ManagedResource {
this.startCollectingNodeInfo(),
this.startCollectingNodeMetrics(),
this.startCollectingPendingTransactions(),
this.startCollectingPeerInfo(),
]);
this.donePromise = p;
await p;
Expand Down Expand Up @@ -132,6 +137,11 @@ export class NodeStatsCollector implements ManagedResource {
return await abort.race(platformAdapter.capturePendingTransactions(ethClient, time));
};

collectPeerInfo = async (abort: AbortHandle, time: number): Promise<OutputMessage[]> => {
const { platformAdapter, ethClient } = this.config;
return await abort.race(platformAdapter.capturePeerInfo!(ethClient, time));
};

private async startCollectingNodeInfo() {
if (!this.config.nodeInfo.enabled) {
debug('Node info collection is disabled');
Expand Down Expand Up @@ -192,6 +202,34 @@ export class NodeStatsCollector implements ManagedResource {
);
}

private async startCollectingPeerInfo() {
if (!this.config.peerInfo.enabled) {
debug('Peer info collection is disabled');
return;
}

const { platformAdapter, ethClient } = this.config;
const peerInfoSupported = await platformAdapter.supportsPeerInfo(ethClient);
if (!peerInfoSupported) {
warn(
'Collection of peer information is not supported by the ethereum node %s (platform %s)',
ethClient.transport.source,
platformAdapter.name
);
return;
}

await this.periodicallyCollect(
this.collectPeerInfo,
'peer info',
this.config.peerInfo.collectInterval,
this.config.peerInfo.retryWaitTime,
this.aggregates.peerInfoCollectDuration,
'peerInfoCollectCount',
'peerInfoErrorCount'
);
}

public flushStats() {
const stats = {
...this.counters,
Expand Down
8 changes: 7 additions & 1 deletion src/platforms/generic.ts
Expand Up @@ -224,7 +224,13 @@ export class GenericNodeAdapter implements NodePlatformAdapter {
return fetchPendingTransactions(ethClient, captureTime);
}

public async supportsPendingTransactions(): Promise<boolean> {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
public async supportsPendingTransactions(_: EthereumClient): Promise<boolean> {
return this.supports?.pendingTransactions ?? false;
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
public async supportsPeerInfo(_: EthereumClient): Promise<boolean> {
return false;
}
}
15 changes: 13 additions & 2 deletions src/platforms/geth.ts
Expand Up @@ -6,7 +6,7 @@ import { NodeInfo, PendingTransactionMessage, NodeMetricsMessage } from '../msgs
import { OutputMessage } from '../output';
import { createModuleDebug } from '../utils/debug';
import { durationStringToMs, parseAbbreviatedNumber } from '../utils/parse';
import { captureDefaultMetrics, GenericNodeAdapter } from './generic';
import { captureDefaultMetrics, GenericNodeAdapter, checkRpcMethodSupport } from './generic';

const { debug, error } = createModuleDebug('platforms:geth');

Expand Down Expand Up @@ -162,12 +162,23 @@ export class GethAdapter extends GenericNodeAdapter {
const [defaultMetrics, gethMetrics] = await Promise.all([
captureDefaultMetrics(ethClient, captureTime),
captureGethMetrics(ethClient, captureTime),
capturePeers(ethClient, captureTime),
]);
return [defaultMetrics, gethMetrics];
}

public async supportsPendingTransactions(ethClient: EthereumClient): Promise<boolean> {
return await checkRpcMethodSupport(ethClient, gethTxpool());
}

public async capturePendingTransactions(ethClient: EthereumClient, captureTime: number): Promise<OutputMessage[]> {
return await captureTxpoolData(ethClient, captureTime);
}

public async supportsPeerInfo(ethClient: EthereumClient) {
return await checkRpcMethodSupport(ethClient, gethPeers());
}

public async capturePeerInfo?(ethClient: EthereumClient, captureTime: number): Promise<OutputMessage[]> {
return await capturePeers(ethClient, captureTime);
}
}
2 changes: 2 additions & 0 deletions src/platforms/index.ts
Expand Up @@ -20,4 +20,6 @@ export interface NodePlatformAdapter {
captureNodeInfo(ethClient: EthereumClient): Promise<NodeInfo>;
capturePendingTransactions(ethClient: EthereumClient, captureTime: number): Promise<OutputMessage[]>;
supportsPendingTransactions(ethClient: EthereumClient): Promise<boolean>;
capturePeerInfo?(ethClient: EthereumClient, captureTime: number): Promise<OutputMessage[]>;
supportsPeerInfo(ethClient: EthereumClient): Promise<boolean>;
}
5 changes: 5 additions & 0 deletions test/config.test.ts
Expand Up @@ -119,6 +119,11 @@ test('defaults', async () => {
},
"type": "hec",
},
"peerInfo": Object {
"collectInterval": 10000,
"enabled": false,
"retryWaitTime": 10000,
},
"pendingTx": Object {
"collectInterval": 10000,
"enabled": false,
Expand Down

0 comments on commit 54bdfd9

Please sign in to comment.