Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(sdk): refactor Filter into different modules #1968

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export * from "./waku.js";

export { createLightNode, createNode } from "./light-node/index.js";
export { wakuLightPush } from "./protocols/light_push.js";
export { wakuFilter } from "./protocols/filter.js";
export { wakuFilter } from "./protocols/filter/index.js";
export { wakuStore } from "./protocols/store.js";

export * as waku from "@waku/core";
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/light-node/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type Libp2pComponents, type LightNode } from "@waku/interfaces";

import { wakuFilter } from "../protocols/filter.js";
import { wakuFilter } from "../protocols/filter/index.js";
import { wakuLightPush } from "../protocols/light_push.js";
import { wakuStore } from "../protocols/store.js";
import { createLibp2pAndUpdateOptions } from "../utils/libp2p.js";
Expand Down
135 changes: 135 additions & 0 deletions packages/sdk/src/protocols/filter/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import { FilterCore } from "@waku/core";
import {
type Callback,
DefaultPubsubTopic,
type IAsyncIterator,
type IDecodedMessage,
type IDecoder,
type IFilterSDK,
type Libp2p,
type ProtocolCreateOptions,
type PubsubTopic,
type SingleShardInfo,
type Unsubscribe
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import {
ensurePubsubTopicIsConfigured,
groupByContentTopic,
Logger,
singleShardInfoToPubsubTopic,
toAsyncIterator
} from "@waku/utils";

import { BaseProtocolSDK } from "../base_protocol";

import { Subscription } from "./subscription";

const log = new Logger("sdk:filter");

class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
public readonly protocol: FilterCore;
private subscriptions: Map<string, Subscription>;

private async handleIncomingMessage(
pubsubTopic: PubsubTopic,
wakuMessage: WakuMessage
): Promise<void> {
const subscription = this.getSubscription(pubsubTopic);
if (!subscription) {
log.error(`No subscription locally registered for topic ${pubsubTopic}`);
return;
}

await subscription.processIncomingMessage(wakuMessage);
}

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super({ numPeersToUse: options?.numPeersToUse });
this.protocol = new FilterCore(
this.handleIncomingMessage.bind(this),
libp2p,
options
);
this.subscriptions = new Map();
}

private getSubscription(pubsubTopic: PubsubTopic): Subscription | undefined {
return this.subscriptions.get(pubsubTopic);
}

private setSubscription(
pubsubTopic: PubsubTopic,
subscription: Subscription
): Subscription {
this.subscriptions.set(pubsubTopic, subscription);
return subscription;
}

private async getOrCreateSubscription(
pubsubTopic: PubsubTopic
): Promise<Subscription> {
const subscription = this.getSubscription(pubsubTopic);
if (subscription) {
return subscription;
}

log.info("Creating filter subscription.");

const peers = await this.protocol.getPeers();
if (peers.length === 0) {
throw new Error("No peer found to initiate subscription.");
}
log.info(
`Created filter subscription with ${peers.length} peers: `,
peers.map((peer) => peer.id.toString())
);

const newSubscription = new Subscription(pubsubTopic, peers, this.protocol);
return this.setSubscription(pubsubTopic, newSubscription);
}

async createSubscription(
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic
): Promise<Subscription> {
const pubsubTopic =
typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
: singleShardInfoToPubsubTopic(pubsubTopicShardInfo);

ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics);

return this.getOrCreateSubscription(pubsubTopic);
}

async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<Unsubscribe> {
const subscription = await this.createSubscription();

await subscription.subscribe(decoders, callback);

const contentTopics = Array.from(
groupByContentTopic(
Array.isArray(decoders) ? decoders : [decoders]
).keys()
);

return async () => {
await subscription.unsubscribe(contentTopics);
};
}

public toSubscriptionIterator<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): Promise<IAsyncIterator<T>> {
return toAsyncIterator(this, decoders);
}
}

