diff --git a/README.md b/README.md index 05bbe5463..57c9f7293 100644 --- a/README.md +++ b/README.md @@ -94,8 +94,8 @@ See "Subscription options" for resend options ### Programmatically creating a stream ```js -const stream = await client.getOrCreateStream({ - name: 'My awesome stream created via the API', +const stream = await client.createStream({ + id: '/foo/bar', // or 0x1234567890123456789012345678901234567890/foo/bar or mydomain.eth/foo/bar }) console.log(`Stream ${stream.id} has been created!`) @@ -328,7 +328,7 @@ All the below functions return a Promise which gets resolved with the result. | getStream(streamId) | Fetches a stream object from the API. | | listStreams(query) | Fetches an array of stream objects from the API. For the query params, consult the [API docs](https://api-explorer.streamr.com). | | getStreamByName(name) | Fetches a stream which exactly matches the given name. | -| createStream(\[properties]) | Creates a stream with the given properties. For more information on the stream properties, consult the [API docs](https://api-explorer.streamr.com). | +| createStream(\[properties]) | Creates a stream with the given properties. For more information on the stream properties, consult the [API docs](https://api-explorer.streamr.com). If you specify `id`, it can be a full streamId or a path (e.g. `/foo/bar` will create a stream with id `/foo/bar` if you have authenticated with a private key)| | getOrCreateStream(properties) | Gets a stream with the id or name given in `properties`, or creates it if one is not found. | | publish(streamId, message, timestamp, partitionKey) | Publishes a new message to the given stream. | diff --git a/src/rest/StreamEndpoints.ts b/src/rest/StreamEndpoints.ts index ca237af00..d4920d2d1 100644 --- a/src/rest/StreamEndpoints.ts +++ b/src/rest/StreamEndpoints.ts @@ -5,7 +5,7 @@ import qs from 'qs' import debugFactory from 'debug' import { getEndpointUrl } from '../utils' -import { validateOptions } from '../stream/utils' +import { createStreamId, validateOptions } from '../stream/utils' import { Stream, StreamOperation, StreamProperties } from '../stream' import { StreamPart } from '../stream/StreamPart' import { isKeyExchangeStream } from '../stream/KeyExchange' @@ -133,18 +133,22 @@ export class StreamEndpoints { /** * @category Important + * @param props - if id is specified, it can be full streamId or path */ async createStream(props?: Partial) { this.client.debug('createStream %o', { props, }) - + const body = (props?.id !== undefined) ? { + ...props, + id: await createStreamId(props.id, () => this.client.getAddress()) + } : props const json = await authFetch( getEndpointUrl(this.client.options.restUrl, 'streams'), this.client.session, { method: 'POST', - body: JSON.stringify(props), + body: JSON.stringify(body), }, ) return new Stream(this.client, json) diff --git a/src/stream/utils.ts b/src/stream/utils.ts index 0813c4540..5ba2f0cc4 100644 --- a/src/stream/utils.ts +++ b/src/stream/utils.ts @@ -7,7 +7,7 @@ import { inspect } from 'util' import { ControlLayer } from 'streamr-client-protocol' import { pTimeout } from '../utils' -import { Todo } from '../types' +import { EthereumAddress, Todo } from '../types' import { StreamrClient } from '../StreamrClient' import { StreamPartDefinition, ValidatedStreamPartDefinition } from '.' @@ -199,3 +199,22 @@ export async function waitForRequestResponse(client: StreamrClient, request: Tod ...opts, // e.g. timeout, rejectOnTimeout }) } + +export const createStreamId = async (streamIdOrPath: string, ownerProvider?: () => Promise) => { + if (streamIdOrPath === undefined) { + throw new Error('Missing stream id') + } + + if (!streamIdOrPath.startsWith('/')) { + return streamIdOrPath + } + + if (ownerProvider === undefined) { + throw new Error(`Owner provider missing for stream id: ${streamIdOrPath}`) + } + const owner = await ownerProvider() + if (owner === undefined) { + throw new Error(`Owner missing for stream id: ${streamIdOrPath}`) + } + return owner.toLowerCase() + streamIdOrPath +} diff --git a/test/integration/StreamEndpoints.test.ts b/test/integration/StreamEndpoints.test.ts index 892526b98..97e45400d 100644 --- a/test/integration/StreamEndpoints.test.ts +++ b/test/integration/StreamEndpoints.test.ts @@ -4,7 +4,7 @@ import { Stream, StreamOperation } from '../../src/stream' import { StorageNode } from '../../src/stream/StorageNode' import { StreamrClient } from '../../src/StreamrClient' -import { uid } from '../utils' +import { uid, fakeAddress } from '../utils' import config from './config' @@ -15,6 +15,7 @@ import config from './config' function TestStreamEndpoints(getName: () => string) { let client: StreamrClient let wallet: Wallet + let createdStreamPath: string let createdStream: Stream const createClient = (opts = {}) => new StreamrClient({ @@ -34,7 +35,9 @@ function TestStreamEndpoints(getName: () => string) { }) beforeAll(async () => { + createdStreamPath = `/StreamEndpoints-${Date.now()}` createdStream = await client.createStream({ + id: `${wallet.address}${createdStreamPath}`, name: getName(), requireSignedData: true, requireEncryptedData: false, @@ -55,6 +58,22 @@ function TestStreamEndpoints(getName: () => string) { expect(stream.requireEncryptedData).toBe(true) }) + it('valid id', async () => { + const newId = `${wallet.address}/StreamEndpoints-createStream-newId-${Date.now()}` + const newStream = await client.createStream({ + id: newId, + }) + expect(newStream.id).toEqual(newId) + }) + + it('valid path', async () => { + const newPath = `/StreamEndpoints-createStream-newPath-${Date.now()}` + const newStream = await client.createStream({ + id: newPath, + }) + expect(newStream.id).toEqual(`${wallet.address.toLowerCase()}${newPath}`) + }) + it('invalid id', () => { return expect(() => client.createStream({ id: 'invalid.eth/foobar' })).rejects.toThrow(ValidationError) }) @@ -68,7 +87,7 @@ function TestStreamEndpoints(getName: () => string) { }) it('get a non-existing Stream', async () => { - const id = `${wallet.address}/StreamEndpoints-integration-nonexisting-${Date.now()}` + const id = `${wallet.address}/StreamEndpoints-nonexisting-${Date.now()}` return expect(() => client.getStream(id)).rejects.toThrow(NotFoundError) }) }) @@ -81,13 +100,13 @@ function TestStreamEndpoints(getName: () => string) { }) it('get a non-existing Stream', async () => { - const name = `${wallet.address}/StreamEndpoints-integration-nonexisting-${Date.now()}` + const name = `${wallet.address}/StreamEndpoints-nonexisting-${Date.now()}` return expect(() => client.getStreamByName(name)).rejects.toThrow(NotFoundError) }) }) describe('getOrCreate', () => { - it('getOrCreate an existing Stream by name', async () => { + it('existing Stream by name', async () => { const existingStream = await client.getOrCreateStream({ name: createdStream.name, }) @@ -95,7 +114,7 @@ function TestStreamEndpoints(getName: () => string) { expect(existingStream.name).toBe(createdStream.name) }) - it('getOrCreate an existing Stream by id', async () => { + it('existing Stream by id', async () => { const existingStream = await client.getOrCreateStream({ id: createdStream.id, }) @@ -103,7 +122,7 @@ function TestStreamEndpoints(getName: () => string) { expect(existingStream.name).toBe(createdStream.name) }) - it('getOrCreate a new Stream by name', async () => { + it('new Stream by name', async () => { const newName = uid('stream') const newStream = await client.getOrCreateStream({ name: newName, @@ -111,13 +130,33 @@ function TestStreamEndpoints(getName: () => string) { expect(newStream.name).toEqual(newName) }) - it('getOrCreate a new Stream by id', async () => { - const newId = `${wallet.address}/StreamEndpoints-integration-${Date.now()}` + it('new Stream by id', async () => { + const newId = `${wallet.address}/StreamEndpoints-getOrCreate-newId-${Date.now()}` const newStream = await client.getOrCreateStream({ id: newId, }) expect(newStream.id).toEqual(newId) }) + + it('new Stream by path', async () => { + const newPath = `/StreamEndpoints-getOrCreate-newPath-${Date.now()}` + const newStream = await client.getOrCreateStream({ + id: newPath, + }) + expect(newStream.id).toEqual(`${wallet.address.toLowerCase()}${newPath}`) + }) + + it('fails if stream prefixed with other users address', async () => { + // can't create streams for other users + const otherAddress = `0x${fakeAddress()}` + const newPath = `/StreamEndpoints-getOrCreate-newPath-${Date.now()}` + // backend should error + await expect(async () => { + await client.getOrCreateStream({ + id: `${otherAddress}${newPath}`, + }) + }).rejects.toThrow('Validation') + }) }) describe('listStreams', () => { diff --git a/test/unit/StreamUtils.test.ts b/test/unit/StreamUtils.test.ts index 3cb5360aa..f4941eee4 100644 --- a/test/unit/StreamUtils.test.ts +++ b/test/unit/StreamUtils.test.ts @@ -1,50 +1,101 @@ import { Stream } from '../../src/stream' -import { validateOptions } from '../../src/stream/utils' +import { createStreamId, validateOptions } from '../../src/stream/utils' describe('Stream utils', () => { - it('no definition', () => { - expect(() => validateOptions(undefined as any)).toThrow() - expect(() => validateOptions(null as any)).toThrow() - expect(() => validateOptions({})).toThrow() - }) + describe('validateOptions', () => { - it('string', () => { - expect(validateOptions('foo')).toMatchObject({ - streamId: 'foo', - streamPartition: 0, - key: 'foo::0' + it('no definition', () => { + expect(() => validateOptions(undefined as any)).toThrow() + expect(() => validateOptions(null as any)).toThrow() + expect(() => validateOptions({})).toThrow() }) - }) - it('object', () => { - expect(validateOptions({ streamId: 'foo' })).toMatchObject({ - streamId: 'foo', - streamPartition: 0, - key: 'foo::0' + it('string', () => { + expect(validateOptions('foo')).toMatchObject({ + streamId: 'foo', + streamPartition: 0, + key: 'foo::0' + }) }) - expect(validateOptions({ streamId: 'foo', streamPartition: 123 })).toMatchObject({ - streamId: 'foo', - streamPartition: 123, - key: 'foo::123' + + it('object', () => { + expect(validateOptions({ streamId: 'foo' })).toMatchObject({ + streamId: 'foo', + streamPartition: 0, + key: 'foo::0' + }) + expect(validateOptions({ streamId: 'foo', streamPartition: 123 })).toMatchObject({ + streamId: 'foo', + streamPartition: 123, + key: 'foo::123' + }) + expect(validateOptions({ id: 'foo', partition: 123 })).toMatchObject({ + streamId: 'foo', + streamPartition: 123, + key: 'foo::123' + }) }) - expect(validateOptions({ id: 'foo', partition: 123 })).toMatchObject({ - streamId: 'foo', - streamPartition: 123, - key: 'foo::123' + + it('stream', () => { + const stream = new Stream(undefined as any, { + id: 'foo', + name: 'bar' + }) + expect(validateOptions({ stream })).toMatchObject({ + streamId: 'foo', + streamPartition: 0, + key: 'foo::0' + }) }) + }) - it('stream', () => { - const stream = new Stream(undefined as any, { - id: 'foo', - name: 'bar' + describe('createStreamId', () => { + const ownerProvider = () => Promise.resolve('0xaAAAaaaaAA123456789012345678901234567890') + + it('path', async () => { + const path = '/foo/BAR' + const actual = await createStreamId(path, ownerProvider) + expect(actual).toBe('0xaaaaaaaaaa123456789012345678901234567890/foo/BAR') }) - expect(validateOptions({ stream })).toMatchObject({ - streamId: 'foo', - streamPartition: 0, - key: 'foo::0' + + it('path: no owner', () => { + const path = '/foo/BAR' + return expect(createStreamId(path, async () => undefined)).rejects.toThrowError('Owner missing for stream id: /foo/BAR') + }) + + it('path: no owner provider', () => { + const path = '/foo/BAR' + return expect(createStreamId(path, undefined)).rejects.toThrowError('Owner provider missing for stream id: /foo/BAR') }) - }) + it('full: ethereum address', async () => { + const id = '0xbbbbbBbBbB123456789012345678901234567890/foo/BAR' + const actual = await createStreamId(id) + expect(actual).toBe(id) + }) + + it('full: ENS domain', async () => { + const id = 'example.eth/foo/BAR' + const actual = await createStreamId(id) + expect(actual).toBe(id) + }) + + it('legacy', async () => { + const id = 'abcdeFGHJI1234567890ab' + const actual = await createStreamId(id) + expect(actual).toBe(id) + }) + + it('system', async () => { + const id = 'SYSTEM/keyexchange/0xcccccccccc123456789012345678901234567890' + const actual = await createStreamId(id) + expect(actual).toBe(id) + }) + + it('undefined', () => { + return expect(createStreamId(undefined as any)).rejects.toThrowError('Missing stream id') + }) + }) }) diff --git a/test/utils.ts b/test/utils.ts index 7e817b76c..3ed8a4d77 100644 --- a/test/utils.ts +++ b/test/utils.ts @@ -17,6 +17,10 @@ export function fakePrivateKey() { return crypto.randomBytes(32).toString('hex') } +export function fakeAddress() { + return crypto.randomBytes(32).toString('hex').slice(0, 40) +} + const TEST_REPEATS = (process.env.TEST_REPEATS) ? parseInt(process.env.TEST_REPEATS, 10) : 1 export function describeRepeats(msg: any, fn: any, describeFn = describe) {