Skip to content

Commit

Permalink
feat: 🎸 support for quorum nodes with raft consensus
Browse files Browse the repository at this point in the history
Added support for quorum nodes with raft consensus mechanism. Folded
quorum protocol info into node info (captured every minute by default)
rather than emitting them with node stats every second.
  • Loading branch information
ziegfried committed Jan 16, 2020
1 parent 1948fcb commit e2186e2
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 65 deletions.
1 change: 0 additions & 1 deletion defaults.ethlogger.yaml
Expand Up @@ -53,7 +53,6 @@ output:
pendingtx: 'ethereum:transaction:pending'
nodeInfo: 'ethereum:node:info'
nodeMetrics: 'ethereum:node:metrics'
quorumProtocol: 'ethereum:quorum:protocol'
gethPeer: 'ethereum:geth:peer'
checkpoint:
filename: checkpoints.json
Expand Down
9 changes: 8 additions & 1 deletion src/blockwatcher.ts
Expand Up @@ -15,6 +15,7 @@ import { createModuleDebug } from './utils/debug';
import { ManagedResource } from './utils/resource';
import { linearBackoff, resolveWaitTime, retry, WaitTime } from './utils/retry';
import { AggregateMetric } from './utils/stats';
import { bigIntToNumber } from './utils/bn';

const { debug, info, warn, error, trace } = createModuleDebug('blockwatcher');

Expand All @@ -32,7 +33,13 @@ export function parseBlockTime(timestamp: number | string): number {
if (typeof timestamp === 'number') {
return timestamp * 1000;
}
// TODO: handle quorum/raft timestamps

if (typeof timestamp === 'string') {
// Timestamp on Quorum/Raft nodes are emitted as nanoseconds since unix epoch
// so translate it to milliseconds
const ms = BigInt(timestamp) / BigInt(1_000_000);
return bigIntToNumber(ms);
}
throw new Error(`Unable to parse block timestamp "${timestamp}"`);
}

Expand Down
2 changes: 0 additions & 2 deletions src/config.ts
Expand Up @@ -271,8 +271,6 @@ export interface SourcetypesSchema {
nodeInfo?: string;
/** @default "ethereum:node:metrics" */
nodeMetrics?: string;
/** @default "ethereum:quorum:protocol" */
quorumProtocol?: string;
/** @default "ethereum:geth:peer" */
gethPeer?: string;
}
Expand Down
10 changes: 6 additions & 4 deletions src/msgs.ts
Expand Up @@ -222,6 +222,9 @@ export interface NodeInfo {
/** Describes transport used to access node information (jsonprc+URL) */
transport: string;

/** Detailed information from the quorum consensus mechanism */
quorumProtocol?: QuorumProtocolInfo;

[k: string]: any;
}

Expand All @@ -237,10 +240,9 @@ export interface GethPeerMessage {
body: GethPeer;
}

