Skip to content

Commit

Permalink
Added CrawlProcesState.STOPPING + some tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
pieterjan84 committed Mar 8, 2024
1 parent 4794780 commit a5dc840
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 53 deletions.
15 changes: 5 additions & 10 deletions src/__tests__/crawl-queue-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ describe('CrawlQueueManager', () => {
crawlQueueManager.addCrawlTask({
connectCallback: () => {},
crawlState,
nodeAddress: ['localhost', 11625],
topTier: false
nodeAddress: ['localhost', 11625]
});

expect(crawlQueue.push).toHaveBeenCalled();
Expand All @@ -34,14 +33,12 @@ describe('CrawlQueueManager', () => {
crawlQueueManager.addCrawlTask({
connectCallback: () => {},
crawlState,
nodeAddress: ['localhost', 11625],
topTier: false
nodeAddress: ['localhost', 11625]
});
crawlQueueManager.addCrawlTask({
connectCallback: () => {},
crawlState,
nodeAddress: ['localhost', 11625],
topTier: false
nodeAddress: ['localhost', 11625]
});

expect(crawlQueue.push).toHaveBeenCalledTimes(1);
Expand All @@ -68,8 +65,7 @@ describe('CrawlQueueManager', () => {
const task: CrawlTask = {
connectCallback: jest.fn(),
crawlState,
nodeAddress: ['localhost', 11625],
topTier: false
nodeAddress: ['localhost', 11625]
};

crawlQueue.initialize.mockImplementation((callback) => {
Expand All @@ -85,8 +81,7 @@ describe('CrawlQueueManager', () => {
const task: CrawlTask = {
connectCallback: jest.fn(),
crawlState,
nodeAddress: ['localhost', 11625],
topTier: false
nodeAddress: ['localhost', 11625]
};

crawlQueue.initialize.mockImplementation((callback) => {
Expand Down
8 changes: 6 additions & 2 deletions src/crawl-logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ export class CrawlLogger {
this.loggingTimer = setInterval(() => {
this.logger.info({
queueLength: this.crawlQueueManager.queueLength(),
activeConnections: this.connectionManager.getNumberOfActiveConnections()
//topTierConnections: this.ledgerCloseDetector.getConnectedNodesCount()
activeConnections:
this.connectionManager.getNumberOfActiveConnections(),
activeTopTiers: this.connectionManager
.getActiveConnectionAddresses()
.filter((address) => this.crawlState.topTierAddresses.has(address))
.length
});
}, 10000);
}
Expand Down
12 changes: 0 additions & 12 deletions src/crawl-queue-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,4 @@ export class CrawlQueueManager {
);
}
}

public readyWithNonTopTierPeers(): boolean {
if (this.crawlQueue.length() !== 0) return false; //we don't know yet because there are still peers left to be crawled

return !this.workersListContainsNonTopTierPeers();
}

private workersListContainsNonTopTierPeers() {
return this.crawlQueue.activeTasks().some((worker) => {
return !worker.topTier;
});
}
}
4 changes: 3 additions & 1 deletion src/crawl-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ type PeerKey = string; //ip:port
export enum CrawlProcessState {
IDLE,
TOP_TIER_SYNC,
CRAWLING
CRAWLING,
STOPPING
}

export class QuorumSetState {
Expand Down Expand Up @@ -51,6 +52,7 @@ export class CrawlState {
quorumSetState: QuorumSetState = new QuorumSetState();
failedConnections: string[] = [];
topTierNodes: Set<PublicKey>;
topTierAddresses: Set<PeerKey> = new Set();
peerAddressesReceivedDuringSync: NodeAddress[] = [];

constructor(
Expand Down
1 change: 0 additions & 1 deletion src/crawl-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@ import { CrawlState } from './crawl-state';
export interface CrawlTask {
nodeAddress: NodeAddress;
crawlState: CrawlState;
topTier: boolean;
connectCallback: () => void;
}
49 changes: 34 additions & 15 deletions src/crawler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { QuorumSet } from '@stellarbeat/js-stellarbeat-shared';
import * as P from 'pino';
import { QuorumSetManager } from './peer-listener/quorum-set-manager';
import { CrawlProcessState, CrawlState } from './crawl-state';
import { CrawlResult } from './crawl-result';
import { CrawlerConfiguration } from './crawler-configuration';
Expand Down Expand Up @@ -43,7 +42,6 @@ export class Crawler {

constructor(
private config: CrawlerConfiguration,
private quorumSetManager: QuorumSetManager,
private stellarMessageHandler: StellarMessageHandler, //event bus would be cleaner...
private readonly connectionManager: ConnectionManager,
private crawlQueueManager: CrawlQueueManager,
Expand All @@ -67,7 +65,7 @@ export class Crawler {
data,
this.crawlState.peerNodes,
this.crawlState.topTierNodes.has(data.publicKey),
this.crawlState.state,
() => this.crawlState.state,
new Date()
);
});
Expand All @@ -79,6 +77,17 @@ export class Crawler {
this.crawlState.crawlQueueTaskDoneCallbacks,
data.address
);
if (
this.crawlState.state === CrawlProcessState.CRAWLING &&
this.connectionManager
.getActiveConnectionAddresses()
.every((address) => this.crawlState.topTierAddresses.has(address))
) {
this.crawlState.state = CrawlProcessState.STOPPING;
this.logger.info(
'Stopping crawl process, disconnecting top tier nodes when their state is fully determined'
);
}

if (!data.publicKey) {
this.crawlState.failedConnections.push(data.address);
Expand Down Expand Up @@ -116,12 +125,8 @@ export class Crawler {

private initializeCrawlState(
topTierQuorumSet: QuorumSet,
latestClosedLedger: Ledger = {
sequence: BigInt(0),
closeTime: new Date(0),
value: '', //todo: store and return value
localCloseTime: new Date(0)
},
topTierAddresses: NodeAddress[],
latestClosedLedger: Ledger,
quorumSets: Map<QuorumSetHash, QuorumSet> = new Map<
QuorumSetHash,
QuorumSet
Expand All @@ -139,6 +144,10 @@ export class Crawler {
this.logger
);

this._crawlState.topTierAddresses = new Set(
topTierAddresses.map((address) => `${address[0]}:${address[1]}`)
);

return CrawlStateValidator.validateCrawlState(this.crawlState, this.config);
}

Expand Down Expand Up @@ -183,7 +192,12 @@ export class Crawler {
nodeAddresses: NodeAddress[],
topTierNodeAddresses: NodeAddress[]
) {
this.initializeCrawlState(topTierQuorumSet, latestClosedLedger, quorumSets)
this.initializeCrawlState(
topTierQuorumSet,
topTierNodeAddresses,
latestClosedLedger,
quorumSets
)
.mapErr((error) => reject(error))
.map(() =>
this.syncTopTierAndCrawl(
Expand All @@ -207,7 +221,7 @@ export class Crawler {

setTimeout(() => {
this.startCrawlProcess(resolve, reject, crawlLogger, nodeAddresses);
}, 5000); //todo: after all top tier nodes have connected, not just timer
}, 10000); //todo: after all top tier nodes have connected, not just timer
}

private startCrawlProcess(
Expand All @@ -216,7 +230,13 @@ export class Crawler {
crawlLogger: CrawlLogger,
nodeAddresses: NodeAddress[]
) {
this.logger.info('Starting crawl process');
this.logger.info(
{
topTierConnectionCount:
this.connectionManager.getNumberOfActiveConnections()
},
'Starting crawl process'
);
this.crawlState.state = CrawlProcessState.CRAWLING;
this.setupCrawlCompletionHandlers(resolve, reject, crawlLogger);
if (
Expand All @@ -240,7 +260,7 @@ export class Crawler {
private startTopTierSync(topTierAddresses: NodeAddress[]) {
this.logger.info('Starting Top Tier sync');
this.crawlState.state = CrawlProcessState.TOP_TIER_SYNC;
topTierAddresses.forEach((address) => this.crawlPeerNode(address, true));
topTierAddresses.forEach((address) => this.crawlPeerNode(address));
}

private setupCrawlCompletionHandlers(
Expand Down Expand Up @@ -303,11 +323,10 @@ export class Crawler {
};
}

private crawlPeerNode(nodeAddress: NodeAddress, isTopTier = false): void {
private crawlPeerNode(nodeAddress: NodeAddress): void {
const crawlTask: CrawlTask = {
nodeAddress: nodeAddress,
crawlState: this.crawlState,
topTier: isTopTier,
connectCallback: () =>
this.connectionManager.connectToNode(nodeAddress[0], nodeAddress[1])
};
Expand Down
1 change: 0 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ export function createCrawler(

return new Crawler(
config,
quorumSetManager,
stellarMessageHandler,
connectionManager,
crawlQueueManager,
Expand Down
4 changes: 2 additions & 2 deletions src/peer-listener/__tests__/on-connected-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ describe('OnConnectedHandler', () => {
data,
peerNodes,
false,
CrawlProcessState.CRAWLING,
() => CrawlProcessState.CRAWLING,
localTime
);

Expand Down Expand Up @@ -84,7 +84,7 @@ describe('OnConnectedHandler', () => {
data,
peerNodes,
false,
CrawlProcessState.CRAWLING,
() => CrawlProcessState.CRAWLING,
localTime
);

Expand Down
6 changes: 3 additions & 3 deletions src/peer-listener/peer-listen-timeout-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export class PeerListenTimeoutManager {
timeoutCounter: number,
isTopTierNode: boolean,
disconnectCallback: () => void,
crawlProcessState: CrawlProcessState
getCrawlProcessState: () => CrawlProcessState
): void {
if (
!listenFurther(
Expand All @@ -28,7 +28,7 @@ export class PeerListenTimeoutManager {
PeerListenTimeoutManager.SCP_LISTEN_TIMEOUT
),
isTopTierNode,
crawlProcessState === CrawlProcessState.IDLE
getCrawlProcessState() === CrawlProcessState.STOPPING
)
) {
this.logger.debug(
Expand Down Expand Up @@ -64,7 +64,7 @@ export class PeerListenTimeoutManager {
timeoutCounter,
isTopTierNode,
disconnectCallback,
crawlProcessState
getCrawlProcessState
);
}, PeerListenTimeoutManager.SCP_LISTEN_TIMEOUT)
);
Expand Down
16 changes: 10 additions & 6 deletions src/peer-listener/peer-listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export class PeerListener {
data: ConnectedPayload,
peerNodes: PeerNodeCollection,
isTopTierNode: boolean,
crawlProcessState: CrawlProcessState,
getCrawlProcessState: () => CrawlProcessState,
localTime: Date
): undefined | Error {
this.logIfTopTierConnected(isTopTierNode, data);
Expand All @@ -36,7 +36,11 @@ export class PeerListener {
return peerNodeOrError;
}

this.startListenTimeout(peerNodeOrError, isTopTierNode, crawlProcessState);
this.startListenTimeout(
peerNodeOrError,
isTopTierNode,
getCrawlProcessState
);
}

public onConnectionClose(
Expand Down Expand Up @@ -73,14 +77,14 @@ export class PeerListener {
private startListenTimeout(
peerNodeOrError: PeerNode,
isTopTierNode: boolean,
crawlProcessState: CrawlProcessState
getCrawlProcessState: () => CrawlProcessState
) {
this.peerListenTimeoutManager.startTimer(
peerNodeOrError,
0,
isTopTierNode,
() => this.connectionManager.disconnectByAddress(peerNodeOrError.key),
crawlProcessState
getCrawlProcessState
);
}

Expand All @@ -100,7 +104,7 @@ export class PeerListener {

private logIfTopTierConnected(isTopTierNode: boolean, data: any) {
if (isTopTierNode) {
this.logger.info(
this.logger.debug(
{ pk: truncate(data.publicKey) },
'Top tier node connected'
);
Expand All @@ -113,7 +117,7 @@ export class PeerListener {
nodeAddress: string
) {
if (crawlState.topTierNodes.has(publicKey)) {
this.logger.info(
this.logger.debug(
{ pk: truncate(publicKey), address: nodeAddress },
'Top tier node disconnected'
);
Expand Down

0 comments on commit a5dc840

Please sign in to comment.