diff --git a/Sources/Realtime/RealtimeBinaryDecoder.swift b/Sources/Realtime/RealtimeBinaryDecoder.swift new file mode 100644 index 00000000..68432004 --- /dev/null +++ b/Sources/Realtime/RealtimeBinaryDecoder.swift @@ -0,0 +1,274 @@ +// +// RealtimeBinaryDecoder.swift +// +// +// Created by Guilherme Souza on 05/12/24. +// + +import Foundation + +/// Binary decoder for Realtime V2 messages. +/// +/// Supports decoding messages with: +/// - Binary payloads +/// - User broadcast messages with metadata +/// - Push, reply, broadcast, and user broadcast message types +final class RealtimeBinaryDecoder: Sendable { + private let headerLength = 1 + private let metaLength = 4 + + enum MessageKind: UInt8 { + case push = 0 + case reply = 1 + case broadcast = 2 + case userBroadcastPush = 3 + case userBroadcast = 4 + } + + enum PayloadEncoding: UInt8 { + case binary = 0 + case json = 1 + } + + /// Decodes binary data into a V3 Realtime message. + /// - Parameter data: Binary data to decode + /// - Returns: Decoded V3 message + func decode(_ data: Data) throws -> RealtimeMessageV3 { + let v2Message = try decodeToV2(data) + return RealtimeMessageV3.fromV2(v2Message) + } + + /// Decodes binary data into a V2 Realtime message (for backward compatibility). + /// - Parameter data: Binary data to decode + /// - Returns: Decoded V2 message + func decodeToV2(_ data: Data) throws -> RealtimeMessageV2 { + guard !data.isEmpty else { + throw RealtimeError("Empty binary data") + } + + let kind = data[0] + + guard let messageKind = MessageKind(rawValue: kind) else { + throw RealtimeError("Unknown message kind: \(kind)") + } + + switch messageKind { + case .push: + return try decodePush(data) + case .reply: + return try decodeReply(data) + case .broadcast: + return try decodeBroadcast(data) + case .userBroadcast: + return try decodeUserBroadcast(data) + case .userBroadcastPush: + throw RealtimeError("userBroadcastPush should not be received from server") + } + } + + // MARK: - Private Decoding Methods + + private func decodePush(_ data: Data) throws -> RealtimeMessageV2 { + guard data.count >= headerLength + metaLength - 1 else { + throw RealtimeError("Invalid push message length") + } + + let joinRefSize = Int(data[1]) + let topicSize = Int(data[2]) + let eventSize = Int(data[3]) + + var offset = headerLength + metaLength - 1 // pushes have no ref + + let joinRef = try decodeString(from: data, offset: offset, length: joinRefSize) + offset += joinRefSize + + let topic = try decodeString(from: data, offset: offset, length: topicSize) + offset += topicSize + + let event = try decodeString(from: data, offset: offset, length: eventSize) + offset += eventSize + + let payloadData = data.subdata(in: offset.. RealtimeMessageV2 { + guard data.count >= headerLength + metaLength else { + throw RealtimeError("Invalid reply message length") + } + + let joinRefSize = Int(data[1]) + let refSize = Int(data[2]) + let topicSize = Int(data[3]) + let eventSize = Int(data[4]) + + var offset = headerLength + metaLength + + let joinRef = try decodeString(from: data, offset: offset, length: joinRefSize) + offset += joinRefSize + + let ref = try decodeString(from: data, offset: offset, length: refSize) + offset += refSize + + let topic = try decodeString(from: data, offset: offset, length: topicSize) + offset += topicSize + + let event = try decodeString(from: data, offset: offset, length: eventSize) + offset += eventSize + + let responseData = data.subdata(in: offset.. RealtimeMessageV2 { + guard data.count >= headerLength + 2 else { + throw RealtimeError("Invalid broadcast message length") + } + + let topicSize = Int(data[1]) + let eventSize = Int(data[2]) + + var offset = headerLength + 2 + + let topic = try decodeString(from: data, offset: offset, length: topicSize) + offset += topicSize + + let event = try decodeString(from: data, offset: offset, length: eventSize) + offset += eventSize + + let payloadData = data.subdata(in: offset.. RealtimeMessageV2 { + guard data.count >= headerLength + 4 else { + throw RealtimeError("Invalid user broadcast message length") + } + + let topicSize = Int(data[1]) + let userEventSize = Int(data[2]) + let metadataSize = Int(data[3]) + let payloadEncoding = data[4] + + var offset = headerLength + 4 + + let topic = try decodeString(from: data, offset: offset, length: topicSize) + offset += topicSize + + let userEvent = try decodeString(from: data, offset: offset, length: userEventSize) + offset += userEventSize + + let metadata = try decodeString(from: data, offset: offset, length: metadataSize) + offset += metadataSize + + let payloadData = data.subdata(in: offset.. String { + guard offset + length <= data.count else { + throw RealtimeError("Invalid string offset/length") + } + + let stringData = data.subdata(in: offset..<(offset + length)) + guard let string = String(data: stringData, encoding: .utf8) else { + throw RealtimeError("Failed to decode string") + } + return string + } +} + +// MARK: - AnyJSON Extensions for Binary Support + +extension AnyJSON { + /// Creates an AnyJSON value from a Swift value. + init(value: Any) throws { + if let dict = value as? [String: Any] { + var object: JSONObject = [:] + for (key, val) in dict { + object[key] = try AnyJSON(value: val) + } + self = .object(object) + } else if let array = value as? [Any] { + self = .array(try array.map { try AnyJSON(value: $0) }) + } else if let string = value as? String { + self = .string(string) + } else if let bool = value as? Bool { + // Bool must be checked before Int because Bool can be cast to Int + self = .bool(bool) + } else if let int = value as? Int { + self = .integer(int) + } else if let double = value as? Double { + self = .double(double) + } else if value is NSNull { + self = .null + } else { + throw RealtimeError("Unsupported JSON value type: \(type(of: value))") + } + } +} diff --git a/Sources/Realtime/RealtimeBinaryEncoder.swift b/Sources/Realtime/RealtimeBinaryEncoder.swift new file mode 100644 index 00000000..f61ab9de --- /dev/null +++ b/Sources/Realtime/RealtimeBinaryEncoder.swift @@ -0,0 +1,188 @@ +// +// RealtimeBinaryEncoder.swift +// +// +// Created by Guilherme Souza on 05/12/24. +// + +import Foundation + +/// Binary encoder for Realtime V2 messages. +/// +/// Supports encoding messages with: +/// - Binary payloads +/// - User broadcast messages with metadata +/// - Reduced JSON encoding overhead +final class RealtimeBinaryEncoder: Sendable { + private let headerLength = 1 + private let metaLength = 4 + private let userBroadcastPushMetaLength = 6 + + enum MessageKind: UInt8 { + case push = 0 + case reply = 1 + case broadcast = 2 + case userBroadcastPush = 3 + case userBroadcast = 4 + } + + enum PayloadEncoding: UInt8 { + case binary = 0 + case json = 1 + } + + private let allowedMetadataKeys: [String] + + init(allowedMetadataKeys: [String] = []) { + self.allowedMetadataKeys = allowedMetadataKeys + } + + /// Encodes a V3 Realtime message to binary format. + /// - Parameter message: The message to encode + /// - Returns: Binary data representation + func encode(_ message: RealtimeMessageV3) throws -> Data { + // Check if this is a user broadcast push + if message.event == "broadcast", + case .json(let jsonPayload) = message.payload, + let event = jsonPayload["event"]?.stringValue + { + return try encodeUserBroadcastPush( + message: message, userEvent: event, jsonPayload: jsonPayload) + } + + // Check if this has a binary payload + if case .binary(let binaryPayload) = message.payload { + return try encodePush(message: message, binaryPayload: binaryPayload) + } + + // Fall back to JSON encoding for standard JSON messages + return try encodeAsJSON(message) + } + + /// Encodes a V2 Realtime message to binary format (for backward compatibility). + /// - Parameter message: The message to encode + /// - Returns: Binary data representation + func encodeV2(_ message: RealtimeMessageV2) throws -> Data { + try encode(RealtimeMessageV3.fromV2(message)) + } + + // MARK: - Private Encoding Methods + + private func encodePush(message: RealtimeMessageV3, binaryPayload: Data) throws -> Data { + let joinRef = message.joinRef ?? "" + let ref = message.ref ?? "" + let topic = message.topic + let event = message.event + + try validateFieldLength(joinRef, name: "joinRef") + try validateFieldLength(ref, name: "ref") + try validateFieldLength(topic, name: "topic") + try validateFieldLength(event, name: "event") + + let metaLength = + self.metaLength + joinRef.utf8.count + ref.utf8.count + topic.utf8.count + event.utf8.count + + var header = Data(capacity: headerLength + metaLength) + + header.append(MessageKind.push.rawValue) + header.append(UInt8(joinRef.utf8.count)) + header.append(UInt8(ref.utf8.count)) + header.append(UInt8(topic.utf8.count)) + header.append(UInt8(event.utf8.count)) + header.append(contentsOf: joinRef.utf8) + header.append(contentsOf: ref.utf8) + header.append(contentsOf: topic.utf8) + header.append(contentsOf: event.utf8) + + var combined = header + combined.append(binaryPayload) + return combined + } + + private func encodeUserBroadcastPush( + message: RealtimeMessageV3, + userEvent: String, + jsonPayload: JSONObject + ) throws -> Data { + let joinRef = message.joinRef ?? "" + let ref = message.ref ?? "" + let topic = message.topic + + // Extract the payload + let payload = jsonPayload["payload"] ?? .null + + // Encode payload + let encodedPayload: Data + let encoding: PayloadEncoding + + if let binaryData = RealtimeBinaryPayload.data(from: payload) { + encodedPayload = binaryData + encoding = .binary + } else { + encodedPayload = try JSONSerialization.data(withJSONObject: payload.value, options: []) + encoding = .json + } + + // Extract metadata based on allowed keys + let metadata: JSONObject + if !allowedMetadataKeys.isEmpty { + metadata = jsonPayload.filter { key, _ in + allowedMetadataKeys.contains(key) && key != "event" && key != "payload" && key != "type" + } + } else { + metadata = [:] + } + + let metadataString: String + if !metadata.isEmpty { + let metadataData = try JSONSerialization.data( + withJSONObject: metadata.mapValues(\.value), + options: [] + ) + metadataString = String(data: metadataData, encoding: .utf8) ?? "" + } else { + metadataString = "" + } + + // Validate lengths + try validateFieldLength(joinRef, name: "joinRef") + try validateFieldLength(ref, name: "ref") + try validateFieldLength(topic, name: "topic") + try validateFieldLength(userEvent, name: "userEvent") + try validateFieldLength(metadataString, name: "metadata") + + let metaLength = + userBroadcastPushMetaLength + joinRef.utf8.count + ref.utf8.count + topic.utf8.count + + userEvent.utf8.count + metadataString.utf8.count + + var header = Data(capacity: headerLength + metaLength) + + header.append(MessageKind.userBroadcastPush.rawValue) + header.append(UInt8(joinRef.utf8.count)) + header.append(UInt8(ref.utf8.count)) + header.append(UInt8(topic.utf8.count)) + header.append(UInt8(userEvent.utf8.count)) + header.append(UInt8(metadataString.utf8.count)) + header.append(encoding.rawValue) + header.append(contentsOf: joinRef.utf8) + header.append(contentsOf: ref.utf8) + header.append(contentsOf: topic.utf8) + header.append(contentsOf: userEvent.utf8) + header.append(contentsOf: metadataString.utf8) + + var combined = header + combined.append(encodedPayload) + return combined + } + + private func encodeAsJSON(_ message: RealtimeMessageV3) throws -> Data { + try JSONEncoder().encode(message) + } + + private func validateFieldLength(_ field: String, name: String) throws { + let length = field.utf8.count + guard length <= 255 else { + throw RealtimeError("\(name) length \(length) exceeds maximum of 255") + } + } +} diff --git a/Sources/Realtime/RealtimeBinaryPayload.swift b/Sources/Realtime/RealtimeBinaryPayload.swift new file mode 100644 index 00000000..1469ddf1 --- /dev/null +++ b/Sources/Realtime/RealtimeBinaryPayload.swift @@ -0,0 +1,50 @@ +// +// RealtimeBinaryPayload.swift +// +// +// Created by Guilherme Souza on 05/12/24. +// + +import Foundation + +/// Helper for creating and working with binary payloads in Realtime messages. +public enum RealtimeBinaryPayload { + /// Creates a JSON payload marker for binary data. + /// This can be used in `RealtimeMessageV2.payload` to indicate binary data. + /// + /// - Parameter data: The binary data to encode + /// - Returns: An AnyJSON object representing the binary data + public static func binary(_ data: Data) -> AnyJSON { + .object([ + "__binary__": .bool(true), + "data": .string(data.base64EncodedString()), + ]) + } + + /// Checks if a JSON value represents binary data. + /// - Parameter value: The AnyJSON value to check + /// - Returns: true if the value represents binary data + public static func isBinary(_ value: AnyJSON) -> Bool { + guard case .object(let obj) = value, + let isBinary = obj["__binary__"]?.boolValue + else { + return false + } + return isBinary + } + + /// Extracts binary data from a JSON value. + /// - Parameter value: The AnyJSON value containing binary data + /// - Returns: The decoded binary data, or nil if not a binary payload + public static func data(from value: AnyJSON) -> Data? { + guard case .object(let obj) = value, + let isBinary = obj["__binary__"]?.boolValue, + isBinary, + let base64String = obj["data"]?.stringValue, + let data = Data(base64Encoded: base64String) + else { + return nil + } + return data + } +} diff --git a/Sources/Realtime/RealtimeChannel+AsyncAwait.swift b/Sources/Realtime/RealtimeChannel+AsyncAwait.swift index 8a12a4d9..e7e68968 100644 --- a/Sources/Realtime/RealtimeChannel+AsyncAwait.swift +++ b/Sources/Realtime/RealtimeChannel+AsyncAwait.swift @@ -37,8 +37,8 @@ extension RealtimeChannelV2 { /// Listen for postgres changes in a channel. @available( *, - deprecated, - message: "Use the new filter syntax instead." + deprecated, + message: "Use the new filter syntax instead." ) @_disfavoredOverload public func postgresChange( @@ -65,8 +65,8 @@ extension RealtimeChannelV2 { /// Listen for postgres changes in a channel. @available( *, - deprecated, - message: "Use the new filter syntax instead." + deprecated, + message: "Use the new filter syntax instead." ) @_disfavoredOverload public func postgresChange( @@ -93,8 +93,8 @@ extension RealtimeChannelV2 { /// Listen for postgres changes in a channel. @available( *, - deprecated, - message: "Use the new filter syntax instead." + deprecated, + message: "Use the new filter syntax instead." ) @_disfavoredOverload public func postgresChange( @@ -120,8 +120,8 @@ extension RealtimeChannelV2 { /// Listen for postgres changes in a channel. @available( *, - deprecated, - message: "Use the new filter syntax instead." + deprecated, + message: "Use the new filter syntax instead." ) @_disfavoredOverload public func postgresChange( @@ -168,7 +168,7 @@ extension RealtimeChannelV2 { return stream } - + /// Listen for `system` event. public func system() -> AsyncStream { let (stream, continuation) = AsyncStream.makeStream() @@ -192,8 +192,8 @@ extension RealtimeChannelV2 { } // Helper to work around type ambiguity in macOS 13 -fileprivate extension AsyncStream { - func compactErase() -> AsyncStream { +extension AsyncStream { + fileprivate func compactErase() -> AsyncStream { AsyncStream(compactMap { $0.wrappedAction as? T } as AsyncCompactMapSequence) } } diff --git a/Sources/Realtime/RealtimeClientV2.swift b/Sources/Realtime/RealtimeClientV2.swift index f99c5173..93736d5c 100644 --- a/Sources/Realtime/RealtimeClientV2.swift +++ b/Sources/Realtime/RealtimeClientV2.swift @@ -25,6 +25,7 @@ protocol RealtimeClientProtocol: AnyObject, Sendable { func connect() async func push(_ message: RealtimeMessageV2) + func pushV3(_ message: RealtimeMessageV3) func _getAccessToken() async -> String? func makeRef() -> String func _remove(_ channel: any RealtimeChannelProtocol) @@ -55,6 +56,8 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { let mutableState = LockIsolated(MutableState()) let http: any HTTPClientType let apikey: String + let binaryEncoder: RealtimeBinaryEncoder? + let binaryDecoder: RealtimeBinaryDecoder? var conn: (any WebSocket)? { mutableState.conn @@ -156,6 +159,15 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { self.wsTransport = wsTransport self.http = http + // Initialize serializer based on version + if options.serializerVersion == "2.0.0" { + binaryEncoder = RealtimeBinaryEncoder(allowedMetadataKeys: options.allowedMetadataKeys) + binaryDecoder = RealtimeBinaryDecoder() + } else { + binaryEncoder = nil + binaryDecoder = nil + } + precondition(options.apikey != nil, "API key is required to connect to Realtime") apikey = options.apikey! @@ -205,7 +217,8 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { Self.realtimeWebSocketURL( baseURL: Self.realtimeBaseURL(url: url), apikey: options.apikey, - logLevel: options.logLevel + logLevel: options.logLevel, + serializerVersion: options.serializerVersion ), options.headers.dictionary ) @@ -374,10 +387,18 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { if Task.isCancelled { return } switch event { - case .binary: - self.options.logger?.error("Unsupported binary event received.") - break + case .binary(let data): + // Binary events are supported in V2 serializer + if let decoder = self.binaryDecoder { + let messageV3 = try decoder.decode(data) + let message = messageV3.toV2() + await onMessage(message) + } else { + self.options.logger?.error( + "Binary event received but V2 serializer is not enabled.") + } case .text(let text): + // Text events are always JSON let data = Data(text.utf8) let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data) await onMessage(message) @@ -522,16 +543,25 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { } } - /// Push out a message if the socket is connected. + /// Push out a V3 message if the socket is connected. /// /// If the socket is not connected, the message gets enqueued within a local buffer, and sent out when a connection is next established. - public func push(_ message: RealtimeMessageV2) { + public func pushV3(_ message: RealtimeMessageV3) { let callback = { @Sendable [weak self] in do { // Check cancellation before sending, because this push may have been cancelled before a connection was established. try Task.checkCancellation() - let data = try JSONEncoder().encode(message) - self?.conn?.send(String(decoding: data, as: UTF8.self)) + + // Use binary encoder if V2 serializer is enabled + if let encoder = self?.binaryEncoder { + let encoded = try encoder.encode(message) + // Binary encoder always returns Data + self?.conn?.send(encoded) + } else { + // Fall back to JSON encoding for V1 + let data = try JSONEncoder().encode(message) + self?.conn?.send(String(decoding: data, as: UTF8.self)) + } } catch { self?.options.logger?.error( """ @@ -554,6 +584,13 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { } } + /// Push out a V2 message if the socket is connected. + /// + /// If the socket is not connected, the message gets enqueued within a local buffer, and sent out when a connection is next established. + public func push(_ message: RealtimeMessageV2) { + pushV3(RealtimeMessageV3.fromV2(message)) + } + private func flushSendBuffer() { mutableState.withValue { $0.sendBuffer.forEach { $0() } @@ -586,7 +623,12 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { return url } - static func realtimeWebSocketURL(baseURL: URL, apikey: String?, logLevel: LogLevel?) -> URL { + static func realtimeWebSocketURL( + baseURL: URL, + apikey: String?, + logLevel: LogLevel?, + serializerVersion: String = "1.0.0" + ) -> URL { guard var components = URLComponents(url: baseURL, resolvingAgainstBaseURL: false) else { return baseURL @@ -596,7 +638,7 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { if let apikey { components.queryItems!.append(URLQueryItem(name: "apikey", value: apikey)) } - components.queryItems!.append(URLQueryItem(name: "vsn", value: "1.0.0")) + components.queryItems!.append(URLQueryItem(name: "vsn", value: serializerVersion)) if let logLevel { components.queryItems!.append(URLQueryItem(name: "log_level", value: logLevel.rawValue)) diff --git a/Sources/Realtime/RealtimeMessageV3.swift b/Sources/Realtime/RealtimeMessageV3.swift new file mode 100644 index 00000000..0ad3da32 --- /dev/null +++ b/Sources/Realtime/RealtimeMessageV3.swift @@ -0,0 +1,212 @@ +// +// RealtimeMessageV3.swift +// +// +// Created by Guilherme Souza on 05/12/24. +// + +import Foundation + +/// Payload type that can represent either JSON or binary data. +public enum RealtimePayload: Sendable, Hashable { + /// JSON payload represented as a dictionary + case json(JSONObject) + /// Binary payload + case binary(Data) + + /// Returns the JSON object if this is a JSON payload + public var jsonValue: JSONObject? { + if case .json(let object) = self { + return object + } + return nil + } + + /// Returns the binary data if this is a binary payload + public var binaryValue: Data? { + if case .binary(let data) = self { + return data + } + return nil + } + + /// Helper to get a value from the JSON payload + public subscript(key: String) -> AnyJSON? { + jsonValue?[key] + } +} + +extension RealtimePayload: Codable { + public init(from decoder: any Decoder) throws { + // When decoding, we always decode as JSON + let container = try decoder.singleValueContainer() + let object = try container.decode(JSONObject.self) + self = .json(object) + } + + public func encode(to encoder: any Encoder) throws { + var container = encoder.singleValueContainer() + switch self { + case .json(let object): + try container.encode(object) + case .binary(let data): + try container.encode(data) + // Binary payloads should be encoded using the binary encoder, not JSONEncoder + throw EncodingError.invalidValue( + self, + EncodingError.Context( + codingPath: encoder.codingPath, + debugDescription: "Binary payloads must be encoded using RealtimeBinaryEncoder" + ) + ) + } + } +} + +/// V3 Realtime message with proper support for both JSON and binary payloads. +/// +/// This type is designed to work seamlessly with the V2 serializer while maintaining +/// backward compatibility through conversion to/from `RealtimeMessageV2`. +public struct RealtimeMessageV3: Hashable, Sendable { + public let joinRef: String? + public let ref: String? + public let topic: String + public let event: String + public let payload: RealtimePayload + + public init( + joinRef: String?, + ref: String?, + topic: String, + event: String, + payload: RealtimePayload + ) { + self.joinRef = joinRef + self.ref = ref + self.topic = topic + self.event = event + self.payload = payload + } + + /// Convenience initializer for JSON payloads + public init( + joinRef: String?, + ref: String?, + topic: String, + event: String, + payload: JSONObject + ) { + self.init( + joinRef: joinRef, + ref: ref, + topic: topic, + event: event, + payload: .json(payload) + ) + } + + /// Convenience initializer for binary payloads + public init( + joinRef: String?, + ref: String?, + topic: String, + event: String, + binaryPayload: Data + ) { + self.init( + joinRef: joinRef, + ref: ref, + topic: topic, + event: event, + payload: .binary(binaryPayload) + ) + } + + /// Status for the received message if any. + public var status: PushStatus? { + payload["status"] + .flatMap(\.stringValue) + .flatMap(PushStatus.init(rawValue:)) + } + + /// Converts to V2 message format (for backward compatibility) + public func toV2() -> RealtimeMessageV2 { + let jsonPayload: JSONObject + switch payload { + case .json(let object): + jsonPayload = object + case .binary(let data): + // Wrap binary data in the special marker format + jsonPayload = ["payload": RealtimeBinaryPayload.binary(data)] + } + + return RealtimeMessageV2( + joinRef: joinRef, + ref: ref, + topic: topic, + event: event, + payload: jsonPayload + ) + } + + /// Creates from V2 message format + public static func fromV2(_ message: RealtimeMessageV2) -> RealtimeMessageV3 { + // Check if this is a direct binary payload (not a broadcast message with nested binary) + // A direct binary payload would have just the "payload" key with binary marker + if message.payload.count == 1, + let payloadValue = message.payload["payload"], + let binaryData = RealtimeBinaryPayload.data(from: payloadValue) + { + return RealtimeMessageV3( + joinRef: message.joinRef, + ref: message.ref, + topic: message.topic, + event: message.event, + binaryPayload: binaryData + ) + } + + // Otherwise it's a JSON payload (including broadcast messages with nested binary) + return RealtimeMessageV3( + joinRef: message.joinRef, + ref: message.ref, + topic: message.topic, + event: message.event, + payload: message.payload + ) + } +} + +extension RealtimeMessageV3: Codable { + private enum CodingKeys: String, CodingKey { + case joinRef = "join_ref" + case ref + case topic + case event + case payload + } + + public init(from decoder: any Decoder) throws { + let container = try decoder.container(keyedBy: CodingKeys.self) + joinRef = try container.decodeIfPresent(String.self, forKey: .joinRef) + ref = try container.decodeIfPresent(String.self, forKey: .ref) + topic = try container.decode(String.self, forKey: .topic) + event = try container.decode(String.self, forKey: .event) + payload = try container.decode(RealtimePayload.self, forKey: .payload) + } + + public func encode(to encoder: any Encoder) throws { + var container = encoder.container(keyedBy: CodingKeys.self) + try container.encodeIfPresent(joinRef, forKey: .joinRef) + try container.encodeIfPresent(ref, forKey: .ref) + try container.encode(topic, forKey: .topic) + try container.encode(event, forKey: .event) + try container.encode(payload, forKey: .payload) + } +} + +extension RealtimeMessageV3: HasRawMessage { + public var rawMessage: RealtimeMessageV2 { + toV2() + } +} diff --git a/Sources/Realtime/RealtimePostgresFilter.swift b/Sources/Realtime/RealtimePostgresFilter.swift index 8b8c7790..976d570a 100644 --- a/Sources/Realtime/RealtimePostgresFilter.swift +++ b/Sources/Realtime/RealtimePostgresFilter.swift @@ -17,19 +17,19 @@ public enum RealtimePostgresFilter { var value: String { switch self { - case let .eq(column, value): + case .eq(let column, let value): return "\(column)=eq.\(value.rawValue)" - case let .neq(column, value): + case .neq(let column, let value): return "\(column)=neq.\(value.rawValue)" - case let .gt(column, value): + case .gt(let column, let value): return "\(column)=gt.\(value.rawValue)" - case let .gte(column, value): + case .gte(let column, let value): return "\(column)=gte.\(value.rawValue)" - case let .lt(column, value): + case .lt(let column, let value): return "\(column)=lt.\(value.rawValue)" - case let .lte(column, value): + case .lte(let column, let value): return "\(column)=lte.\(value.rawValue)" - case let .in(column, values): + case .in(let column, let values): return "\(column)=in.(\(values.map(\.rawValue).joined(separator: ",")))" } } diff --git a/Sources/Realtime/RealtimeSerializer.swift b/Sources/Realtime/RealtimeSerializer.swift new file mode 100644 index 00000000..db53f369 --- /dev/null +++ b/Sources/Realtime/RealtimeSerializer.swift @@ -0,0 +1,21 @@ +// +// RealtimeSerializer.swift +// +// +// Created by Guilherme Souza on 05/12/24. +// + +import Foundation + +/// Protocol for encoding and decoding Realtime messages. +protocol RealtimeSerializer: Sendable { + /// Encodes a message for sending over the WebSocket connection. + /// - Parameter message: The message to encode + /// - Returns: Either Data (for binary encoding) or String (for JSON encoding) + func encode(_ message: RealtimeMessageV2) throws -> Any + + /// Decodes a message received from the WebSocket connection. + /// - Parameter data: Either Data (binary) or String (JSON) + /// - Returns: The decoded message + func decode(_ data: Any) throws -> RealtimeMessageV2 +} diff --git a/Sources/Realtime/Types.swift b/Sources/Realtime/Types.swift index cd0a44c3..366db1a4 100644 --- a/Sources/Realtime/Types.swift +++ b/Sources/Realtime/Types.swift @@ -28,12 +28,23 @@ public struct RealtimeClientOptions: Sendable { package var accessToken: (@Sendable () async throws -> String?)? package var logger: (any SupabaseLogger)? + /// Serializer version to use. Defaults to "1.0.0". + /// - "1.0.0": JSON-only serializer (default, backward compatible) + /// - "2.0.0": Binary serializer with support for binary payloads and metadata + var serializerVersion: String + + /// Allowed metadata keys for user broadcast push messages (V2 serializer only). + /// Only these keys will be included in the metadata section of binary messages. + var allowedMetadataKeys: [String] + public static let defaultHeartbeatInterval: TimeInterval = 25 public static let defaultReconnectDelay: TimeInterval = 7 public static let defaultTimeoutInterval: TimeInterval = 10 public static let defaultDisconnectOnSessionLoss = true public static let defaultConnectOnSubscribe: Bool = true public static let defaultMaxRetryAttempts: Int = 5 + public static let defaultSerializerVersion = "1.0.0" + public static let defaultAllowedMetadataKeys: [String] = [] public init( headers: [String: String] = [:], @@ -46,7 +57,9 @@ public struct RealtimeClientOptions: Sendable { logLevel: LogLevel? = nil, fetch: (@Sendable (_ request: URLRequest) async throws -> (Data, URLResponse))? = nil, accessToken: (@Sendable () async throws -> String?)? = nil, - logger: (any SupabaseLogger)? = nil + logger: (any SupabaseLogger)? = nil, + serializerVersion: String = Self.defaultSerializerVersion, + allowedMetadataKeys: [String] = Self.defaultAllowedMetadataKeys ) { self.headers = HTTPFields(headers) self.heartbeatInterval = heartbeatInterval @@ -59,6 +72,8 @@ public struct RealtimeClientOptions: Sendable { self.fetch = fetch self.accessToken = accessToken self.logger = logger + self.serializerVersion = serializerVersion + self.allowedMetadataKeys = allowedMetadataKeys } var apikey: String? { diff --git a/Tests/RealtimeTests/PushV2Tests.swift b/Tests/RealtimeTests/PushV2Tests.swift index 44882981..d511c52b 100644 --- a/Tests/RealtimeTests/PushV2Tests.swift +++ b/Tests/RealtimeTests/PushV2Tests.swift @@ -324,6 +324,10 @@ private final class MockRealtimeClient: RealtimeClientProtocol, @unchecked Senda } } + func pushV3(_ message: RealtimeMessageV3) { + push(message.toV2()) + } + func _getAccessToken() async -> String? { return nil } diff --git a/Tests/RealtimeTests/RealtimeSerializerTests.swift b/Tests/RealtimeTests/RealtimeSerializerTests.swift new file mode 100644 index 00000000..c2e33e96 --- /dev/null +++ b/Tests/RealtimeTests/RealtimeSerializerTests.swift @@ -0,0 +1,468 @@ +// +// RealtimeSerializerTests.swift +// +// +// Created by Guilherme Souza on 05/12/24. +// + +import Foundation +import XCTest + +@testable import Realtime + +final class RealtimeSerializerTests: XCTestCase { + // MARK: - Binary Encoder Tests (V3) + + func testEncodePushWithBinaryPayload() throws { + let encoder = RealtimeBinaryEncoder() + + let binaryData = Data([0x01, 0x04]) + let message = RealtimeMessageV3( + joinRef: "10", + ref: "1", + topic: "t", + event: "e", + binaryPayload: binaryData + ) + + let encoded = try encoder.encode(message) + XCTAssertTrue(encoded.count > 0) + + // Verify the structure + XCTAssertEqual(encoded[0], 0) // Kind: push + XCTAssertEqual(encoded[1], 2) // joinRef length + XCTAssertEqual(encoded[2], 1) // ref length + XCTAssertEqual(encoded[3], 1) // topic length + XCTAssertEqual(encoded[4], 1) // event length + + // Verify payload is appended + let headerEnd = 1 + 4 + 2 + 1 + 1 + 1 // header + meta + strings + let payloadStart = encoded.index(encoded.startIndex, offsetBy: headerEnd) + XCTAssertEqual(encoded[payloadStart], 0x01) + XCTAssertEqual(encoded[payloadStart + 1], 0x04) + } + + func testEncodeUserBroadcastPushWithJSONNoMetadata() throws { + let encoder = RealtimeBinaryEncoder() + + let message = RealtimeMessageV3( + joinRef: "10", + ref: "1", + topic: "top", + event: "broadcast", + payload: [ + "type": "broadcast", + "event": "user-event", + "payload": ["a": "b"], + ] + ) + + let encoded = try encoder.encode(message) + + // Verify the structure + XCTAssertEqual(encoded[0], 3) // Kind: userBroadcastPush + XCTAssertEqual(encoded[1], 2) // joinRef length + XCTAssertEqual(encoded[2], 1) // ref length + XCTAssertEqual(encoded[3], 3) // topic length ("top") + XCTAssertEqual(encoded[4], 10) // userEvent length ("user-event") + XCTAssertEqual(encoded[5], 0) // metadata length + XCTAssertEqual(encoded[6], 1) // JSON encoding + } + + func testEncodeUserBroadcastPushWithAllowedMetadata() throws { + let encoder = RealtimeBinaryEncoder(allowedMetadataKeys: ["extra"]) + + let message = RealtimeMessageV3( + joinRef: "10", + ref: "1", + topic: "top", + event: "broadcast", + payload: [ + "type": "broadcast", + "event": "user-event", + "extra": "bit", + "store": .bool(true), // Should not be included + "payload": ["a": "b"], + ] + ) + + let encoded = try encoder.encode(message) + + // Verify metadata is included + XCTAssertEqual(encoded[0], 3) // Kind: userBroadcastPush + XCTAssertEqual(encoded[5], 15) // metadata length ({"extra":"bit"}) + XCTAssertEqual(encoded[6], 1) // JSON encoding + } + + func testEncodeUserBroadcastPushWithBinaryPayload() throws { + let encoder = RealtimeBinaryEncoder() + + let binaryData = Data([0x01, 0x04]) + let message = RealtimeMessageV3( + joinRef: "10", + ref: "1", + topic: "top", + event: "broadcast", + payload: [ + "event": "user-event", + "payload": RealtimeBinaryPayload.binary(binaryData), + ] + ) + + let encoded = try encoder.encode(message) + + // Verify the structure + XCTAssertEqual(encoded[0], 3) // Kind: userBroadcastPush + XCTAssertEqual(encoded[6], 0) // Binary encoding + } + + func testThrowsErrorWhenJoinRefExceeds255() { + let encoder = RealtimeBinaryEncoder() + let longJoinRef = String(repeating: "a", count: 256) + + let message = RealtimeMessageV3( + joinRef: longJoinRef, + ref: "1", + topic: "top", + event: "broadcast", + payload: [ + "event": "user-event", + "payload": ["a": "b"], + ] + ) + + XCTAssertThrowsError(try encoder.encode(message)) { error in + XCTAssertTrue(error.localizedDescription.contains("joinRef length")) + } + } + + func testThrowsErrorWhenTopicExceeds255() { + let encoder = RealtimeBinaryEncoder() + let longTopic = String(repeating: "a", count: 256) + + let message = RealtimeMessageV3( + joinRef: "10", + ref: "1", + topic: longTopic, + event: "broadcast", + payload: [ + "event": "user-event", + "payload": ["a": "b"], + ] + ) + + XCTAssertThrowsError(try encoder.encode(message)) { error in + XCTAssertTrue(error.localizedDescription.contains("topic length")) + } + } + + // MARK: - Binary Decoder Tests + + func testDecodePushWithJSONPayload() throws { + let decoder = RealtimeBinaryDecoder() + + // Construct: kind(1) + joinRefLen(1) + topicLen(1) + eventLen(1) + strings + payload + var data = Data() + data.append(0) // kind: push + data.append(3) // joinRef length + data.append(3) // topic length + data.append(10) // event length + data.append(contentsOf: "123".utf8) + data.append(contentsOf: "top".utf8) + data.append(contentsOf: "some-event".utf8) + data.append(contentsOf: #"{"a":"b"}"#.utf8) + + let message = try decoder.decode(data) + + XCTAssertEqual(message.joinRef, "123") + XCTAssertNil(message.ref) + XCTAssertEqual(message.topic, "top") + XCTAssertEqual(message.event, "some-event") + XCTAssertEqual(message.payload["a"]?.stringValue, "b") + } + + func testDecodeReplyWithJSONPayload() throws { + let decoder = RealtimeBinaryDecoder() + + var data = Data() + data.append(1) // kind: reply + data.append(3) // joinRef length + data.append(2) // ref length + data.append(3) // topic length + data.append(2) // event/status length + data.append(contentsOf: "100".utf8) + data.append(contentsOf: "12".utf8) + data.append(contentsOf: "top".utf8) + data.append(contentsOf: "ok".utf8) + data.append(contentsOf: #"{"a":"b"}"#.utf8) + + let message = try decoder.decode(data) + + XCTAssertEqual(message.joinRef, "100") + XCTAssertEqual(message.ref, "12") + XCTAssertEqual(message.topic, "top") + XCTAssertEqual(message.event, "phx_reply") + XCTAssertEqual(message.payload["status"]?.stringValue, "ok") + XCTAssertEqual(message.payload["response"]?.objectValue?["a"]?.stringValue, "b") + } + + func testDecodeBroadcastWithJSONPayload() throws { + let decoder = RealtimeBinaryDecoder() + + var data = Data() + data.append(2) // kind: broadcast + data.append(3) // topic length + data.append(10) // event length + data.append(contentsOf: "top".utf8) + data.append(contentsOf: "some-event".utf8) + data.append(contentsOf: #"{"a":"b"}"#.utf8) + + let message = try decoder.decode(data) + + XCTAssertNil(message.joinRef) + XCTAssertNil(message.ref) + XCTAssertEqual(message.topic, "top") + XCTAssertEqual(message.event, "some-event") + XCTAssertEqual(message.payload["a"]?.stringValue, "b") + } + + func testDecodeUserBroadcastWithJSONPayloadNoMetadata() throws { + let decoder = RealtimeBinaryDecoder() + + var data = Data() + data.append(4) // kind: userBroadcast + data.append(3) // topic length + data.append(10) // userEvent length + data.append(0) // metadata length + data.append(1) // JSON encoding + data.append(contentsOf: "top".utf8) + data.append(contentsOf: "user-event".utf8) + // no metadata + data.append(contentsOf: #"{"a":"b"}"#.utf8) + + let message = try decoder.decode(data) + + XCTAssertNil(message.joinRef) + XCTAssertNil(message.ref) + XCTAssertEqual(message.topic, "top") + XCTAssertEqual(message.event, "broadcast") + XCTAssertEqual(message.payload["type"]?.stringValue, "broadcast") + XCTAssertEqual(message.payload["event"]?.stringValue, "user-event") + XCTAssertEqual(message.payload["payload"]?.objectValue?["a"]?.stringValue, "b") + } + + func testDecodeUserBroadcastWithJSONPayloadAndMetadata() throws { + let decoder = RealtimeBinaryDecoder() + + var data = Data() + data.append(4) // kind: userBroadcast + data.append(3) // topic length + data.append(10) // userEvent length + data.append(17) // metadata length + data.append(1) // JSON encoding + data.append(contentsOf: "top".utf8) + data.append(contentsOf: "user-event".utf8) + data.append(contentsOf: #"{"replayed":true}"#.utf8) + data.append(contentsOf: #"{"a":"b"}"#.utf8) + + let message = try decoder.decode(data) + + XCTAssertEqual(message.event, "broadcast") + XCTAssertEqual(message.payload["event"]?.stringValue, "user-event") + XCTAssertEqual(message.payload["meta"]?.objectValue?["replayed"]?.boolValue, true) + XCTAssertEqual(message.payload["payload"]?.objectValue?["a"]?.stringValue, "b") + } + + func testDecodeUserBroadcastWithBinaryPayloadNoMetadata() throws { + let decoder = RealtimeBinaryDecoder() + + var data = Data() + data.append(4) // kind: userBroadcast + data.append(3) // topic length + data.append(10) // userEvent length + data.append(0) // metadata length + data.append(0) // binary encoding + data.append(contentsOf: "top".utf8) + data.append(contentsOf: "user-event".utf8) + // no metadata + data.append(0x01) + data.append(0x04) + + let message = try decoder.decode(data) + + XCTAssertEqual(message.event, "broadcast") + + // For V3 messages, check the payload structure + guard case .json(let jsonPayload) = message.payload else { + XCTFail("Expected JSON payload") + return + } + + XCTAssertEqual(jsonPayload["type"]?.stringValue, "broadcast") + XCTAssertEqual(jsonPayload["event"]?.stringValue, "user-event") + + // Check binary payload + let binaryPayload = RealtimeBinaryPayload.data(from: jsonPayload["payload"]!) + XCTAssertNotNil(binaryPayload) + XCTAssertEqual(binaryPayload, Data([0x01, 0x04])) + } + + func testDecodeUserBroadcastWithBinaryPayloadAndMetadata() throws { + let decoder = RealtimeBinaryDecoder() + + var data = Data() + data.append(4) // kind: userBroadcast + data.append(3) // topic length + data.append(10) // userEvent length + data.append(17) // metadata length + data.append(0) // binary encoding + data.append(contentsOf: "top".utf8) + data.append(contentsOf: "user-event".utf8) + data.append(contentsOf: #"{"replayed":true}"#.utf8) + data.append(0x01) + data.append(0x04) + + let message = try decoder.decode(data) + + guard case .json(let jsonPayload) = message.payload else { + XCTFail("Expected JSON payload") + return + } + + XCTAssertEqual(jsonPayload["event"]?.stringValue, "user-event") + XCTAssertEqual(jsonPayload["meta"]?.objectValue?["replayed"]?.boolValue, true) + + let binaryPayload = RealtimeBinaryPayload.data(from: jsonPayload["payload"]!) + XCTAssertNotNil(binaryPayload) + XCTAssertEqual(binaryPayload, Data([0x01, 0x04])) + } + + // MARK: - Binary Payload Helper Tests + + func testBinaryPayloadHelper() { + let data = Data([0x01, 0x02, 0x03]) + let payload = RealtimeBinaryPayload.binary(data) + + XCTAssertTrue(RealtimeBinaryPayload.isBinary(payload)) + + let extractedData = RealtimeBinaryPayload.data(from: payload) + XCTAssertEqual(extractedData, data) + } + + func testBinaryPayloadHelperWithNonBinary() { + let payload: AnyJSON = .string("test") + + XCTAssertFalse(RealtimeBinaryPayload.isBinary(payload)) + XCTAssertNil(RealtimeBinaryPayload.data(from: payload)) + } + + // MARK: - Round-trip Tests + + func testRoundTripUserBroadcastWithBinary() throws { + let encoder = RealtimeBinaryEncoder() + + let originalData = Data([0x01, 0x02, 0x03, 0x04]) + let originalMessage = RealtimeMessageV3( + joinRef: "10", + ref: "1", + topic: "test-topic", + event: "broadcast", + payload: [ + "event": "test-event", + "payload": RealtimeBinaryPayload.binary(originalData), + ] + ) + + let encoded = try encoder.encode(originalMessage) + + // Note: We can't directly decode what we encode because the server + // would send it back as userBroadcast (kind 4) not userBroadcastPush (kind 3) + // But we can verify the encoding structure is correct + XCTAssertTrue(encoded.count > 0) + XCTAssertEqual(encoded[0], 3) // userBroadcastPush + } + + // MARK: - V2 / V3 Conversion Tests + + func testV2ToV3ConversionWithJSON() { + let v2Message = RealtimeMessageV2( + joinRef: "10", + ref: "1", + topic: "test", + event: "broadcast", + payload: ["key": "value"] + ) + + let v3Message = RealtimeMessageV3.fromV2(v2Message) + + XCTAssertEqual(v3Message.joinRef, "10") + XCTAssertEqual(v3Message.ref, "1") + XCTAssertEqual(v3Message.topic, "test") + XCTAssertEqual(v3Message.event, "broadcast") + XCTAssertEqual(v3Message.payload["key"]?.stringValue, "value") + } + + func testV2ToV3ConversionWithBinary() { + let binaryData = Data([0x01, 0x02, 0x03]) + let v2Message = RealtimeMessageV2( + joinRef: "10", + ref: "1", + topic: "test", + event: "msg", + payload: ["payload": RealtimeBinaryPayload.binary(binaryData)] + ) + + let v3Message = RealtimeMessageV3.fromV2(v2Message) + + XCTAssertEqual(v3Message.joinRef, "10") + XCTAssertEqual(v3Message.ref, "1") + XCTAssertEqual(v3Message.topic, "test") + XCTAssertEqual(v3Message.event, "msg") + + guard case .binary(let extractedData) = v3Message.payload else { + XCTFail("Expected binary payload") + return + } + XCTAssertEqual(extractedData, binaryData) + } + + func testV3ToV2ConversionWithJSON() { + let v3Message = RealtimeMessageV3( + joinRef: "10", + ref: "1", + topic: "test", + event: "broadcast", + payload: ["key": "value"] + ) + + let v2Message = v3Message.toV2() + + XCTAssertEqual(v2Message.joinRef, "10") + XCTAssertEqual(v2Message.ref, "1") + XCTAssertEqual(v2Message.topic, "test") + XCTAssertEqual(v2Message.event, "broadcast") + XCTAssertEqual(v2Message.payload["key"]?.stringValue, "value") + } + + func testV3ToV2ConversionWithBinary() { + let binaryData = Data([0x01, 0x02, 0x03]) + let v3Message = RealtimeMessageV3( + joinRef: "10", + ref: "1", + topic: "test", + event: "msg", + binaryPayload: binaryData + ) + + let v2Message = v3Message.toV2() + + XCTAssertEqual(v2Message.joinRef, "10") + XCTAssertEqual(v2Message.ref, "1") + XCTAssertEqual(v2Message.topic, "test") + XCTAssertEqual(v2Message.event, "msg") + + // Binary data should be wrapped in the special marker format + let extractedData = RealtimeBinaryPayload.data(from: v2Message.payload["payload"]!) + XCTAssertEqual(extractedData, binaryData) + } +}