Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ See "Subscription options" for resend options
### Programmatically creating a stream

```js
const stream = await client.getOrCreateStream({
name: 'My awesome stream created via the API',
const stream = await client.createStream({
id: '/foo/bar', // or 0x1234567890123456789012345678901234567890/foo/bar or mydomain.eth/foo/bar
})
console.log(`Stream ${stream.id} has been created!`)

Expand Down Expand Up @@ -328,7 +328,7 @@ All the below functions return a Promise which gets resolved with the result.
| getStream(streamId) | Fetches a stream object from the API. |
| listStreams(query) | Fetches an array of stream objects from the API. For the query params, consult the [API docs](https://api-explorer.streamr.com). |
| getStreamByName(name) | Fetches a stream which exactly matches the given name. |
| createStream(\[properties]) | Creates a stream with the given properties. For more information on the stream properties, consult the [API docs](https://api-explorer.streamr.com). |
| createStream(\[properties]) | Creates a stream with the given properties. For more information on the stream properties, consult the [API docs](https://api-explorer.streamr.com). If you specify `id`, it can be a full streamId or a path (e.g. `/foo/bar` will create a stream with id `<your-etherereum-address>/foo/bar` if you have authenticated with a private key)|
| getOrCreateStream(properties) | Gets a stream with the id or name given in `properties`, or creates it if one is not found. |
| publish(streamId, message, timestamp, partitionKey) | Publishes a new message to the given stream. |

Expand Down
10 changes: 7 additions & 3 deletions src/rest/StreamEndpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import qs from 'qs'
import debugFactory from 'debug'

import { getEndpointUrl } from '../utils'
import { validateOptions } from '../stream/utils'
import { createStreamId, validateOptions } from '../stream/utils'
import { Stream, StreamOperation, StreamProperties } from '../stream'
import { StreamPart } from '../stream/StreamPart'
import { isKeyExchangeStream } from '../stream/KeyExchange'
Expand Down Expand Up @@ -133,18 +133,22 @@ export class StreamEndpoints {

/**
* @category Important
* @param props - if id is specified, it can be full streamId or path
*/
async createStream(props?: Partial<StreamProperties>) {
this.client.debug('createStream %o', {
props,
})

const body = (props?.id !== undefined) ? {
...props,
id: await createStreamId(props.id, () => this.client.getAddress())
} : props
const json = await authFetch<StreamProperties>(
getEndpointUrl(this.client.options.restUrl, 'streams'),
this.client.session,
{
method: 'POST',
body: JSON.stringify(props),
body: JSON.stringify(body),
},
)
return new Stream(this.client, json)
Expand Down
21 changes: 20 additions & 1 deletion src/stream/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { inspect } from 'util'
import { ControlLayer } from 'streamr-client-protocol'

import { pTimeout } from '../utils'
import { Todo } from '../types'
import { EthereumAddress, Todo } from '../types'
import { StreamrClient } from '../StreamrClient'
import { StreamPartDefinition, ValidatedStreamPartDefinition } from '.'

Expand Down Expand Up @@ -199,3 +199,22 @@ export async function waitForRequestResponse(client: StreamrClient, request: Tod
...opts, // e.g. timeout, rejectOnTimeout
})
}

export const createStreamId = async (streamIdOrPath: string, ownerProvider?: () => Promise<EthereumAddress|undefined>) => {
if (streamIdOrPath === undefined) {
throw new Error('Missing stream id')
}

if (!streamIdOrPath.startsWith('/')) {
return streamIdOrPath
}

if (ownerProvider === undefined) {
throw new Error(`Owner provider missing for stream id: ${streamIdOrPath}`)
}
const owner = await ownerProvider()
if (owner === undefined) {
throw new Error(`Owner missing for stream id: ${streamIdOrPath}`)
}
return owner.toLowerCase() + streamIdOrPath
}
55 changes: 47 additions & 8 deletions test/integration/StreamEndpoints.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Stream, StreamOperation } from '../../src/stream'
import { StorageNode } from '../../src/stream/StorageNode'

import { StreamrClient } from '../../src/StreamrClient'
import { uid } from '../utils'
import { uid, fakeAddress } from '../utils'

import config from './config'

Expand All @@ -15,6 +15,7 @@ import config from './config'
function TestStreamEndpoints(getName: () => string) {
let client: StreamrClient
let wallet: Wallet
let createdStreamPath: string
let createdStream: Stream

const createClient = (opts = {}) => new StreamrClient({
Expand All @@ -34,7 +35,9 @@ function TestStreamEndpoints(getName: () => string) {
})

beforeAll(async () => {
createdStreamPath = `/StreamEndpoints-${Date.now()}`
createdStream = await client.createStream({
id: `${wallet.address}${createdStreamPath}`,
name: getName(),
requireSignedData: true,
requireEncryptedData: false,
Expand All @@ -55,6 +58,22 @@ function TestStreamEndpoints(getName: () => string) {
expect(stream.requireEncryptedData).toBe(true)
})

it('valid id', async () => {
const newId = `${wallet.address}/StreamEndpoints-createStream-newId-${Date.now()}`
const newStream = await client.createStream({
id: newId,
})
expect(newStream.id).toEqual(newId)
})

it('valid path', async () => {
const newPath = `/StreamEndpoints-createStream-newPath-${Date.now()}`
const newStream = await client.createStream({
id: newPath,
})
expect(newStream.id).toEqual(`${wallet.address.toLowerCase()}${newPath}`)
})

it('invalid id', () => {
return expect(() => client.createStream({ id: 'invalid.eth/foobar' })).rejects.toThrow(ValidationError)
})
Expand All @@ -68,7 +87,7 @@ function TestStreamEndpoints(getName: () => string) {
})

it('get a non-existing Stream', async () => {
const id = `${wallet.address}/StreamEndpoints-integration-nonexisting-${Date.now()}`
const id = `${wallet.address}/StreamEndpoints-nonexisting-${Date.now()}`
return expect(() => client.getStream(id)).rejects.toThrow(NotFoundError)
})
})
Expand All @@ -81,43 +100,63 @@ function TestStreamEndpoints(getName: () => string) {
})

it('get a non-existing Stream', async () => {
const name = `${wallet.address}/StreamEndpoints-integration-nonexisting-${Date.now()}`
const name = `${wallet.address}/StreamEndpoints-nonexisting-${Date.now()}`
return expect(() => client.getStreamByName(name)).rejects.toThrow(NotFoundError)
})
})

describe('getOrCreate', () => {
it('getOrCreate an existing Stream by name', async () => {
it('existing Stream by name', async () => {
const existingStream = await client.getOrCreateStream({
name: createdStream.name,
})
expect(existingStream.id).toBe(createdStream.id)
expect(existingStream.name).toBe(createdStream.name)
})

it('getOrCreate an existing Stream by id', async () => {
it('existing Stream by id', async () => {
const existingStream = await client.getOrCreateStream({
id: createdStream.id,
})
expect(existingStream.id).toBe(createdStream.id)
expect(existingStream.name).toBe(createdStream.name)
})

it('getOrCreate a new Stream by name', async () => {
it('new Stream by name', async () => {
const newName = uid('stream')
const newStream = await client.getOrCreateStream({
name: newName,
})
expect(newStream.name).toEqual(newName)
})

it('getOrCreate a new Stream by id', async () => {
const newId = `${wallet.address}/StreamEndpoints-integration-${Date.now()}`
it('new Stream by id', async () => {
const newId = `${wallet.address}/StreamEndpoints-getOrCreate-newId-${Date.now()}`
const newStream = await client.getOrCreateStream({
id: newId,
})
expect(newStream.id).toEqual(newId)
})

it('new Stream by path', async () => {
const newPath = `/StreamEndpoints-getOrCreate-newPath-${Date.now()}`
const newStream = await client.getOrCreateStream({
id: newPath,
})
expect(newStream.id).toEqual(`${wallet.address.toLowerCase()}${newPath}`)
})

it('fails if stream prefixed with other users address', async () => {
// can't create streams for other users
const otherAddress = `0x${fakeAddress()}`
const newPath = `/StreamEndpoints-getOrCreate-newPath-${Date.now()}`
// backend should error
await expect(async () => {
await client.getOrCreateStream({
id: `${otherAddress}${newPath}`,
})
}).rejects.toThrow('Validation')
})
})

describe('listStreams', () => {
Expand Down
119 changes: 85 additions & 34 deletions test/unit/StreamUtils.test.ts
Original file line number Diff line number Diff line change
@@ -1,50 +1,101 @@
import { Stream } from '../../src/stream'
import { validateOptions } from '../../src/stream/utils'
import { createStreamId, validateOptions } from '../../src/stream/utils'

describe('Stream utils', () => {

it('no definition', () => {
expect(() => validateOptions(undefined as any)).toThrow()
expect(() => validateOptions(null as any)).toThrow()
expect(() => validateOptions({})).toThrow()
})
describe('validateOptions', () => {

it('string', () => {
expect(validateOptions('foo')).toMatchObject({
streamId: 'foo',
streamPartition: 0,
key: 'foo::0'
it('no definition', () => {
expect(() => validateOptions(undefined as any)).toThrow()
expect(() => validateOptions(null as any)).toThrow()
expect(() => validateOptions({})).toThrow()
})
})

it('object', () => {
expect(validateOptions({ streamId: 'foo' })).toMatchObject({
streamId: 'foo',
streamPartition: 0,
key: 'foo::0'
it('string', () => {
expect(validateOptions('foo')).toMatchObject({
streamId: 'foo',
streamPartition: 0,
key: 'foo::0'
})
})
expect(validateOptions({ streamId: 'foo', streamPartition: 123 })).toMatchObject({
streamId: 'foo',
streamPartition: 123,
key: 'foo::123'

it('object', () => {
expect(validateOptions({ streamId: 'foo' })).toMatchObject({
streamId: 'foo',
streamPartition: 0,
key: 'foo::0'
})
expect(validateOptions({ streamId: 'foo', streamPartition: 123 })).toMatchObject({
streamId: 'foo',
streamPartition: 123,
key: 'foo::123'
})
expect(validateOptions({ id: 'foo', partition: 123 })).toMatchObject({
streamId: 'foo',
streamPartition: 123,
key: 'foo::123'
})
})
expect(validateOptions({ id: 'foo', partition: 123 })).toMatchObject({
streamId: 'foo',
streamPartition: 123,
key: 'foo::123'

it('stream', () => {
const stream = new Stream(undefined as any, {
id: 'foo',
name: 'bar'
})
expect(validateOptions({ stream })).toMatchObject({
streamId: 'foo',
streamPartition: 0,
key: 'foo::0'
})
})

})

it('stream', () => {
const stream = new Stream(undefined as any, {
id: 'foo',
name: 'bar'
describe('createStreamId', () => {
const ownerProvider = () => Promise.resolve('0xaAAAaaaaAA123456789012345678901234567890')

it('path', async () => {
const path = '/foo/BAR'
const actual = await createStreamId(path, ownerProvider)
expect(actual).toBe('0xaaaaaaaaaa123456789012345678901234567890/foo/BAR')
})
expect(validateOptions({ stream })).toMatchObject({
streamId: 'foo',
streamPartition: 0,
key: 'foo::0'

it('path: no owner', () => {
const path = '/foo/BAR'
return expect(createStreamId(path, async () => undefined)).rejects.toThrowError('Owner missing for stream id: /foo/BAR')
})

it('path: no owner provider', () => {
const path = '/foo/BAR'
return expect(createStreamId(path, undefined)).rejects.toThrowError('Owner provider missing for stream id: /foo/BAR')
})
})

it('full: ethereum address', async () => {
const id = '0xbbbbbBbBbB123456789012345678901234567890/foo/BAR'
const actual = await createStreamId(id)
expect(actual).toBe(id)
})

it('full: ENS domain', async () => {
const id = 'example.eth/foo/BAR'
const actual = await createStreamId(id)
expect(actual).toBe(id)
})

it('legacy', async () => {
const id = 'abcdeFGHJI1234567890ab'
const actual = await createStreamId(id)
expect(actual).toBe(id)
})

it('system', async () => {
const id = 'SYSTEM/keyexchange/0xcccccccccc123456789012345678901234567890'
const actual = await createStreamId(id)
expect(actual).toBe(id)
})

it('undefined', () => {
return expect(createStreamId(undefined as any)).rejects.toThrowError('Missing stream id')
})
})
})
4 changes: 4 additions & 0 deletions test/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ export function fakePrivateKey() {
return crypto.randomBytes(32).toString('hex')
}

export function fakeAddress() {
return crypto.randomBytes(32).toString('hex').slice(0, 40)
}

const TEST_REPEATS = (process.env.TEST_REPEATS) ? parseInt(process.env.TEST_REPEATS, 10) : 1

export function describeRepeats(msg: any, fn: any, describeFn = describe) {
Expand Down