Skip to content

Commit

Permalink
feat: add autoRetry
Browse files Browse the repository at this point in the history
  • Loading branch information
danisharora099 committed Jun 18, 2024
1 parent 37ddf4d commit f97c6a9
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 22 deletions.
13 changes: 12 additions & 1 deletion packages/interfaces/src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,16 @@ import type { IEncoder, IMessage } from "./message.js";
import { SDKProtocolResult } from "./protocols.js";

export interface ISender {
send: (encoder: IEncoder, message: IMessage) => Promise<SDKProtocolResult>;
send: (
encoder: IEncoder,
message: IMessage,
sendOptions?: SendOptions
) => Promise<SDKProtocolResult>;
}

export type SendOptions = {
autoRetry?: boolean;
maxAttempts?: number;
initialDelay?: number;
maxDelay?: number;
};
56 changes: 38 additions & 18 deletions packages/sdk/src/protocols/base_protocol.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager } from "@waku/core";
import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { IBaseProtocolSDK } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { IBaseProtocolSDK, SendOptions } from "@waku/interfaces";
import { delay, Logger } from "@waku/utils";

interface Options {
numPeersToUse?: number;
Expand Down Expand Up @@ -73,29 +73,49 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {

/**
* Checks if there are peers to send a message to.
* If there are no peers, tries to find new peers from the ConnectionManager.
* If no peers are found, returns false.
* If peers are found, returns true.
* If there are connected peers, returns `true`.
* If `autoRetry` is `false`, returns `false`.
* If `autoRetry` is `true`, tries to find new peers from the ConnectionManager.
* If no peers are found after retries, returns `false`.
* If peers are found, returns `true`.
* @param autoRetry Optional flag to enable auto-retry with exponential backoff (default: false)
*/
protected hasPeers = async (): Promise<boolean> => {
protected hasPeers = async (
options: Partial<SendOptions> = {}
): Promise<boolean> => {
const {
autoRetry,
initialDelay: _initialDelay,
maxAttempts: _maxAttempts,
maxDelay: _maxDelay
} = options;
if (this.connectedPeers.length > 0) return true;
if (!autoRetry) return false;

let success = await this.maintainPeers();
let attempts = 0;
while (!success) {

const initialDelay = _initialDelay ?? 10;
const maxAttempts = _maxAttempts ?? 3;
const maxDelay = _maxDelay ?? 100;

while (!success && attempts < maxAttempts) {
attempts++;
const delayMs = Math.min(
initialDelay * Math.pow(2, attempts - 1),
maxDelay
);
await delay(delayMs);
success = await this.maintainPeers();
if (attempts > 3) {
if (this.peers.length === 0) {
this.log.error("Failed to find peers to send message to");
return false;
} else {
this.log.warn(
`Found only ${this.peers.length} peers, expected ${this.numPeersToUse}`
);
return true;
}
}
}

if (this.peers.length === 0) {
this.log.error("Failed to find peers to send message to");
return false;
} else if (this.peers.length < this.numPeersToUse) {
this.log.warn(
`Found only ${this.peers.length} peers, expected ${this.numPeersToUse}`
);
}
return true;
};
Expand Down
13 changes: 10 additions & 3 deletions packages/sdk/src/protocols/light_push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import {
type Libp2p,
type ProtocolCreateOptions,
ProtocolError,
SDKProtocolResult
SDKProtocolResult,
SendOptions
} from "@waku/interfaces";
import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils";

Expand All @@ -31,7 +32,13 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
this.protocol = this.core as LightPushCore;
}

async send(encoder: IEncoder, message: IMessage): Promise<SDKProtocolResult> {
async send(
encoder: IEncoder,
message: IMessage,
options: SendOptions = {
autoRetry: true
}
): Promise<SDKProtocolResult> {
const successes: PeerId[] = [];
const failures: Failure[] = [];

Expand All @@ -50,7 +57,7 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
};
}

const hasPeers = await this.hasPeers();
const hasPeers = await this.hasPeers(options);
if (!hasPeers) {
return {
successes,
Expand Down
3 changes: 3 additions & 0 deletions packages/tests/tests/store/index.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,13 @@ describe("Waku Store, general", function () {
for await (const msg of query) {
if (msg) {
messages.push(msg as DecodedMessage);
console.log(bytesToUtf8(msg.payload!));
}
}
}

console.log(messages.length);

// Messages are ordered from oldest to latest within a page (1 page query)
expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText);
expect(bytesToUtf8(messages[1].payload!)).to.eq(symText);
Expand Down
3 changes: 3 additions & 0 deletions packages/utils/src/common/delay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export async function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
1 change: 1 addition & 0 deletions packages/utils/src/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export * from "./is_size_valid.js";
export * from "./sharding.js";
export * from "./push_or_init_map.js";
export * from "./relay_shard_codec.js";
export * from "./delay.js";

export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] {
const index = arr.indexOf(value);
Expand Down

0 comments on commit f97c6a9

Please sign in to comment.