Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 6 additions & 138 deletions packages/core/realtime-js/src/lib/serializer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
// This file draws heavily from https://github.com/phoenixframework/phoenix/commit/cf098e9cf7a44ee6479d31d911a97d3c7430c6fe
// License: https://github.com/phoenixframework/phoenix/blob/master/LICENSE.md
import { CHANNEL_EVENTS } from '../lib/constants'

export type Msg<T> = {
join_ref?: string | null
ref?: string | null
Expand All @@ -14,21 +12,14 @@ export default class Serializer {
HEADER_LENGTH = 1
META_LENGTH = 4
USER_BROADCAST_PUSH_META_LENGTH = 5
KINDS = { push: 0, reply: 1, broadcast: 2, userBroadcastPush: 3, userBroadcast: 4 }
KINDS = { userBroadcastPush: 3, userBroadcast: 4 }
BINARY_ENCODING = 0
JSON_ENCODING = 1
BROADCAST = 'broadcast'

encode(
msg: Msg<{ [key: string]: any } | ArrayBuffer>,
callback: (result: ArrayBuffer | string) => any
) {
if (this._isArrayBuffer(msg.payload)) {
return callback(this._binaryEncodePush(msg as Msg<ArrayBuffer>))
}
BROADCAST_EVENT = 'broadcast'

encode(msg: Msg<{ [key: string]: any }>, callback: (result: ArrayBuffer | string) => any) {
if (
msg.event === this.BROADCAST &&
msg.event === this.BROADCAST_EVENT &&
!(msg.payload instanceof ArrayBuffer) &&
typeof msg.payload.event === 'string'
) {
Expand All @@ -41,34 +32,6 @@ export default class Serializer {
return callback(JSON.stringify(payload))
}

private _binaryEncodePush(message: Msg<ArrayBuffer>) {
const { event, topic, payload } = message
const ref = message.ref ?? ''
const joinRef = message.join_ref ?? ''

const metaLength = this.META_LENGTH + joinRef.length + ref.length + topic.length + event.length

const header = new ArrayBuffer(this.HEADER_LENGTH + metaLength)
let view = new DataView(header)
let offset = 0

view.setUint8(offset++, this.KINDS.push) // kind
view.setUint8(offset++, joinRef.length)
view.setUint8(offset++, ref.length)
view.setUint8(offset++, topic.length)
view.setUint8(offset++, event.length)
Array.from(joinRef, (char) => view.setUint8(offset++, char.charCodeAt(0)))
Array.from(ref, (char) => view.setUint8(offset++, char.charCodeAt(0)))
Array.from(topic, (char) => view.setUint8(offset++, char.charCodeAt(0)))
Array.from(event, (char) => view.setUint8(offset++, char.charCodeAt(0)))

var combined = new Uint8Array(header.byteLength + payload.byteLength)
combined.set(new Uint8Array(header), 0)
combined.set(new Uint8Array(payload), header.byteLength)

return combined.buffer
}

private _binaryEncodeUserBroadcastPush(message: Msg<{ event: string } & { [key: string]: any }>) {
if (this._isArrayBuffer(message.payload?.payload)) {
return this._encodeBinaryUserBroadcastPush(message)
Expand Down Expand Up @@ -172,106 +135,11 @@ export default class Serializer {
const kind = view.getUint8(0)
const decoder = new TextDecoder()
switch (kind) {
case this.KINDS.push:
return this._decodePush(buffer, view, decoder)
case this.KINDS.reply:
return this._decodeReply(buffer, view, decoder)
case this.KINDS.broadcast:
return this._decodeBroadcast(buffer, view, decoder)
case this.KINDS.userBroadcast:
return this._decodeUserBroadcast(buffer, view, decoder)
}
}

private _decodePush(
buffer: ArrayBuffer,
view: DataView,
decoder: TextDecoder
): {
join_ref: string
ref: null
topic: string
event: string
payload: { [key: string]: any }
} {
const joinRefSize = view.getUint8(1)
const topicSize = view.getUint8(2)
const eventSize = view.getUint8(3)
let offset = this.HEADER_LENGTH + this.META_LENGTH - 1 // pushes have no ref
const joinRef = decoder.decode(buffer.slice(offset, offset + joinRefSize))
offset = offset + joinRefSize
const topic = decoder.decode(buffer.slice(offset, offset + topicSize))
offset = offset + topicSize
const event = decoder.decode(buffer.slice(offset, offset + eventSize))
offset = offset + eventSize
const data = JSON.parse(decoder.decode(buffer.slice(offset, buffer.byteLength)))
return {
join_ref: joinRef,
ref: null,
topic: topic,
event: event,
payload: data,
}
}

private _decodeReply(
buffer: ArrayBuffer,
view: DataView,
decoder: TextDecoder
): {
join_ref: string
ref: string
topic: string
event: CHANNEL_EVENTS.reply
payload: { status: string; response: { [key: string]: any } }
} {
const joinRefSize = view.getUint8(1)
const refSize = view.getUint8(2)
const topicSize = view.getUint8(3)
const eventSize = view.getUint8(4)
let offset = this.HEADER_LENGTH + this.META_LENGTH
const joinRef = decoder.decode(buffer.slice(offset, offset + joinRefSize))
offset = offset + joinRefSize
const ref = decoder.decode(buffer.slice(offset, offset + refSize))
offset = offset + refSize
const topic = decoder.decode(buffer.slice(offset, offset + topicSize))
offset = offset + topicSize
const event = decoder.decode(buffer.slice(offset, offset + eventSize))
offset = offset + eventSize
const data = JSON.parse(decoder.decode(buffer.slice(offset, buffer.byteLength)))
const payload = { status: event, response: data }
return {
join_ref: joinRef,
ref: ref,
topic: topic,
event: CHANNEL_EVENTS.reply,
payload: payload,
}
}

private _decodeBroadcast(
buffer: ArrayBuffer,
view: DataView,
decoder: TextDecoder
): {
join_ref: null
ref: null
topic: string
event: string
payload: { [key: string]: any }
} {
const topicSize = view.getUint8(1)
const eventSize = view.getUint8(2)
let offset = this.HEADER_LENGTH + 2
const topic = decoder.decode(buffer.slice(offset, offset + topicSize))
offset = offset + topicSize
const event = decoder.decode(buffer.slice(offset, offset + eventSize))
offset = offset + eventSize
const data = JSON.parse(decoder.decode(buffer.slice(offset, buffer.byteLength)))

return { join_ref: null, ref: null, topic: topic, event: event, payload: data }
}

private _decodeUserBroadcast(
buffer: ArrayBuffer,
view: DataView,
Expand Down Expand Up @@ -301,7 +169,7 @@ export default class Serializer {
payloadEncoding === this.JSON_ENCODING ? JSON.parse(decoder.decode(payload)) : payload

const data: { [key: string]: any } = {
type: this.BROADCAST,
type: this.BROADCAST_EVENT,
event: userEvent,
payload: parsedPayload,
}
Expand All @@ -311,7 +179,7 @@ export default class Serializer {
data['meta'] = JSON.parse(metadata)
}

return { join_ref: null, ref: null, topic: topic, event: this.BROADCAST, payload: data }
return { join_ref: null, ref: null, topic: topic, event: this.BROADCAST_EVENT, payload: data }
}

private _isArrayBuffer(buffer: any): boolean {
Expand Down
94 changes: 0 additions & 94 deletions packages/core/realtime-js/test/serializer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,57 +67,6 @@ describe('JSON', () => {
})

describe('binary', () => {
it('encodes push', async () => {
let buffer = binPayload()
let bin = '\0\x01\x01\x01\x0101te\x01\x04'
const result = await encodeAsync(serializer, {
join_ref: '0',
ref: '1',
topic: 't',
event: 'e',
payload: buffer,
})
expect(decoder.decode(result as ArrayBuffer)).toBe(bin)
})

it('encodes push with undefined join_ref and ref', async () => {
let buffer = binPayload()
let bin = '\0\x00\x00\x01\x01te\x01\x04'
const result = await encodeAsync(serializer, {
join_ref: undefined,
ref: undefined,
topic: 't',
event: 'e',
payload: buffer,
})
expect(decoder.decode(result as ArrayBuffer)).toBe(bin)
})

it('encodes push with no join_ref no ref', async () => {
let buffer = binPayload()
let bin = '\0\x00\x00\x01\x01te\x01\x04'
const result = await encodeAsync(serializer, {
topic: 't',
event: 'e',
payload: buffer,
})
expect(decoder.decode(result as ArrayBuffer)).toBe(bin)
})

it('encodes variable length segments', async () => {
let buffer = binPayload()
let bin = '\0\x02\x01\x03\x02101topev\x01\x04'

const result = await encodeAsync(serializer, {
join_ref: '10',
ref: '1',
topic: 'top',
event: 'ev',
payload: buffer,
})
expect(decoder.decode(result as ArrayBuffer)).toBe(bin)
})

it('encodes user broadcast push with JSON payload', async () => {
// 3 -> user_broadcast_push
// 2 join_ref length
Expand Down Expand Up @@ -226,49 +175,6 @@ describe('binary', () => {
expect(decoder.decode(result as ArrayBuffer)).toBe(bin)
})

it('decodes push payload as JSON', async () => {
let bin = '\0\x03\x03\n123topsome-event{"a":"b"}'
let buffer = new TextEncoder().encode(bin).buffer

const result = await decodeAsync(serializer, buffer)

expect(result.join_ref).toBe('123')
expect(result.ref).toBeNull()
expect(result.topic).toBe('top')
expect(result.event).toBe('some-event')
expect(result.payload.constructor).toBe(Object)
expect(result.payload).toStrictEqual({ a: 'b' })
})

it('decodes reply payload as JSON', async () => {
let bin = '\x01\x03\x02\x03\x0210012topok{"a":"b"}'
let buffer = new TextEncoder().encode(bin).buffer

const result = await decodeAsync(serializer, buffer)

expect(result.join_ref).toBe('100')
expect(result.ref).toBe('12')
expect(result.topic).toBe('top')
expect(result.event).toBe('phx_reply')
expect(result.payload.status).toBe('ok')
expect(result.payload.response.constructor).toBe(Object)
expect(result.payload.response).toStrictEqual({ a: 'b' })
})

it('decodes broadcast payload as JSON', async () => {
let bin = '\x02\x03\ntopsome-event{"a":"b"}'
let buffer = new TextEncoder().encode(bin).buffer

const result = await decodeAsync(serializer, buffer)

expect(result.join_ref).toBeNull()
expect(result.ref).toBeNull()
expect(result.topic).toBe('top')
expect(result.event).toBe('some-event')
expect(result.payload.constructor).toBe(Object)
expect(result.payload).toStrictEqual({ a: 'b' })
})

it('decodes user broadcast with JSON payload and no metadata', async () => {
// 4 -> user_broadcast
// 3 for topic length
Expand Down
Loading