Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Changes before Tatum release are not documented in this file.
### @streamr/sdk

#### Added
- Add support for using the plumtree optimization in stream partitions (https://github.com/streamr-dev/network/pull/3147)

#### Changed

Expand Down
24 changes: 18 additions & 6 deletions packages/sdk/src/NetworkNodeFacade.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
NetworkOptions,
StreamMessage as NewStreamMessage,
ProxyDirection,
StreamPartDeliveryOptions,
createNetworkNode as createNetworkNode_
} from '@streamr/trackerless-network'
import { Logger, MetricsContext, StreamPartID, StreamPartIDUtils, UserID } from '@streamr/utils'
Expand All @@ -32,9 +33,16 @@ export interface NetworkNodeStub {
getNodeId: () => DhtAddress
addMessageListener: (listener: (msg: NewStreamMessage) => void) => void
removeMessageListener: (listener: (msg: NewStreamMessage) => void) => void
join: (streamPartId: StreamPartID, neighborRequirement?: { minCount: number, timeout: number }) => Promise<void>
join: (
streamPartId: StreamPartID,
neighborRequirement?: { minCount: number, timeout: number },
deliveryOptions?: StreamPartDeliveryOptions
) => Promise<void>
leave: (streamPartId: StreamPartID) => Promise<void>
broadcast: (streamMessage: NewStreamMessage) => Promise<void>
broadcast: (
streamMessage: NewStreamMessage,
deliveryOptions?: StreamPartDeliveryOptions
) => Promise<void>
getStreamParts: () => StreamPartID[]
getNeighbors: (streamPartId: StreamPartID) => readonly DhtAddress[]
getPeerDescriptor: () => PeerDescriptor
Expand Down Expand Up @@ -214,19 +222,23 @@ export class NetworkNodeFacade {
return node.getNodeId()
}

async join(streamPartId: StreamPartID, neighborRequirement?: { minCount: number, timeout: number }): Promise<void> {
async join(
streamPartId: StreamPartID,
neighborRequirement?: { minCount: number, timeout: number },
deliveryOptions?: StreamPartDeliveryOptions
): Promise<void> {
const node = await this.getNode()
await node.join(streamPartId, neighborRequirement)
await node.join(streamPartId, neighborRequirement, deliveryOptions)
}

async leave(streamPartId: StreamPartID): Promise<void> {
const node = await this.getNode()
await node.leave(streamPartId)
}

async broadcast(msg: OldStreamMessage): Promise<void> {
async broadcast(msg: OldStreamMessage, deliveryOptions?: StreamPartDeliveryOptions): Promise<void> {
const node = await this.getNode()
node.broadcast(StreamMessageTranslator.toProtobuf(msg))
node.broadcast(StreamMessageTranslator.toProtobuf(msg), deliveryOptions)
}

addMessageListener(listener: (msg: OldStreamMessage) => void): void {
Expand Down
10 changes: 7 additions & 3 deletions packages/sdk/src/StreamrClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import 'reflect-metadata'
import './utils/PatchTsyringe'

import { DhtAddress } from '@streamr/dht'
import { ProxyDirection } from '@streamr/trackerless-network'
import { ProxyDirection, StreamPartDeliveryOptions } from '@streamr/trackerless-network'
import { DEFAULT_PARTITION_COUNT, EthereumAddress, HexString, Logger, StreamID, TheGraphClient, toEthereumAddress, toUserId } from '@streamr/utils'
import type { Overrides } from 'ethers'
import EventEmitter from 'eventemitter3'
Expand Down Expand Up @@ -79,6 +79,8 @@ export interface ExtraSubscribeOptions {
* The streamr client wallet address must be an authorized signer for the contract.
*/
erc1271Contract?: HexString

delivery?: StreamPartDeliveryOptions
}

const logger = new Logger(module)
Expand Down Expand Up @@ -165,9 +167,10 @@ export class StreamrClient {
async publish(
streamDefinition: StreamDefinition,
content: unknown,
metadata?: PublishMetadata
metadata?: PublishMetadata,
deliveryOptions?: StreamPartDeliveryOptions
Comment thread
teogeb marked this conversation as resolved.
): Promise<Message> {
const result = await this.publisher.publish(streamDefinition, content, metadata)
const result = await this.publisher.publish(streamDefinition, content, metadata, deliveryOptions)
this.eventEmitter.emit('messagePublished', result)
return convertStreamMessageToMessage(result)
}
Expand Down Expand Up @@ -227,6 +230,7 @@ export class StreamrClient {
streamPartId,
options.raw ?? false,
options.erc1271Contract !== undefined ? toEthereumAddress(options.erc1271Contract) : undefined,
options.delivery ?? {},
eventEmitter,
this.loggerFactory
)
Expand Down
1 change: 1 addition & 0 deletions packages/sdk/src/exports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export { convertBytesToStreamMessage, convertStreamMessageToBytes } from './prot
export type { DhtAddress } from '@streamr/dht'
export { ContentType, EncryptedGroupKey, EncryptionType,
ProxyDirection, SignatureType } from '@streamr/trackerless-network'
export type { StreamPartDeliveryOptions } from '@streamr/trackerless-network'
export type {
StreamID,
StreamPartID,
Expand Down
6 changes: 4 additions & 2 deletions packages/sdk/src/publish/Publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { createLazyMap, Mapping } from '../utils/Mapping'
import { GroupKeyQueue } from './GroupKeyQueue'
import { MessageFactory } from './MessageFactory'
import { ConfigInjectionToken, type StrictStreamrClientConfig } from '../Config'
import { StreamPartDeliveryOptions } from '@streamr/trackerless-network'

export interface PublishMetadata {
timestamp?: string | number | Date
Expand Down Expand Up @@ -87,7 +88,8 @@ export class Publisher {
async publish(
streamDefinition: StreamDefinition,
content: unknown,
metadata?: PublishMetadata
metadata?: PublishMetadata,
deliveryOptions?: StreamPartDeliveryOptions
): Promise<StreamMessage> {
const timestamp = parseTimestamp(metadata)
/*
Expand Down Expand Up @@ -117,7 +119,7 @@ export class Publisher {
},
partition
)
await this.node.broadcast(message)
await this.node.broadcast(message, deliveryOptions)
return message
} catch (e) {
const errorCode = (e instanceof StreamrClientError) ? e.code : 'UNKNOWN_ERROR'
Expand Down
4 changes: 4 additions & 0 deletions packages/sdk/src/subscribe/Subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { EthereumAddress, Logger, StreamPartID } from '@streamr/utils'
import EventEmitter from 'eventemitter3'
import { LoggerFactory } from '../utils/LoggerFactory'
import { MessageStream } from './MessageStream'
import { StreamPartDeliveryOptions } from '@streamr/trackerless-network'

/**
* Events emitted by {@link Subscription}.
Expand Down Expand Up @@ -29,6 +30,7 @@ export class Subscription extends MessageStream {
/** @internal */
readonly isRaw: boolean
readonly erc1271ContractAddress: EthereumAddress | undefined
readonly deliveryOptions: StreamPartDeliveryOptions | undefined
private readonly eventEmitter: EventEmitter<SubscriptionEvents>
private readonly logger: Logger

Expand All @@ -37,13 +39,15 @@ export class Subscription extends MessageStream {
streamPartId: StreamPartID,
isRaw: boolean,
erc1271ContractAddress: EthereumAddress | undefined,
deliveryOptions: StreamPartDeliveryOptions | undefined,
eventEmitter: EventEmitter<SubscriptionEvents>,
loggerFactory: LoggerFactory
) {
super()
this.streamPartId = streamPartId
this.isRaw = isRaw
this.erc1271ContractAddress = erc1271ContractAddress
this.deliveryOptions = deliveryOptions
this.eventEmitter = eventEmitter
this.logger = loggerFactory.createLogger(module)
this.onError.listen((err) => {
Expand Down
7 changes: 6 additions & 1 deletion packages/sdk/src/subscribe/SubscriptionSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import { Subscription } from './Subscription'
* A session contains one or more subscriptions to a single streamId + streamPartition pair.
*/

const getAnyItemFromSet = <T>(set: Set<T>): T | undefined => {
return set.values().next().value
}

export class SubscriptionSession {

public readonly streamPartId: StreamPartID
Expand Down Expand Up @@ -102,7 +106,8 @@ export class SubscriptionSession {
private async subscribe(): Promise<void> {
this.node.addMessageListener(this.onMessageInput)
if (!await this.node.isProxiedStreamPart(this.streamPartId)) {
Comment thread
juslesan marked this conversation as resolved.
await this.node.join(this.streamPartId)
const deliveryOptions = getAnyItemFromSet(this.subscriptions)!.deliveryOptions
Comment thread
juslesan marked this conversation as resolved.
await this.node.join(this.streamPartId, undefined, deliveryOptions)
Comment thread
juslesan marked this conversation as resolved.
}
}

Expand Down
1 change: 1 addition & 0 deletions packages/sdk/src/utils/addStreamToStorageNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export const addStreamToStorageNode = async (
streamPartId,
false,
undefined,
undefined,
new EventEmitter<SubscriptionEvents>(),
loggerFactory
)
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/test/unit/SubscriptionSession.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const ADDRESS_ONE = randomEthereumAddress()
const ADDRESS_TWO = randomEthereumAddress()

function createSubscription(erc1271contractAddress?: EthereumAddress): Subscription {
return new Subscription(STREAM_PART_ID, false, erc1271contractAddress, mock(), mock())
return new Subscription(STREAM_PART_ID, false, erc1271contractAddress, undefined, mock(), mock())
}

describe('SubscriptionSession', () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/test/unit/resendSubscription.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ describe('resend subscription', () => {
gapFill = true
) => {
const eventEmitter = new EventEmitter<SubscriptionEvents>()
sub = new Subscription(STREAM_PART_ID, false, undefined, eventEmitter, mockLoggerFactory())
sub = new Subscription(STREAM_PART_ID, false, undefined, undefined, eventEmitter, mockLoggerFactory())
initResendSubscription(
sub,
{} as any,
Expand Down
4 changes: 2 additions & 2 deletions packages/trackerless-network/src/ContentDeliveryManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export interface ContentDeliveryManagerOptions {
}

export interface StreamPartDeliveryOptions {
plumTreeOptimisation?: boolean
plumtreeOptimization?: boolean
}

export const streamPartIdToDataKey = (streamPartId: StreamPartID): DhtAddress => {
Expand Down Expand Up @@ -288,7 +288,7 @@ export class ContentDeliveryManager extends EventEmitter<Events> {
neighborUpdateInterval: this.options.neighborUpdateInterval,
isLocalNodeEntryPoint,
doNotBufferWhileConnecting: this.options.doNotBufferWhileConnecting,
plumTreeOptimisation: streamPartDeliveryOptions?.plumTreeOptimisation
plumtreeOptimization: streamPartDeliveryOptions?.plumtreeOptimization
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type ContentDeliveryLayerNodeOptions = MarkOptional<StrictContentDeliveryLayerNo
neighborUpdateInterval?: number
maxPropagationBufferSize?: number
doNotBufferWhileConnecting?: boolean
plumTreeOptimisation?: boolean
plumtreeOptimization?: boolean
}

const createConfigWithDefaults = (options: ContentDeliveryLayerNodeOptions): StrictContentDeliveryLayerNodeOptions => {
Expand Down Expand Up @@ -68,7 +68,7 @@ const createConfigWithDefaults = (options: ContentDeliveryLayerNodeOptions): Str
streamPartId: options.streamPartId,
rpcCommunicator
}) : undefined
const plumtreeManager = options.plumTreeOptimisation ? new PlumtreeManager({
const plumtreeManager = options.plumtreeOptimization ? new PlumtreeManager({
neighbors,
localPeerDescriptor: options.localPeerDescriptor,
rpcCommunicator
Expand Down
2 changes: 1 addition & 1 deletion packages/trackerless-network/src/exports.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export { NetworkNode, createNetworkNode } from './NetworkNode'
export { type NetworkOptions, NetworkStack } from './NetworkStack'
export { type ContentDeliveryManagerOptions, streamPartIdToDataKey } from './ContentDeliveryManager'
export { type ContentDeliveryManagerOptions, type StreamPartDeliveryOptions, streamPartIdToDataKey } from './ContentDeliveryManager'
export {
AsymmetricEncryptionType,
ContentType,
Expand Down
4 changes: 2 additions & 2 deletions packages/trackerless-network/test/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export const createMockContentDeliveryLayerNodeAndDhtNode = async (
entryPointDescriptor: PeerDescriptor,
streamPartId: StreamPartID,
simulator: Simulator,
plumTreeOptimisation?: boolean
plumtreeOptimization?: boolean
): Promise<[ DiscoveryLayerNode, ContentDeliveryLayerNode ]> => {
const mockCm = new SimulatorTransport(localPeerDescriptor, simulator)
await mockCm.start()
Expand All @@ -62,7 +62,7 @@ export const createMockContentDeliveryLayerNodeAndDhtNode = async (
localPeerDescriptor,
rpcRequestTimeout: 5000,
isLocalNodeEntryPoint: () => false,
plumTreeOptimisation
plumtreeOptimization
})
return [discoveryLayerNode, contentDeliveryLayerNode]
}
Expand Down
Loading