Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ const stream = await client.getOrCreateStream({
name: 'My awesome stream created via the API',
})
console.log(`Stream ${stream.id} has been created!`)

// Optional: to enable historical data resends, add the stream to a storage node
await stream.addToStorageNode(StorageNode.STREAMR_GERMANY)

// Do something with the stream, for example call stream.publish(message)
```

Expand Down
3 changes: 2 additions & 1 deletion src/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { BytesLike } from '@ethersproject/bytes'
import { isAddress } from '@ethersproject/address'
import has from 'lodash/has'
import get from 'lodash/get'
import { StorageNode } from './stream/StorageNode'

export type EthereumConfig = ExternalProvider|JsonRpcFetchFunc

Expand Down Expand Up @@ -144,7 +145,7 @@ export const STREAM_CLIENT_DEFAULTS: StrictStreamrClientOptions = {
templateSidechainAddress: '0xf1E9d6E254BeA3f0129018AcA1A50AEcb7D528be',
},
storageNode: {
address: '0x31546eEA76F2B2b3C5cC06B1c93601dc35c9D916',
address: StorageNode.STREAMR_GERMANY.getAddress(),
url: 'https://corea1.streamr.network:8001'
},
cache: {
Expand Down
4 changes: 3 additions & 1 deletion src/rest/StreamEndpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { EthereumAddress } from '../types'
import { StreamrClient } from '../StreamrClient'
// TODO change this import when streamr-client-protocol exports StreamMessage type or the enums types directly
import { ContentType, EncryptionType, SignatureType, StreamMessageType } from 'streamr-client-protocol/dist/src/protocol/message_layer/StreamMessage'
import { StorageNode } from '../stream/StorageNode'

const debug = debugFactory('StreamrClient')

Expand Down Expand Up @@ -255,7 +256,8 @@ export class StreamEndpoints {
return json
}

async getStreamPartsByStorageNode(address: EthereumAddress) {
async getStreamPartsByStorageNode(node: StorageNode|EthereumAddress) {
const address = (node instanceof StorageNode) ? node.getAddress() : node
type ItemType = { id: string, partitions: number}
const json = await authFetch<ItemType[]>(getEndpointUrl(this.client.options.restUrl, 'storageNodes', address, 'streams'), this.client.session)
let result: StreamPart[] = []
Expand Down
3 changes: 3 additions & 0 deletions src/stream/StorageNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ import { EthereumAddress } from '../types'

export class StorageNode {

static STREAMR_GERMANY = new StorageNode('0x31546eEA76F2B2b3C5cC06B1c93601dc35c9D916')
static STREAMR_DOCKER_DEV = new StorageNode('0xde1112f631486CfC759A50196853011528bC5FA0')

private _address: EthereumAddress

constructor(address: EthereumAddress) {
Expand Down
7 changes: 5 additions & 2 deletions src/stream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import authFetch from '../rest/authFetch'

import { StorageNode } from './StorageNode'
import { StreamrClient } from '../StreamrClient'
import { EthereumAddress } from '../types'

// TODO explicit types: e.g. we never provide both streamId and id, or both streamPartition and partition
export type StreamPartDefinitionOptions = { streamId?: string, streamPartition?: number, id?: string, partition?: number, stream?: Stream|string }
Expand Down Expand Up @@ -223,7 +224,8 @@ export class Stream {
await this.update()
}

async addToStorageNode(address: string, { timeout = 30000 }: { timeout: number } = { timeout: 30000 }) {
async addToStorageNode(node: StorageNode|EthereumAddress, { timeout = 30000 }: { timeout: number } = { timeout: 30000 }) {
const address = (node instanceof StorageNode) ? node.getAddress() : node
// currently we support only one storage node
// -> we can validate that the given address is that address
// -> remove this comparison when we start to support multiple storage nodes
Expand Down Expand Up @@ -258,7 +260,8 @@ export class Stream {
throw new Error(`Unexpected response code ${response.status} when fetching stream storage status`)
}

async removeFromStorageNode(address: string) {
async removeFromStorageNode(node: StorageNode|EthereumAddress) {
const address = (node instanceof StorageNode) ? node.getAddress() : node
await authFetch(
getEndpointUrl(this._client.options.restUrl, 'streams', this.id, 'storageNodes', address),
this._client.session,
Expand Down
3 changes: 2 additions & 1 deletion test/integration/Encryption.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Defer } from '../../src/utils'
import { StreamrClient } from '../../src/StreamrClient'
import { GroupKey } from '../../src/stream/Encryption'
import Connection from '../../src/Connection'
import { StorageNode } from '../../src/stream/StorageNode'

import config from './config'

Expand Down Expand Up @@ -107,7 +108,7 @@ describe('decryption', () => {
requireEncryptedData: true,
})

await stream.addToStorageNode(config.clientOptions.storageNode.address)
await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV)

publishTestMessages = getPublishTestMessages(client, {
stream
Expand Down
3 changes: 2 additions & 1 deletion test/integration/GapFill.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { Stream } from '../../src/stream'
import { Subscriber, Subscription } from '../../src/subscribe'
import { MessageRef } from 'streamr-client-protocol/dist/src/protocol/message_layer'
import { StreamrClientOptions } from '../../src'
import { StorageNode } from '../../src/stream/StorageNode'

const MAX_MESSAGES = 10

Expand Down Expand Up @@ -47,7 +48,7 @@ describeRepeats('GapFill with resends', () => {
requireSignedData: true,
name: uid('stream')
})
await stream.addToStorageNode(config.clientOptions.storageNode.address)
await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV)

client.debug('connecting before test <<')
publishTestMessages = getPublishTestMessages(client, stream.id)
Expand Down
3 changes: 2 additions & 1 deletion test/integration/MultipleClients.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { describeRepeats, uid, fakePrivateKey, getWaitForStorage, getPublishTest
import { StreamrClient } from '../../src/StreamrClient'
import { counterId, Defer, pLimitFn } from '../../src/utils'
import Connection from '../../src/Connection'
import { StorageNode } from '../../src/stream/StorageNode'

import config from './config'

Expand Down Expand Up @@ -51,7 +52,7 @@ describeRepeats('PubSub with multiple clients', () => {
stream = await mainClient.createStream({
name: uid('stream')
})
await stream.addToStorageNode(config.clientOptions.storageNode.address)
await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV)
})

afterEach(async () => {
Expand Down
3 changes: 2 additions & 1 deletion test/integration/ResendReconnect.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import config from './config'
import { Stream } from '../../src/stream'
import { Subscription } from '../../src'
import { PublishRequest } from 'streamr-client-protocol/dist/src/protocol/control_layer'
import { StorageNode } from '../../src/stream/StorageNode'

const createClient = (opts = {}) => new StreamrClient({
...config.clientOptions,
Expand Down Expand Up @@ -35,7 +36,7 @@ describe('resend/reconnect', () => {
name: uid('resends')
})

await stream.addToStorageNode(config.clientOptions.storageNode.address)
await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV)
}, 10000)

beforeEach(async () => {
Expand Down
5 changes: 3 additions & 2 deletions test/integration/Resends.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import Connection from '../../src/Connection'

import config from './config'
import { Stream } from '../../src/stream'
import { StorageNode } from '../../src/stream/StorageNode'

const MAX_MESSAGES = 10
const WAIT_FOR_STORAGE_TIMEOUT = 6000
Expand Down Expand Up @@ -76,7 +77,7 @@ describe('StreamrClient resends', () => {
name: uid('resends')
})

await stream.addToStorageNode(config.clientOptions.storageNode.address)
await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV)
})

beforeEach(async () => {
Expand Down Expand Up @@ -337,7 +338,7 @@ describe('StreamrClient resends', () => {
name: uid('resends')
})

await stream.addToStorageNode(config.clientOptions.storageNode.address)
await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV)

publishTestMessages = getPublishTestMessages(client, {
stream
Expand Down
3 changes: 2 additions & 1 deletion test/integration/Stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { StreamrClient } from '../../src/StreamrClient'
import { Stream } from '../../src/stream'
import { uid, fakePrivateKey, getPublishTestMessages } from '../utils'
import { StorageNode } from '../../src/stream/StorageNode'

import config from './config'

Expand All @@ -25,7 +26,7 @@ describe('Stream', () => {
stream = await client.createStream({
name: uid('stream-integration-test')
})
await stream.addToStorageNode(config.clientOptions.storageNode.address)
await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV)
})

afterEach(async () => {
Expand Down
3 changes: 2 additions & 1 deletion test/integration/StreamConnectionState.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import config from './config'
import { Stream } from '../../src/stream'
import { Subscriber, Subscription } from '../../src/subscribe'
import { StreamrClientOptions } from '../../src'
import { StorageNode } from '../../src/stream/StorageNode'

const MAX_MESSAGES = 5

Expand Down Expand Up @@ -50,7 +51,7 @@ describeRepeats('Connection State', () => {
requireSignedData: true,
name: uid('stream')
})
await stream.addToStorageNode(config.clientOptions.storageNode.address)
await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV)

client.debug('connecting before test <<')
publishTestMessages = getPublishTestMessages(client, stream.id)
Expand Down
17 changes: 9 additions & 8 deletions test/integration/StreamEndpoints.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { ethers, Wallet } from 'ethers'
import { NotFoundError, ValidationError } from '../../src/rest/authFetch'
import { Stream, StreamOperation } from '../../src/stream'
import { StorageNode } from '../../src/stream/StorageNode'

import { StreamrClient } from '../../src/StreamrClient'
import { uid } from '../utils'
Expand Down Expand Up @@ -231,26 +232,26 @@ function TestStreamEndpoints(getName: () => string) {

describe('Storage node assignment', () => {
it('add', async () => {
const storageNodeAddress = client.options.storageNode.address
const storageNode = StorageNode.STREAMR_DOCKER_DEV
const stream = await client.createStream()
await stream.addToStorageNode(storageNodeAddress)
await stream.addToStorageNode(storageNode)
const storageNodes = await stream.getStorageNodes()
expect(storageNodes.length).toBe(1)
expect(storageNodes[0].getAddress()).toBe(storageNodeAddress)
const storedStreamParts = await client.getStreamPartsByStorageNode(storageNodeAddress)
expect(storageNodes[0].getAddress()).toBe(storageNode.getAddress())
const storedStreamParts = await client.getStreamPartsByStorageNode(storageNode)
expect(storedStreamParts.some(
(sp) => (sp.getStreamId() === stream.id) && (sp.getStreamPartition() === 0)
)).toBeTruthy()
})

it('remove', async () => {
const storageNodeAddress = client.options.storageNode.address
const storageNode = StorageNode.STREAMR_DOCKER_DEV
const stream = await client.createStream()
await stream.addToStorageNode(storageNodeAddress)
await stream.removeFromStorageNode(storageNodeAddress)
await stream.addToStorageNode(storageNode)
await stream.removeFromStorageNode(storageNode)
const storageNodes = await stream.getStorageNodes()
expect(storageNodes).toHaveLength(0)
const storedStreamParts = await client.getStreamPartsByStorageNode(storageNodeAddress)
const storedStreamParts = await client.getStreamPartsByStorageNode(storageNode)
expect(storedStreamParts.some(
(sp) => (sp.getStreamId() === stream.id)
)).toBeFalsy()
Expand Down
3 changes: 2 additions & 1 deletion test/integration/StreamrClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import Connection from '../../src/Connection'
import config from './config'
import { Stream } from '../../src/stream'
import { Subscription } from '../../src'
import { StorageNode } from '../../src/stream/StorageNode'

const { StreamMessage } = MessageLayer

Expand Down Expand Up @@ -104,7 +105,7 @@ describeRepeats('StreamrClient', () => {
requireSignedData,
...opts,
})
await s.addToStorageNode(config.clientOptions.storageNode.address)
await s.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV)

expect(s.id).toBeTruthy()
expect(s.name).toEqual(name)
Expand Down
3 changes: 2 additions & 1 deletion test/integration/StreamrClientResends.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import Connection from '../../src/Connection'

import config from './config'
import { Stream } from '../../src/stream'
import { StorageNode } from '../../src/stream/StorageNode'

describeRepeats('StreamrClient Resend', () => {
let expectErrors = 0 // check no errors by default
Expand Down Expand Up @@ -76,7 +77,7 @@ describeRepeats('StreamrClient Resend', () => {
requireSignedData,
...opts,
})
await s.addToStorageNode(config.clientOptions.storageNode.address)
await s.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV)

expect(s.id).toBeTruthy()
expect(s.name).toEqual(name)
Expand Down
3 changes: 2 additions & 1 deletion test/integration/Subscriber.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { uid, fakePrivateKey, describeRepeats, getPublishTestMessages, collect }
import { StreamrClient } from '../../src/StreamrClient'
import { Defer } from '../../src/utils'
import Connection from '../../src/Connection'
import { StorageNode } from '../../src/stream/StorageNode'

import config from './config'

Expand Down Expand Up @@ -54,7 +55,7 @@ describeRepeats('Subscriber', () => {
stream = await client.createStream({
name: uid('stream')
})
await stream.addToStorageNode(config.clientOptions.storageNode.address)
await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV)
client.debug('connecting before test <<')
publishTestMessages = getPublishTestMessages(client, {
stream
Expand Down
9 changes: 5 additions & 4 deletions test/integration/SubscriberResends.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { Defer } from '../../src/utils'
import config from './config'
import { Stream } from '../../src/stream'
import { Subscriber } from '../../src/subscribe'
import { StorageNode } from '../../src/stream/StorageNode'

const { ControlMessage } = ControlLayer

Expand Down Expand Up @@ -78,7 +79,7 @@ describeRepeats('resends', () => {

beforeAll(async () => {
client.debug('addToStorageNode >>')
await stream.addToStorageNode(config.clientOptions.storageNode.address, {
await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV, {
timeout: WAIT_FOR_STORAGE_TIMEOUT * 2,
})
client.debug('addToStorageNode <<')
Expand Down Expand Up @@ -138,7 +139,7 @@ describeRepeats('resends', () => {
emptyStream = await client.createStream({
name: uid('stream')
})
await emptyStream.addToStorageNode(config.clientOptions.storageNode.address)
await emptyStream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV)
await expect(async () => {
await subscriber.resend({
streamId: emptyStream.id,
Expand All @@ -151,7 +152,7 @@ describeRepeats('resends', () => {
emptyStream = await client.createStream({
name: uid('stream')
})
await emptyStream.addToStorageNode(config.clientOptions.storageNode.address)
await emptyStream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV)

const sub = await subscriber.resend({
streamId: emptyStream.id,
Expand All @@ -171,7 +172,7 @@ describeRepeats('resends', () => {
emptyStream = await client.createStream({
name: uid('stream')
})
await emptyStream.addToStorageNode(config.clientOptions.storageNode.address)
await emptyStream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV)

const sub = await subscriber.resendSubscribe({
streamId: emptyStream.id,
Expand Down
3 changes: 2 additions & 1 deletion test/integration/SubscriberResendsSequential.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import Connection from '../../src/Connection'
import config from './config'
import { Stream } from '../../src/stream'
import { Subscriber } from '../../src/subscribe'
import { StorageNode } from '../../src/stream/StorageNode'

/* eslint-disable no-await-in-loop */

Expand Down Expand Up @@ -66,7 +67,7 @@ describeRepeats('sequential resend subscribe', () => {
stream = await client.createStream({
name: uid('stream')
})
await stream.addToStorageNode(config.clientOptions.storageNode.address)
await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV)

publishTestMessages = getPublishTestMessages(client, {
stream,
Expand Down
3 changes: 2 additions & 1 deletion test/integration/Subscription.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { StreamrClient } from '../../src/StreamrClient'
import config from './config'
import { Stream } from '../../src/stream'
import { Subscription } from '../../src/subscribe'
import { StorageNode } from '../../src/stream/StorageNode'

const createClient = (opts = {}) => new StreamrClient({
...config.clientOptions,
Expand Down Expand Up @@ -72,7 +73,7 @@ describe('Subscription', () => {
stream = await client.createStream({
name: uid('stream')
})
await stream.addToStorageNode(config.clientOptions.storageNode.address)
await stream.addToStorageNode(StorageNode.STREAMR_DOCKER_DEV)
await client.connect()
})

Expand Down
5 changes: 3 additions & 2 deletions test/integration/config.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const { StorageNode } = require('../../src/stream/StorageNode')

const toNumber = (value) => {
return (value !== undefined) ? Number(value) : undefined
}
Expand All @@ -20,8 +22,7 @@ module.exports = {
templateSidechainAddress: process.env.DU_TEMPLATE_SIDECHAIN || '0x36afc8c9283CC866b8EB6a61C6e6862a83cd6ee8',
},
storageNode: {
// "broker-node-storage-1" on Docker environment
address: '0xde1112f631486CfC759A50196853011528bC5FA0',
address: StorageNode.STREAMR_DOCKER_DEV.getAddress(),
url: `http://${process.env.STREAMR_DOCKER_DEV_HOST || '10.200.10.1'}:8891`
},
sidechain: {
Expand Down