Skip to content

Commit

Permalink
Added tests for NetworkObserver. Added ObservationFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
pieterjan84 committed Mar 13, 2024
1 parent d7cff13 commit 39264fe
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 6 deletions.
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { PeerEventHandler } from './network-observer/peer-event-handler/peer-eve
import { Timers } from './utilities/timers';
import { TimerFactory } from './utilities/timer-factory';
import { ConsensusTimer } from './network-observer/consensus-timer';
import { ObservationFactory } from './network-observer/observation-factory';

export { Crawler } from './crawler';
export { CrawlResult } from './crawl-result';
Expand Down Expand Up @@ -94,6 +95,7 @@ export function createCrawler(
logger
);
const peerNetworkManager = new NetworkObserver(
new ObservationFactory(),
connectionManager,
quorumSetManager,
peerEventHandler,
Expand Down
160 changes: 160 additions & 0 deletions src/network-observer/__tests__/network-observer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import { NetworkObserver } from '../network-observer';
import { mock } from 'jest-mock-extended';
import { P } from 'pino';
import {
ClosePayload,
ConnectionManager,
DataPayload
} from '../connection-manager';
import { QuorumSetManager } from '../quorum-set-manager';
import { PeerEventHandler } from '../peer-event-handler/peer-event-handler';
import { ObservationManager } from '../observation-manager';
import { CrawlState } from '../../crawl-state';
import { NodeAddress } from '../../node-address';
import { ObservationFactory } from '../observation-factory';
import { Observation } from '../observation';
import { ObservationState } from '../observation-state';
import { EventEmitter } from 'events';
import { Ledger } from '../../crawler';
import { nextTick } from 'async';

describe('network-observer', () => {
const observationFactory = mock<ObservationFactory>();
const connectionManager = mock<ConnectionManager>();
const connectionManagerEmitter = new EventEmitter();
connectionManager.on.mockImplementation((event, listener) => {
connectionManagerEmitter.on(event, listener);
return connectionManager;
});
const quorumSetManager = mock<QuorumSetManager>();
const peerEventHandler = mock<PeerEventHandler>();
const observationManager = mock<ObservationManager>();

const networkObserver = new NetworkObserver(
observationFactory,
connectionManager,
quorumSetManager,
peerEventHandler,
observationManager
);

beforeEach(() => {
jest.clearAllMocks();
});

it('should observe', async () => {
const topTierNodes: NodeAddress[] = [];
connectionManager.getNumberOfActiveConnections.mockReturnValue(1);
const observation = mock<Observation>();
observationFactory.createObservation.mockReturnValue(observation);
const crawlState = mock<CrawlState>();
const result = await networkObserver.observe(topTierNodes, crawlState);
expect(result).toBe(1);
expect(observationFactory.createObservation).toHaveBeenCalled();
expect(observationManager.startSync).toHaveBeenCalledWith(observation);
});

it('should connect to node', async () => {
const ip = 'localhost';
const port = 11625;
const observation = new Observation([['localhost', 11625]], mock(), mock());
observation.state = ObservationState.Synced;
observationFactory.createObservation.mockReturnValue(observation);
await networkObserver.observe([mock<NodeAddress>()], mock<CrawlState>());
networkObserver.connectToNode(ip, port);
expect(connectionManager.connectToNode).toHaveBeenCalledWith(ip, port);
});

it('should stop', async () => {
const observation = new Observation([['localhost', 11625]], mock(), mock());
observation.state = ObservationState.Synced;
observationFactory.createObservation.mockReturnValue(observation);
await networkObserver.observe([mock<NodeAddress>()], mock<CrawlState>());
observationManager.stopObservation.mockImplementation((observation, cb) => {
cb();
});
const result = await networkObserver.stop();
expect(result).toBe(observation);
});

it('should handle peer data', async () => {
const data = mock<DataPayload>();
peerEventHandler.onData.mockReturnValue({
closedLedger: null,
peers: []
});
const observation = new Observation([['localhost', 11625]], mock(), mock());
observation.state = ObservationState.Synced;
observationFactory.createObservation.mockReturnValue(observation);
await networkObserver.observe([mock<NodeAddress>()], mock<CrawlState>());
connectionManagerEmitter.emit('data', data);
expect(peerEventHandler.onData).toHaveBeenCalledWith(data, observation);
});

it('should handle closed ledger through peer data event', async () => {
const data = mock<DataPayload>();
peerEventHandler.onData.mockReturnValue({
closedLedger: mock<Ledger>(),
peers: []
});
const observation = new Observation([['localhost', 11625]], mock(), mock());
observation.state = ObservationState.Synced;
observationFactory.createObservation.mockReturnValue(observation);
await networkObserver.observe([mock<NodeAddress>()], mock<CrawlState>());
connectionManagerEmitter.emit('data', data);
expect(peerEventHandler.onData).toHaveBeenCalledWith(data, observation);
expect(observationManager.ledgerCloseConfirmed).toHaveBeenCalledWith(
observation,
peerEventHandler.onData(data, observation).closedLedger
);
});

it('should emit peers event through peer data event', async () => {
const data = mock<DataPayload>();
peerEventHandler.onData.mockReturnValue({
closedLedger: null,
peers: [['localhost', 11625]]
});
const observation = new Observation([['localhost', 11625]], mock(), mock());
networkObserver.on('peers', (peers) => {
expect(peers).toEqual([['localhost', 11625]]);
});
observation.state = ObservationState.Synced;
observationFactory.createObservation.mockReturnValue(observation);
await networkObserver.observe([mock<NodeAddress>()], mock<CrawlState>());
connectionManagerEmitter.emit('data', data);
expect(peerEventHandler.onData).toHaveBeenCalledWith(data, observation);
expect(observationManager.ledgerCloseConfirmed).not.toHaveBeenCalled();
nextTick(() => {}); //to make sure event is checked
});

it('should handle connected event', async () => {
const data = mock<DataPayload>();
const observation = new Observation([['localhost', 11625]], mock(), mock());
observation.state = ObservationState.Synced;
observationFactory.createObservation.mockReturnValue(observation);
await networkObserver.observe([mock<NodeAddress>()], mock<CrawlState>());
connectionManagerEmitter.emit('connected', data);
expect(peerEventHandler.onConnected).toHaveBeenCalledWith(
data,
observation
);
});

it('should handle close event', async () => {
const data = mock<DataPayload>();
const observation = new Observation([['localhost', 11625]], mock(), mock());
observation.state = ObservationState.Synced;
observationFactory.createObservation.mockReturnValue(observation);
networkObserver.on('disconnect', (close: ClosePayload) => {
expect(close).toEqual(data);
});
await networkObserver.observe([mock<NodeAddress>()], mock<CrawlState>());
connectionManagerEmitter.emit('close', data);
expect(peerEventHandler.onConnectionClose).toHaveBeenCalledWith(
data,
observation
);
nextTick(() => {}); //to make sure close event is checked
});
});
135 changes: 135 additions & 0 deletions src/network-observer/__tests__/observation-manager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import { ObservationManager } from '../observation-manager';
import { mock } from 'jest-mock-extended';
import { ConnectionManager } from '../connection-manager';
import { ConsensusTimer } from '../consensus-timer';
import { StragglerTimer } from '../straggler-timer';
import { P } from 'pino';
import { Observation } from '../observation';
import { PeerNodeCollection } from '../../peer-node-collection';
import { CrawlState } from '../../crawl-state';
import { ObservationState } from '../observation-state';

describe('ObservationManager', () => {
const connectionManager = mock<ConnectionManager>();
const consensusTimer = mock<ConsensusTimer>();
const stragglerTimer = mock<StragglerTimer>();
const logger = mock<P.Logger>();

const observationManager = new ObservationManager(
connectionManager,
consensusTimer,
stragglerTimer,
200,
logger
);

beforeEach(() => {
jest.clearAllMocks();
});

it('should start syncing', (resolve) => {
const peerNodes = new PeerNodeCollection();
const observation = new Observation(
[['localhost', 11625]],
peerNodes,
mock<CrawlState>()
);
observationManager.startSync(observation).then(() => {
expect(connectionManager.connectToNode).toHaveBeenCalledWith(
observation.topTierAddresses[0][0],
observation.topTierAddresses[0][1]
);
expect(consensusTimer.start).toHaveBeenCalled();
expect(observation.state).toBe(ObservationState.Synced);
resolve();
});

expect(observation.state).toBe(ObservationState.Syncing);
});

it('should stop observation immediately if no more active nodes', (resolve) => {
connectionManager.getNumberOfActiveConnections.mockReturnValue(0);
const peerNodes = new PeerNodeCollection();
const observation = new Observation(
[['localhost', 11625]],
peerNodes,
mock<CrawlState>()
);
observation.moveToSyncingState();
observation.moveToSyncedState();
observationManager.stopObservation(observation, () => {});

expect(observation.state).toBe(ObservationState.Stopped);
expect(consensusTimer.stop).toHaveBeenCalled();
expect(stragglerTimer.stopStragglerTimeouts).toHaveBeenCalled();
expect(connectionManager.shutdown).toHaveBeenCalled();
resolve();
});

it('should stop observation after all active nodes are disconnected', (resolve) => {
connectionManager.getNumberOfActiveConnections.mockReturnValue(1);
const peerNodes = new PeerNodeCollection();
const observation = new Observation(
[['localhost', 11625]],
peerNodes,
mock<CrawlState>()
);
observation.moveToSyncingState();
observation.moveToSyncedState();
const callback = () => {
expect(observation.state).toBe(ObservationState.Stopped);
expect(stragglerTimer.stopStragglerTimeouts).toHaveBeenCalled();
expect(connectionManager.shutdown).toHaveBeenCalled();
resolve();
};
observationManager.stopObservation(observation, callback);
expect(observation.state).toBe(ObservationState.Stopping);
expect(consensusTimer.stop).toHaveBeenCalled();
expect(
stragglerTimer.startStragglerTimeoutForActivePeers
).toHaveBeenCalled();
expect(stragglerTimer.startStragglerTimeoutForActivePeers).toBeCalledWith(
true,
observation.topTierAddressesSet,
expect.any(Function)
);

const onLastNodesDisconnected = stragglerTimer
.startStragglerTimeoutForActivePeers.mock.calls[0][2] as () => void;
onLastNodesDisconnected();
});

it('should handle ledger close confirmed', () => {
const peerNodes = new PeerNodeCollection();
const observation = new Observation(
[['localhost', 11625]],
peerNodes,
mock<CrawlState>()
);
observation.moveToSyncingState();
observation.moveToSyncedState();
observationManager.ledgerCloseConfirmed(observation, {} as any);
expect(
stragglerTimer.startStragglerTimeoutForActivePeers
).toHaveBeenCalled();
expect(consensusTimer.start).toHaveBeenCalled();
});

it('should handle network halted', async () => {
const peerNodes = new PeerNodeCollection();
const observation = new Observation(
[['localhost', 11625]],
peerNodes,
mock<CrawlState>()
);
await observationManager.startSync(observation);
expect(observation.networkHalted).toBe(false);
const networkHaltedCallback = consensusTimer.start.mock
.calls[0][0] as () => void;
networkHaltedCallback();
expect(observation.networkHalted).toBe(true);
expect(
stragglerTimer.startStragglerTimeoutForActivePeers
).toHaveBeenCalled();
});
});
8 changes: 7 additions & 1 deletion src/network-observer/network-observer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import { PeerEventHandler } from './peer-event-handler/peer-event-handler';
import { Observation } from './observation';
import * as assert from 'assert';
import { ObservationState } from './observation-state';
import { ObservationFactory } from './observation-factory';

export class NetworkObserver extends EventEmitter {
private _observation: Observation | null = null;

constructor(
private observationFactory: ObservationFactory,
private connectionManager: ConnectionManager,
private quorumSetManager: QuorumSetManager,
private peerEventHandler: PeerEventHandler,
Expand Down Expand Up @@ -59,7 +61,11 @@ export class NetworkObserver extends EventEmitter {
crawlState: CrawlState,
topTierAddresses: NodeAddress[]
): Observation {
return new Observation(topTierAddresses, crawlState.peerNodes, crawlState);
return this.observationFactory.createObservation(
topTierAddresses,
crawlState.peerNodes,
crawlState
);
}

private setupPeerEventHandlers() {
Expand Down
14 changes: 14 additions & 0 deletions src/network-observer/observation-factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Observation } from './observation';
import { NodeAddress } from '../node-address';
import { PeerNodeCollection } from '../peer-node-collection';
import { CrawlState } from '../crawl-state';

export class ObservationFactory {
public createObservation(
topTierAddresses: NodeAddress[],
peerNodes: PeerNodeCollection,
crawlState: CrawlState
): Observation {
return new Observation(topTierAddresses, peerNodes, crawlState);
}
}
13 changes: 8 additions & 5 deletions src/network-observer/observation-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export class ObservationManager {
return this.syncCompleted(observation);
}

public syncCompleted(observation: Observation) {
private syncCompleted(observation: Observation) {
this.logger.info('Moving to synced state');
observation.moveToSyncedState();
this.startNetworkConsensusTimer(observation);
Expand Down Expand Up @@ -55,19 +55,22 @@ export class ObservationManager {
);
}

public stopObservation(observation: Observation, doneCallback: () => void) {
public stopObservation(
observation: Observation,
onStoppedCallback: () => void
) {
this.logger.info('Moving to stopping state');
observation.moveToStoppingState();

this.consensusTimer.stop();
if (this.connectionManager.getActiveConnectionAddresses().length === 0) {
return this.onLastNodesDisconnected(observation, doneCallback);
if (this.connectionManager.getNumberOfActiveConnections() === 0) {
return this.onLastNodesDisconnected(observation, onStoppedCallback);
}

this.stragglerTimer.startStragglerTimeoutForActivePeers(
true,
observation.topTierAddressesSet,
() => this.onLastNodesDisconnected(observation, doneCallback)
() => this.onLastNodesDisconnected(observation, onStoppedCallback)
);
}

Expand Down

0 comments on commit 39264fe

Please sign in to comment.