Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 56 additions & 41 deletions packages/core/realtime-js/src/lib/serializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ export type Msg<T> = {

export default class Serializer {
HEADER_LENGTH = 1
META_LENGTH = 4
USER_BROADCAST_PUSH_META_LENGTH = 5
USER_BROADCAST_PUSH_META_LENGTH = 6
KINDS = { userBroadcastPush: 3, userBroadcast: 4 }
BINARY_ENCODING = 0
JSON_ENCODING = 1
BROADCAST_EVENT = 'broadcast'

allowedMetadataKeys: string[] = []

constructor(allowedMetadataKeys?: string[] | null) {
this.allowedMetadataKeys = allowedMetadataKeys ?? []
}

encode(msg: Msg<{ [key: string]: any }>, callback: (result: ArrayBuffer | string) => any) {
if (
msg.event === this.BROADCAST_EVENT &&
Expand All @@ -41,57 +46,58 @@ export default class Serializer {
}

private _encodeBinaryUserBroadcastPush(message: Msg<{ event: string } & { [key: string]: any }>) {
const topic = message.topic
const ref = message.ref ?? ''
const joinRef = message.join_ref ?? ''
const userEvent = message.payload.event
const userPayload = message.payload?.payload ?? new ArrayBuffer(0)

const metaLength =
this.USER_BROADCAST_PUSH_META_LENGTH +
joinRef.length +
ref.length +
topic.length +
userEvent.length

const header = new ArrayBuffer(this.HEADER_LENGTH + metaLength)
let view = new DataView(header)
let offset = 0

view.setUint8(offset++, this.KINDS.userBroadcastPush) // kind
view.setUint8(offset++, joinRef.length)
view.setUint8(offset++, ref.length)
view.setUint8(offset++, topic.length)
view.setUint8(offset++, userEvent.length)
view.setUint8(offset++, this.BINARY_ENCODING)
Array.from(joinRef, (char) => view.setUint8(offset++, char.charCodeAt(0)))
Array.from(ref, (char) => view.setUint8(offset++, char.charCodeAt(0)))
Array.from(topic, (char) => view.setUint8(offset++, char.charCodeAt(0)))
Array.from(userEvent, (char) => view.setUint8(offset++, char.charCodeAt(0)))

var combined = new Uint8Array(header.byteLength + userPayload.byteLength)
combined.set(new Uint8Array(header), 0)
combined.set(new Uint8Array(userPayload), header.byteLength)

return combined.buffer
return this._encodeUserBroadcastPush(message, this.BINARY_ENCODING, userPayload)
}

private _encodeJsonUserBroadcastPush(message: Msg<{ event: string } & { [key: string]: any }>) {
const userPayload = message.payload?.payload ?? {}
const encoder = new TextEncoder()
const encodedUserPayload = encoder.encode(JSON.stringify(userPayload)).buffer
return this._encodeUserBroadcastPush(message, this.JSON_ENCODING, encodedUserPayload)
}

private _encodeUserBroadcastPush(
message: Msg<{ event: string } & { [key: string]: any }>,
encodingType: number,
encodedPayload: ArrayBuffer
) {
const topic = message.topic
const ref = message.ref ?? ''
const joinRef = message.join_ref ?? ''
const userEvent = message.payload.event
const userPayload = message.payload?.payload ?? {}

const encoder = new TextEncoder() // Encodes to UTF-8
const encodedUserPayload = encoder.encode(JSON.stringify(userPayload)).buffer
// Filter metadata based on allowed keys
const rest = this.allowedMetadataKeys
? this._pick(message.payload, this.allowedMetadataKeys)
: {}

const metadata = Object.keys(rest).length === 0 ? '' : JSON.stringify(rest)

// Validate lengths don't exceed uint8 max value (255)
if (joinRef.length > 255) {
throw new Error(`joinRef length ${joinRef.length} exceeds maximum of 255`)
}
if (ref.length > 255) {
throw new Error(`ref length ${ref.length} exceeds maximum of 255`)
}
if (topic.length > 255) {
throw new Error(`topic length ${topic.length} exceeds maximum of 255`)
}
if (userEvent.length > 255) {
throw new Error(`userEvent length ${userEvent.length} exceeds maximum of 255`)
}
if (metadata.length > 255) {
throw new Error(`metadata length ${metadata.length} exceeds maximum of 255`)
}

const metaLength =
this.USER_BROADCAST_PUSH_META_LENGTH +
joinRef.length +
ref.length +
topic.length +
userEvent.length
userEvent.length +
metadata.length

const header = new ArrayBuffer(this.HEADER_LENGTH + metaLength)
let view = new DataView(header)
Expand All @@ -102,15 +108,17 @@ export default class Serializer {
view.setUint8(offset++, ref.length)
view.setUint8(offset++, topic.length)
view.setUint8(offset++, userEvent.length)
view.setUint8(offset++, this.JSON_ENCODING)
view.setUint8(offset++, metadata.length)
view.setUint8(offset++, encodingType)
Array.from(joinRef, (char) => view.setUint8(offset++, char.charCodeAt(0)))
Array.from(ref, (char) => view.setUint8(offset++, char.charCodeAt(0)))
Array.from(topic, (char) => view.setUint8(offset++, char.charCodeAt(0)))
Array.from(userEvent, (char) => view.setUint8(offset++, char.charCodeAt(0)))
Array.from(metadata, (char) => view.setUint8(offset++, char.charCodeAt(0)))

var combined = new Uint8Array(header.byteLength + encodedUserPayload.byteLength)
var combined = new Uint8Array(header.byteLength + encodedPayload.byteLength)
combined.set(new Uint8Array(header), 0)
combined.set(new Uint8Array(encodedUserPayload), header.byteLength)
combined.set(new Uint8Array(encodedPayload), header.byteLength)

return combined.buffer
}
Expand Down Expand Up @@ -185,4 +193,11 @@ export default class Serializer {
private _isArrayBuffer(buffer: any): boolean {
return buffer instanceof ArrayBuffer || buffer?.constructor?.name === 'ArrayBuffer'
}

private _pick(obj: Record<string, any> | null | undefined, keys: string[]): Record<string, any> {
if (!obj || typeof obj !== 'object') {
return {}
}
return Object.fromEntries(Object.entries(obj).filter(([key]) => keys.includes(key)))
}
}
Loading
Loading