diff --git a/README.md b/README.md index 46be08f20..f2a088ead 100644 --- a/README.md +++ b/README.md @@ -303,7 +303,7 @@ All the below functions return a Promise which gets resolved with the result. | getStream(streamId) | Fetches a stream object from the API. | | listStreams(query) | Fetches an array of stream objects from the API. For the query params, consult the [API docs](https://api-explorer.streamr.com). | | getStreamByName(name) | Fetches a stream which exactly matches the given name. | -| createStream(properties) | Creates a stream with the given properties. For more information on the stream properties, consult the [API docs](https://api-explorer.streamr.com). | +| createStream(\[properties]) | Creates a stream with the given properties. For more information on the stream properties, consult the [API docs](https://api-explorer.streamr.com). | | getOrCreateStream(properties) | Gets a stream with the id or name given in `properties`, or creates it if one is not found. | | publish(streamId, message, timestamp, partitionKey) | Publishes a new message to the given stream. | diff --git a/src/rest/ErrorCode.ts b/src/rest/ErrorCode.ts new file mode 100644 index 000000000..f7f1b95a2 --- /dev/null +++ b/src/rest/ErrorCode.ts @@ -0,0 +1,20 @@ +export enum ErrorCode { + NOT_FOUND = 'NOT_FOUND', + VALIDATION_ERROR = 'VALIDATION_ERROR', + UNKNOWN = 'UNKNOWN' +} + +export const parseErrorCode = (body: string) => { + let json + try { + json = JSON.parse(body) + } catch (err) { + return ErrorCode.UNKNOWN + } + const code = json.code + const keys = Object.keys(ErrorCode) + if (keys.includes(code)) { + return code as ErrorCode + } + return ErrorCode.UNKNOWN +} diff --git a/src/rest/StreamEndpoints.ts b/src/rest/StreamEndpoints.ts index 01c1c5a6a..3960bdfaa 100644 --- a/src/rest/StreamEndpoints.ts +++ b/src/rest/StreamEndpoints.ts @@ -10,9 +10,10 @@ import Stream, { StreamOperation, StreamProperties } from '../stream' import StreamPart from '../stream/StreamPart' import { isKeyExchangeStream } from '../stream/KeyExchange' -import authFetch from './authFetch' +import authFetch, { AuthFetchError } from './authFetch' import { Todo } from '../types' import StreamrClient from '../StreamrClient' +import { ErrorCode } from './ErrorCode' // TODO change this import when streamr-client-protocol exports StreamMessage type or the enums types directly import { ContentType, EncryptionType, SignatureType, StreamMessageType } from 'streamr-client-protocol/dist/src/protocol/message_layer/StreamMessage' @@ -97,18 +98,11 @@ export class StreamEndpoints { } const url = getEndpointUrl(this.client.options.restUrl, 'streams', streamId) - try { - const json = await authFetch(url, this.client.session) - return new Stream(this.client, json) - } catch (e) { - if (e.response && e.response.status === 404) { - return undefined - } - throw e - } + const json = await authFetch(url, this.client.session) + return new Stream(this.client, json) } - async listStreams(query: StreamListQuery = {}) { + async listStreams(query: StreamListQuery = {}): Promise { this.client.debug('listStreams %o', { query, }) @@ -126,10 +120,10 @@ export class StreamEndpoints { // @ts-expect-error public: false, }) - return json[0] ? new Stream(this.client, json[0]) : undefined + return json[0] ? new Stream(this.client, json[0]) : Promise.reject(new AuthFetchError('', undefined, undefined, ErrorCode.NOT_FOUND)) } - async createStream(props: StreamProperties) { + async createStream(props?: StreamProperties) { this.client.debug('createStream %o', { props, }) @@ -142,34 +136,31 @@ export class StreamEndpoints { body: JSON.stringify(props), }, ) - return json ? new Stream(this.client, json) : undefined + return new Stream(this.client, json) } async getOrCreateStream(props: { id?: string, name?: string }) { this.client.debug('getOrCreateStream %o', { props, }) - let json: any - // Try looking up the stream by id or name, whichever is defined - if (props.id) { - json = await this.getStream(props.id) - } else if (props.name) { - json = await this.getStreamByName(props.name) - } - - // If not found, try creating the stream - if (!json) { - json = await this.createStream(props) - debug('Created stream: %s (%s)', props.name, json.id) + try { + if (props.id) { + const stream = await this.getStream(props.id) + return stream + } + const stream = await this.getStreamByName(props.name!) + return stream + } catch (err) { + const isNotFoundError = (err instanceof AuthFetchError) && (err.errorCode === ErrorCode.NOT_FOUND) + if (!isNotFoundError) { + throw err + } } - // If still nothing, throw - if (!json) { - throw new Error(`Unable to find or create stream: ${props.name || props.id}`) - } else { - return new Stream(this.client, json) - } + const stream = await this.createStream(props) + debug('Created stream: %s (%s)', props.name, stream.id) + return stream } async getStreamPublishers(streamId: string) { diff --git a/src/rest/authFetch.ts b/src/rest/authFetch.ts index 55083faf4..16a299bad 100644 --- a/src/rest/authFetch.ts +++ b/src/rest/authFetch.ts @@ -2,6 +2,7 @@ import fetch, { Response } from 'node-fetch' import Debug from 'debug' import { getVersionString } from '../utils' +import { ErrorCode, parseErrorCode } from './ErrorCode' import Session from '../Session' export const DEFAULT_HEADERS = { @@ -9,15 +10,17 @@ export const DEFAULT_HEADERS = { } export class AuthFetchError extends Error { - response: Response + response?: Response body?: any + errorCode?: ErrorCode - constructor(message: string, response: Response, body?: any) { + constructor(message: string, response?: Response, body?: any, errorCode?: ErrorCode) { // add leading space if there is a body set const bodyMessage = body ? ` ${(typeof body === 'string' ? body : JSON.stringify(body).slice(0, 1024))}...` : '' super(message + bodyMessage) this.response = response this.body = body + this.errorCode = errorCode if (Error.captureStackTrace) { Error.captureStackTrace(this, this.constructor) @@ -75,6 +78,6 @@ export default async function authFetch(url: string, session?: return authFetch(url, session, options, true) } else { debug('%d %s – failed', id, url) - throw new AuthFetchError(`Request ${id} to ${url} returned with error code ${response.status}.`, response, body) + throw new AuthFetchError(`Request ${id} to ${url} returned with error code ${response.status}.`, response, body, parseErrorCode(body)) } } diff --git a/test/integration/StreamEndpoints.test.js b/test/integration/StreamEndpoints.test.js index 6be2ee3c9..0fd2ebf9e 100644 --- a/test/integration/StreamEndpoints.test.js +++ b/test/integration/StreamEndpoints.test.js @@ -51,6 +51,10 @@ function TestStreamEndpoints(getName) { expect(stream.requireSignedData).toBe(true) expect(stream.requireEncryptedData).toBe(true) }) + + it('invalid id', () => { + return expect(() => client.createStream({ id: 'invalid.eth/foobar' })).rejects.toThrow() + }) }) describe('getStream', () => { @@ -62,13 +66,25 @@ function TestStreamEndpoints(getName) { it('get a non-existing Stream', async () => { const id = `${wallet.address}/StreamEndpoints-integration-nonexisting-${Date.now()}` - const stream = await client.getStream(id) - expect(stream).toBe(undefined) + return expect(() => client.getStream(id)).rejects.toThrow() + }) + }) + + describe('getStreamByName', () => { + it('get an existing Stream', async () => { + const stream = await client.createStream() + const existingStream = await client.getStreamByName(stream.name) + expect(existingStream.id).toEqual(stream.id) + }) + + it('get a non-existing Stream', async () => { + const name = `${wallet.address}/StreamEndpoints-integration-nonexisting-${Date.now()}` + return expect(() => client.getStreamByName(name)).rejects.toThrow() }) }) describe('getOrCreate', () => { - it('getOrCreate an existing Stream', async () => { + it('getOrCreate an existing Stream by name', async () => { const existingStream = await client.getOrCreateStream({ name: createdStream.name, }) @@ -76,12 +92,19 @@ function TestStreamEndpoints(getName) { expect(existingStream.name).toBe(createdStream.name) }) + it('getOrCreate an existing Stream by id', async () => { + const existingStream = await client.getOrCreateStream({ + id: createdStream.id, + }) + expect(existingStream.id).toBe(createdStream.id) + expect(existingStream.name).toBe(createdStream.name) + }) + it('getOrCreate a new Stream by name', async () => { const newName = uid('stream') const newStream = await client.getOrCreateStream({ name: newName, }) - expect(newStream.name).toEqual(newName) }) @@ -90,7 +113,6 @@ function TestStreamEndpoints(getName) { const newStream = await client.getOrCreateStream({ id: newId, }) - expect(newStream.id).toEqual(newId) }) }) @@ -201,7 +223,7 @@ function TestStreamEndpoints(getName) { describe('Stream deletion', () => { it('Stream.delete', async () => { await createdStream.delete() - expect(await client.getStream(createdStream.id)).toBe(undefined) + return expect(() => client.getStream(createdStream.id)).rejects.toThrow() }) }) diff --git a/test/utils.ts b/test/utils.ts index f260ca07b..3c068aba9 100644 --- a/test/utils.ts +++ b/test/utils.ts @@ -237,7 +237,7 @@ export function getPublishTestMessages(client: StreamrClient, defaultOpts = {}) export const createMockAddress = () => '0x000000000000000000000000000' + Date.now() -export const createClient = (providerSidechain: providers.JsonRpcProvider) => { +export const createClient = (providerSidechain?: providers.JsonRpcProvider) => { const wallet = new Wallet(`0x100000000000000000000000000000000000000012300000001${Date.now()}`, providerSidechain) return new StreamrClient({ ...config.clientOptions,