diff --git a/CHANGELOG.md b/CHANGELOG.md index e2f6fb891b..5733c0639e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Changes before Tatum release are not documented in this file. ### @streamr/sdk #### Added +- Add support for using the plumtree optimization in stream partitions (https://github.com/streamr-dev/network/pull/3147) #### Changed diff --git a/packages/sdk/src/NetworkNodeFacade.ts b/packages/sdk/src/NetworkNodeFacade.ts index fa6fbdb067..50b678cd56 100644 --- a/packages/sdk/src/NetworkNodeFacade.ts +++ b/packages/sdk/src/NetworkNodeFacade.ts @@ -11,6 +11,7 @@ import { NetworkOptions, StreamMessage as NewStreamMessage, ProxyDirection, + StreamPartDeliveryOptions, createNetworkNode as createNetworkNode_ } from '@streamr/trackerless-network' import { Logger, MetricsContext, StreamPartID, StreamPartIDUtils, UserID } from '@streamr/utils' @@ -32,9 +33,16 @@ export interface NetworkNodeStub { getNodeId: () => DhtAddress addMessageListener: (listener: (msg: NewStreamMessage) => void) => void removeMessageListener: (listener: (msg: NewStreamMessage) => void) => void - join: (streamPartId: StreamPartID, neighborRequirement?: { minCount: number, timeout: number }) => Promise + join: ( + streamPartId: StreamPartID, + neighborRequirement?: { minCount: number, timeout: number }, + deliveryOptions?: StreamPartDeliveryOptions + ) => Promise leave: (streamPartId: StreamPartID) => Promise - broadcast: (streamMessage: NewStreamMessage) => Promise + broadcast: ( + streamMessage: NewStreamMessage, + deliveryOptions?: StreamPartDeliveryOptions + ) => Promise getStreamParts: () => StreamPartID[] getNeighbors: (streamPartId: StreamPartID) => readonly DhtAddress[] getPeerDescriptor: () => PeerDescriptor @@ -214,9 +222,13 @@ export class NetworkNodeFacade { return node.getNodeId() } - async join(streamPartId: StreamPartID, neighborRequirement?: { minCount: number, timeout: number }): Promise { + async join( + streamPartId: StreamPartID, + neighborRequirement?: { minCount: number, timeout: number }, + deliveryOptions?: StreamPartDeliveryOptions + ): Promise { const node = await this.getNode() - await node.join(streamPartId, neighborRequirement) + await node.join(streamPartId, neighborRequirement, deliveryOptions) } async leave(streamPartId: StreamPartID): Promise { @@ -224,9 +236,9 @@ export class NetworkNodeFacade { await node.leave(streamPartId) } - async broadcast(msg: OldStreamMessage): Promise { + async broadcast(msg: OldStreamMessage, deliveryOptions?: StreamPartDeliveryOptions): Promise { const node = await this.getNode() - node.broadcast(StreamMessageTranslator.toProtobuf(msg)) + node.broadcast(StreamMessageTranslator.toProtobuf(msg), deliveryOptions) } addMessageListener(listener: (msg: OldStreamMessage) => void): void { diff --git a/packages/sdk/src/StreamrClient.ts b/packages/sdk/src/StreamrClient.ts index ef7ab205e3..ae42ad6181 100644 --- a/packages/sdk/src/StreamrClient.ts +++ b/packages/sdk/src/StreamrClient.ts @@ -7,7 +7,7 @@ import 'reflect-metadata' import './utils/PatchTsyringe' import { DhtAddress } from '@streamr/dht' -import { ProxyDirection } from '@streamr/trackerless-network' +import { ProxyDirection, StreamPartDeliveryOptions } from '@streamr/trackerless-network' import { DEFAULT_PARTITION_COUNT, EthereumAddress, HexString, Logger, StreamID, TheGraphClient, toEthereumAddress, toUserId } from '@streamr/utils' import type { Overrides } from 'ethers' import EventEmitter from 'eventemitter3' @@ -79,6 +79,8 @@ export interface ExtraSubscribeOptions { * The streamr client wallet address must be an authorized signer for the contract. */ erc1271Contract?: HexString + + delivery?: StreamPartDeliveryOptions } const logger = new Logger(module) @@ -165,9 +167,10 @@ export class StreamrClient { async publish( streamDefinition: StreamDefinition, content: unknown, - metadata?: PublishMetadata + metadata?: PublishMetadata, + deliveryOptions?: StreamPartDeliveryOptions ): Promise { - const result = await this.publisher.publish(streamDefinition, content, metadata) + const result = await this.publisher.publish(streamDefinition, content, metadata, deliveryOptions) this.eventEmitter.emit('messagePublished', result) return convertStreamMessageToMessage(result) } @@ -227,6 +230,7 @@ export class StreamrClient { streamPartId, options.raw ?? false, options.erc1271Contract !== undefined ? toEthereumAddress(options.erc1271Contract) : undefined, + options.delivery ?? {}, eventEmitter, this.loggerFactory ) diff --git a/packages/sdk/src/exports.ts b/packages/sdk/src/exports.ts index 5fa699dc56..6ce3e540bd 100644 --- a/packages/sdk/src/exports.ts +++ b/packages/sdk/src/exports.ts @@ -65,6 +65,7 @@ export { convertBytesToStreamMessage, convertStreamMessageToBytes } from './prot export type { DhtAddress } from '@streamr/dht' export { ContentType, EncryptedGroupKey, EncryptionType, ProxyDirection, SignatureType } from '@streamr/trackerless-network' +export type { StreamPartDeliveryOptions } from '@streamr/trackerless-network' export type { StreamID, StreamPartID, diff --git a/packages/sdk/src/publish/Publisher.ts b/packages/sdk/src/publish/Publisher.ts index 6508c5935f..2af25a0471 100644 --- a/packages/sdk/src/publish/Publisher.ts +++ b/packages/sdk/src/publish/Publisher.ts @@ -16,6 +16,7 @@ import { createLazyMap, Mapping } from '../utils/Mapping' import { GroupKeyQueue } from './GroupKeyQueue' import { MessageFactory } from './MessageFactory' import { ConfigInjectionToken, type StrictStreamrClientConfig } from '../Config' +import { StreamPartDeliveryOptions } from '@streamr/trackerless-network' export interface PublishMetadata { timestamp?: string | number | Date @@ -87,7 +88,8 @@ export class Publisher { async publish( streamDefinition: StreamDefinition, content: unknown, - metadata?: PublishMetadata + metadata?: PublishMetadata, + deliveryOptions?: StreamPartDeliveryOptions ): Promise { const timestamp = parseTimestamp(metadata) /* @@ -117,7 +119,7 @@ export class Publisher { }, partition ) - await this.node.broadcast(message) + await this.node.broadcast(message, deliveryOptions) return message } catch (e) { const errorCode = (e instanceof StreamrClientError) ? e.code : 'UNKNOWN_ERROR' diff --git a/packages/sdk/src/subscribe/Subscription.ts b/packages/sdk/src/subscribe/Subscription.ts index e2a00aa5b6..f8459b2d79 100644 --- a/packages/sdk/src/subscribe/Subscription.ts +++ b/packages/sdk/src/subscribe/Subscription.ts @@ -2,6 +2,7 @@ import { EthereumAddress, Logger, StreamPartID } from '@streamr/utils' import EventEmitter from 'eventemitter3' import { LoggerFactory } from '../utils/LoggerFactory' import { MessageStream } from './MessageStream' +import { StreamPartDeliveryOptions } from '@streamr/trackerless-network' /** * Events emitted by {@link Subscription}. @@ -29,6 +30,7 @@ export class Subscription extends MessageStream { /** @internal */ readonly isRaw: boolean readonly erc1271ContractAddress: EthereumAddress | undefined + readonly deliveryOptions: StreamPartDeliveryOptions | undefined private readonly eventEmitter: EventEmitter private readonly logger: Logger @@ -37,6 +39,7 @@ export class Subscription extends MessageStream { streamPartId: StreamPartID, isRaw: boolean, erc1271ContractAddress: EthereumAddress | undefined, + deliveryOptions: StreamPartDeliveryOptions | undefined, eventEmitter: EventEmitter, loggerFactory: LoggerFactory ) { @@ -44,6 +47,7 @@ export class Subscription extends MessageStream { this.streamPartId = streamPartId this.isRaw = isRaw this.erc1271ContractAddress = erc1271ContractAddress + this.deliveryOptions = deliveryOptions this.eventEmitter = eventEmitter this.logger = loggerFactory.createLogger(module) this.onError.listen((err) => { diff --git a/packages/sdk/src/subscribe/SubscriptionSession.ts b/packages/sdk/src/subscribe/SubscriptionSession.ts index 4b7964d2c6..8d44dd1da7 100644 --- a/packages/sdk/src/subscribe/SubscriptionSession.ts +++ b/packages/sdk/src/subscribe/SubscriptionSession.ts @@ -12,6 +12,10 @@ import { Subscription } from './Subscription' * A session contains one or more subscriptions to a single streamId + streamPartition pair. */ +const getAnyItemFromSet = (set: Set): T | undefined => { + return set.values().next().value +} + export class SubscriptionSession { public readonly streamPartId: StreamPartID @@ -102,7 +106,8 @@ export class SubscriptionSession { private async subscribe(): Promise { this.node.addMessageListener(this.onMessageInput) if (!await this.node.isProxiedStreamPart(this.streamPartId)) { - await this.node.join(this.streamPartId) + const deliveryOptions = getAnyItemFromSet(this.subscriptions)!.deliveryOptions + await this.node.join(this.streamPartId, undefined, deliveryOptions) } } diff --git a/packages/sdk/src/utils/addStreamToStorageNode.ts b/packages/sdk/src/utils/addStreamToStorageNode.ts index f10f94f32a..a4cd5236e0 100644 --- a/packages/sdk/src/utils/addStreamToStorageNode.ts +++ b/packages/sdk/src/utils/addStreamToStorageNode.ts @@ -36,6 +36,7 @@ export const addStreamToStorageNode = async ( streamPartId, false, undefined, + undefined, new EventEmitter(), loggerFactory ) diff --git a/packages/sdk/test/unit/SubscriptionSession.test.ts b/packages/sdk/test/unit/SubscriptionSession.test.ts index 8531d6c44c..97cd59468a 100644 --- a/packages/sdk/test/unit/SubscriptionSession.test.ts +++ b/packages/sdk/test/unit/SubscriptionSession.test.ts @@ -14,7 +14,7 @@ const ADDRESS_ONE = randomEthereumAddress() const ADDRESS_TWO = randomEthereumAddress() function createSubscription(erc1271contractAddress?: EthereumAddress): Subscription { - return new Subscription(STREAM_PART_ID, false, erc1271contractAddress, mock(), mock()) + return new Subscription(STREAM_PART_ID, false, erc1271contractAddress, undefined, mock(), mock()) } describe('SubscriptionSession', () => { diff --git a/packages/sdk/test/unit/resendSubscription.test.ts b/packages/sdk/test/unit/resendSubscription.test.ts index fbfbfc867e..c56668f267 100644 --- a/packages/sdk/test/unit/resendSubscription.test.ts +++ b/packages/sdk/test/unit/resendSubscription.test.ts @@ -102,7 +102,7 @@ describe('resend subscription', () => { gapFill = true ) => { const eventEmitter = new EventEmitter() - sub = new Subscription(STREAM_PART_ID, false, undefined, eventEmitter, mockLoggerFactory()) + sub = new Subscription(STREAM_PART_ID, false, undefined, undefined, eventEmitter, mockLoggerFactory()) initResendSubscription( sub, {} as any, diff --git a/packages/trackerless-network/src/ContentDeliveryManager.ts b/packages/trackerless-network/src/ContentDeliveryManager.ts index 25bd0d1f00..8375bd96dc 100644 --- a/packages/trackerless-network/src/ContentDeliveryManager.ts +++ b/packages/trackerless-network/src/ContentDeliveryManager.ts @@ -75,7 +75,7 @@ export interface ContentDeliveryManagerOptions { } export interface StreamPartDeliveryOptions { - plumTreeOptimisation?: boolean + plumtreeOptimization?: boolean } export const streamPartIdToDataKey = (streamPartId: StreamPartID): DhtAddress => { @@ -288,7 +288,7 @@ export class ContentDeliveryManager extends EventEmitter { neighborUpdateInterval: this.options.neighborUpdateInterval, isLocalNodeEntryPoint, doNotBufferWhileConnecting: this.options.doNotBufferWhileConnecting, - plumTreeOptimisation: streamPartDeliveryOptions?.plumTreeOptimisation + plumtreeOptimization: streamPartDeliveryOptions?.plumtreeOptimization }) } diff --git a/packages/trackerless-network/src/content-delivery-layer/createContentDeliveryLayerNode.ts b/packages/trackerless-network/src/content-delivery-layer/createContentDeliveryLayerNode.ts index 9f07e88905..d44c8e3ff5 100644 --- a/packages/trackerless-network/src/content-delivery-layer/createContentDeliveryLayerNode.ts +++ b/packages/trackerless-network/src/content-delivery-layer/createContentDeliveryLayerNode.ts @@ -35,7 +35,7 @@ type ContentDeliveryLayerNodeOptions = MarkOptional { @@ -68,7 +68,7 @@ const createConfigWithDefaults = (options: ContentDeliveryLayerNodeOptions): Str streamPartId: options.streamPartId, rpcCommunicator }) : undefined - const plumtreeManager = options.plumTreeOptimisation ? new PlumtreeManager({ + const plumtreeManager = options.plumtreeOptimization ? new PlumtreeManager({ neighbors, localPeerDescriptor: options.localPeerDescriptor, rpcCommunicator diff --git a/packages/trackerless-network/src/exports.ts b/packages/trackerless-network/src/exports.ts index 36e340e7b0..29d337483c 100644 --- a/packages/trackerless-network/src/exports.ts +++ b/packages/trackerless-network/src/exports.ts @@ -1,6 +1,6 @@ export { NetworkNode, createNetworkNode } from './NetworkNode' export { type NetworkOptions, NetworkStack } from './NetworkStack' -export { type ContentDeliveryManagerOptions, streamPartIdToDataKey } from './ContentDeliveryManager' +export { type ContentDeliveryManagerOptions, type StreamPartDeliveryOptions, streamPartIdToDataKey } from './ContentDeliveryManager' export { AsymmetricEncryptionType, ContentType, diff --git a/packages/trackerless-network/test/utils/utils.ts b/packages/trackerless-network/test/utils/utils.ts index 2ce13b5837..6b08915b11 100644 --- a/packages/trackerless-network/test/utils/utils.ts +++ b/packages/trackerless-network/test/utils/utils.ts @@ -41,7 +41,7 @@ export const createMockContentDeliveryLayerNodeAndDhtNode = async ( entryPointDescriptor: PeerDescriptor, streamPartId: StreamPartID, simulator: Simulator, - plumTreeOptimisation?: boolean + plumtreeOptimization?: boolean ): Promise<[ DiscoveryLayerNode, ContentDeliveryLayerNode ]> => { const mockCm = new SimulatorTransport(localPeerDescriptor, simulator) await mockCm.start() @@ -62,7 +62,7 @@ export const createMockContentDeliveryLayerNodeAndDhtNode = async ( localPeerDescriptor, rpcRequestTimeout: 5000, isLocalNodeEntryPoint: () => false, - plumTreeOptimisation + plumtreeOptimization }) return [discoveryLayerNode, contentDeliveryLayerNode] }