diff --git a/README.md b/README.md index 6ea63d4ca..05bbe5463 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,10 @@ const stream = await client.getOrCreateStream({ name: 'My awesome stream created via the API', }) console.log(`Stream ${stream.id} has been created!`) + +// Optional: to enable historical data resends, add the stream to a storage node +await stream.addToStorageNode(StorageNode.STREAMR_GERMANY) + // Do something with the stream, for example call stream.publish(message) ``` diff --git a/src/Config.ts b/src/Config.ts index 6d48b1430..1708c0216 100644 --- a/src/Config.ts +++ b/src/Config.ts @@ -13,6 +13,7 @@ import { BytesLike } from '@ethersproject/bytes' import { isAddress } from '@ethersproject/address' import has from 'lodash/has' import get from 'lodash/get' +import { StorageNode } from './stream/StorageNode' export type EthereumConfig = ExternalProvider|JsonRpcFetchFunc @@ -144,7 +145,7 @@ export const STREAM_CLIENT_DEFAULTS: StrictStreamrClientOptions = { templateSidechainAddress: '0xf1E9d6E254BeA3f0129018AcA1A50AEcb7D528be', }, storageNode: { - address: '0x31546eEA76F2B2b3C5cC06B1c93601dc35c9D916', + address: StorageNode.STREAMR_GERMANY.getAddress(), url: 'https://corea1.streamr.network:8001' }, cache: { diff --git a/src/rest/StreamEndpoints.ts b/src/rest/StreamEndpoints.ts index 6233fe3a8..ca237af00 100644 --- a/src/rest/StreamEndpoints.ts +++ b/src/rest/StreamEndpoints.ts @@ -15,6 +15,7 @@ import { EthereumAddress } from '../types' import { StreamrClient } from '../StreamrClient' // TODO change this import when streamr-client-protocol exports StreamMessage type or the enums types directly import { ContentType, EncryptionType, SignatureType, StreamMessageType } from 'streamr-client-protocol/dist/src/protocol/message_layer/StreamMessage' +import { StorageNode } from '../stream/StorageNode' const debug = debugFactory('StreamrClient') @@ -255,7 +256,8 @@ export class StreamEndpoints { return json } - async getStreamPartsByStorageNode(address: EthereumAddress) { + async getStreamPartsByStorageNode(node: StorageNode|EthereumAddress) { + const address = (node instanceof StorageNode) ? node.getAddress() : node type ItemType = { id: string, partitions: number} const json = await authFetch(getEndpointUrl(this.client.options.restUrl, 'storageNodes', address, 'streams'), this.client.session) let result: StreamPart[] = [] diff --git a/src/stream/StorageNode.ts b/src/stream/StorageNode.ts index f16b938f0..51ddacff4 100644 --- a/src/stream/StorageNode.ts +++ b/src/stream/StorageNode.ts @@ -2,6 +2,9 @@ import { EthereumAddress } from '../types' export class StorageNode { + static STREAMR_GERMANY = new StorageNode('0x31546eEA76F2B2b3C5cC06B1c93601dc35c9D916') + static STREAMR_DOCKER_DEV = new StorageNode('0xde1112f631486CfC759A50196853011528bC5FA0') + private _address: EthereumAddress constructor(address: EthereumAddress) { diff --git a/src/stream/index.ts b/src/stream/index.ts index 1ca1d5d2e..397b5dda2 100644 --- a/src/stream/index.ts +++ b/src/stream/index.ts @@ -5,6 +5,7 @@ import authFetch from '../rest/authFetch' import { StorageNode } from './StorageNode' import { StreamrClient } from '../StreamrClient' +import { EthereumAddress } from '../types' // TODO explicit types: e.g. we never provide both streamId and id, or both streamPartition and partition export type StreamPartDefinitionOptions = { streamId?: string, streamPartition?: number, id?: string, partition?: number, stream?: Stream|string } @@ -223,7 +224,8 @@ export class Stream { await this.update() } - async addToStorageNode(address: string, { timeout = 30000 }: { timeout: number } = { timeout: 30000 }) { + async addToStorageNode(node: StorageNode|EthereumAddress, { timeout = 30000 }: { timeout: number } = { timeout: 30000 }) { + const address = (node instanceof StorageNode) ? node.getAddress() : node // currently we support only one storage node // -> we can validate that the given address is that address // -> remove this comparison when we start to support multiple storage nodes @@ -258,7 +260,8 @@ export class Stream { throw new Error(`Unexpected response code ${response.status} when fetching stream storage status`) } - async removeFromStorageNode(address: string) { + async removeFromStorageNode(node: StorageNode|EthereumAddress) { + const address = (node instanceof StorageNode) ? node.getAddress() : node await authFetch( getEndpointUrl(this._client.options.restUrl, 'streams', this.id, 'storageNodes', address), this._client.session, diff --git a/test/integration/Encryption.test.js b/test/integration/Encryption.test.js index b0f741ccf..b98c106ad 100644 --- a/test/integration/Encryption.test.js +++ b/test/integration/Encryption.test.js @@ -6,6 +6,7 @@ import { Defer } from '../../src/utils' import { StreamrClient } from '../../src/StreamrClient' import { GroupKey } from '../../src/stream/Encryption' import Connection from '../../src/Connection' +import { StorageNode } from '../../src/stream/StorageNode' import config from './config' @@ -107,7 +108,7 @@ describe('decryption', () => { requireEncryptedData: true, }) - await stream.addToStorageNode(config.clientOptions.storageNode.address) + await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV) publishTestMessages = getPublishTestMessages(client, { stream diff --git a/test/integration/GapFill.test.ts b/test/integration/GapFill.test.ts index b4c8e31d4..da57a5d87 100644 --- a/test/integration/GapFill.test.ts +++ b/test/integration/GapFill.test.ts @@ -9,6 +9,7 @@ import { Stream } from '../../src/stream' import { Subscriber, Subscription } from '../../src/subscribe' import { MessageRef } from 'streamr-client-protocol/dist/src/protocol/message_layer' import { StreamrClientOptions } from '../../src' +import { StorageNode } from '../../src/stream/StorageNode' const MAX_MESSAGES = 10 @@ -47,7 +48,7 @@ describeRepeats('GapFill with resends', () => { requireSignedData: true, name: uid('stream') }) - await stream.addToStorageNode(config.clientOptions.storageNode.address) + await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV) client.debug('connecting before test <<') publishTestMessages = getPublishTestMessages(client, stream.id) diff --git a/test/integration/MultipleClients.test.js b/test/integration/MultipleClients.test.js index 0b111fe39..101f0f5b6 100644 --- a/test/integration/MultipleClients.test.js +++ b/test/integration/MultipleClients.test.js @@ -5,6 +5,7 @@ import { describeRepeats, uid, fakePrivateKey, getWaitForStorage, getPublishTest import { StreamrClient } from '../../src/StreamrClient' import { counterId, Defer, pLimitFn } from '../../src/utils' import Connection from '../../src/Connection' +import { StorageNode } from '../../src/stream/StorageNode' import config from './config' @@ -51,7 +52,7 @@ describeRepeats('PubSub with multiple clients', () => { stream = await mainClient.createStream({ name: uid('stream') }) - await stream.addToStorageNode(config.clientOptions.storageNode.address) + await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV) }) afterEach(async () => { diff --git a/test/integration/ResendReconnect.test.ts b/test/integration/ResendReconnect.test.ts index a3efd3700..3619da6c0 100644 --- a/test/integration/ResendReconnect.test.ts +++ b/test/integration/ResendReconnect.test.ts @@ -8,6 +8,7 @@ import config from './config' import { Stream } from '../../src/stream' import { Subscription } from '../../src' import { PublishRequest } from 'streamr-client-protocol/dist/src/protocol/control_layer' +import { StorageNode } from '../../src/stream/StorageNode' const createClient = (opts = {}) => new StreamrClient({ ...config.clientOptions, @@ -35,7 +36,7 @@ describe('resend/reconnect', () => { name: uid('resends') }) - await stream.addToStorageNode(config.clientOptions.storageNode.address) + await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV) }, 10000) beforeEach(async () => { diff --git a/test/integration/Resends.test.ts b/test/integration/Resends.test.ts index e55d58636..7cd71d3b7 100644 --- a/test/integration/Resends.test.ts +++ b/test/integration/Resends.test.ts @@ -7,6 +7,7 @@ import Connection from '../../src/Connection' import config from './config' import { Stream } from '../../src/stream' +import { StorageNode } from '../../src/stream/StorageNode' const MAX_MESSAGES = 10 const WAIT_FOR_STORAGE_TIMEOUT = 6000 @@ -76,7 +77,7 @@ describe('StreamrClient resends', () => { name: uid('resends') }) - await stream.addToStorageNode(config.clientOptions.storageNode.address) + await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV) }) beforeEach(async () => { @@ -337,7 +338,7 @@ describe('StreamrClient resends', () => { name: uid('resends') }) - await stream.addToStorageNode(config.clientOptions.storageNode.address) + await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV) publishTestMessages = getPublishTestMessages(client, { stream diff --git a/test/integration/Stream.test.ts b/test/integration/Stream.test.ts index 05daf5c6a..e4ee94909 100644 --- a/test/integration/Stream.test.ts +++ b/test/integration/Stream.test.ts @@ -1,6 +1,7 @@ import { StreamrClient } from '../../src/StreamrClient' import { Stream } from '../../src/stream' import { uid, fakePrivateKey, getPublishTestMessages } from '../utils' +import { StorageNode } from '../../src/stream/StorageNode' import config from './config' @@ -25,7 +26,7 @@ describe('Stream', () => { stream = await client.createStream({ name: uid('stream-integration-test') }) - await stream.addToStorageNode(config.clientOptions.storageNode.address) + await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV) }) afterEach(async () => { diff --git a/test/integration/StreamConnectionState.test.ts b/test/integration/StreamConnectionState.test.ts index 9d57553d7..265bf2013 100644 --- a/test/integration/StreamConnectionState.test.ts +++ b/test/integration/StreamConnectionState.test.ts @@ -9,6 +9,7 @@ import config from './config' import { Stream } from '../../src/stream' import { Subscriber, Subscription } from '../../src/subscribe' import { StreamrClientOptions } from '../../src' +import { StorageNode } from '../../src/stream/StorageNode' const MAX_MESSAGES = 5 @@ -50,7 +51,7 @@ describeRepeats('Connection State', () => { requireSignedData: true, name: uid('stream') }) - await stream.addToStorageNode(config.clientOptions.storageNode.address) + await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV) client.debug('connecting before test <<') publishTestMessages = getPublishTestMessages(client, stream.id) diff --git a/test/integration/StreamEndpoints.test.ts b/test/integration/StreamEndpoints.test.ts index eb9fda535..892526b98 100644 --- a/test/integration/StreamEndpoints.test.ts +++ b/test/integration/StreamEndpoints.test.ts @@ -1,6 +1,7 @@ import { ethers, Wallet } from 'ethers' import { NotFoundError, ValidationError } from '../../src/rest/authFetch' import { Stream, StreamOperation } from '../../src/stream' +import { StorageNode } from '../../src/stream/StorageNode' import { StreamrClient } from '../../src/StreamrClient' import { uid } from '../utils' @@ -231,26 +232,26 @@ function TestStreamEndpoints(getName: () => string) { describe('Storage node assignment', () => { it('add', async () => { - const storageNodeAddress = client.options.storageNode.address + const storageNode = StorageNode.STREAMR_DOCKER_DEV const stream = await client.createStream() - await stream.addToStorageNode(storageNodeAddress) + await stream.addToStorageNode(storageNode) const storageNodes = await stream.getStorageNodes() expect(storageNodes.length).toBe(1) - expect(storageNodes[0].getAddress()).toBe(storageNodeAddress) - const storedStreamParts = await client.getStreamPartsByStorageNode(storageNodeAddress) + expect(storageNodes[0].getAddress()).toBe(storageNode.getAddress()) + const storedStreamParts = await client.getStreamPartsByStorageNode(storageNode) expect(storedStreamParts.some( (sp) => (sp.getStreamId() === stream.id) && (sp.getStreamPartition() === 0) )).toBeTruthy() }) it('remove', async () => { - const storageNodeAddress = client.options.storageNode.address + const storageNode = StorageNode.STREAMR_DOCKER_DEV const stream = await client.createStream() - await stream.addToStorageNode(storageNodeAddress) - await stream.removeFromStorageNode(storageNodeAddress) + await stream.addToStorageNode(storageNode) + await stream.removeFromStorageNode(storageNode) const storageNodes = await stream.getStorageNodes() expect(storageNodes).toHaveLength(0) - const storedStreamParts = await client.getStreamPartsByStorageNode(storageNodeAddress) + const storedStreamParts = await client.getStreamPartsByStorageNode(storageNode) expect(storedStreamParts.some( (sp) => (sp.getStreamId() === stream.id) )).toBeFalsy() diff --git a/test/integration/StreamrClient.test.ts b/test/integration/StreamrClient.test.ts index 6d3542975..8e44d43b6 100644 --- a/test/integration/StreamrClient.test.ts +++ b/test/integration/StreamrClient.test.ts @@ -12,6 +12,7 @@ import Connection from '../../src/Connection' import config from './config' import { Stream } from '../../src/stream' import { Subscription } from '../../src' +import { StorageNode } from '../../src/stream/StorageNode' const { StreamMessage } = MessageLayer @@ -104,7 +105,7 @@ describeRepeats('StreamrClient', () => { requireSignedData, ...opts, }) - await s.addToStorageNode(config.clientOptions.storageNode.address) + await s.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV) expect(s.id).toBeTruthy() expect(s.name).toEqual(name) diff --git a/test/integration/StreamrClientResends.test.ts b/test/integration/StreamrClientResends.test.ts index a36604905..5b3d5d7ca 100644 --- a/test/integration/StreamrClientResends.test.ts +++ b/test/integration/StreamrClientResends.test.ts @@ -6,6 +6,7 @@ import Connection from '../../src/Connection' import config from './config' import { Stream } from '../../src/stream' +import { StorageNode } from '../../src/stream/StorageNode' describeRepeats('StreamrClient Resend', () => { let expectErrors = 0 // check no errors by default @@ -76,7 +77,7 @@ describeRepeats('StreamrClient Resend', () => { requireSignedData, ...opts, }) - await s.addToStorageNode(config.clientOptions.storageNode.address) + await s.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV) expect(s.id).toBeTruthy() expect(s.name).toEqual(name) diff --git a/test/integration/Subscriber.test.js b/test/integration/Subscriber.test.js index c45b0a38b..2c98f3786 100644 --- a/test/integration/Subscriber.test.js +++ b/test/integration/Subscriber.test.js @@ -5,6 +5,7 @@ import { uid, fakePrivateKey, describeRepeats, getPublishTestMessages, collect } import { StreamrClient } from '../../src/StreamrClient' import { Defer } from '../../src/utils' import Connection from '../../src/Connection' +import { StorageNode } from '../../src/stream/StorageNode' import config from './config' @@ -54,7 +55,7 @@ describeRepeats('Subscriber', () => { stream = await client.createStream({ name: uid('stream') }) - await stream.addToStorageNode(config.clientOptions.storageNode.address) + await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV) client.debug('connecting before test <<') publishTestMessages = getPublishTestMessages(client, { stream diff --git a/test/integration/SubscriberResends.test.ts b/test/integration/SubscriberResends.test.ts index 615218e29..611f47382 100644 --- a/test/integration/SubscriberResends.test.ts +++ b/test/integration/SubscriberResends.test.ts @@ -19,6 +19,7 @@ import { Defer } from '../../src/utils' import config from './config' import { Stream } from '../../src/stream' import { Subscriber } from '../../src/subscribe' +import { StorageNode } from '../../src/stream/StorageNode' const { ControlMessage } = ControlLayer @@ -78,7 +79,7 @@ describeRepeats('resends', () => { beforeAll(async () => { client.debug('addToStorageNode >>') - await stream.addToStorageNode(config.clientOptions.storageNode.address, { + await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV, { timeout: WAIT_FOR_STORAGE_TIMEOUT * 2, }) client.debug('addToStorageNode <<') @@ -138,7 +139,7 @@ describeRepeats('resends', () => { emptyStream = await client.createStream({ name: uid('stream') }) - await emptyStream.addToStorageNode(config.clientOptions.storageNode.address) + await emptyStream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV) await expect(async () => { await subscriber.resend({ streamId: emptyStream.id, @@ -151,7 +152,7 @@ describeRepeats('resends', () => { emptyStream = await client.createStream({ name: uid('stream') }) - await emptyStream.addToStorageNode(config.clientOptions.storageNode.address) + await emptyStream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV) const sub = await subscriber.resend({ streamId: emptyStream.id, @@ -171,7 +172,7 @@ describeRepeats('resends', () => { emptyStream = await client.createStream({ name: uid('stream') }) - await emptyStream.addToStorageNode(config.clientOptions.storageNode.address) + await emptyStream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV) const sub = await subscriber.resendSubscribe({ streamId: emptyStream.id, diff --git a/test/integration/SubscriberResendsSequential.test.ts b/test/integration/SubscriberResendsSequential.test.ts index 026a8099c..275cfa98d 100644 --- a/test/integration/SubscriberResendsSequential.test.ts +++ b/test/integration/SubscriberResendsSequential.test.ts @@ -15,6 +15,7 @@ import Connection from '../../src/Connection' import config from './config' import { Stream } from '../../src/stream' import { Subscriber } from '../../src/subscribe' +import { StorageNode } from '../../src/stream/StorageNode' /* eslint-disable no-await-in-loop */ @@ -66,7 +67,7 @@ describeRepeats('sequential resend subscribe', () => { stream = await client.createStream({ name: uid('stream') }) - await stream.addToStorageNode(config.clientOptions.storageNode.address) + await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV) publishTestMessages = getPublishTestMessages(client, { stream, diff --git a/test/integration/Subscription.test.ts b/test/integration/Subscription.test.ts index 298258f3e..a66e2ab1d 100644 --- a/test/integration/Subscription.test.ts +++ b/test/integration/Subscription.test.ts @@ -6,6 +6,7 @@ import { StreamrClient } from '../../src/StreamrClient' import config from './config' import { Stream } from '../../src/stream' import { Subscription } from '../../src/subscribe' +import { StorageNode } from '../../src/stream/StorageNode' const createClient = (opts = {}) => new StreamrClient({ ...config.clientOptions, @@ -72,7 +73,7 @@ describe('Subscription', () => { stream = await client.createStream({ name: uid('stream') }) - await stream.addToStorageNode(config.clientOptions.storageNode.address) + await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV) await client.connect() }) diff --git a/test/integration/config.js b/test/integration/config.js index 013a61877..177b5cfef 100644 --- a/test/integration/config.js +++ b/test/integration/config.js @@ -1,3 +1,5 @@ +const { StorageNode } = require('../../src/stream/StorageNode') + const toNumber = (value) => { return (value !== undefined) ? Number(value) : undefined } @@ -20,8 +22,7 @@ module.exports = { templateSidechainAddress: process.env.DU_TEMPLATE_SIDECHAIN || '0x36afc8c9283CC866b8EB6a61C6e6862a83cd6ee8', }, storageNode: { - // "broker-node-storage-1" on Docker environment - address: '0xde1112f631486CfC759A50196853011528bC5FA0', + address: StorageNode.STREAMR_DOCKER_DEV.getAddress(), url: `http://${process.env.STREAMR_DOCKER_DEV_HOST || '10.200.10.1'}:8891` }, sidechain: {