export function wakuFilter(
init: ProtocolCreateOptions
): (libp2p: Libp2p) => IFilterSDK {
return (libp2p: Libp2p) => new FilterSDK(libp2p, init);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,24 @@ import type { Peer } from "@libp2p/interface";
import { FilterCore } from "@waku/core";
import {
type Callback,
ContentTopic,
DefaultPubsubTopic,
type IAsyncIterator,
type ContentTopic,
type IDecodedMessage,
type IDecoder,
type IFilterSDK,
IProtoMessage,
type Libp2p,
type ProtocolCreateOptions,
type PubsubTopic,
type SingleShardInfo,
type Unsubscribe
type IProtoMessage,
type PubsubTopic
} from "@waku/interfaces";
import { messageHashStr } from "@waku/message-hash";
import { WakuMessage } from "@waku/proto";
import {
ensurePubsubTopicIsConfigured,
groupByContentTopic,
Logger,
singleShardInfoToPubsubTopic,
toAsyncIterator
} from "@waku/utils";

import { BaseProtocolSDK } from "./base_protocol";
import { groupByContentTopic, Logger } from "@waku/utils";

type SubscriptionCallback<T extends IDecodedMessage> = {
decoders: IDecoder<T>[];
callback: Callback<T>;
};

const log = new Logger("sdk:filter");
const log = new Logger("sdk:filter:subscription");

export class SubscriptionManager {
export class Subscription {
private readonly pubsubTopic: PubsubTopic;
readonly peers: Peer[];
readonly receivedMessagesHashStr: string[] = [];
Expand Down Expand Up @@ -196,132 +181,6 @@ export class SubscriptionManager {
}
}

class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
public readonly protocol: FilterCore;

private activeSubscriptions = new Map<string, SubscriptionManager>();
private async handleIncomingMessage(
pubsubTopic: PubsubTopic,
wakuMessage: WakuMessage
): Promise<void> {
const subscription = this.getActiveSubscription(pubsubTopic);
if (!subscription) {
log.error(`No subscription locally registered for topic ${pubsubTopic}`);
return;
}

await subscription.processIncomingMessage(wakuMessage);
}

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super({ numPeersToUse: options?.numPeersToUse });
this.protocol = new FilterCore(
this.handleIncomingMessage.bind(this),
libp2p,
options
);
this.activeSubscriptions = new Map();
}

//TODO: move to SubscriptionManager
private getActiveSubscription(
pubsubTopic: PubsubTopic
): SubscriptionManager | undefined {
return this.activeSubscriptions.get(pubsubTopic);
}

private setActiveSubscription(
pubsubTopic: PubsubTopic,
subscription: SubscriptionManager
): SubscriptionManager {
this.activeSubscriptions.set(pubsubTopic, subscription);
return subscription;
}

/**
* Creates a new subscription to the given pubsub topic.
* The subscription is made to multiple peers for decentralization.
* @param pubsubTopicShardInfo The pubsub topic to subscribe to.
* @returns The subscription object.
*/
async createSubscription(
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic
): Promise<SubscriptionManager> {
const pubsubTopic =
typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
: singleShardInfoToPubsubTopic(pubsubTopicShardInfo);

ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics);

const peers = await this.protocol.getPeers();
if (peers.length === 0) {
throw new Error("No peer found to initiate subscription.");
}

log.info(
`Creating filter subscription with ${peers.length} peers: `,
peers.map((peer) => peer.id.toString())
);

const subscription =
this.getActiveSubscription(pubsubTopic) ??
this.setActiveSubscription(
pubsubTopic,
new SubscriptionManager(pubsubTopic, peers, this.protocol)
);

return subscription;
}

//TODO: remove this dependency on IReceiver
/**
* This method is used to satisfy the `IReceiver` interface.
*
* @hidden
*
* @param decoders The decoders to use for the subscription.
* @param callback The callback function to use for the subscription.
* @param opts Optional protocol options for the subscription.
*
* @returns A Promise that resolves to a function that unsubscribes from the subscription.
*
* @remarks
* This method should not be used directly.
* Instead, use `createSubscription` to create a new subscription.
*/
async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<Unsubscribe> {
const subscription = await this.createSubscription();

await subscription.subscribe(decoders, callback);

const contentTopics = Array.from(
groupByContentTopic(
Array.isArray(decoders) ? decoders : [decoders]
).keys()
);

return async () => {
await subscription.unsubscribe(contentTopics);
};
}

public toSubscriptionIterator<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): Promise<IAsyncIterator<T>> {
return toAsyncIterator(this, decoders);
}
}

export function wakuFilter(
init: ProtocolCreateOptions
): (libp2p: Libp2p) => IFilterSDK {
return (libp2p: Libp2p) => new FilterSDK(libp2p, init);
}

async function pushMessage<T extends IDecodedMessage>(
subscriptionCallback: SubscriptionCallback<T>,
pubsubTopic: PubsubTopic,
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/relay-node/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { type FullNode, type RelayNode } from "@waku/interfaces";
import { RelayCreateOptions, wakuRelay } from "@waku/relay";

import { wakuFilter } from "../protocols/filter.js";
import { wakuFilter } from "../protocols/filter/index.js";
import { wakuLightPush } from "../protocols/light_push.js";
import { wakuStore } from "../protocols/store.js";
import { createLibp2pAndUpdateOptions } from "../utils/libp2p.js";
Expand Down
Loading