Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@
"pretty-bytes": "^5.6.0",
"process": "^0.11.10",
"sinon": "^9.2.4",
"streamr-test-utils": "^1.3.1",
"terser-webpack-plugin": "^5.1.1",
"ts-jest": "^26.5.1",
"ts-loader": "^8.0.17",
Expand Down Expand Up @@ -160,6 +159,7 @@
"quick-lru": "^6.0.0",
"readable-stream": "^3.6.0",
"streamr-client-protocol": "^8.0.0-beta.2",
"streamr-test-utils": "^1.3.1",
"ts-toolbelt": "^9.3.12",
"uuid": "^8.3.2",
"webpack-node-externals": "^2.5.2",
Expand Down
13 changes: 11 additions & 2 deletions src/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ export type StrictStreamrClientOptions = {
templateMainnetAddress: EthereumAddress
templateSidechainAddress: EthereumAddress
},
storageNode: {
address: EthereumAddress
url: string
}
cache: {
maxSize: number,
maxAge: number
Expand Down Expand Up @@ -139,6 +143,10 @@ export const STREAM_CLIENT_DEFAULTS: StrictStreamrClientOptions = {
templateMainnetAddress: '0x5FE790E3751dd775Cb92e9086Acd34a2adeB8C7b',
templateSidechainAddress: '0xf1E9d6E254BeA3f0129018AcA1A50AEcb7D528be',
},
storageNode: {
address: '0x31546eEA76F2B2b3C5cC06B1c93601dc35c9D916',
url: 'https://corea1.streamr.network:8001'
},
cache: {
maxSize: 10000,
maxAge: 30 * 60 * 1000, // 30 minutes
Expand All @@ -160,7 +168,8 @@ export default function ClientConfig(opts: StreamrClientOptions = {}) {
'dataUnion.factoryMainnetAddress',
'dataUnion.factorySidechainAddress',
'dataUnion.templateMainnetAddress',
'dataUnion.templateSidechainAddress'
'dataUnion.templateSidechainAddress',
'storageNode.address'
])

const options: StrictStreamrClientOptions = {
Expand All @@ -174,7 +183,7 @@ export default function ClientConfig(opts: StreamrClientOptions = {}) {
...STREAM_CLIENT_DEFAULTS.cache,
...opts.cache,
}
// NOTE: sidechain is not merged with the defaults
// NOTE: sidechain and storageNode settings are not merged with the defaults
}

const parts = options.url!.split('?')
Expand Down
2 changes: 1 addition & 1 deletion src/rest/StreamEndpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import debugFactory from 'debug'
import { getEndpointUrl } from '../utils'
import { validateOptions } from '../stream/utils'
import { Stream, StreamOperation, StreamProperties } from '../stream'
import StreamPart from '../stream/StreamPart'
import { StreamPart } from '../stream/StreamPart'
import { isKeyExchangeStream } from '../stream/KeyExchange'

import authFetch, { ErrorCode, NotFoundError } from './authFetch'
Expand Down
2 changes: 1 addition & 1 deletion src/stream/StorageNode.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { EthereumAddress } from '../types'

export default class StorageNode {
export class StorageNode {

private _address: EthereumAddress

Expand Down
2 changes: 1 addition & 1 deletion src/stream/StreamPart.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export default class StreamPart {
export class StreamPart {

_streamId: string
_streamPartition: number
Expand Down
29 changes: 28 additions & 1 deletion src/stream/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import fetch from 'node-fetch'
import { getAddress } from '@ethersproject/address'
import { waitForCondition } from 'streamr-test-utils'
import { getEndpointUrl } from '../utils'
import authFetch from '../rest/authFetch'

import StorageNode from './StorageNode'
import { StorageNode } from './StorageNode'
import { StreamrClient } from '../StreamrClient'

// TODO explicit types: e.g. we never provide both streamId and id, or both streamPartition and partition
Expand Down Expand Up @@ -221,6 +224,12 @@ export class Stream {
}

async addToStorageNode(address: string) {
// currently we support only one storage node
// -> we can validate that the given address is that address
// -> remove this comparison when we start to support multiple storage nodes
if (getAddress(address) !== this._client.options.storageNode.address) {
throw new Error('Unknown storage node: ' + address)
}
await authFetch(
getEndpointUrl(this._client.options.restUrl, 'streams', this.id, 'storageNodes'),
this._client.session,
Expand All @@ -231,6 +240,24 @@ export class Stream {
})
},
)
// wait for propagation: the storage node sees the database change in E&E and
// is ready to store the any stream data which we publish
const TIMEOUT = 30 * 1000
const POLL_INTERVAL = 500
await waitForCondition(() => this.isStreamStoredInStorageNode(this.id), TIMEOUT, POLL_INTERVAL,
() => `Propagation timeout when adding stream to a storage node: ${this.id}`)
}

private async isStreamStoredInStorageNode(streamId: string) {
const url = `${this._client.options.storageNode.url}/api/v1/streams/${encodeURIComponent(streamId)}/storage/partitions/0`
const response = await fetch(url)
if (response.status === 200) {
return true
}
if (response.status === 404) { // eslint-disable-line padding-line-between-statements
return false
}
throw new Error(`Unexpected response code ${response.status} when fetching stream storage status`)
}

async removeFromStorageNode(address: string) {
Expand Down
2 changes: 2 additions & 0 deletions test/integration/Encryption.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ describe('decryption', () => {
requireEncryptedData: true,
})

await stream.addToStorageNode(config.clientOptions.storageNode.address)

publishTestMessages = getPublishTestMessages(client, {
stream
})
Expand Down
1 change: 1 addition & 0 deletions test/integration/GapFill.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ describeRepeats('GapFill with resends', () => {
requireSignedData: true,
name: uid('stream')
})
await stream.addToStorageNode(config.clientOptions.storageNode.address)

client.debug('connecting before test <<')
publishTestMessages = getPublishTestMessages(client, stream.id)
Expand Down
1 change: 1 addition & 0 deletions test/integration/MultipleClients.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ describeRepeats('PubSub with multiple clients', () => {
stream = await mainClient.createStream({
name: uid('stream')
})
await stream.addToStorageNode(config.clientOptions.storageNode.address)
})

afterEach(async () => {
Expand Down
2 changes: 2 additions & 0 deletions test/integration/ResendReconnect.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ describe('resend/reconnect', () => {
name: uid('resends')
})

await stream.addToStorageNode(config.clientOptions.storageNode.address)

publishTestMessages = getPublishTestMessages(client, {
streamId: stream.id,
waitForLast: true,
Expand Down
5 changes: 5 additions & 0 deletions test/integration/Resends.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ describe('StreamrClient resends', () => {
name: uid('resends')
})

await stream.addToStorageNode(config.clientOptions.storageNode.address)

publishTestMessages = getPublishTestMessages(client, {
stream
})
Expand Down Expand Up @@ -335,6 +337,9 @@ describe('StreamrClient resends', () => {
})

client.debug('CREATED')

await stream.addToStorageNode(config.clientOptions.storageNode.address)

publishTestMessages = getPublishTestMessages(client, {
stream
})
Expand Down
1 change: 1 addition & 0 deletions test/integration/Stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ describe('Stream', () => {
stream = await client.createStream({
name: uid('stream-integration-test')
})
await stream.addToStorageNode(config.clientOptions.storageNode.address)
})

afterEach(async () => {
Expand Down
14 changes: 8 additions & 6 deletions test/integration/StreamEndpoints.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,27 +231,29 @@ function TestStreamEndpoints(getName: () => string) {

describe('Storage node assignment', () => {
it('add', async () => {
const storageNodeAddress = ethers.Wallet.createRandom().address
const storageNodeAddress = client.options.storageNode.address
const stream = await client.createStream()
await stream.addToStorageNode(storageNodeAddress)
const storageNodes = await stream.getStorageNodes()
expect(storageNodes.length).toBe(1)
expect(storageNodes[0].getAddress()).toBe(storageNodeAddress)
const storedStreamParts = await client.getStreamPartsByStorageNode(storageNodeAddress)
expect(storedStreamParts.length).toBe(1)
expect(storedStreamParts[0].getStreamId()).toBe(stream.id)
expect(storedStreamParts[0].getStreamPartition()).toBe(0)
expect(storedStreamParts.some(
(sp) => (sp.getStreamId() === stream.id) && (sp.getStreamPartition() === 0)
)).toBeTruthy()
})

it('remove', async () => {
const storageNodeAddress = ethers.Wallet.createRandom().address
const storageNodeAddress = client.options.storageNode.address
const stream = await client.createStream()
await stream.addToStorageNode(storageNodeAddress)
await stream.removeFromStorageNode(storageNodeAddress)
const storageNodes = await stream.getStorageNodes()
expect(storageNodes).toHaveLength(0)
const storedStreamParts = await client.getStreamPartsByStorageNode(storageNodeAddress)
expect(storedStreamParts).toHaveLength(0)
expect(storedStreamParts.some(
(sp) => (sp.getStreamId() === stream.id)
)).toBeFalsy()
})
})
}
Expand Down
1 change: 1 addition & 0 deletions test/integration/StreamrClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ describeRepeats('StreamrClient', () => {
requireSignedData,
...opts,
})
await s.addToStorageNode(config.clientOptions.storageNode.address)

expect(s.id).toBeTruthy()
expect(s.name).toEqual(name)
Expand Down
3 changes: 3 additions & 0 deletions test/integration/SubscriberResends.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ describeRepeats('resends', () => {
stream = await client.createStream({
name: uid('stream')
})
await stream.addToStorageNode(config.clientOptions.storageNode.address)
client.debug('connecting before test <<')

publishTestMessages = getPublishTestMessages(client, {
Expand Down Expand Up @@ -128,6 +129,7 @@ describeRepeats('resends', () => {
emptyStream = await client.createStream({
name: uid('stream')
})
await emptyStream.addToStorageNode(config.clientOptions.storageNode.address)

const sub = await subscriber.resend({
streamId: emptyStream.id,
Expand All @@ -147,6 +149,7 @@ describeRepeats('resends', () => {
emptyStream = await client.createStream({
name: uid('stream')
})
await emptyStream.addToStorageNode(config.clientOptions.storageNode.address)

const sub = await subscriber.resendSubscribe({
streamId: emptyStream.id,
Expand Down
1 change: 1 addition & 0 deletions test/integration/Subscription.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ describe('Subscription', () => {
stream = await client.createStream({
name: uid('stream')
})
await stream.addToStorageNode(config.clientOptions.storageNode.address)
await client.connect()
})

Expand Down
5 changes: 5 additions & 0 deletions test/integration/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ module.exports = {
templateMainnetAddress: process.env.DU_TEMPLATE_MAINNET || '0x7bFBAe10AE5b5eF45e2aC396E0E605F6658eF3Bc',
templateSidechainAddress: process.env.DU_TEMPLATE_SIDECHAIN || '0x36afc8c9283CC866b8EB6a61C6e6862a83cd6ee8',
},
storageNode: {
// "broker-node-storage-1" on Docker environment
address: '0xde1112f631486CfC759A50196853011528bC5FA0',
url: 'http://10.200.10.1:8891'
},
sidechain: {
url: process.env.SIDECHAIN_URL || 'http://10.200.10.1:8546',
timeout: toNumber(process.env.TEST_TIMEOUT),
Expand Down
3 changes: 2 additions & 1 deletion test/unit/Config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ describe('Config', () => {
'dataUnion.factoryMainnetAddress',
'dataUnion.factorySidechainAddress',
'dataUnion.templateMainnetAddress',
'dataUnion.templateSidechainAddress'
'dataUnion.templateSidechainAddress',
'storageNode.address'
]
for (const propertyPath of propertyPaths) {
it(propertyPath, () => {
Expand Down