Skip to content

Commit

Permalink
Extracted Timers from StragglerTimer. Added unit tests. Made
Browse files Browse the repository at this point in the history
StraggleTimeoutMS configurable
  • Loading branch information
pieterjan84 committed Mar 13, 2024
1 parent 74f019e commit cd1acfa
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 23 deletions.
8 changes: 6 additions & 2 deletions src/__tests__/crawler.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ it('should crawl, listen for validating nodes and harvest quorumSets', async ()
flowControlSendMoreBatchSizeBytes: 100000
};

const crawler = createCrawler(new CrawlerConfiguration(nodeConfig));
const crawlerConfig = new CrawlerConfiguration(nodeConfig);
crawlerConfig.peerStraggleTimeoutMS = 2000;
const crawler = createCrawler(crawlerConfig);
const crawlState = new CrawlState(
trustedQSet,
new Map(),
Expand Down Expand Up @@ -216,7 +218,9 @@ it('should hit the max crawl limit', async function () {
const nodeConfig = getConfigFromEnv();
nodeConfig.network = Networks.TESTNET;

const crawler = createCrawler(new CrawlerConfiguration(nodeConfig, 25, 1000));
const crawler = createCrawler(
new CrawlerConfiguration(nodeConfig, 25, 1000, new Set(), 1000)
);
const crawlState = new CrawlState(
trustedQSet,
new Map(),
Expand Down
3 changes: 2 additions & 1 deletion src/crawler-configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export class CrawlerConfiguration {
public nodeConfig: NodeConfig, //How many connections can be open at the same time. The higher the number, the faster the crawl
public maxOpenConnections = 25,
public maxCrawlTime = 1800000, //max nr of ms the crawl will last. Safety guard in case crawler is stuck.
public blackList = new Set<PublicKey>()
public blackList = new Set<PublicKey>(),
public peerStraggleTimeoutMS = 10000 //time in ms that we listen to a node to determine if it is validating a confirmed closed ledger
) {}
}
10 changes: 9 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import { OnPeerConnectionClosed } from './network-observer/peer-event-handler/on
import { OnPeerData } from './network-observer/peer-event-handler/on-peer-data';
import { NetworkObserverStateManager } from './network-observer/network-observer-state-manager';
import { PeerEventHandler } from './network-observer/peer-event-handler/peer-event-handler';
import { Timers } from './utilities/timers';
import { TimerFactory } from './utilities/timer-factory';

export { Crawler } from './crawler';
export { CrawlResult } from './crawl-result';
Expand Down Expand Up @@ -62,7 +64,13 @@ export function createCrawler(
logger
);

const stragglerTimer = new StragglerTimer(connectionManager, logger);
const timers = new Timers(new TimerFactory());
const stragglerTimer = new StragglerTimer(
connectionManager,
timers,
config.peerStraggleTimeoutMS,
logger
);
const peerEventHandler = new PeerEventHandler(
new OnPeerConnected(stragglerTimer, connectionManager, logger),
new OnPeerConnectionClosed(quorumSetManager, logger),
Expand Down
82 changes: 82 additions & 0 deletions src/network-observer/__tests__/straggler-timer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { mock } from 'jest-mock-extended';
import { P } from 'pino';
import { ConnectionManager } from '../connection-manager';
import { StragglerTimer } from '../straggler-timer';
import { Timers } from '../../utilities/timers';

describe('StragglerTimer', () => {
const logger = mock<P.Logger>();
const connectionManager = mock<ConnectionManager>();
const timers = mock<Timers>();
const stragglerHandler = new StragglerTimer(
connectionManager,
timers,
1000,
logger
);

beforeEach(() => {
jest.clearAllMocks();
});

it('should start straggler timeout', () => {
const addresses = ['address1', 'address2'];
stragglerHandler.startStragglerTimeout(addresses);
const callback1 = timers.startTimer.mock.calls[0][1];
expect(timers.startTimer).toHaveBeenCalledTimes(1);
callback1();

expect(connectionManager.disconnectByAddress).toHaveBeenCalledWith(
'address1'
);
expect(connectionManager.disconnectByAddress).toHaveBeenCalledWith(
'address2'
);
});

it('should not start timeout if addresses is empty', () => {
stragglerHandler.startStragglerTimeout([]);
expect(timers.startTimer).not.toHaveBeenCalled();
});

it('should start straggler timeout for active non top tier peers', () => {
connectionManager.getActiveConnectionAddresses.mockReturnValue([
'peerAddress',
'topTierAddress'
]);
stragglerHandler.startStragglerTimeoutForActivePeers(
false,
new Set(['topTierAddress'])
);
const callback1 = timers.startTimer.mock.calls[0][1];
expect(timers.startTimer).toHaveBeenCalledTimes(1);
callback1();

expect(connectionManager.disconnectByAddress).toHaveBeenCalledWith(
'peerAddress'
);
expect(connectionManager.disconnectByAddress).toHaveBeenCalledTimes(1);
});

it('should start straggler timeout for active top tier peers', () => {
connectionManager.getActiveConnectionAddresses.mockReturnValue([
'peerAddress',
'topTierAddress'
]);
stragglerHandler.startStragglerTimeoutForActivePeers(
true,
new Set('topTierAddress')
);
const callback1 = timers.startTimer.mock.calls[0][1];
expect(timers.startTimer).toHaveBeenCalledTimes(1);
callback1();

expect(connectionManager.disconnectByAddress).toHaveBeenCalledTimes(2);
expect(connectionManager.disconnectByAddress).toHaveBeenCalledWith(
'peerAddress'
);
expect(connectionManager.disconnectByAddress).toHaveBeenCalledWith(
'topTierAddress'
);
});
});
2 changes: 1 addition & 1 deletion src/network-observer/network-observer-state-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export class NetworkObserverStateManager {

public moveToIdleState(callback: () => void) {
assert(this._state === NetworkObserverState.Stopping);
this.stragglerTimer.stopTimers(); //a node could have disconnected during the straggler timeout
this.stragglerTimer.stopStragglerTimeouts(); //a node could have disconnected during the straggler timeout
this.connectionManager.shutdown();
this._state = NetworkObserverState.Idle;
callback();
Expand Down
24 changes: 6 additions & 18 deletions src/network-observer/straggler-timer.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import { Timer } from '../utilities/timer';
import { ConnectionManager } from './connection-manager';
import { P } from 'pino';
import { Timers } from '../utilities/timers';

export class StragglerTimer {
public static readonly PEER_STRAGGLE_TIMEOUT = 10000; //if the network has externalized, a node has 10 seconds to catch up.

private timers: Set<Timer> = new Set();

constructor(
private connectionManager: ConnectionManager,
private timers: Timers,
private straggleTimeoutMS: number,
private logger: P.Logger
) {}

Expand All @@ -26,25 +24,15 @@ export class StragglerTimer {

public startStragglerTimeout(addresses: string[]) {
if (addresses.length === 0) return;
this.startTimer(StragglerTimer.PEER_STRAGGLE_TIMEOUT, () => {
this.timers.startTimer(this.straggleTimeoutMS, () => {
this.logger.debug({ addresses }, 'Straggler timeout hit');
addresses.forEach((address) => {
this.connectionManager.disconnectByAddress(address);
});
});
}

private startTimer(time: number, callback: () => void) {
const timer = new Timer();
const myCallback = () => {
this.timers.delete(timer);
callback();
};
timer.startTimer(time, callback);
this.timers.add(timer);
}

stopTimers() {
this.timers.forEach((timer) => timer.stopTimer());
public stopStragglerTimeouts() {
this.timers.stopTimers();
}
}
41 changes: 41 additions & 0 deletions src/utilities/__tests__/timers.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { TimerFactory } from '../timer-factory';
import { Timers } from '../timers';
import { mock } from 'jest-mock-extended';
import { Timer } from '../timer';

describe('timers', () => {
const timerFactory = mock<TimerFactory>();
const timers = new Timers(timerFactory);

beforeEach(() => {
jest.clearAllMocks();
});

it('should start timer', () => {
const callback = jest.fn();
const timer = mock<Timer>();
timerFactory.createTimer.mockReturnValue(timer);
timers.startTimer(1000, callback);

const calledTime = timer.startTimer.mock.calls[0][0];
const timerCallback = timer.startTimer.mock.calls[0][1];

timerCallback();

expect(calledTime).toBe(1000);
expect(timerFactory.createTimer).toHaveBeenCalled();
expect(timer.startTimer).toHaveBeenCalled();
expect(timers.hasActiveTimers()).toBeFalsy();
});

it('should stop timers', () => {
const timer = mock<Timer>();
timerFactory.createTimer.mockReturnValue(timer);
const callback = jest.fn();
timers.startTimer(1000, callback);
timers.stopTimers();
expect(timer.stopTimer).toHaveBeenCalled();
expect(timers.hasActiveTimers()).toBeFalsy();
expect(callback).not.toHaveBeenCalled();
});
});
7 changes: 7 additions & 0 deletions src/utilities/timer-factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { Timer } from './timer';

export class TimerFactory {
createTimer() {
return new Timer();
}
}
27 changes: 27 additions & 0 deletions src/utilities/timers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { Timer } from './timer';
import { TimerFactory } from './timer-factory';

export class Timers {
private timers: Set<Timer> = new Set();

constructor(private timerFactory: TimerFactory) {}

public startTimer(time: number, callback: () => void) {
const timer = this.timerFactory.createTimer();
const myCallback = () => {
this.timers.delete(timer);
callback();
};
timer.startTimer(time, myCallback);
this.timers.add(timer);
}

public stopTimers() {
this.timers.forEach((timer) => timer.stopTimer());
this.timers = new Set();
}

public hasActiveTimers() {
return this.timers.size > 0;
}
}

0 comments on commit cd1acfa

Please sign in to comment.