From f97c6a93cb2b2ffb67910f661a94c5c9cfb47fc8 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 18 Jun 2024 15:53:06 -0400 Subject: [PATCH] feat: add autoRetry --- packages/interfaces/src/sender.ts | 13 ++++- packages/sdk/src/protocols/base_protocol.ts | 56 +++++++++++++------ packages/sdk/src/protocols/light_push.ts | 13 ++++- packages/tests/tests/store/index.node.spec.ts | 3 + packages/utils/src/common/delay.ts | 3 + packages/utils/src/common/index.ts | 1 + 6 files changed, 67 insertions(+), 22 deletions(-) create mode 100644 packages/utils/src/common/delay.ts diff --git a/packages/interfaces/src/sender.ts b/packages/interfaces/src/sender.ts index 2dbe72def9..792ebfcea4 100644 --- a/packages/interfaces/src/sender.ts +++ b/packages/interfaces/src/sender.ts @@ -2,5 +2,16 @@ import type { IEncoder, IMessage } from "./message.js"; import { SDKProtocolResult } from "./protocols.js"; export interface ISender { - send: (encoder: IEncoder, message: IMessage) => Promise; + send: ( + encoder: IEncoder, + message: IMessage, + sendOptions?: SendOptions + ) => Promise; } + +export type SendOptions = { + autoRetry?: boolean; + maxAttempts?: number; + initialDelay?: number; + maxDelay?: number; +}; diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 3360ae1227..acfe6ac5ff 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -1,8 +1,8 @@ import type { Peer, PeerId } from "@libp2p/interface"; import { ConnectionManager } from "@waku/core"; import { BaseProtocol } from "@waku/core/lib/base_protocol"; -import { IBaseProtocolSDK } from "@waku/interfaces"; -import { Logger } from "@waku/utils"; +import { IBaseProtocolSDK, SendOptions } from "@waku/interfaces"; +import { delay, Logger } from "@waku/utils"; interface Options { numPeersToUse?: number; @@ -73,29 +73,49 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { /** * Checks if there are peers to send a message to. - * If there are no peers, tries to find new peers from the ConnectionManager. - * If no peers are found, returns false. - * If peers are found, returns true. + * If there are connected peers, returns `true`. + * If `autoRetry` is `false`, returns `false`. + * If `autoRetry` is `true`, tries to find new peers from the ConnectionManager. + * If no peers are found after retries, returns `false`. + * If peers are found, returns `true`. + * @param autoRetry Optional flag to enable auto-retry with exponential backoff (default: false) */ - protected hasPeers = async (): Promise => { + protected hasPeers = async ( + options: Partial = {} + ): Promise => { + const { + autoRetry, + initialDelay: _initialDelay, + maxAttempts: _maxAttempts, + maxDelay: _maxDelay + } = options; if (this.connectedPeers.length > 0) return true; + if (!autoRetry) return false; let success = await this.maintainPeers(); let attempts = 0; - while (!success) { + + const initialDelay = _initialDelay ?? 10; + const maxAttempts = _maxAttempts ?? 3; + const maxDelay = _maxDelay ?? 100; + + while (!success && attempts < maxAttempts) { attempts++; + const delayMs = Math.min( + initialDelay * Math.pow(2, attempts - 1), + maxDelay + ); + await delay(delayMs); success = await this.maintainPeers(); - if (attempts > 3) { - if (this.peers.length === 0) { - this.log.error("Failed to find peers to send message to"); - return false; - } else { - this.log.warn( - `Found only ${this.peers.length} peers, expected ${this.numPeersToUse}` - ); - return true; - } - } + } + + if (this.peers.length === 0) { + this.log.error("Failed to find peers to send message to"); + return false; + } else if (this.peers.length < this.numPeersToUse) { + this.log.warn( + `Found only ${this.peers.length} peers, expected ${this.numPeersToUse}` + ); } return true; }; diff --git a/packages/sdk/src/protocols/light_push.ts b/packages/sdk/src/protocols/light_push.ts index b915123ff4..a2d36df10d 100644 --- a/packages/sdk/src/protocols/light_push.ts +++ b/packages/sdk/src/protocols/light_push.ts @@ -8,7 +8,8 @@ import { type Libp2p, type ProtocolCreateOptions, ProtocolError, - SDKProtocolResult + SDKProtocolResult, + SendOptions } from "@waku/interfaces"; import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils"; @@ -31,7 +32,13 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { this.protocol = this.core as LightPushCore; } - async send(encoder: IEncoder, message: IMessage): Promise { + async send( + encoder: IEncoder, + message: IMessage, + options: SendOptions = { + autoRetry: true + } + ): Promise { const successes: PeerId[] = []; const failures: Failure[] = []; @@ -50,7 +57,7 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { }; } - const hasPeers = await this.hasPeers(); + const hasPeers = await this.hasPeers(options); if (!hasPeers) { return { successes, diff --git a/packages/tests/tests/store/index.node.spec.ts b/packages/tests/tests/store/index.node.spec.ts index 595c0abc2d..05a981e8bd 100644 --- a/packages/tests/tests/store/index.node.spec.ts +++ b/packages/tests/tests/store/index.node.spec.ts @@ -304,10 +304,13 @@ describe("Waku Store, general", function () { for await (const msg of query) { if (msg) { messages.push(msg as DecodedMessage); + console.log(bytesToUtf8(msg.payload!)); } } } + console.log(messages.length); + // Messages are ordered from oldest to latest within a page (1 page query) expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText); expect(bytesToUtf8(messages[1].payload!)).to.eq(symText); diff --git a/packages/utils/src/common/delay.ts b/packages/utils/src/common/delay.ts new file mode 100644 index 0000000000..6069f04bf7 --- /dev/null +++ b/packages/utils/src/common/delay.ts @@ -0,0 +1,3 @@ +export async function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/packages/utils/src/common/index.ts b/packages/utils/src/common/index.ts index 906bfb948d..025a06f03c 100644 --- a/packages/utils/src/common/index.ts +++ b/packages/utils/src/common/index.ts @@ -7,6 +7,7 @@ export * from "./is_size_valid.js"; export * from "./sharding.js"; export * from "./push_or_init_map.js"; export * from "./relay_shard_codec.js"; +export * from "./delay.js"; export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] { const index = arr.indexOf(value);