Skip to content

Commit

Permalink
top tier sync is no longer a crawl task
Browse files Browse the repository at this point in the history
  • Loading branch information
pieterjan84 committed Mar 11, 2024
1 parent fc9d79d commit 81bff16
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 87 deletions.
16 changes: 0 additions & 16 deletions src/__tests__/crawl-queue-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,6 @@ describe('CrawlQueueManager', () => {
expect(crawlQueue.push).toHaveBeenCalled();
});

it('should not add a crawl task if the node has been crawled', () => {
const crawlQueueManager = new CrawlQueueManager(crawlQueue, logger);
crawlQueueManager.addCrawlTask({
connectCallback: () => {},
crawlState,
nodeAddress: ['localhost', 11625]
});
crawlQueueManager.addCrawlTask({
connectCallback: () => {},
crawlState,
nodeAddress: ['localhost', 11625]
});

expect(crawlQueue.push).toHaveBeenCalledTimes(1);
});

it('should call onDrain', () => {
const crawlQueueManager = new CrawlQueueManager(crawlQueue, logger);
crawlQueueManager.onDrain(() => {});
Expand Down
2 changes: 1 addition & 1 deletion src/__tests__/crawler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { CrawlerConfiguration, createCrawler } from '../index';
import { StellarMessageWork } from '@stellarbeat/js-stellar-node-connector/lib/connection/connection';
import { NodeAddress } from '../node-address';

jest.setTimeout(30000);
jest.setTimeout(40000);

let peerNodeAddress: NodeAddress;
let peerNetworkNode: NetworkNode;
Expand Down
26 changes: 0 additions & 26 deletions src/crawl-queue-manager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import { CrawlState } from './crawl-state';
import * as P from 'pino';
import { nodeAddressToPeerKey } from './node-address';
import { AsyncResultCallback, CrawlQueue } from './crawl-queue';
import { CrawlTask } from './crawl-task';

Expand All @@ -10,13 +8,6 @@ export class CrawlQueueManager {
}

public addCrawlTask(crawlTask: CrawlTask): void {
const peerKey = nodeAddressToPeerKey(crawlTask.nodeAddress);
this.logNodeAddition(crawlTask.crawlState, peerKey);

if (this.hasNodeBeenCrawled(crawlTask.crawlState, peerKey)) return;

crawlTask.crawlState.crawledNodeAddresses.add(peerKey);

this.crawlQueue.push(crawlTask, (error?: Error) => {
if (error) {
this.logger.error(
Expand All @@ -27,18 +18,6 @@ export class CrawlQueueManager {
});
}

private hasNodeBeenCrawled(crawlState: CrawlState, peerKey: string): boolean {
return crawlState.crawledNodeAddresses.has(peerKey);
}

private logNodeAddition(crawlState: CrawlState, peerKey: string): void {
if (this.hasNodeBeenCrawled(crawlState, peerKey)) {
this.logger.debug({ peer: peerKey }, 'Address already crawled');
} else {
this.logger.debug({ peer: peerKey }, 'Adding address to crawl queue');
}
}

public onDrain(callback: () => void) {
this.crawlQueue.onDrain(callback);
}
Expand Down Expand Up @@ -67,11 +46,6 @@ export class CrawlQueueManager {
if (taskDoneCallback) {
taskDoneCallback();
crawlQueueTaskDoneCallbacks.delete(nodeAddress);
} else {
this.logger.error(
{ peer: nodeAddress },
'No crawlQueueTaskDoneCallback found'
);
}
}
}
70 changes: 51 additions & 19 deletions src/crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
import { err } from 'neverthrow';
import { StellarMessageHandler } from './peer-listener/stellar-message-handlers/stellar-message-handler';
import { CrawlQueueManager } from './crawl-queue-manager';
import { NodeAddress } from './node-address';
import { NodeAddress, nodeAddressToPeerKey } from './node-address';
import { CrawlTask } from './crawl-task';
import { PeerListener } from './peer-listener/peer-listener';

Expand Down Expand Up @@ -58,7 +58,6 @@ export class Crawler {
data,
this.crawlState.peerNodes,
this.crawlState.topTierNodes.has(data.publicKey),
() => this.crawlState.state,
new Date()
);
});
Expand All @@ -73,17 +72,6 @@ export class Crawler {
this.crawlState.crawlQueueTaskDoneCallbacks,
data.address
);
if (
this.crawlState.state === CrawlProcessState.CRAWLING &&
this.crawlQueueManager.queueLength() === 0 &&
this.connectionManager
.getActiveConnectionAddresses()
.every((address) => this.crawlState.topTierAddresses.has(address))
) {
this.peerListener.stop();
this.logger.info('Stopping crawl process');
this.crawlState.state = CrawlProcessState.STOPPING;
}

if (!data.publicKey) {
this.crawlState.failedConnections.push(data.address);
Expand Down Expand Up @@ -249,18 +237,38 @@ export class Crawler {
private startTopTierSync(topTierAddresses: NodeAddress[]) {
this.logger.info('Starting Top Tier sync');
this.crawlState.state = CrawlProcessState.TOP_TIER_SYNC;
topTierAddresses.forEach((address) => this.crawlPeerNode(address));

topTierAddresses.forEach((address) =>
this.connectionManager.connectToNode(address[0], address[1])
);
}

private setupCrawlCompletionHandlers(
resolve: (value: PromiseLike<CrawlResult> | CrawlResult) => void,
reject: (reason?: any) => void,
crawlLogger: CrawlLogger
) {
const maxCrawlTimeout = this.startMaxCrawlTimeout(this.crawlState);
const maxCrawlTimeout = this.startMaxCrawlTimeout(
resolve,
reject,
crawlLogger,
this.crawlState
);
this.crawlQueueManager.onDrain(() => {
clearTimeout(maxCrawlTimeout);
this.completeCrawlProcess(resolve, reject, crawlLogger);
if (
this.crawlState.state === CrawlProcessState.CRAWLING &&
this.crawlQueueManager.queueLength() === 0 &&
this.connectionManager
.getActiveConnectionAddresses()
.every((address) => this.crawlState.topTierAddresses.has(address))
) {
this.logger.info('Stopping crawl process');
this.peerListener.stop().then(() => {
this.finish(resolve, reject, crawlLogger);
this.crawlState.state = CrawlProcessState.STOPPING;
});
}
});
}

Expand All @@ -275,15 +283,22 @@ export class Crawler {
return crawlLogger;
}

private startMaxCrawlTimeout(crawlState: CrawlState) {
private startMaxCrawlTimeout(
resolve: (value: CrawlResult | PromiseLike<CrawlResult>) => void,
reject: (error: Error) => void,
crawlLogger: CrawlLogger,
crawlState: CrawlState
) {
return setTimeout(() => {
this.logger.fatal('Max crawl time hit, closing all connections');
this.connectionManager.shutdown();
this.peerListener
.stop()
.then(() => this.finish(resolve, reject, crawlLogger));
crawlState.maxCrawlTimeHit = true;
}, this.config.maxCrawlTime);
}

private completeCrawlProcess(
private finish(
resolve: (value: CrawlResult | PromiseLike<CrawlResult>) => void,
reject: (error: Error) => void,
crawlLogger: CrawlLogger
Expand Down Expand Up @@ -312,6 +327,12 @@ export class Crawler {
}

private crawlPeerNode(nodeAddress: NodeAddress): void {
const peerKey = nodeAddressToPeerKey(nodeAddress);

if (!this.canNodeBeCrawled(peerKey)) return;

this.logNodeAddition(peerKey);
this.crawlState.crawledNodeAddresses.add(peerKey);
const crawlTask: CrawlTask = {
nodeAddress: nodeAddress,
crawlState: this.crawlState,
Expand All @@ -321,4 +342,15 @@ export class Crawler {

this.crawlQueueManager.addCrawlTask(crawlTask);
}

private logNodeAddition(peerKey: string): void {
this.logger.debug({ peer: peerKey }, 'Adding address to crawl queue');
}

private canNodeBeCrawled(peerKey: string): boolean {
return (
!this.crawlState.crawledNodeAddresses.has(peerKey) &&
!this.crawlState.topTierAddresses.has(peerKey)
);
}
}
17 changes: 2 additions & 15 deletions src/peer-listener/__tests__/on-connected-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { PeerNodeCollection } from '../../peer-node-collection';
import { mock } from 'jest-mock-extended';
import { ConnectedPayload, ConnectionManager } from '../../connection-manager';
import { P } from 'pino';
import { CrawlProcessState } from '../../crawl-state';
import { PeerListener } from '../peer-listener';
import { QuorumSetManager } from '../quorum-set-manager';
import { StellarMessageHandler } from '../stellar-message-handlers/stellar-message-handler';
Expand Down Expand Up @@ -41,13 +40,7 @@ describe('OnConnectedHandler', () => {
};
const peerNodes = mock<PeerNodeCollection>();
const localTime = new Date();
onConnectedHandler.onConnected(
data,
peerNodes,
false,
() => CrawlProcessState.CRAWLING,
localTime
);
onConnectedHandler.onConnected(data, peerNodes, false, localTime);

expect(peerNodes.addSuccessfullyConnected).toHaveBeenCalledWith(
data.publicKey,
Expand Down Expand Up @@ -76,13 +69,7 @@ describe('OnConnectedHandler', () => {
const localTime = new Date();
const error = new Error('error');
peerNodes.addSuccessfullyConnected.mockReturnValue(error);
onConnectedHandler.onConnected(
data,
peerNodes,
false,
() => CrawlProcessState.CRAWLING,
localTime
);
onConnectedHandler.onConnected(data, peerNodes, false, localTime);

expect(peerNodes.addSuccessfullyConnected).toHaveBeenCalledWith(
data.publicKey,
Expand Down
33 changes: 23 additions & 10 deletions src/peer-listener/peer-listener.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { P } from 'pino';
import { CrawlProcessState, CrawlState } from '../crawl-state';
import { CrawlState } from '../crawl-state';
import { truncate } from '../utilities/truncate';
import {
ConnectedPayload,
Expand Down Expand Up @@ -34,14 +34,28 @@ export class PeerListener {
this.setNetworkConsensusTimer();
}

public stop() {
if (this.networkConsensusTimer) clearTimeout(this.networkConsensusTimer);
this.stopping = true;
if (this.connectionManager.getActiveConnectionAddresses().length === 0)
return;
setTimeout(() => {
this.connectionManager.shutdown(); //give straggling top tier nodes a chance and then shut down.
}, PeerListener.PEER_STRAGGLE_TIMEOUT);
/* public connectToTopTierNodes(topTierNodes: Set<NodeAddress>) {
topTierNodes.forEach((address) => {
this.connectionManager.connectToNode(address[0], address[1]);
});
}
public connectToNode(ip: string, port: number) {
this.connectionManager.connectToNode(ip, port);
}
*/

public async stop() {
return new Promise<void>((resolve) => {
if (this.networkConsensusTimer) clearTimeout(this.networkConsensusTimer);
this.stopping = true;
if (this.connectionManager.getActiveConnectionAddresses().length === 0)
return;
setTimeout(() => {
this.connectionManager.shutdown(); //give straggling top tier nodes a chance and then shut down.
resolve();
}, PeerListener.PEER_STRAGGLE_TIMEOUT);
});
}

private setNetworkConsensusTimer() {
Expand Down Expand Up @@ -81,7 +95,6 @@ export class PeerListener {
data: ConnectedPayload,
peerNodes: PeerNodeCollection,
isTopTierNode: boolean,
getCrawlProcessState: () => CrawlProcessState,
localTime: Date
): undefined | Error {
this.logIfTopTierConnected(isTopTierNode, data);
Expand Down

0 comments on commit 81bff16

Please sign in to comment.