Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/StreamrClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ export default class StreamrClient extends EventEmitter {
return this.publisher.rotateGroupKey(...args)
}

async subscribe(opts: Todo, onMessage: OnMessageCallback) {
async subscribe(opts: Todo, onMessage?: OnMessageCallback) {
let subTask: Todo
let sub: Todo
const hasResend = !!(opts.resend || opts.from || opts.to || opts.last)
Expand Down Expand Up @@ -372,7 +372,7 @@ export default class StreamrClient extends EventEmitter {
await this.subscriber.unsubscribe(opts)
}

async resend(opts: Todo, onMessage: OnMessageCallback) {
async resend(opts: Todo, onMessage?: OnMessageCallback) {
const task = this.subscriber.resend(opts)
if (typeof onMessage !== 'function') {
return task
Expand Down
59 changes: 54 additions & 5 deletions src/stream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,39 @@ export enum StreamOperation {

export type StreamProperties = Todo

export default class Stream {
const VALID_FIELD_TYPES = ['number', 'string', 'boolean', 'list', 'map'] as const

type Field = {
name: string;
type: typeof VALID_FIELD_TYPES[number];
}

function getFieldType(value: any): (Field['type'] | undefined) {
const type = typeof value
switch (true) {
case Array.isArray(value): {
return 'list'
}
case type === 'object': {
return 'map'
}
case (VALID_FIELD_TYPES as ReadonlyArray<string>).includes(type): {
// see https://github.com/microsoft/TypeScript/issues/36275
return type as Field['type']
}
default: {
return undefined
}
}
}

export default class Stream {
// TODO add field definitions for all fields
// @ts-expect-error
id: string
config: {
fields: Field[];
} = { fields: [] }
_client: StreamrClient

constructor(client: StreamrClient, props: StreamProperties) {
Expand Down Expand Up @@ -125,10 +153,31 @@ export default class Stream {
}

async detectFields() {
return authFetch(
getEndpointUrl(this._client.options.restUrl, 'streams', this.id, 'detectFields'),
this._client.session,
)
// Get last message of the stream to be used for field detecting
const sub = await this._client.resend({
stream: this.id,
resend: {
last: 1,
},
})

const receivedMsgs = await sub.collect()

if (!receivedMsgs.length) { return }

const [lastMessage] = receivedMsgs

const fields = Object.entries(lastMessage).map(([name, value]) => {
const type = getFieldType(value)
return !!type && {
name,
type,
}
}).filter(Boolean) as Field[] // see https://github.com/microsoft/TypeScript/issues/30621

// Save field config back to the stream
this.config.fields = fields
await this.update()
}

async addToStorageNode(address: string) {
Expand Down
117 changes: 117 additions & 0 deletions test/integration/Stream.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import StreamrClient from '../../src/StreamrClient'
import { uid, fakePrivateKey, getPublishTestMessages } from '../utils'

import config from './config'

const createClient = (opts = {}) => new StreamrClient({
...config.clientOptions,
auth: {
privateKey: fakePrivateKey(),
},
autoConnect: false,
autoDisconnect: false,
...opts,
})

describe('Stream', () => {
let client
let stream

beforeEach(async () => {
client = createClient()
await client.connect()

stream = await client.createStream({
name: uid('stream-integration-test')
})
})

afterEach(async () => {
await client.disconnect()
})

describe('detectFields()', () => {
it('does detect primitive types', async () => {
const msg = {
number: 123,
boolean: true,
object: {
k: 1,
v: 2,
},
array: [1, 2, 3],
string: 'test',
}
const publishTestMessages = getPublishTestMessages(client, {
streamId: stream.id,
waitForLast: true,
createMessage: () => msg,
})
await publishTestMessages(1)

expect(stream.config.fields).toEqual([])
await stream.detectFields()
const expectedFields = [
{
name: 'number',
type: 'number',
},
{
name: 'boolean',
type: 'boolean',
},
{
name: 'object',
type: 'map',
},
{
name: 'array',
type: 'list',
},
{
name: 'string',
type: 'string',
},
]

expect(stream.config.fields).toEqual(expectedFields)
const loadedStream = await client.getStream(stream.id)
expect(loadedStream.config.fields).toEqual(expectedFields)
})

it('skips unsupported types', async () => {
const msg = {
null: null,
empty: {},
func: () => null,
nonexistent: undefined,
symbol: Symbol('test'),
// TODO: bigint: 10n,
}
const publishTestMessages = getPublishTestMessages(client, {
streamId: stream.id,
waitForLast: true,
createMessage: () => msg,
})
await publishTestMessages(1)

expect(stream.config.fields).toEqual([])
await stream.detectFields()
const expectedFields = [
{
name: 'null',
type: 'map',
},
{
name: 'empty',
type: 'map',
},
]

expect(stream.config.fields).toEqual(expectedFields)

const loadedStream = await client.getStream(stream.id)
expect(loadedStream.config.fields).toEqual(expectedFields)
})
})
})
25 changes: 0 additions & 25 deletions test/integration/StreamEndpoints.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { ethers } from 'ethers'
import { wait } from 'streamr-test-utils'

import StreamrClient from '../../src/StreamrClient'
import { uid } from '../utils'
Expand Down Expand Up @@ -176,30 +175,6 @@ function TestStreamEndpoints(getName) {
})
})

describe.skip('Stream configuration', () => {
it('Stream.detectFields', async () => {
await client.connect()
await client.publish(createdStream.id, {
foo: 'bar',
count: 0,
})
// Need time to propagate to storage
await wait(10000)
const stream = await createdStream.detectFields()
expect(stream.config.fields).toEqual([
{
name: 'foo',
type: 'string',
},
{
name: 'count',
type: 'number',
},
])
await client.disconnect()
}, 15000)
})

describe('Stream permissions', () => {
it('Stream.getPermissions', async () => {
const permissions = await createdStream.getPermissions()
Expand Down