export interface QuorumProtocolMessage {
type: 'quorumProtocol';
time: number;
body: { consensusMechanism: 'istanbul' | 'raft'; [k: string]: any };
export interface QuorumProtocolInfo {
consensusMechanism: 'istanbul' | 'raft';
[k: string]: any;
}

export interface NodeMetrics {
Expand Down
4 changes: 0 additions & 4 deletions src/output.ts
Expand Up @@ -7,7 +7,6 @@ import {
NodeInfoMessage,
NodeMetricsMessage,
PendingTransactionMessage,
QuorumProtocolMessage,
TransactionMessage,
} from './msgs';
import { createDebug } from './utils/debug';
Expand All @@ -22,7 +21,6 @@ export const defaultSourcetypes = {
pendingtx: 'ethereum:transaction:pending',
nodeInfo: 'ethereum:node:info',
nodeMetrics: 'ethereum:node:metrics',
quorumProtocol: 'ethereum:quorum:protocol',
gethPeer: 'ethereum:geth:peer',
};

Expand All @@ -33,7 +31,6 @@ export type OutputMessage =
| LogEventMessage
| NodeInfoMessage
| NodeMetricsMessage
| QuorumProtocolMessage
| GethPeerMessage;

export interface Output extends ManagedResource {
Expand All @@ -52,7 +49,6 @@ export class HecOutput implements Output, ManagedResource {
case 'event':
case 'pendingtx':
case 'nodeInfo':
case 'quorumProtocol':
case 'gethPeer':
this.eventsHec.pushEvent({
time: msg.time,
Expand Down
87 changes: 36 additions & 51 deletions src/platforms/quorum.ts
@@ -1,57 +1,45 @@
import { createModuleDebug } from '../utils/debug';
import { GethAdapter } from './geth';
import { EthereumClient } from '../eth/client';
import { OutputMessage } from '../output';
import {
quroumIstanbulSnapshot,
quorumIstanbulCandidates,
quorumRaftRole,
quorumRaftLeader,
quorumRaftCluster,
quorumRaftLeader,
quorumRaftRole,
quroumIstanbulSnapshot,
} from '../eth/requests';
import { QuorumProtocolInfo } from '../msgs';
import { createModuleDebug } from '../utils/debug';
import { GethAdapter } from './geth';

const { debug, warn } = createModuleDebug('platforms:quorum');

export type QUORUM_CONSENSUS = 'istanbul' | 'raft';

export async function captureIstanbulData(ethClient: EthereumClient, captureTime: number): Promise<OutputMessage[]> {
export async function captureIstanbulData(ethClient: EthereumClient): Promise<QuorumProtocolInfo> {
debug('Capturing istanbul data from quorum node');
const [snapshot, candidates] = await Promise.all([
ethClient.request(quroumIstanbulSnapshot()),
ethClient.request(quorumIstanbulCandidates()),
]);
return [
{
type: 'quorumProtocol',
time: captureTime,
body: {
consensusMechanism: 'istanbul',
snapshot,
candidates,
},
},
];
return {
consensusMechanism: 'istanbul',
snapshot,
candidates,
};
}

export async function captureRaftData(ethClient: EthereumClient, captureTime: number): Promise<OutputMessage[]> {
export async function captureRaftData(ethClient: EthereumClient): Promise<QuorumProtocolInfo> {
debug('Capturing raft data from quorum node');
const [role, leader, cluster] = await Promise.all([
ethClient.request(quorumRaftRole()),
ethClient.request(quorumRaftLeader()),
ethClient.request(quorumRaftCluster()),
]);
return [
{
type: 'quorumProtocol',
time: captureTime,
body: {
consensusMechanism: 'raft',
role,
leader,
cluster,
},
},
];
return {
consensusMechanism: 'raft',
role,
leader,
cluster,
};
}

export class QuorumAdapter extends GethAdapter {
Expand All @@ -62,38 +50,35 @@ export class QuorumAdapter extends GethAdapter {
if ('istanbul' in (this.nodeInfo?.gethInfo?.protocols ?? {})) {
this.consensus = 'istanbul';
} else {
// TODO check for raft
warn(
'Unable to determine quorum consensus mechanism by inspecting nodeInfo protocols: %o',
this.gethNodeInfo?.protocols
);
try {
const raftInfo = await captureRaftData(ethClient);
debug('Successfully retrieved raft data from node: %o, assuming consensus mechanism is raft', raftInfo);
this.consensus = 'raft';
} catch (e) {
warn(
'Unable to determine quorum consensus mechanism by inspecting nodeInfo protocols: %o',
this.gethNodeInfo?.protocols
);
}
}
}

public async captureNodeInfo(ethClient: EthereumClient) {
const gethResult = await super.captureNodeInfo(ethClient);
const quorumProtocol =
this.consensus === 'istanbul'
? await captureIstanbulData(ethClient)
: this.consensus === 'raft'
? await captureRaftData(ethClient)
: undefined;
return {
...gethResult,
name: this.name,
quorum: {
consensus: this.consensus ?? null,
},
quorumProtocol,
};
}

public get name() {
return this.consensus == null ? 'quorum' : `quorum:${this.consensus}`;
}

public async captureNodeStats(ethClient: EthereumClient, captureTime: number) {
const [baseGethMsgs, quorumProtocolMsgs] = await Promise.all([
super.captureNodeStats(ethClient, captureTime),
this.consensus === 'istanbul'
? captureIstanbulData(ethClient, captureTime)
: this.consensus === 'raft'
? captureRaftData(ethClient, captureTime)
: [],
]);
return [...baseGethMsgs, ...quorumProtocolMsgs];
}
}
2 changes: 1 addition & 1 deletion src/utils/bn.ts
Expand Up @@ -19,7 +19,7 @@ export function parseBigInt(input: number | string): number | string {
* If the value exeeds the safe bounds of integers in JS it will throw an error.
* @param input "0x"-prefixed hex string, base 10 number string or number
*/
export function bigIntToNumber(input: number | string): number {
export function bigIntToNumber(input: number | string | BigInt): number {
const n = BigInt(input);
if (n > MAX_NUMBER || n < MIN_NUMBER) {
throw new Error(`BigInt overflow for "${input}" - cannot convert to number`);
Expand Down
1 change: 0 additions & 1 deletion test/config.test.ts
Expand Up @@ -109,7 +109,6 @@ test('defaults', async () => {
"nodeInfo": "ethereum:node:info",
"nodeMetrics": "ethereum:node:metrics",
"pendingtx": "ethereum:transaction:pending",
"quorumProtocol": "ethereum:quorum:protocol",
"transaction": "ethereum:transaction",
},
"type": "hec",
Expand Down

0 comments on commit e2186e2

Please sign in to comment.