From 84805d4c91f12444fb7ca67fcb47c385327e84c8 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Tue, 21 Oct 2025 17:00:19 -0300 Subject: [PATCH] feat(realtime): drop deprecated realtime --- Sources/Realtime/Deprecated/Defaults.swift | 108 -- Sources/Realtime/Deprecated/Delegated.swift | 102 -- Sources/Realtime/Deprecated/Deprecated.swift | 80 -- .../Realtime/Deprecated/HeartbeatTimer.swift | 136 --- .../Deprecated/PhoenixTransport.swift | 316 ----- Sources/Realtime/Deprecated/Presence.swift | 417 ------- Sources/Realtime/Deprecated/Push.swift | 265 ---- .../Realtime/Deprecated/RealtimeChannel.swift | 1037 ---------------- .../Realtime/Deprecated/RealtimeClient.swift | 1071 ----------------- .../Realtime/Deprecated/RealtimeMessage.swift | 86 -- .../Realtime/Deprecated/TimeoutTimer.swift | 108 -- Sources/Realtime/Types.swift | 26 + Sources/Supabase/Deprecated.swift | 6 - Sources/Supabase/SupabaseClient.swift | 13 +- 14 files changed, 27 insertions(+), 3744 deletions(-) delete mode 100644 Sources/Realtime/Deprecated/Defaults.swift delete mode 100644 Sources/Realtime/Deprecated/Delegated.swift delete mode 100644 Sources/Realtime/Deprecated/Deprecated.swift delete mode 100644 Sources/Realtime/Deprecated/HeartbeatTimer.swift delete mode 100644 Sources/Realtime/Deprecated/PhoenixTransport.swift delete mode 100644 Sources/Realtime/Deprecated/Presence.swift delete mode 100644 Sources/Realtime/Deprecated/Push.swift delete mode 100644 Sources/Realtime/Deprecated/RealtimeChannel.swift delete mode 100644 Sources/Realtime/Deprecated/RealtimeClient.swift delete mode 100644 Sources/Realtime/Deprecated/RealtimeMessage.swift delete mode 100644 Sources/Realtime/Deprecated/TimeoutTimer.swift diff --git a/Sources/Realtime/Deprecated/Defaults.swift b/Sources/Realtime/Deprecated/Defaults.swift deleted file mode 100644 index e74f08bc7..000000000 --- a/Sources/Realtime/Deprecated/Defaults.swift +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -import Foundation - -/// A collection of default values and behaviors used across the Client -public enum Defaults { - /// Default timeout when sending messages - public static let timeoutInterval: TimeInterval = 10.0 - - /// Default interval to send heartbeats on - public static let heartbeatInterval: TimeInterval = 30.0 - - /// Default maximum amount of time which the system may delay heartbeat events in order to - /// minimize power usage - public static let heartbeatLeeway: DispatchTimeInterval = .milliseconds(10) - - /// Default reconnect algorithm for the socket - public static let reconnectSteppedBackOff: (Int) -> TimeInterval = { tries in - tries > 9 ? 5.0 : [0.01, 0.05, 0.1, 0.15, 0.2, 0.25, 0.5, 1.0, 2.0][tries - 1] - } - - /** Default rejoin algorithm for individual channels */ - public static let rejoinSteppedBackOff: (Int) -> TimeInterval = { tries in - tries > 3 ? 10 : [1, 2, 5][tries - 1] - } - - public static let vsn = "2.0.0" - - /// Default encode function, utilizing JSONSerialization.data - public static let encode: (Any) -> Data = { json in - try! JSONSerialization - .data( - withJSONObject: json, - options: JSONSerialization.WritingOptions() - ) - } - - /// Default decode function, utilizing JSONSerialization.jsonObject - public static let decode: (Data) -> Any? = { data in - guard - let json = - try? JSONSerialization - .jsonObject( - with: data, - options: JSONSerialization.ReadingOptions() - ) - else { return nil } - return json - } - - public static let heartbeatQueue: DispatchQueue = .init( - label: "com.phoenix.socket.heartbeat" - ) -} - -/// Represents the multiple states that a Channel can be in -/// throughout it's lifecycle. -public enum ChannelState: String { - case closed - case errored - case joined - case joining - case leaving -} - -/// Represents the different events that can be sent through -/// a channel regarding a Channel's lifecycle. -public enum ChannelEvent { - public static let join = "phx_join" - public static let leave = "phx_leave" - public static let close = "phx_close" - public static let error = "phx_error" - public static let reply = "phx_reply" - public static let system = "system" - public static let broadcast = "broadcast" - public static let accessToken = "access_token" - public static let presence = "presence" - public static let presenceDiff = "presence_diff" - public static let presenceState = "presence_state" - public static let postgresChanges = "postgres_changes" - - public static let heartbeat = "heartbeat" - - static func isLifecyleEvent(_ event: String) -> Bool { - switch event { - case join, leave, reply, error, close: true - default: false - } - } -} diff --git a/Sources/Realtime/Deprecated/Delegated.swift b/Sources/Realtime/Deprecated/Delegated.swift deleted file mode 100644 index 6e5489140..000000000 --- a/Sources/Realtime/Deprecated/Delegated.swift +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -/// Provides a memory-safe way of passing callbacks around while not creating -/// retain cycles. This file was copied from https://github.com/dreymonde/Delegated -/// instead of added as a dependency to reduce the number of packages that -/// ship with SwiftPhoenixClient -public struct Delegated { - private(set) var callback: ((Input) -> Output?)? - - public init() {} - - public mutating func delegate( - to target: Target, - with callback: @escaping (Target, Input) -> Output - ) { - self.callback = { [weak target] input in - guard let target else { - return nil - } - return callback(target, input) - } - } - - public func call(_ input: Input) -> Output? { - callback?(input) - } - - public var isDelegateSet: Bool { - callback != nil - } -} - -extension Delegated { - public mutating func stronglyDelegate( - to target: Target, - with callback: @escaping (Target, Input) -> Output - ) { - self.callback = { input in - callback(target, input) - } - } - - public mutating func manuallyDelegate(with callback: @escaping (Input) -> Output) { - self.callback = callback - } - - public mutating func removeDelegate() { - callback = nil - } -} - -extension Delegated where Input == Void { - public mutating func delegate( - to target: Target, - with callback: @escaping (Target) -> Output - ) { - delegate(to: target, with: { target, _ in callback(target) }) - } - - public mutating func stronglyDelegate( - to target: Target, - with callback: @escaping (Target) -> Output - ) { - stronglyDelegate(to: target, with: { target, _ in callback(target) }) - } -} - -extension Delegated where Input == Void { - public func call() -> Output? { - call(()) - } -} - -extension Delegated where Output == Void { - public func call(_ input: Input) { - callback?(input) - } -} - -extension Delegated where Input == Void, Output == Void { - public func call() { - call(()) - } -} diff --git a/Sources/Realtime/Deprecated/Deprecated.swift b/Sources/Realtime/Deprecated/Deprecated.swift deleted file mode 100644 index c0cb2937b..000000000 --- a/Sources/Realtime/Deprecated/Deprecated.swift +++ /dev/null @@ -1,80 +0,0 @@ -// -// Deprecated.swift -// -// -// Created by Guilherme Souza on 23/12/23. -// - -import Foundation - -@available(*, deprecated, renamed: "RealtimeMessage") -public typealias Message = RealtimeMessage - -extension RealtimeClientV2 { - @available(*, deprecated, renamed: "channels") - public var subscriptions: [String: RealtimeChannelV2] { - channels - } - - @available(*, deprecated, renamed: "RealtimeClientOptions") - public struct Configuration: Sendable { - var url: URL - var apiKey: String - var headers: [String: String] - var heartbeatInterval: TimeInterval - var reconnectDelay: TimeInterval - var timeoutInterval: TimeInterval - var disconnectOnSessionLoss: Bool - var connectOnSubscribe: Bool - var logger: (any SupabaseLogger)? - - public init( - url: URL, - apiKey: String, - headers: [String: String] = [:], - heartbeatInterval: TimeInterval = 15, - reconnectDelay: TimeInterval = 7, - timeoutInterval: TimeInterval = 10, - disconnectOnSessionLoss: Bool = true, - connectOnSubscribe: Bool = true, - logger: (any SupabaseLogger)? = nil - ) { - self.url = url - self.apiKey = apiKey - self.headers = headers - self.heartbeatInterval = heartbeatInterval - self.reconnectDelay = reconnectDelay - self.timeoutInterval = timeoutInterval - self.disconnectOnSessionLoss = disconnectOnSessionLoss - self.connectOnSubscribe = connectOnSubscribe - self.logger = logger - } - } - - @available(*, deprecated, renamed: "RealtimeClientStatus") - public typealias Status = RealtimeClientStatus - - @available(*, deprecated, renamed: "RealtimeClientV2.init(url:options:)") - public convenience init(config: Configuration) { - self.init( - url: config.url, - options: RealtimeClientOptions( - headers: config.headers, - heartbeatInterval: config.heartbeatInterval, - reconnectDelay: config.reconnectDelay, - timeoutInterval: config.timeoutInterval, - disconnectOnSessionLoss: config.disconnectOnSessionLoss, - connectOnSubscribe: config.connectOnSubscribe, - logger: config.logger - ) - ) - } -} - -extension RealtimeChannelV2 { - @available(*, deprecated, renamed: "RealtimeSubscription") - public typealias Subscription = ObservationToken - - @available(*, deprecated, renamed: "RealtimeChannelStatus") - public typealias Status = RealtimeChannelStatus -} diff --git a/Sources/Realtime/Deprecated/HeartbeatTimer.swift b/Sources/Realtime/Deprecated/HeartbeatTimer.swift deleted file mode 100644 index 7bd4ccbf0..000000000 --- a/Sources/Realtime/Deprecated/HeartbeatTimer.swift +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -import Foundation - -/** - Heartbeat Timer class which manages the lifecycle of the underlying - timer which triggers when a heartbeat should be fired. This heartbeat - runs on it's own Queue so that it does not interfere with the main - queue but guarantees thread safety. - */ - -class HeartbeatTimer { - // ---------------------------------------------------------------------- - - // MARK: - Dependencies - - // ---------------------------------------------------------------------- - // The interval to wait before firing the Timer - let timeInterval: TimeInterval - - /// The maximum amount of time which the system may delay the delivery of the timer events - let leeway: DispatchTimeInterval - - // The DispatchQueue to schedule the timers on - let queue: DispatchQueue - - // UUID which specifies the Timer instance. Verifies that timers are different - let uuid: String = UUID().uuidString - - // ---------------------------------------------------------------------- - - // MARK: - Properties - - // ---------------------------------------------------------------------- - // The underlying, cancelable, resettable, timer. - private var temporaryTimer: (any DispatchSourceTimer)? - // The event handler that is called by the timer when it fires. - private var temporaryEventHandler: (() -> Void)? - - /** - Create a new HeartbeatTimer - - - Parameters: - - timeInterval: Interval to fire the timer. Repeats - - queue: Queue to schedule the timer on - - leeway: The maximum amount of time which the system may delay the delivery of the timer events - */ - init( - timeInterval: TimeInterval, queue: DispatchQueue = Defaults.heartbeatQueue, - leeway: DispatchTimeInterval = Defaults.heartbeatLeeway - ) { - self.timeInterval = timeInterval - self.queue = queue - self.leeway = leeway - } - - /** - Create a new HeartbeatTimer - - - Parameter timeInterval: Interval to fire the timer. Repeats - */ - convenience init(timeInterval: TimeInterval) { - self.init(timeInterval: timeInterval, queue: Defaults.heartbeatQueue) - } - - func start(eventHandler: @escaping () -> Void) { - queue.sync { - // Create a new DispatchSourceTimer, passing the event handler - let timer = DispatchSource.makeTimerSource(flags: [], queue: queue) - timer.setEventHandler(handler: eventHandler) - - // Schedule the timer to first fire in `timeInterval` and then - // repeat every `timeInterval` - timer.schedule( - deadline: DispatchTime.now() + self.timeInterval, - repeating: self.timeInterval, - leeway: self.leeway - ) - - // Start the timer - timer.resume() - self.temporaryEventHandler = eventHandler - self.temporaryTimer = timer - } - } - - func stop() { - // Must be queued synchronously to prevent threading issues. - queue.sync { - // DispatchSourceTimer will automatically cancel when released - temporaryTimer = nil - temporaryEventHandler = nil - } - } - - /** - True if the Timer exists and has not been cancelled. False otherwise - */ - var isValid: Bool { - guard let timer = temporaryTimer else { return false } - return !timer.isCancelled - } - - /** - Calls the Timer's event handler immediately. This method - is primarily used in tests (not ideal) - */ - func fire() { - guard isValid else { return } - temporaryEventHandler?() - } -} - -extension HeartbeatTimer: Equatable { - static func == (lhs: HeartbeatTimer, rhs: HeartbeatTimer) -> Bool { - lhs.uuid == rhs.uuid - } -} diff --git a/Sources/Realtime/Deprecated/PhoenixTransport.swift b/Sources/Realtime/Deprecated/PhoenixTransport.swift deleted file mode 100644 index 79c854005..000000000 --- a/Sources/Realtime/Deprecated/PhoenixTransport.swift +++ /dev/null @@ -1,316 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -import Foundation - -#if canImport(FoundationNetworking) - import FoundationNetworking -#endif - -// ---------------------------------------------------------------------- - -// MARK: - Transport Protocol - -// ---------------------------------------------------------------------- -/** - Defines a `Socket`'s Transport layer. - */ -// sourcery: AutoMockable -public protocol PhoenixTransport { - /// The current `ReadyState` of the `Transport` layer - var readyState: PhoenixTransportReadyState { get } - - /// Delegate for the `Transport` layer - var delegate: (any PhoenixTransportDelegate)? { get set } - - /** - Connect to the server - - - Parameters: - - headers: Headers to include in the URLRequests when opening the Websocket connection. Can be empty [:] - */ - func connect(with headers: [String: String]) - - /** - Disconnect from the server. - - - Parameters: - - code: Status code as defined by Section 7.4 of RFC 6455. - - reason: Reason why the connection is closing. Optional. - */ - func disconnect(code: Int, reason: String?) - - /** - Sends a message to the server. - - - Parameter data: Data to send. - */ - func send(data: Data) -} - -// ---------------------------------------------------------------------- - -// MARK: - Transport Delegate Protocol - -// ---------------------------------------------------------------------- -/// Delegate to receive notifications of events that occur in the `Transport` layer -public protocol PhoenixTransportDelegate { - /** - Notified when the `Transport` opens. - - - Parameter response: Response from the server indicating that the WebSocket handshake was successful and the connection has been upgraded to webSockets - */ - func onOpen(response: URLResponse?) - - /** - Notified when the `Transport` receives an error. - - - Parameter error: Client-side error from the underlying `Transport` implementation - - Parameter response: Response from the server, if any, that occurred with the Error - - */ - func onError(error: any Error, response: URLResponse?) - - /** - Notified when the `Transport` receives a message from the server. - - - Parameter message: Message received from the server - */ - func onMessage(message: String) - - /** - Notified when the `Transport` closes. - - - Parameter code: Code that was sent when the `Transport` closed - - Parameter reason: A concise human-readable prose explanation for the closure - */ - func onClose(code: Int, reason: String?) -} - -// ---------------------------------------------------------------------- - -// MARK: - Transport Ready State Enum - -// ---------------------------------------------------------------------- -/// Available `ReadyState`s of a `Transport` layer. -public enum PhoenixTransportReadyState { - /// The `Transport` is opening a connection to the server. - case connecting - - /// The `Transport` is connected to the server. - case open - - /// The `Transport` is closing the connection to the server. - case closing - - /// The `Transport` has disconnected from the server. - case closed -} - -// ---------------------------------------------------------------------- - -// MARK: - Default Websocket Transport Implementation - -// ---------------------------------------------------------------------- -/// A `Transport` implementation that relies on URLSession's native WebSocket -/// implementation. -/// -/// This implementation ships default with SwiftPhoenixClient however -/// SwiftPhoenixClient supports earlier OS versions using one of the submodule -/// `Transport` implementations. Or you can create your own implementation using -/// your own WebSocket library or implementation. -@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, *) -open class URLSessionTransport: NSObject, PhoenixTransport, URLSessionWebSocketDelegate { - /// The URL to connect to - let url: URL - - /// The URLSession configuration - let configuration: URLSessionConfiguration - - /// The underling URLSession. Assigned during `connect()` - private var session: URLSession? = nil - - /// The ongoing task. Assigned during `connect()` - private var task: URLSessionWebSocketTask? = nil - - /** - Initializes a `Transport` layer built using URLSession's WebSocket - - Example: - - ```swift - let url = URL("wss://example.com/socket") - let transport: Transport = URLSessionTransport(url: url) - ``` - - Using a custom `URLSessionConfiguration` - - ```swift - let url = URL("wss://example.com/socket") - let configuration = URLSessionConfiguration.default - let transport: Transport = URLSessionTransport(url: url, configuration: configuration) - ``` - - - parameter url: URL to connect to - - parameter configuration: Provide your own URLSessionConfiguration. Uses `.default` if none provided - */ - public init(url: URL, configuration: URLSessionConfiguration = .default) { - // URLSession requires that the endpoint be "wss" instead of "https". - let endpoint = url.absoluteString - let wsEndpoint = - endpoint - .replacingOccurrences(of: "http://", with: "ws://") - .replacingOccurrences(of: "https://", with: "wss://") - - // Force unwrapping should be safe here since a valid URL came in and we just - // replaced the protocol. - self.url = URL(string: wsEndpoint)! - self.configuration = configuration - - super.init() - } - - // MARK: - Transport - - public var readyState: PhoenixTransportReadyState = .closed - public var delegate: (any PhoenixTransportDelegate)? = nil - - public func connect(with headers: [String: String]) { - // Set the transport state as connecting - readyState = .connecting - - // Create the session and websocket task - session = URLSession(configuration: configuration, delegate: self, delegateQueue: nil) - var request = URLRequest(url: url) - - for (key, value) in headers { - guard let value = value as? String else { continue } - request.addValue(value, forHTTPHeaderField: key) - } - - task = session?.webSocketTask(with: request) - - // Start the task - task?.resume() - } - - open func disconnect(code: Int, reason: String?) { - /* - TODO: - 1. Provide a "strict" mode that fails if an invalid close code is given - 2. If strict mode is disabled, default to CloseCode.invalid - 3. Provide default .normalClosure function - */ - guard let closeCode = URLSessionWebSocketTask.CloseCode(rawValue: code) else { - fatalError("Could not create a CloseCode with invalid code: [\(code)].") - } - - readyState = .closing - task?.cancel(with: closeCode, reason: reason?.data(using: .utf8)) - session?.finishTasksAndInvalidate() - } - - open func send(data: Data) { - Task { - try? await task?.send(.string(String(data: data, encoding: .utf8)!)) - } - } - - // MARK: - URLSessionWebSocketDelegate - - open func urlSession( - _: URLSession, - webSocketTask: URLSessionWebSocketTask, - didOpenWithProtocol _: String? - ) { - // The Websocket is connected. Set Transport state to open and inform delegate - readyState = .open - delegate?.onOpen(response: webSocketTask.response) - - // Start receiving messages - receive() - } - - open func urlSession( - _: URLSession, - webSocketTask _: URLSessionWebSocketTask, - didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, - reason: Data? - ) { - // A close frame was received from the server. - readyState = .closed - delegate?.onClose( - code: closeCode.rawValue, reason: reason.flatMap { String(data: $0, encoding: .utf8) } - ) - } - - open func urlSession( - _: URLSession, - task: URLSessionTask, - didCompleteWithError error: (any Error)? - ) { - // The task has terminated. Inform the delegate that the transport has closed abnormally - // if this was caused by an error. - guard let err = error else { return } - - abnormalErrorReceived(err, response: task.response) - } - - // MARK: - Private - - private func receive() { - Task { - do { - let result = try await task?.receive() - switch result { - case .data: - print("Data received. This method is unsupported by the Client") - case let .string(text): - self.delegate?.onMessage(message: text) - default: - fatalError("Unknown result was received. [\(String(describing: result))]") - } - - // Since `.receive()` is only good for a single message, it must - // be called again after a message is received in order to - // received the next message. - self.receive() - } catch { - print("Error when receiving \(error)") - self.abnormalErrorReceived(error, response: nil) - } - } - } - - private func abnormalErrorReceived(_ error: any Error, response: URLResponse?) { - // Set the state of the Transport to closed - readyState = .closed - - // Inform the Transport's delegate that an error occurred. - delegate?.onError(error: error, response: response) - - // An abnormal error is results in an abnormal closure, such as internet getting dropped - // so inform the delegate that the Transport has closed abnormally. This will kick off - // the reconnect logic. - delegate?.onClose( - code: RealtimeClient.CloseCode.abnormal.rawValue, reason: error.localizedDescription - ) - } -} diff --git a/Sources/Realtime/Deprecated/Presence.swift b/Sources/Realtime/Deprecated/Presence.swift deleted file mode 100644 index 2370697f7..000000000 --- a/Sources/Realtime/Deprecated/Presence.swift +++ /dev/null @@ -1,417 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -import Foundation - -/// The Presence object provides features for syncing presence information from -/// the server with the client and handling presences joining and leaving. -/// -/// ## Syncing state from the server -/// -/// To sync presence state from the server, first instantiate an object and pass -/// your channel in to track lifecycle events: -/// -/// let channel = socket.channel("some:topic") -/// let presence = Presence(channel) -/// -/// If you have custom syncing state events, you can configure the `Presence` -/// object to use those instead. -/// -/// let options = Options(events: [.state: "my_state", .diff: "my_diff"]) -/// let presence = Presence(channel, opts: options) -/// -/// Next, use the presence.onSync callback to react to state changes from the -/// server. For example, to render the list of users every time the list -/// changes, you could write: -/// -/// presence.onSync { renderUsers(presence.list()) } -/// -/// ## Listing Presences -/// -/// presence.list is used to return a list of presence information based on the -/// local state of metadata. By default, all presence metadata is returned, but -/// a listBy function can be supplied to allow the client to select which -/// metadata to use for a given presence. For example, you may have a user -/// online from different devices with a metadata status of "online", but they -/// have set themselves to "away" on another device. In this case, the app may -/// choose to use the "away" status for what appears on the UI. The example -/// below defines a listBy function which prioritizes the first metadata which -/// was registered for each user. This could be the first tab they opened, or -/// the first device they came online from: -/// -/// let listBy: (String, Presence.Map) -> Presence.Meta = { id, pres in -/// let first = pres["metas"]!.first! -/// first["count"] = pres["metas"]!.count -/// first["id"] = id -/// return first -/// } -/// let onlineUsers = presence.list(by: listBy) -/// -/// (NOTE: The underlying behavior is a `map` on the `presence.state`. You are -/// mapping the `state` dictionary into whatever datastructure suites your needs) -/// -/// ## Handling individual presence join and leave events -/// -/// The presence.onJoin and presence.onLeave callbacks can be used to react to -/// individual presences joining and leaving the app. For example: -/// -/// let presence = Presence(channel) -/// presence.onJoin { [weak self] (key, current, newPres) in -/// if let cur = current { -/// print("user additional presence", cur) -/// } else { -/// print("user entered for the first time", newPres) -/// } -/// } -/// -/// presence.onLeave { [weak self] (key, current, leftPres) in -/// if current["metas"]?.isEmpty == true { -/// print("user has left from all devices", leftPres) -/// } else { -/// print("user left from a device", current) -/// } -/// } -/// -/// presence.onSync { renderUsers(presence.list()) } -@available( - *, - deprecated, - renamed: "PresenceV2", - message: "Presence class is deprecated in favor of PresenceV2. See migration guide: https://github.com/supabase-community/supabase-swift/blob/main/docs/migrations/RealtimeV2%20Migration%20Guide.md" -) -public final class Presence { - // ---------------------------------------------------------------------- - - // MARK: - Enums and Structs - - // ---------------------------------------------------------------------- - /// Custom options that can be provided when creating Presence - /// - /// ### Example: - /// - /// let options = Options(events: [.state: "my_state", .diff: "my_diff"]) - /// let presence = Presence(channel, opts: options) - public struct Options { - let events: [Events: String] - - /// Default set of Options used when creating Presence. Uses the - /// phoenix events "presence_state" and "presence_diff" - public static let defaults = Options(events: [ - .state: "presence_state", - .diff: "presence_diff", - ]) - - public init(events: [Events: String]) { - self.events = events - } - } - - /// Presense Events - public enum Events: String { - case state - case diff - } - - // ---------------------------------------------------------------------- - - // MARK: - Typaliases - - // ---------------------------------------------------------------------- - /// Meta details of a Presence. Just a dictionary of properties - public typealias Meta = [String: Any] - - /// A mapping of a String to an array of Metas. e.g. {"metas": [{id: 1}]} - public typealias Map = [String: [Meta]] - - /// A mapping of a Presence state to a mapping of Metas - public typealias State = [String: Map] - - // Diff has keys "joins" and "leaves", pointing to a Presence.State each - // containing the users that joined and left. - public typealias Diff = [String: State] - - /// Closure signature of OnJoin callbacks - public typealias OnJoin = (_ key: String, _ current: Map?, _ new: Map) -> Void - - /// Closure signature for OnLeave callbacks - public typealias OnLeave = (_ key: String, _ current: Map, _ left: Map) -> Void - - //// Closure signature for OnSync callbacks - public typealias OnSync = () -> Void - - /// Collection of callbacks with default values - struct Caller { - var onJoin: OnJoin = { _, _, _ in } - var onLeave: OnLeave = { _, _, _ in } - var onSync: OnSync = {} - } - - // ---------------------------------------------------------------------- - - // MARK: - Properties - - // ---------------------------------------------------------------------- - /// The channel the Presence belongs to - weak var channel: RealtimeChannel? - - /// Caller to callback hooks - var caller: Caller - - /// The state of the Presence - public private(set) var state: State - - /// Pending `join` and `leave` diffs that need to be synced - public private(set) var pendingDiffs: [Diff] - - /// The channel's joinRef, set when state events occur - public private(set) var joinRef: String? - - public var isPendingSyncState: Bool { - guard let safeJoinRef = joinRef else { return true } - return safeJoinRef != channel?.joinRef - } - - /// Callback to be informed of joins - public var onJoin: OnJoin { - get { caller.onJoin } - set { caller.onJoin = newValue } - } - - /// Set the OnJoin callback - public func onJoin(_ callback: @escaping OnJoin) { - onJoin = callback - } - - /// Callback to be informed of leaves - public var onLeave: OnLeave { - get { caller.onLeave } - set { caller.onLeave = newValue } - } - - /// Set the OnLeave callback - public func onLeave(_ callback: @escaping OnLeave) { - onLeave = callback - } - - /// Callback to be informed of synces - public var onSync: OnSync { - get { caller.onSync } - set { caller.onSync = newValue } - } - - /// Set the OnSync callback - public func onSync(_ callback: @escaping OnSync) { - onSync = callback - } - - public init(channel: RealtimeChannel, opts: Options = Options.defaults) { - state = [:] - pendingDiffs = [] - self.channel = channel - joinRef = nil - caller = Caller() - - guard // Do not subscribe to events if they were not provided - let stateEvent = opts.events[.state], - let diffEvent = opts.events[.diff] - else { return } - - self.channel?.delegateOn(stateEvent, filter: ChannelFilter(), to: self) { (self, message) in - guard let newState = message.rawPayload as? State else { return } - - self.joinRef = self.channel?.joinRef - self.state = Presence.syncState( - self.state, - newState: newState, - onJoin: self.caller.onJoin, - onLeave: self.caller.onLeave - ) - - for diff in self.pendingDiffs { - self.state = Presence.syncDiff( - self.state, - diff: diff, - onJoin: self.caller.onJoin, - onLeave: self.caller.onLeave - ) - } - - self.pendingDiffs = [] - self.caller.onSync() - } - - self.channel?.delegateOn(diffEvent, filter: ChannelFilter(), to: self) { (self, message) in - guard let diff = message.rawPayload as? Diff else { return } - if self.isPendingSyncState { - self.pendingDiffs.append(diff) - } else { - self.state = Presence.syncDiff( - self.state, - diff: diff, - onJoin: self.caller.onJoin, - onLeave: self.caller.onLeave - ) - self.caller.onSync() - } - } - } - - /// Returns the array of presences, with deault selected metadata. - public func list() -> [Map] { - list(by: { _, pres in pres }) - } - - /// Returns the array of presences, with selected metadata - public func list(by transformer: (String, Map) -> T) -> [T] { - Presence.listBy(state, transformer: transformer) - } - - /// Filter the Presence state with a given function - public func filter(by filter: ((String, Map) -> Bool)?) -> State { - Presence.filter(state, by: filter) - } - - // ---------------------------------------------------------------------- - - // MARK: - Static - - // ---------------------------------------------------------------------- - - // Used to sync the list of presences on the server - // with the client's state. An optional `onJoin` and `onLeave` callback can - // be provided to react to changes in the client's local presences across - // disconnects and reconnects with the server. - // - // - returns: Presence.State - @discardableResult - public static func syncState( - _ currentState: State, - newState: State, - onJoin: OnJoin = { _, _, _ in }, - onLeave: OnLeave = { _, _, _ in } - ) -> State { - let state = currentState - var leaves: Presence.State = [:] - var joins: Presence.State = [:] - - for (key, presence) in state { - if newState[key] == nil { - leaves[key] = presence - } - } - - for (key, newPresence) in newState { - if let currentPresence = state[key] { - let newRefs = newPresence["metas"]!.map { $0["phx_ref"] as! String } - let curRefs = currentPresence["metas"]!.map { $0["phx_ref"] as! String } - - let joinedMetas = newPresence["metas"]!.filter { (meta: Meta) -> Bool in - !curRefs.contains { $0 == meta["phx_ref"] as! String } - } - let leftMetas = currentPresence["metas"]!.filter { (meta: Meta) -> Bool in - !newRefs.contains { $0 == meta["phx_ref"] as! String } - } - - if joinedMetas.count > 0 { - joins[key] = newPresence - joins[key]!["metas"] = joinedMetas - } - - if leftMetas.count > 0 { - leaves[key] = currentPresence - leaves[key]!["metas"] = leftMetas - } - } else { - joins[key] = newPresence - } - } - - return Presence.syncDiff( - state, - diff: ["joins": joins, "leaves": leaves], - onJoin: onJoin, - onLeave: onLeave - ) - } - - // Used to sync a diff of presence join and leave - // events from the server, as they happen. Like `syncState`, `syncDiff` - // accepts optional `onJoin` and `onLeave` callbacks to react to a user - // joining or leaving from a device. - // - // - returns: Presence.State - @discardableResult - public static func syncDiff( - _ currentState: State, - diff: Diff, - onJoin: OnJoin = { _, _, _ in }, - onLeave: OnLeave = { _, _, _ in } - ) -> State { - var state = currentState - diff["joins"]?.forEach { key, newPresence in - let currentPresence = state[key] - state[key] = newPresence - - if let curPresence = currentPresence { - let joinedRefs = state[key]!["metas"]!.map { $0["phx_ref"] as! String } - let curMetas = curPresence["metas"]!.filter { (meta: Meta) -> Bool in - !joinedRefs.contains { $0 == meta["phx_ref"] as! String } - } - state[key]!["metas"]!.insert(contentsOf: curMetas, at: 0) - } - - onJoin(key, currentPresence, newPresence) - } - - diff["leaves"]?.forEach { key, leftPresence in - guard var curPresence = state[key] else { return } - let refsToRemove = leftPresence["metas"]!.map { $0["phx_ref"] as! String } - let keepMetas = curPresence["metas"]!.filter { (meta: Meta) -> Bool in - !refsToRemove.contains { $0 == meta["phx_ref"] as! String } - } - - curPresence["metas"] = keepMetas - onLeave(key, curPresence, leftPresence) - - if keepMetas.count > 0 { - state[key]!["metas"] = keepMetas - } else { - state.removeValue(forKey: key) - } - } - - return state - } - - public static func filter( - _ presences: State, - by filter: ((String, Map) -> Bool)? - ) -> State { - let safeFilter = filter ?? { _, _ in true } - return presences.filter(safeFilter) - } - - public static func listBy( - _ presences: State, - transformer: (String, Map) -> T - ) -> [T] { - presences.map(transformer) - } -} diff --git a/Sources/Realtime/Deprecated/Push.swift b/Sources/Realtime/Deprecated/Push.swift deleted file mode 100644 index 7f681b6da..000000000 --- a/Sources/Realtime/Deprecated/Push.swift +++ /dev/null @@ -1,265 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -import Foundation - -/// Represnts pushing data to a `Channel` through the `Socket` -public class Push { - /// The channel sending the Push - public weak var channel: RealtimeChannel? - - /// The event, for example `phx_join` - public let event: String - - /// The payload, for example ["user_id": "abc123"] - public var payload: Payload - - /// The push timeout. Default is 10.0 seconds - public var timeout: TimeInterval - - /// The server's response to the Push - var receivedMessage: RealtimeMessage? - - /// Timer which triggers a timeout event - var timeoutTimer: TimerQueue - - /// WorkItem to be performed when the timeout timer fires - var timeoutWorkItem: DispatchWorkItem? - - /// Hooks into a Push. Where .receive("ok", callback(Payload)) are stored - var receiveHooks: [PushStatus: [Delegated]] - - /// True if the Push has been sent - var sent: Bool - - /// The reference ID of the Push - var ref: String? - - /// The event that is associated with the reference ID of the Push - var refEvent: String? - - /// Initializes a Push - /// - /// - parameter channel: The Channel - /// - parameter event: The event, for example ChannelEvent.join - /// - parameter payload: Optional. The Payload to send, e.g. ["user_id": "abc123"] - /// - parameter timeout: Optional. The push timeout. Default is 10.0s - init( - channel: RealtimeChannel, - event: String, - payload: Payload = [:], - timeout: TimeInterval = Defaults.timeoutInterval - ) { - self.channel = channel - self.event = event - self.payload = payload - self.timeout = timeout - receivedMessage = nil - timeoutTimer = TimerQueue.main - receiveHooks = [:] - sent = false - ref = nil - } - - /// Resets and sends the Push - /// - parameter timeout: Optional. The push timeout. Default is 10.0s - public func resend(_ timeout: TimeInterval = Defaults.timeoutInterval) { - self.timeout = timeout - reset() - send() - } - - /// Sends the Push. If it has already timed out, then the call will - /// be ignored and return early. Use `resend` in this case. - public func send() { - guard !hasReceived(status: .timeout) else { return } - - startTimeout() - sent = true - channel?.socket?.push( - topic: channel?.topic ?? "", - event: event, - payload: payload, - ref: ref, - joinRef: channel?.joinRef - ) - } - - /// Receive a specific event when sending an Outbound message. Subscribing - /// to status events with this method does not guarantees no retain cycles. - /// You should pass `weak self` in the capture list of the callback. You - /// can call `.delegateReceive(status:, to:, callback:) and the library will - /// handle it for you. - /// - /// Example: - /// - /// channel - /// .send(event:"custom", payload: ["body": "example"]) - /// .receive("error") { [weak self] payload in - /// print("Error: ", payload) - /// } - /// - /// - parameter status: Status to receive - /// - parameter callback: Callback to fire when the status is recevied - @discardableResult - public func receive( - _ status: PushStatus, - callback: @escaping ((RealtimeMessage) -> Void) - ) -> Push { - var delegated = Delegated() - delegated.manuallyDelegate(with: callback) - - return receive(status, delegated: delegated) - } - - /// Receive a specific event when sending an Outbound message. Automatically - /// prevents retain cycles. See `manualReceive(status:, callback:)` if you - /// want to handle this yourself. - /// - /// Example: - /// - /// channel - /// .send(event:"custom", payload: ["body": "example"]) - /// .delegateReceive("error", to: self) { payload in - /// print("Error: ", payload) - /// } - /// - /// - parameter status: Status to receive - /// - parameter owner: The class that is calling .receive. Usually `self` - /// - parameter callback: Callback to fire when the status is recevied - @discardableResult - public func delegateReceive( - _ status: PushStatus, - to owner: Target, - callback: @escaping ((Target, RealtimeMessage) -> Void) - ) -> Push { - var delegated = Delegated() - delegated.delegate(to: owner, with: callback) - - return receive(status, delegated: delegated) - } - - /// Shared behavior between `receive` calls - @discardableResult - func receive(_ status: PushStatus, delegated: Delegated) -> Push { - // If the message has already been received, pass it to the callback immediately - if hasReceived(status: status), let receivedMessage { - delegated.call(receivedMessage) - } - - if receiveHooks[status] == nil { - /// Create a new array of hooks if no previous hook is associated with status - receiveHooks[status] = [delegated] - } else { - /// A previous hook for this status already exists. Just append the new hook - receiveHooks[status]?.append(delegated) - } - - return self - } - - /// Resets the Push as it was after it was first tnitialized. - func reset() { - cancelRefEvent() - ref = nil - refEvent = nil - receivedMessage = nil - sent = false - } - - /// Finds the receiveHook which needs to be informed of a status response - /// - /// - parameter status: Status which was received, e.g. "ok", "error", "timeout" - /// - parameter response: Response that was received - private func matchReceive(_ status: PushStatus, message: RealtimeMessage) { - receiveHooks[status]?.forEach { $0.call(message) } - } - - /// Reverses the result on channel.on(ChannelEvent, callback) that spawned the Push - private func cancelRefEvent() { - guard let refEvent else { return } - channel?.off(refEvent) - } - - /// Cancel any ongoing Timeout Timer - func cancelTimeout() { - timeoutWorkItem?.cancel() - timeoutWorkItem = nil - } - - /// Starts the Timer which will trigger a timeout after a specific _timeout_ - /// time, in milliseconds, is reached. - func startTimeout() { - // Cancel any existing timeout before starting a new one - if let safeWorkItem = timeoutWorkItem, !safeWorkItem.isCancelled { - cancelTimeout() - } - - guard - let channel, - let socket = channel.socket - else { return } - - let ref = socket.makeRef() - let refEvent = channel.replyEventName(ref) - - self.ref = ref - self.refEvent = refEvent - - /// If a response is received before the Timer triggers, cancel timer - /// and match the received event to it's corresponding hook - channel.delegateOn(refEvent, filter: ChannelFilter(), to: self) { (self, message) in - self.cancelRefEvent() - self.cancelTimeout() - self.receivedMessage = message - - /// Check if there is event a status available - guard let status = message.status else { return } - self.matchReceive(status, message: message) - } - - /// Setup and start the Timeout timer. - let workItem = DispatchWorkItem { - self.trigger(.timeout, payload: [:]) - } - - timeoutWorkItem = workItem - timeoutTimer.queue(timeInterval: timeout, execute: workItem) - } - - /// Checks if a status has already been received by the Push. - /// - /// - parameter status: Status to check - /// - return: True if given status has been received by the Push. - func hasReceived(status: PushStatus) -> Bool { - receivedMessage?.status == status - } - - /// Triggers an event to be sent though the Channel - func trigger(_ status: PushStatus, payload: Payload) { - /// If there is no ref event, then there is nothing to trigger on the channel - guard let refEvent else { return } - - var mutPayload = payload - mutPayload["status"] = status.rawValue - - channel?.trigger(event: refEvent, payload: mutPayload) - } -} diff --git a/Sources/Realtime/Deprecated/RealtimeChannel.swift b/Sources/Realtime/Deprecated/RealtimeChannel.swift deleted file mode 100644 index 22169bc19..000000000 --- a/Sources/Realtime/Deprecated/RealtimeChannel.swift +++ /dev/null @@ -1,1037 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -import ConcurrencyExtras -import Foundation -import Swift -import HTTPTypes - -/// Container class of bindings to the channel -struct Binding { - let type: String - let filter: [String: String] - - // The callback to be triggered - let callback: Delegated - - let id: String? -} - -public struct ChannelFilter { - public var event: String? - public var schema: String? - public let table: String? - public let filter: String? - - public init( - event: String? = nil, schema: String? = nil, table: String? = nil, filter: String? = nil - ) { - self.event = event - self.schema = schema - self.table = table - self.filter = filter - } - - var asDictionary: [String: String] { - [ - "event": event, - "schema": schema, - "table": table, - "filter": filter, - ].compactMapValues { $0 } - } -} - -public enum ChannelResponse { - case ok, timedOut, error -} - -public enum RealtimeListenTypes: String { - case postgresChanges = "postgres_changes" - case broadcast - case presence -} - -/// Represents the broadcast and presence options for a channel. -public struct RealtimeChannelOptions { - /// Used to track presence payload across clients. Must be unique per client. If `nil`, the server - /// will generate one. - var presenceKey: String? - /// Enables the client to receive their own`broadcast` messages - var broadcastSelf: Bool - /// Instructs the server to acknowledge the client's `broadcast` messages - var broadcastAcknowledge: Bool - - public init( - presenceKey: String? = nil, - broadcastSelf: Bool = false, - broadcastAcknowledge: Bool = false - ) { - self.presenceKey = presenceKey - self.broadcastSelf = broadcastSelf - self.broadcastAcknowledge = broadcastAcknowledge - } - - /// Parameters used to configure the channel - var params: [String: [String: Any]] { - [ - "config": [ - "presence": [ - "key": presenceKey ?? "", - ], - "broadcast": [ - "ack": broadcastAcknowledge, - "self": broadcastSelf, - ], - ], - ] - } -} - -public enum RealtimeSubscribeStates { - case subscribed - case timedOut - case closed - case channelError -} - -/// -/// Represents a RealtimeChannel which is bound to a topic -/// -/// A RealtimeChannel can bind to multiple events on a given topic and -/// be informed when those events occur within a topic. -/// -/// ### Example: -/// -/// let channel = socket.channel("room:123", params: ["token": "Room Token"]) -/// channel.on("new_msg") { payload in print("Got message", payload") } -/// channel.push("new_msg, payload: ["body": "This is a message"]) -/// .receive("ok") { payload in print("Sent message", payload) } -/// .receive("error") { payload in print("Send failed", payload) } -/// .receive("timeout") { payload in print("Networking issue...", payload) } -/// -/// channel.join() -/// .receive("ok") { payload in print("RealtimeChannel Joined", payload) } -/// .receive("error") { payload in print("Failed ot join", payload) } -/// .receive("timeout") { payload in print("Networking issue...", payload) } -/// -@available( - *, - deprecated, - message: "Use new RealtimeChannelV2 class instead. See migration guide: https://github.com/supabase-community/supabase-swift/blob/main/docs/migrations/RealtimeV2%20Migration%20Guide.md" -) -public class RealtimeChannel { - /// The topic of the RealtimeChannel. e.g. "rooms:friends" - public let topic: String - - /// The params sent when joining the channel - public var params: Payload { - didSet { joinPush.payload = params } - } - - public private(set) lazy var presence = Presence(channel: self) - - /// The Socket that the channel belongs to - weak var socket: RealtimeClient? - - var subTopic: String - - /// Current state of the RealtimeChannel - var state: ChannelState - - /// Collection of event bindings - let bindings: LockIsolated<[String: [Binding]]> - - /// Timeout when attempting to join a RealtimeChannel - var timeout: TimeInterval - - /// Set to true once the channel calls .join() - var joinedOnce: Bool - - /// Push to send when the channel calls .join() - var joinPush: Push! - - /// Buffer of Pushes that will be sent once the RealtimeChannel's socket connects - var pushBuffer: [Push] - - /// Timer to attempt to rejoin - var rejoinTimer: TimeoutTimer - - /// Refs of stateChange hooks - var stateChangeRefs: [String] - - /// Initialize a RealtimeChannel - /// - /// - parameter topic: Topic of the RealtimeChannel - /// - parameter params: Optional. Parameters to send when joining. - /// - parameter socket: Socket that the channel is a part of - init(topic: String, params: [String: Any] = [:], socket: RealtimeClient) { - state = ChannelState.closed - self.topic = topic - subTopic = topic.replacingOccurrences(of: "realtime:", with: "") - self.params = params - self.socket = socket - bindings = LockIsolated([:]) - timeout = socket.timeout - joinedOnce = false - pushBuffer = [] - stateChangeRefs = [] - rejoinTimer = TimeoutTimer() - - // Setup Timer delgation - rejoinTimer.callback - .delegate(to: self) { (self) in - if self.socket?.isConnected == true { self.rejoin() } - } - - rejoinTimer.timerCalculation - .delegate(to: self) { (self, tries) -> TimeInterval in - self.socket?.rejoinAfter(tries) ?? 5.0 - } - - // Respond to socket events - let onErrorRef = self.socket?.delegateOnError( - to: self, - callback: { (self, _) in - self.rejoinTimer.reset() - } - ) - if let ref = onErrorRef { stateChangeRefs.append(ref) } - - let onOpenRef = self.socket?.delegateOnOpen( - to: self, - callback: { (self) in - self.rejoinTimer.reset() - if self.isErrored { self.rejoin() } - } - ) - if let ref = onOpenRef { stateChangeRefs.append(ref) } - - // Setup Push Event to be sent when joining - joinPush = Push( - channel: self, - event: ChannelEvent.join, - payload: self.params, - timeout: timeout - ) - - /// Handle when a response is received after join() - joinPush.delegateReceive(.ok, to: self) { (self, _) in - // Mark the RealtimeChannel as joined - self.state = ChannelState.joined - - // Reset the timer, preventing it from attempting to join again - self.rejoinTimer.reset() - - // Send and buffered messages and clear the buffer - self.pushBuffer.forEach { $0.send() } - self.pushBuffer = [] - } - - // Perform if RealtimeChannel errors while attempting to joi - joinPush.delegateReceive(.error, to: self) { (self, _) in - self.state = .errored - if self.socket?.isConnected == true { self.rejoinTimer.scheduleTimeout() } - } - - // Handle when the join push times out when sending after join() - joinPush.delegateReceive(.timeout, to: self) { (self, _) in - // log that the channel timed out - self.socket?.logItems( - "channel", "timeout \(self.topic) \(self.joinRef ?? "") after \(self.timeout)s" - ) - - // Send a Push to the server to leave the channel - let leavePush = Push( - channel: self, - event: ChannelEvent.leave, - timeout: self.timeout - ) - leavePush.send() - - // Mark the RealtimeChannel as in an error and attempt to rejoin if socket is connected - self.state = ChannelState.errored - self.joinPush.reset() - - if self.socket?.isConnected == true { self.rejoinTimer.scheduleTimeout() } - } - - /// Perfom when the RealtimeChannel has been closed - delegateOnClose(to: self) { (self, _) in - // Reset any timer that may be on-going - self.rejoinTimer.reset() - - // Log that the channel was left - self.socket?.logItems( - "channel", "close topic: \(self.topic) joinRef: \(self.joinRef ?? "nil")" - ) - - // Mark the channel as closed and remove it from the socket - self.state = ChannelState.closed - self.socket?.remove(self) - } - - /// Perfom when the RealtimeChannel errors - delegateOnError(to: self) { (self, message) in - // Log that the channel received an error - self.socket?.logItems( - "channel", "error topic: \(self.topic) joinRef: \(self.joinRef ?? "nil") mesage: \(message)" - ) - - // If error was received while joining, then reset the Push - if self.isJoining { - // Make sure that the "phx_join" isn't buffered to send once the socket - // reconnects. The channel will send a new join event when the socket connects. - if let safeJoinRef = self.joinRef { - self.socket?.removeFromSendBuffer(ref: safeJoinRef) - } - - // Reset the push to be used again later - self.joinPush.reset() - } - - // Mark the channel as errored and attempt to rejoin if socket is currently connected - self.state = ChannelState.errored - if self.socket?.isConnected == true { self.rejoinTimer.scheduleTimeout() } - } - - // Perform when the join reply is received - delegateOn(ChannelEvent.reply, filter: ChannelFilter(), to: self) { (self, message) in - // Trigger bindings - self.trigger( - event: self.replyEventName(message.ref), - payload: message.rawPayload, - ref: message.ref, - joinRef: message.joinRef - ) - } - } - - deinit { - rejoinTimer.reset() - } - - /// Overridable message hook. Receives all events for specialized message - /// handling before dispatching to the channel callbacks. - /// - /// - parameter msg: The Message received by the client from the server - /// - return: Must return the message, modified or unmodified - public var onMessage: (_ message: RealtimeMessage) -> RealtimeMessage = { message in - message - } - - /// Joins the channel - /// - /// - parameter timeout: Optional. Defaults to RealtimeChannel's timeout - /// - return: Push event - @discardableResult - public func subscribe( - timeout: TimeInterval? = nil, - callback: ((RealtimeSubscribeStates, (any Error)?) -> Void)? = nil - ) -> RealtimeChannel { - if socket?.isConnected == false { - socket?.connect() - } - - guard !joinedOnce else { - fatalError( - "tried to join multiple times. 'join' " - + "can only be called a single time per channel instance" - ) - } - - onError { message in - let values = message.payload.values.map { "\($0) " } - let error = RealtimeError(values.isEmpty ? "error" : values.joined(separator: ", ")) - callback?(.channelError, error) - } - - onClose { _ in - callback?(.closed, nil) - } - - // Join the RealtimeChannel - if let safeTimeout = timeout { - self.timeout = safeTimeout - } - - let broadcast = params["config", as: [String: Any].self]?["broadcast"] - let presence = params["config", as: [String: Any].self]?["presence"] - - var accessTokenPayload: Payload = [:] - var config: Payload = [ - "postgres_changes": bindings.value["postgres_changes"]?.map(\.filter) ?? [], - ] - - config["broadcast"] = broadcast - config["presence"] = presence - - if let accessToken = socket?.accessToken { - accessTokenPayload["access_token"] = accessToken - } - - params["config"] = config - - joinedOnce = true - rejoin() - - joinPush - .delegateReceive(.ok, to: self) { (self, message) in - if self.socket?.accessToken != nil { - self.socket?.setAuth(self.socket?.accessToken) - } - - guard let serverPostgresFilters = message.payload["postgres_changes"] as? [[String: Any]] - else { - callback?(.subscribed, nil) - return - } - - let clientPostgresBindings = self.bindings.value["postgres_changes"] ?? [] - let bindingsCount = clientPostgresBindings.count - var newPostgresBindings: [Binding] = [] - - for i in 0 ..< bindingsCount { - let clientPostgresBinding = clientPostgresBindings[i] - - let event = clientPostgresBinding.filter["event"] - let schema = clientPostgresBinding.filter["schema"] - let table = clientPostgresBinding.filter["table"] - let filter = clientPostgresBinding.filter["filter"] - - let serverPostgresFilter = serverPostgresFilters[i] - - if serverPostgresFilter["event", as: String.self] == event, - serverPostgresFilter["schema", as: String.self] == schema, - serverPostgresFilter["table", as: String.self] == table, - serverPostgresFilter["filter", as: String.self] == filter - { - newPostgresBindings.append( - Binding( - type: clientPostgresBinding.type, - filter: clientPostgresBinding.filter, - callback: clientPostgresBinding.callback, - id: serverPostgresFilter["id", as: Int.self].flatMap(String.init) - ) - ) - } else { - self.unsubscribe() - callback?( - .channelError, - RealtimeError("Mismatch between client and server bindings for postgres changes.") - ) - return - } - } - - self.bindings.withValue { [newPostgresBindings] in - $0["postgres_changes"] = newPostgresBindings - } - callback?(.subscribed, nil) - } - .delegateReceive(.error, to: self) { _, message in - let values = message.payload.values.map { "\($0) " } - let error = RealtimeError(values.isEmpty ? "error" : values.joined(separator: ", ")) - callback?(.channelError, error) - } - .delegateReceive(.timeout, to: self) { _, _ in - callback?(.timedOut, nil) - } - - return self - } - - public func presenceState() -> Presence.State { - presence.state - } - - public func track(_ payload: Payload, opts: Payload = [:]) async -> ChannelResponse { - await send( - type: .presence, - payload: [ - "event": "track", - "payload": payload, - ], - opts: opts - ) - } - - public func untrack(opts: Payload = [:]) async -> ChannelResponse { - await send( - type: .presence, - payload: ["event": "untrack"], - opts: opts - ) - } - - /// Hook into when the RealtimeChannel is closed. Does not handle retain cycles. - /// Use `delegateOnClose(to:)` for automatic handling of retain cycles. - /// - /// Example: - /// - /// let channel = socket.channel("topic") - /// channel.onClose() { [weak self] message in - /// self?.print("RealtimeChannel \(message.topic) has closed" - /// } - /// - /// - parameter handler: Called when the RealtimeChannel closes - /// - return: Ref counter of the subscription. See `func off()` - @discardableResult - public func onClose(_ handler: @escaping ((RealtimeMessage) -> Void)) -> RealtimeChannel { - on(ChannelEvent.close, filter: ChannelFilter(), handler: handler) - } - - /// Hook into when the RealtimeChannel is closed. Automatically handles retain - /// cycles. Use `onClose()` to handle yourself. - /// - /// Example: - /// - /// let channel = socket.channel("topic") - /// channel.delegateOnClose(to: self) { (self, message) in - /// self.print("RealtimeChannel \(message.topic) has closed" - /// } - /// - /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called when the RealtimeChannel closes - /// - return: Ref counter of the subscription. See `func off()` - @discardableResult - public func delegateOnClose( - to owner: Target, - callback: @escaping ((Target, RealtimeMessage) -> Void) - ) -> RealtimeChannel { - delegateOn( - ChannelEvent.close, filter: ChannelFilter(), to: owner, callback: callback - ) - } - - /// Hook into when the RealtimeChannel receives an Error. Does not handle retain - /// cycles. Use `delegateOnError(to:)` for automatic handling of retain - /// cycles. - /// - /// Example: - /// - /// let channel = socket.channel("topic") - /// channel.onError() { [weak self] (message) in - /// self?.print("RealtimeChannel \(message.topic) has errored" - /// } - /// - /// - parameter handler: Called when the RealtimeChannel closes - /// - return: Ref counter of the subscription. See `func off()` - @discardableResult - public func onError(_ handler: @escaping ((_ message: RealtimeMessage) -> Void)) - -> RealtimeChannel - { - on(ChannelEvent.error, filter: ChannelFilter(), handler: handler) - } - - /// Hook into when the RealtimeChannel receives an Error. Automatically handles - /// retain cycles. Use `onError()` to handle yourself. - /// - /// Example: - /// - /// let channel = socket.channel("topic") - /// channel.delegateOnError(to: self) { (self, message) in - /// self.print("RealtimeChannel \(message.topic) has closed" - /// } - /// - /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called when the RealtimeChannel closes - /// - return: Ref counter of the subscription. See `func off()` - @discardableResult - public func delegateOnError( - to owner: Target, - callback: @escaping ((Target, RealtimeMessage) -> Void) - ) -> RealtimeChannel { - delegateOn( - ChannelEvent.error, filter: ChannelFilter(), to: owner, callback: callback - ) - } - - /// Subscribes on channel events. Does not handle retain cycles. Use - /// `delegateOn(_:, to:)` for automatic handling of retain cycles. - /// - /// Subscription returns a ref counter, which can be used later to - /// unsubscribe the exact event listener - /// - /// Example: - /// - /// let channel = socket.channel("topic") - /// let ref1 = channel.on("event") { [weak self] (message) in - /// self?.print("do stuff") - /// } - /// let ref2 = channel.on("event") { [weak self] (message) in - /// self?.print("do other stuff") - /// } - /// channel.off("event", ref1) - /// - /// Since unsubscription of ref1, "do stuff" won't print, but "do other - /// stuff" will keep on printing on the "event" - /// - /// - parameter event: Event to receive - /// - parameter handler: Called with the event's message - /// - return: Ref counter of the subscription. See `func off()` - @discardableResult - public func on( - _ event: String, - filter: ChannelFilter, - handler: @escaping ((RealtimeMessage) -> Void) - ) -> RealtimeChannel { - var delegated = Delegated() - delegated.manuallyDelegate(with: handler) - - return on(event, filter: filter, delegated: delegated) - } - - /// Subscribes on channel events. Automatically handles retain cycles. Use - /// `on()` to handle yourself. - /// - /// Subscription returns a ref counter, which can be used later to - /// unsubscribe the exact event listener - /// - /// Example: - /// - /// let channel = socket.channel("topic") - /// let ref1 = channel.delegateOn("event", to: self) { (self, message) in - /// self?.print("do stuff") - /// } - /// let ref2 = channel.delegateOn("event", to: self) { (self, message) in - /// self?.print("do other stuff") - /// } - /// channel.off("event", ref1) - /// - /// Since unsubscription of ref1, "do stuff" won't print, but "do other - /// stuff" will keep on printing on the "event" - /// - /// - parameter event: Event to receive - /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called with the event's message - /// - return: Ref counter of the subscription. See `func off()` - @discardableResult - public func delegateOn( - _ event: String, - filter: ChannelFilter, - to owner: Target, - callback: @escaping ((Target, RealtimeMessage) -> Void) - ) -> RealtimeChannel { - var delegated = Delegated() - delegated.delegate(to: owner, with: callback) - - return on(event, filter: filter, delegated: delegated) - } - - /// Shared method between `on` and `manualOn` - @discardableResult - private func on( - _ type: String, filter: ChannelFilter, delegated: Delegated - ) -> RealtimeChannel { - bindings.withValue { - $0[type.lowercased(), default: []].append( - Binding(type: type.lowercased(), filter: filter.asDictionary, callback: delegated, id: nil) - ) - } - - return self - } - - /// Unsubscribes from a channel event. If a `ref` is given, only the exact - /// listener will be removed. Else all listeners for the `event` will be - /// removed. - /// - /// Example: - /// - /// let channel = socket.channel("topic") - /// let ref1 = channel.on("event") { _ in print("ref1 event" } - /// let ref2 = channel.on("event") { _ in print("ref2 event" } - /// let ref3 = channel.on("other_event") { _ in print("ref3 other" } - /// let ref4 = channel.on("other_event") { _ in print("ref4 other" } - /// channel.off("event", ref1) - /// channel.off("other_event") - /// - /// After this, only "ref2 event" will be printed if the channel receives - /// "event" and nothing is printed if the channel receives "other_event". - /// - /// - parameter event: Event to unsubscribe from - /// - parameter ref: Ref counter returned when subscribing. Can be omitted - public func off(_ type: String, filter: [String: String] = [:]) { - bindings.withValue { - $0[type.lowercased()] = $0[type.lowercased(), default: []].filter { bind in - !(bind.type.lowercased() == type.lowercased() && bind.filter == filter) - } - } - } - - /// Push a payload to the RealtimeChannel - /// - /// Example: - /// - /// channel - /// .push("event", payload: ["message": "hello") - /// .receive("ok") { _ in { print("message sent") } - /// - /// - parameter event: Event to push - /// - parameter payload: Payload to push - /// - parameter timeout: Optional timeout - @discardableResult - public func push( - _ event: String, - payload: Payload, - timeout: TimeInterval = Defaults.timeoutInterval - ) -> Push { - guard joinedOnce else { - fatalError( - "Tried to push \(event) to \(topic) before joining. Use channel.join() before pushing events" - ) - } - - let pushEvent = Push( - channel: self, - event: event, - payload: payload, - timeout: timeout - ) - if canPush { - pushEvent.send() - } else { - pushEvent.startTimeout() - pushBuffer.append(pushEvent) - } - - return pushEvent - } - - public func send( - type: RealtimeListenTypes, - event: String? = nil, - payload: Payload, - opts: Payload = [:] - ) async -> ChannelResponse { - var payload = payload - payload["type"] = type.rawValue - if let event { - payload["event"] = event - } - - if !canPush, type == .broadcast { - var headers = socket?.headers ?? [:] - headers["Content-Type"] = "application/json" - headers["apikey"] = socket?.accessToken - - let body = [ - "messages": [ - "topic": subTopic, - "payload": payload, - "event": event as Any, - ], - ] - - do { - let request = try HTTPRequest( - url: broadcastEndpointURL, - method: .post, - headers: HTTPFields(headers.compactMapValues { $0 }), - body: JSONSerialization.data(withJSONObject: body) - ) - - let response = try await socket?.http.send(request) - guard let response, 200 ..< 300 ~= response.statusCode else { - return .error - } - return .ok - } catch { - return .error - } - } else { - return await withCheckedContinuation { continuation in - let push = self.push( - type.rawValue, payload: payload, - timeout: (opts["timeout"] as? TimeInterval) ?? self.timeout - ) - - if let type = payload["type"] as? String, type == "broadcast", - let config = self.params["config"] as? [String: Any], - let broadcast = config["broadcast"] as? [String: Any] - { - let ack = broadcast["ack"] as? Bool - if ack == nil || ack == false { - continuation.resume(returning: .ok) - return - } - } - - push - .receive(.ok) { _ in - continuation.resume(returning: .ok) - } - .receive(.timeout) { _ in - continuation.resume(returning: .timedOut) - } - } - } - } - - /// Leaves the channel - /// - /// Unsubscribes from server events, and instructs channel to terminate on - /// server - /// - /// Triggers onClose() hooks - /// - /// To receive leave acknowledgements, use the a `receive` - /// hook to bind to the server ack, ie: - /// - /// Example: - //// - /// channel.leave().receive("ok") { _ in { print("left") } - /// - /// - parameter timeout: Optional timeout - /// - return: Push that can add receive hooks - @discardableResult - public func unsubscribe(timeout: TimeInterval = Defaults.timeoutInterval) -> Push { - // If attempting a rejoin during a leave, then reset, cancelling the rejoin - rejoinTimer.reset() - - // Now set the state to leaving - state = .leaving - - /// Delegated callback for a successful or a failed channel leave - var onCloseDelegate = Delegated() - onCloseDelegate.delegate(to: self) { (self, _) in - self.socket?.logItems("channel", "leave \(self.topic)") - - // Triggers onClose() hooks - self.trigger(event: ChannelEvent.close, payload: ["reason": "leave"]) - } - - // Push event to send to the server - let leavePush = Push( - channel: self, - event: ChannelEvent.leave, - timeout: timeout - ) - - // Perform the same behavior if successfully left the channel - // or if sending the event timed out - leavePush - .receive(.ok, delegated: onCloseDelegate) - .receive(.timeout, delegated: onCloseDelegate) - leavePush.send() - - // If the RealtimeChannel cannot send push events, trigger a success locally - if !canPush { - leavePush.trigger(.ok, payload: [:]) - } - - // Return the push so it can be bound to - return leavePush - } - - /// Overridable message hook. Receives all events for specialized message - /// handling before dispatching to the channel callbacks. - /// - /// - parameter event: The event the message was for - /// - parameter payload: The payload for the message - /// - parameter ref: The reference of the message - /// - return: Must return the payload, modified or unmodified - public func onMessage(callback: @escaping (RealtimeMessage) -> RealtimeMessage) { - onMessage = callback - } - - // ---------------------------------------------------------------------- - - // MARK: - Internal - - // ---------------------------------------------------------------------- - /// Checks if an event received by the Socket belongs to this RealtimeChannel - func isMember(_ message: RealtimeMessage) -> Bool { - // Return false if the message's topic does not match the RealtimeChannel's topic - guard message.topic == topic else { return false } - - guard - let safeJoinRef = message.joinRef, - safeJoinRef != joinRef, - ChannelEvent.isLifecyleEvent(message.event) - else { return true } - - socket?.logItems( - "channel", "dropping outdated message", message.topic, message.event, message.rawPayload, - safeJoinRef - ) - return false - } - - /// Sends the payload to join the RealtimeChannel - func sendJoin(_ timeout: TimeInterval) { - state = ChannelState.joining - joinPush.resend(timeout) - } - - /// Rejoins the channel - func rejoin(_ timeout: TimeInterval? = nil) { - // Do not attempt to rejoin if the channel is in the process of leaving - guard !isLeaving else { return } - - // Leave potentially duplicate channels - socket?.leaveOpenTopic(topic: topic) - - // Send the joinPush - sendJoin(timeout ?? self.timeout) - } - - /// Triggers an event to the correct event bindings created by - /// `channel.on("event")`. - /// - /// - parameter message: Message to pass to the event bindings - func trigger(_ message: RealtimeMessage) { - let typeLower = message.event.lowercased() - - let events = Set([ - ChannelEvent.close, - ChannelEvent.error, - ChannelEvent.leave, - ChannelEvent.join, - ]) - - if message.ref != message.joinRef, events.contains(typeLower) { - return - } - - let handledMessage = message - - let bindings: [Binding] = if ["insert", "update", "delete"].contains(typeLower) { - self.bindings.value["postgres_changes", default: []].filter { bind in - bind.filter["event"] == "*" || bind.filter["event"] == typeLower - } - } else { - self.bindings.value[typeLower, default: []].filter { bind in - if ["broadcast", "presence", "postgres_changes"].contains(typeLower) { - let bindEvent = bind.filter["event"]?.lowercased() - - if let bindId = bind.id.flatMap(Int.init) { - let ids = message.payload["ids", as: [Int].self] ?? [] - return ids.contains(bindId) - && ( - bindEvent == "*" - || bindEvent - == message.payload["data", as: [String: Any].self]?["type", as: String.self]? - .lowercased() - ) - } - - return bindEvent == "*" - || bindEvent == message.payload["event", as: String.self]?.lowercased() - } - - return bind.type.lowercased() == typeLower - } - } - - bindings.forEach { $0.callback.call(handledMessage) } - } - - /// Triggers an event to the correct event bindings created by - //// `channel.on("event")`. - /// - /// - parameter event: Event to trigger - /// - parameter payload: Payload of the event - /// - parameter ref: Ref of the event. Defaults to empty - /// - parameter joinRef: Ref of the join event. Defaults to nil - func trigger( - event: String, - payload: Payload = [:], - ref: String = "", - joinRef: String? = nil - ) { - let message = RealtimeMessage( - ref: ref, - topic: topic, - event: event, - payload: payload, - joinRef: joinRef ?? self.joinRef - ) - trigger(message) - } - - /// - parameter ref: The ref of the event push - /// - return: The event name of the reply - func replyEventName(_ ref: String) -> String { - "chan_reply_\(ref)" - } - - /// The Ref send during the join message. - var joinRef: String? { - joinPush.ref - } - - /// - return: True if the RealtimeChannel can push messages, meaning the socket - /// is connected and the channel is joined - var canPush: Bool { - socket?.isConnected == true && isJoined - } - - var broadcastEndpointURL: URL { - var url = socket?.endPoint ?? "" - url = url.replacingOccurrences(of: "^ws", with: "http", options: .regularExpression, range: nil) - url = url.replacingOccurrences( - of: "(/socket/websocket|/socket|/websocket)/?$", with: "", options: .regularExpression, - range: nil - ) - url = - "\(url.replacingOccurrences(of: "/+$", with: "", options: .regularExpression, range: nil))/api/broadcast" - return URL(string: url)! - } -} - -// ---------------------------------------------------------------------- - -// MARK: - Public API - -// ---------------------------------------------------------------------- -extension RealtimeChannel { - /// - return: True if the RealtimeChannel has been closed - public var isClosed: Bool { - state == .closed - } - - /// - return: True if the RealtimeChannel experienced an error - public var isErrored: Bool { - state == .errored - } - - /// - return: True if the channel has joined - public var isJoined: Bool { - state == .joined - } - - /// - return: True if the channel has requested to join - public var isJoining: Bool { - state == .joining - } - - /// - return: True if the channel has requested to leave - public var isLeaving: Bool { - state == .leaving - } -} - -extension [String: Any] { - subscript(_ key: Key, as _: T.Type) -> T? { - self[key] as? T - } -} diff --git a/Sources/Realtime/Deprecated/RealtimeClient.swift b/Sources/Realtime/Deprecated/RealtimeClient.swift deleted file mode 100644 index d1eabe92f..000000000 --- a/Sources/Realtime/Deprecated/RealtimeClient.swift +++ /dev/null @@ -1,1071 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -import ConcurrencyExtras -import Foundation - -#if canImport(FoundationNetworking) - import FoundationNetworking -#endif - -public enum SocketError: Error { - case abnormalClosureError -} - -/// Alias for a JSON dictionary [String: Any] -public typealias Payload = [String: Any] - -/// Alias for a function returning an optional JSON dictionary (`Payload?`) -public typealias PayloadClosure = () -> Payload? - -/// Struct that gathers callbacks assigned to the Socket -struct StateChangeCallbacks { - var open: LockIsolated<[(ref: String, callback: Delegated)]> = .init([]) - var close: LockIsolated<[(ref: String, callback: Delegated<(Int, String?), Void>)]> = .init([]) - var error: LockIsolated<[(ref: String, callback: Delegated<(any Error, URLResponse?), Void>)]> = - .init([]) - var message: LockIsolated<[(ref: String, callback: Delegated)]> = .init([]) -} - -/// ## Socket Connection -/// A single connection is established to the server and -/// channels are multiplexed over the connection. -/// Connect to the server using the `RealtimeClient` class: -/// -/// ```swift -/// let socket = new RealtimeClient("/socket", paramsClosure: { ["userToken": "123" ] }) -/// socket.connect() -/// ``` -/// -/// The `RealtimeClient` constructor takes the mount point of the socket, -/// the authentication params, as well as options that can be found in -/// the Socket docs, such as configuring the heartbeat. -@available( - *, - deprecated, - message: "Use new RealtimeClientV2 class instead. See migration guide: https://github.com/supabase-community/supabase-swift/blob/main/docs/migrations/RealtimeV2%20Migration%20Guide.md" -) -public class RealtimeClient: PhoenixTransportDelegate { - // ---------------------------------------------------------------------- - - // MARK: - Public Attributes - - // ---------------------------------------------------------------------- - /// The string WebSocket endpoint (ie `"ws://example.com/socket"`, - /// `"wss://example.com"`, etc.) That was passed to the Socket during - /// initialization. The URL endpoint will be modified by the Socket to - /// include `"/websocket"` if missing. - public let endPoint: String - - /// The fully qualified socket URL - public private(set) var endPointUrl: URL - - /// Resolves to return the `paramsClosure` result at the time of calling. - /// If the `Socket` was created with static params, then those will be - /// returned every time. - public var params: Payload? { - paramsClosure?() - } - - /// The optional params closure used to get params when connecting. Must - /// be set when initializing the Socket. - public let paramsClosure: PayloadClosure? - - /// The WebSocket transport. Default behavior is to provide a - /// URLSessionWebsocketTask. See README for alternatives. - private let transport: (URL) -> any PhoenixTransport - - /// Phoenix serializer version, defaults to "2.0.0" - public let vsn: String - - /// Override to provide custom encoding of data before writing to the socket - public var encode: (Any) -> Data = Defaults.encode - - /// Override to provide custom decoding of data read from the socket - public var decode: (Data) -> Any? = Defaults.decode - - /// Timeout to use when opening connections - public var timeout: TimeInterval = Defaults.timeoutInterval - - /// Custom headers to be added to the socket connection request - public var headers: [String: String] = [:] - - /// Interval between sending a heartbeat - public var heartbeatInterval: TimeInterval = Defaults.heartbeatInterval - - /// The maximum amount of time which the system may delay heartbeats in order to optimize power - /// usage - public var heartbeatLeeway: DispatchTimeInterval = Defaults.heartbeatLeeway - - /// Interval between socket reconnect attempts, in seconds - public var reconnectAfter: (Int) -> TimeInterval = Defaults.reconnectSteppedBackOff - - /// Interval between channel rejoin attempts, in seconds - public var rejoinAfter: (Int) -> TimeInterval = Defaults.rejoinSteppedBackOff - - /// The optional function to receive logs - public var logger: ((String) -> Void)? - - /// Disables heartbeats from being sent. Default is false. - public var skipHeartbeat: Bool = false - - /// Enable/Disable SSL certificate validation. Default is false. This - /// must be set before calling `socket.connect()` in order to be applied - public var disableSSLCertValidation: Bool = false - - #if os(Linux) || os(Windows) || os(Android) - #else - /// Configure custom SSL validation logic, eg. SSL pinning. This - /// must be set before calling `socket.connect()` in order to apply. - // public var security: SSLTrustValidator? - - /// Configure the encryption used by your client by setting the - /// allowed cipher suites supported by your server. This must be - /// set before calling `socket.connect()` in order to apply. - public var enabledSSLCipherSuites: [SSLCipherSuite]? - #endif - - // ---------------------------------------------------------------------- - - // MARK: - Private Attributes - - // ---------------------------------------------------------------------- - /// Callbacks for socket state changes - var stateChangeCallbacks: StateChangeCallbacks = .init() - - /// Collection on channels created for the Socket - public internal(set) var channels: [RealtimeChannel] = [] - - /// Buffers messages that need to be sent once the socket has connected. It is an array - /// of tuples, with the ref of the message to send and the callback that will send the message. - var sendBuffer: [(ref: String?, callback: () throws -> Void)] = [] - - /// Ref counter for messages - var ref: UInt64 = .min // 0 (max: 18,446,744,073,709,551,615) - - /// Timer that triggers sending new Heartbeat messages - var heartbeatTimer: HeartbeatTimer? - - /// Ref counter for the last heartbeat that was sent - var pendingHeartbeatRef: String? - - /// Timer to use when attempting to reconnect - var reconnectTimer: TimeoutTimer - - /// Close status - var closeStatus: CloseStatus = .unknown - - /// The connection to the server - var connection: (any PhoenixTransport)? = nil - - /// The HTTPClient to perform HTTP requests. - let http: any HTTPClientType - - var accessToken: String? - - // ---------------------------------------------------------------------- - - // MARK: - Initialization - - // ---------------------------------------------------------------------- - @available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, *) - public convenience init( - _ endPoint: String, - headers: [String: String] = [:], - params: Payload? = nil, - vsn: String = Defaults.vsn - ) { - self.init( - endPoint: endPoint, - headers: headers, - transport: { url in URLSessionTransport(url: url) }, - paramsClosure: { params }, - vsn: vsn - ) - } - - @available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, *) - public convenience init( - _ endPoint: String, - headers: [String: String] = [:], - paramsClosure: PayloadClosure?, - vsn: String = Defaults.vsn - ) { - self.init( - endPoint: endPoint, - headers: headers, - transport: { url in URLSessionTransport(url: url) }, - paramsClosure: paramsClosure, - vsn: vsn - ) - } - - public init( - endPoint: String, - headers: [String: String] = [:], - transport: @escaping ((URL) -> any PhoenixTransport), - paramsClosure: PayloadClosure? = nil, - vsn: String = Defaults.vsn - ) { - self.transport = transport - self.paramsClosure = paramsClosure - self.endPoint = endPoint - self.vsn = vsn - - var headers = headers - if headers["X-Client-Info"] == nil { - headers["X-Client-Info"] = "realtime-swift/\(version)" - } - self.headers = headers - http = HTTPClient(fetch: { try await URLSession.shared.data(for: $0) }, interceptors: []) - - let params = paramsClosure?() - if let jwt = (params?["Authorization"] as? String)?.split(separator: " ").last { - accessToken = String(jwt) - } else { - accessToken = params?["apikey"] as? String - } - endPointUrl = RealtimeClient.buildEndpointUrl( - endpoint: endPoint, - paramsClosure: paramsClosure, - vsn: vsn - ) - - reconnectTimer = TimeoutTimer() - reconnectTimer.callback.delegate(to: self) { (self) in - self.logItems("Socket attempting to reconnect") - self.teardown(reason: "reconnection") { self.connect() } - } - reconnectTimer.timerCalculation - .delegate(to: self) { (self, tries) -> TimeInterval in - let interval = self.reconnectAfter(tries) - self.logItems("Socket reconnecting in \(interval)s") - return interval - } - } - - deinit { - reconnectTimer.reset() - } - - // ---------------------------------------------------------------------- - - // MARK: - Public - - // ---------------------------------------------------------------------- - /// - return: The socket protocol, wss or ws - public var websocketProtocol: String { - switch endPointUrl.scheme { - case "https": "wss" - case "http": "ws" - default: endPointUrl.scheme ?? "" - } - } - - /// - return: True if the socket is connected - public var isConnected: Bool { - connectionState == .open - } - - /// - return: The state of the connect. [.connecting, .open, .closing, .closed] - public var connectionState: PhoenixTransportReadyState { - connection?.readyState ?? .closed - } - - /// Sets the JWT access token used for channel subscription authorization and Realtime RLS. - /// - Parameter token: A JWT string. - public func setAuth(_ token: String?) { - accessToken = token - - for channel in channels { - if token != nil { - channel.params["user_token"] = token - } - - if channel.joinedOnce, channel.isJoined { - channel.push(ChannelEvent.accessToken, payload: ["access_token": token as Any]) - } - } - } - - /// Connects the Socket. The params passed to the Socket on initialization - /// will be sent through the connection. If the Socket is already connected, - /// then this call will be ignored. - public func connect() { - // Do not attempt to reconnect if the socket is currently connected - guard !isConnected else { return } - - // Reset the close status when attempting to connect - closeStatus = .unknown - - // We need to build this right before attempting to connect as the - // parameters could be built upon demand and change over time - endPointUrl = RealtimeClient.buildEndpointUrl( - endpoint: endPoint, - paramsClosure: paramsClosure, - vsn: vsn - ) - - connection = transport(endPointUrl) - connection?.delegate = self - // self.connection?.disableSSLCertValidation = disableSSLCertValidation - // - // #if os(Linux) - // #else - // self.connection?.security = security - // self.connection?.enabledSSLCipherSuites = enabledSSLCipherSuites - // #endif - - connection?.connect(with: headers) - } - - /// Disconnects the socket - /// - /// - parameter code: Optional. Closing status code - /// - parameter callback: Optional. Called when disconnected - public func disconnect( - code: CloseCode = CloseCode.normal, - reason: String? = nil, - callback: (() -> Void)? = nil - ) { - // The socket was closed cleanly by the User - closeStatus = CloseStatus(closeCode: code.rawValue) - - // Reset any reconnects and teardown the socket connection - reconnectTimer.reset() - teardown(code: code, reason: reason, callback: callback) - } - - func teardown( - code: CloseCode = CloseCode.normal, reason: String? = nil, callback: (() -> Void)? = nil - ) { - connection?.delegate = nil - connection?.disconnect(code: code.rawValue, reason: reason) - connection = nil - - // The socket connection has been turndown, heartbeats are not needed - heartbeatTimer?.stop() - - // Since the connection's delegate was nil'd out, inform all state - // callbacks that the connection has closed - stateChangeCallbacks.close.value.forEach { $0.callback.call((code.rawValue, reason)) } - callback?() - } - - // ---------------------------------------------------------------------- - - // MARK: - Register Socket State Callbacks - - // ---------------------------------------------------------------------- - - /// Registers callbacks for connection open events. Does not handle retain - /// cycles. Use `delegateOnOpen(to:)` for automatic handling of retain cycles. - /// - /// Example: - /// - /// socket.onOpen() { [weak self] in - /// self?.print("Socket Connection Open") - /// } - /// - /// - parameter callback: Called when the Socket is opened - @discardableResult - public func onOpen(callback: @escaping () -> Void) -> String { - onOpen { _ in callback() } - } - - /// Registers callbacks for connection open events. Does not handle retain - /// cycles. Use `delegateOnOpen(to:)` for automatic handling of retain cycles. - /// - /// Example: - /// - /// socket.onOpen() { [weak self] response in - /// self?.print("Socket Connection Open") - /// } - /// - /// - parameter callback: Called when the Socket is opened - @discardableResult - public func onOpen(callback: @escaping (URLResponse?) -> Void) -> String { - var delegated = Delegated() - delegated.manuallyDelegate(with: callback) - - return stateChangeCallbacks.open.withValue { [delegated] in - self.append(callback: delegated, to: &$0) - } - } - - /// Registers callbacks for connection open events. Automatically handles - /// retain cycles. Use `onOpen()` to handle yourself. - /// - /// Example: - /// - /// socket.delegateOnOpen(to: self) { self in - /// self.print("Socket Connection Open") - /// } - /// - /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called when the Socket is opened - @discardableResult - public func delegateOnOpen( - to owner: T, - callback: @escaping ((T) -> Void) - ) -> String { - delegateOnOpen(to: owner) { owner, _ in callback(owner) } - } - - /// Registers callbacks for connection open events. Automatically handles - /// retain cycles. Use `onOpen()` to handle yourself. - /// - /// Example: - /// - /// socket.delegateOnOpen(to: self) { self, response in - /// self.print("Socket Connection Open") - /// } - /// - /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called when the Socket is opened - @discardableResult - public func delegateOnOpen( - to owner: T, - callback: @escaping ((T, URLResponse?) -> Void) - ) -> String { - var delegated = Delegated() - delegated.delegate(to: owner, with: callback) - - return stateChangeCallbacks.open.withValue { [delegated] in - self.append(callback: delegated, to: &$0) - } - } - - /// Registers callbacks for connection close events. Does not handle retain - /// cycles. Use `delegateOnClose(_:)` for automatic handling of retain cycles. - /// - /// Example: - /// - /// socket.onClose() { [weak self] in - /// self?.print("Socket Connection Close") - /// } - /// - /// - parameter callback: Called when the Socket is closed - @discardableResult - public func onClose(callback: @escaping () -> Void) -> String { - onClose { _, _ in callback() } - } - - /// Registers callbacks for connection close events. Does not handle retain - /// cycles. Use `delegateOnClose(_:)` for automatic handling of retain cycles. - /// - /// Example: - /// - /// socket.onClose() { [weak self] code, reason in - /// self?.print("Socket Connection Close") - /// } - /// - /// - parameter callback: Called when the Socket is closed - @discardableResult - public func onClose(callback: @escaping (Int, String?) -> Void) -> String { - var delegated = Delegated<(Int, String?), Void>() - delegated.manuallyDelegate(with: callback) - - return stateChangeCallbacks.close.withValue { [delegated] in - self.append(callback: delegated, to: &$0) - } - } - - /// Registers callbacks for connection close events. Automatically handles - /// retain cycles. Use `onClose()` to handle yourself. - /// - /// Example: - /// - /// socket.delegateOnClose(self) { self in - /// self.print("Socket Connection Close") - /// } - /// - /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called when the Socket is closed - @discardableResult - public func delegateOnClose( - to owner: T, - callback: @escaping ((T) -> Void) - ) -> String { - delegateOnClose(to: owner) { owner, _ in callback(owner) } - } - - /// Registers callbacks for connection close events. Automatically handles - /// retain cycles. Use `onClose()` to handle yourself. - /// - /// Example: - /// - /// socket.delegateOnClose(self) { self, code, reason in - /// self.print("Socket Connection Close") - /// } - /// - /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called when the Socket is closed - @discardableResult - public func delegateOnClose( - to owner: T, - callback: @escaping ((T, (Int, String?)) -> Void) - ) -> String { - var delegated = Delegated<(Int, String?), Void>() - delegated.delegate(to: owner, with: callback) - - return stateChangeCallbacks.close.withValue { [delegated] in - self.append(callback: delegated, to: &$0) - } - } - - /// Registers callbacks for connection error events. Does not handle retain - /// cycles. Use `delegateOnError(to:)` for automatic handling of retain cycles. - /// - /// Example: - /// - /// socket.onError() { [weak self] (error) in - /// self?.print("Socket Connection Error", error) - /// } - /// - /// - parameter callback: Called when the Socket errors - @discardableResult - public func onError(callback: @escaping ((any Error, URLResponse?)) -> Void) -> String { - var delegated = Delegated<(any Error, URLResponse?), Void>() - delegated.manuallyDelegate(with: callback) - - return stateChangeCallbacks.error.withValue { [delegated] in - self.append(callback: delegated, to: &$0) - } - } - - /// Registers callbacks for connection error events. Automatically handles - /// retain cycles. Use `manualOnError()` to handle yourself. - /// - /// Example: - /// - /// socket.delegateOnError(to: self) { (self, error) in - /// self.print("Socket Connection Error", error) - /// } - /// - /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called when the Socket errors - @discardableResult - public func delegateOnError( - to owner: T, - callback: @escaping ((T, (any Error, URLResponse?)) -> Void) - ) -> String { - var delegated = Delegated<(any Error, URLResponse?), Void>() - delegated.delegate(to: owner, with: callback) - - return stateChangeCallbacks.error.withValue { [delegated] in - self.append(callback: delegated, to: &$0) - } - } - - /// Registers callbacks for connection message events. Does not handle - /// retain cycles. Use `delegateOnMessage(_to:)` for automatic handling of - /// retain cycles. - /// - /// Example: - /// - /// socket.onMessage() { [weak self] (message) in - /// self?.print("Socket Connection Message", message) - /// } - /// - /// - parameter callback: Called when the Socket receives a message event - @discardableResult - public func onMessage(callback: @escaping (RealtimeMessage) -> Void) -> String { - var delegated = Delegated() - delegated.manuallyDelegate(with: callback) - - return stateChangeCallbacks.message.withValue { [delegated] in - append(callback: delegated, to: &$0) - } - } - - /// Registers callbacks for connection message events. Automatically handles - /// retain cycles. Use `onMessage()` to handle yourself. - /// - /// Example: - /// - /// socket.delegateOnMessage(self) { (self, message) in - /// self.print("Socket Connection Message", message) - /// } - /// - /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called when the Socket receives a message event - @discardableResult - public func delegateOnMessage( - to owner: T, - callback: @escaping ((T, RealtimeMessage) -> Void) - ) -> String { - var delegated = Delegated() - delegated.delegate(to: owner, with: callback) - - return stateChangeCallbacks.message.withValue { [delegated] in - self.append(callback: delegated, to: &$0) - } - } - - private func append(callback: T, to array: inout [(ref: String, callback: T)]) - -> String - { - let ref = makeRef() - array.append((ref, callback)) - return ref - } - - /// Releases all stored callback hooks (onError, onOpen, onClose, etc.) You should - /// call this method when you are finished when the Socket in order to release - /// any references held by the socket. - public func releaseCallbacks() { - stateChangeCallbacks.open.setValue([]) - stateChangeCallbacks.close.setValue([]) - stateChangeCallbacks.error.setValue([]) - stateChangeCallbacks.message.setValue([]) - } - - // ---------------------------------------------------------------------- - - // MARK: - Channel Initialization - - // ---------------------------------------------------------------------- - /// Initialize a new Channel - /// - /// Example: - /// - /// let channel = socket.channel("rooms", params: ["user_id": "abc123"]) - /// - /// - parameter topic: Topic of the channel - /// - parameter params: Optional. Parameters for the channel - /// - return: A new channel - public func channel( - _ topic: String, - params: RealtimeChannelOptions = .init() - ) -> RealtimeChannel { - let channel = RealtimeChannel( - topic: "realtime:\(topic)", params: params.params, socket: self - ) - channels.append(channel) - - return channel - } - - /// Unsubscribes and removes a single channel - public func remove(_ channel: RealtimeChannel) { - channel.unsubscribe() - off(channel.stateChangeRefs) - channels.removeAll(where: { $0.joinRef == channel.joinRef }) - - if channels.isEmpty { - disconnect() - } - } - - /// Unsubscribes and removes all channels - public func removeAllChannels() { - for channel in channels { - remove(channel) - } - } - - /// Removes `onOpen`, `onClose`, `onError,` and `onMessage` registrations. - /// - /// - /// - Parameter refs: List of refs returned by calls to `onOpen`, `onClose`, etc - public func off(_ refs: [String]) { - stateChangeCallbacks.open.withValue { - $0 = $0.filter { - !refs.contains($0.ref) - } - } - stateChangeCallbacks.close.withValue { - $0 = $0.filter { - !refs.contains($0.ref) - } - } - stateChangeCallbacks.error.withValue { - $0 = $0.filter { - !refs.contains($0.ref) - } - } - stateChangeCallbacks.message.withValue { - $0 = $0.filter { - !refs.contains($0.ref) - } - } - } - - // ---------------------------------------------------------------------- - - // MARK: - Sending Data - - // ---------------------------------------------------------------------- - /// Sends data through the Socket. This method is internal. Instead, you - /// should call `push(_:, payload:, timeout:)` on the Channel you are - /// sending an event to. - /// - /// - parameter topic: - /// - parameter event: - /// - parameter payload: - /// - parameter ref: Optional. Defaults to nil - /// - parameter joinRef: Optional. Defaults to nil - func push( - topic: String, - event: String, - payload: Payload, - ref: String? = nil, - joinRef: String? = nil - ) { - let callback: (() throws -> Void) = { [weak self] in - guard let self else { return } - let body: [Any?] = [joinRef, ref, topic, event, payload] - let data = encode(body) - - logItems("push", "Sending \(String(data: data, encoding: String.Encoding.utf8) ?? "")") - connection?.send(data: data) - } - - /// If the socket is connected, then execute the callback immediately. - if isConnected { - try? callback() - } else { - /// If the socket is not connected, add the push to a buffer which will - /// be sent immediately upon connection. - sendBuffer.append((ref: ref, callback: callback)) - } - } - - /// - return: the next message ref, accounting for overflows - public func makeRef() -> String { - ref = (ref == UInt64.max) ? 0 : ref + 1 - return String(ref) - } - - /// Logs the message. Override Socket.logger for specialized logging. noops by default - /// - /// - parameter items: List of items to be logged. Behaves just like debugPrint() - func logItems(_ items: Any...) { - let msg = items.map { String(describing: $0) }.joined(separator: ", ") - logger?("SwiftPhoenixClient: \(msg)") - } - - // ---------------------------------------------------------------------- - - // MARK: - Connection Events - - // ---------------------------------------------------------------------- - /// Called when the underlying Websocket connects to it's host - func onConnectionOpen(response: URLResponse?) { - logItems("transport", "Connected to \(endPoint)") - - // Reset the close status now that the socket has been connected - closeStatus = .unknown - - // Send any messages that were waiting for a connection - flushSendBuffer() - - // Reset how the socket tried to reconnect - reconnectTimer.reset() - - // Restart the heartbeat timer - resetHeartbeat() - - // Inform all onOpen callbacks that the Socket has opened - stateChangeCallbacks.open.value.forEach { $0.callback.call(response) } - } - - func onConnectionClosed(code: Int, reason: String?) { - logItems("transport", "close") - - // Send an error to all channels - triggerChannelError() - - // Prevent the heartbeat from triggering if the - heartbeatTimer?.stop() - - // Only attempt to reconnect if the socket did not close normally, - // or if it was closed abnormally but on client side (e.g. due to heartbeat timeout) - if closeStatus.shouldReconnect { - reconnectTimer.scheduleTimeout() - } - - stateChangeCallbacks.close.value.forEach { $0.callback.call((code, reason)) } - } - - func onConnectionError(_ error: any Error, response: URLResponse?) { - logItems("transport", error, response ?? "") - - // Send an error to all channels - triggerChannelError() - - // Inform any state callbacks of the error - stateChangeCallbacks.error.value.forEach { $0.callback.call((error, response)) } - } - - func onConnectionMessage(_ rawMessage: String) { - logItems("receive ", rawMessage) - - guard - let data = rawMessage.data(using: String.Encoding.utf8), - let json = decode(data) as? [Any?], - let message = RealtimeMessage(json: json) - else { - logItems("receive: Unable to parse JSON: \(rawMessage)") - return - } - - // Clear heartbeat ref, preventing a heartbeat timeout disconnect - if message.ref == pendingHeartbeatRef { pendingHeartbeatRef = nil } - - if message.event == "phx_close" { - print("Close Event Received") - } - - // Dispatch the message to all channels that belong to the topic - channels - .filter { $0.isMember(message) } - .forEach { $0.trigger(message) } - - // Inform all onMessage callbacks of the message - stateChangeCallbacks.message.value.forEach { $0.callback.call(message) } - } - - /// Triggers an error event to all of the connected Channels - func triggerChannelError() { - for channel in channels { - // Only trigger a channel error if it is in an "opened" state - if !(channel.isErrored || channel.isLeaving || channel.isClosed) { - channel.trigger(event: ChannelEvent.error) - } - } - } - - /// Send all messages that were buffered before the socket opened - func flushSendBuffer() { - guard isConnected, sendBuffer.count > 0 else { return } - sendBuffer.forEach { try? $0.callback() } - sendBuffer = [] - } - - /// Removes an item from the sendBuffer with the matching ref - func removeFromSendBuffer(ref: String) { - sendBuffer = sendBuffer.filter { $0.ref != ref } - } - - /// Builds a fully qualified socket `URL` from `endPoint` and `params`. - static func buildEndpointUrl( - endpoint: String, paramsClosure params: PayloadClosure?, vsn: String - ) -> URL { - guard - let url = URL(string: endpoint), - var urlComponents = URLComponents(url: url, resolvingAgainstBaseURL: false) - else { fatalError("Malformed URL: \(endpoint)") } - - // Ensure that the URL ends with "/websocket - if !urlComponents.path.contains("/websocket") { - // Do not duplicate '/' in the path - if urlComponents.path.last != "/" { - urlComponents.path.append("/") - } - - // append 'websocket' to the path - urlComponents.path.append("websocket") - } - - urlComponents.queryItems = [URLQueryItem(name: "vsn", value: vsn)] - - // If there are parameters, append them to the URL - if let params = params?() { - urlComponents.queryItems?.append( - contentsOf: params.map { - URLQueryItem(name: $0.key, value: String(describing: $0.value)) - } - ) - } - - guard let qualifiedUrl = urlComponents.url - else { fatalError("Malformed URL while adding parameters") } - return qualifiedUrl - } - - // Leaves any channel that is open that has a duplicate topic - func leaveOpenTopic(topic: String) { - guard - let dupe = channels.first(where: { $0.topic == topic && ($0.isJoined || $0.isJoining) }) - else { return } - - logItems("transport", "leaving duplicate topic: [\(topic)]") - dupe.unsubscribe() - } - - // ---------------------------------------------------------------------- - - // MARK: - Heartbeat - - // ---------------------------------------------------------------------- - func resetHeartbeat() { - // Clear anything related to the heartbeat - pendingHeartbeatRef = nil - heartbeatTimer?.stop() - - // Do not start up the heartbeat timer if skipHeartbeat is true - guard !skipHeartbeat else { return } - - heartbeatTimer = HeartbeatTimer(timeInterval: heartbeatInterval, leeway: heartbeatLeeway) - heartbeatTimer?.start(eventHandler: { [weak self] in - self?.sendHeartbeat() - }) - } - - /// Sends a heartbeat payload to the phoenix servers - func sendHeartbeat() { - // Do not send if the connection is closed - guard isConnected else { return } - - // If there is a pending heartbeat ref, then the last heartbeat was - // never acknowledged by the server. Close the connection and attempt - // to reconnect. - if let _ = pendingHeartbeatRef { - pendingHeartbeatRef = nil - logItems( - "transport", - "heartbeat timeout. Attempting to re-establish connection" - ) - - // Close the socket manually, flagging the closure as abnormal. Do not use - // `teardown` or `disconnect` as they will nil out the websocket delegate. - abnormalClose("heartbeat timeout") - - return - } - - // The last heartbeat was acknowledged by the server. Send another one - pendingHeartbeatRef = makeRef() - push( - topic: "phoenix", - event: ChannelEvent.heartbeat, - payload: [:], - ref: pendingHeartbeatRef - ) - } - - func abnormalClose(_ reason: String) { - closeStatus = .abnormal - - /* - We use NORMAL here since the client is the one determining to close the - connection. However, we set to close status to abnormal so that - the client knows that it should attempt to reconnect. - - If the server subsequently acknowledges with code 1000 (normal close), - the socket will keep the `.abnormal` close status and trigger a reconnection. - */ - connection?.disconnect(code: CloseCode.normal.rawValue, reason: reason) - } - - // ---------------------------------------------------------------------- - - // MARK: - TransportDelegate - - // ---------------------------------------------------------------------- - public func onOpen(response: URLResponse?) { - onConnectionOpen(response: response) - } - - public func onError(error: any Error, response: URLResponse?) { - onConnectionError(error, response: response) - } - - public func onMessage(message: String) { - onConnectionMessage(message) - } - - public func onClose(code: Int, reason: String? = nil) { - closeStatus.update(transportCloseCode: code) - onConnectionClosed(code: code, reason: reason) - } -} - -// ---------------------------------------------------------------------- - -// MARK: - Close Codes - -// ---------------------------------------------------------------------- -extension RealtimeClient { - public enum CloseCode: Int { - case abnormal = 999 - - case normal = 1000 - - case goingAway = 1001 - } -} - -// ---------------------------------------------------------------------- - -// MARK: - Close Status - -// ---------------------------------------------------------------------- -extension RealtimeClient { - /// Indicates the different closure states a socket can be in. - enum CloseStatus { - /// Undetermined closure state - case unknown - /// A clean closure requested either by the client or the server - case clean - /// An abnormal closure requested by the client - case abnormal - - /// Temporarily close the socket, pausing reconnect attempts. Useful on mobile - /// clients when disconnecting a because the app resigned active but should - /// reconnect when app enters active state. - case temporary - - init(closeCode: Int) { - switch closeCode { - case CloseCode.abnormal.rawValue: - self = .abnormal - case CloseCode.goingAway.rawValue: - self = .temporary - default: - self = .clean - } - } - - mutating func update(transportCloseCode: Int) { - switch self { - case .unknown, .clean, .temporary: - // Allow transport layer to override these statuses. - self = .init(closeCode: transportCloseCode) - case .abnormal: - // Do not allow transport layer to override the abnormal close status. - // The socket itself should reset it on the next connection attempt. - // See `Socket.abnormalClose(_:)` for more information. - break - } - } - - var shouldReconnect: Bool { - switch self { - case .unknown, .abnormal: - true - case .clean, .temporary: - false - } - } - } -} diff --git a/Sources/Realtime/Deprecated/RealtimeMessage.swift b/Sources/Realtime/Deprecated/RealtimeMessage.swift deleted file mode 100644 index a993ae2d1..000000000 --- a/Sources/Realtime/Deprecated/RealtimeMessage.swift +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -import Foundation - -/// Data that is received from the Server. -public struct RealtimeMessage { - /// Reference number. Empty if missing - public let ref: String - - /// Join Reference number - let joinRef: String? - - /// Message topic - public let topic: String - - /// Message event - public let event: String - - /// The raw payload from the Message, including a nested response from - /// phx_reply events. It is recommended to use `payload` instead. - let rawPayload: Payload - - /// Message payload - public var payload: Payload { - guard let response = rawPayload["response"] as? Payload - else { return rawPayload } - return response - } - - /// Convenience accessor. Equivalent to getting the status as such: - /// ```swift - /// message.payload["status"] - /// ``` - public var status: PushStatus? { - (rawPayload["status"] as? String).flatMap(PushStatus.init(rawValue:)) - } - - init( - ref: String = "", - topic: String = "", - event: String = "", - payload: Payload = [:], - joinRef: String? = nil - ) { - self.ref = ref - self.topic = topic - self.event = event - rawPayload = payload - self.joinRef = joinRef - } - - init?(json: [Any?]) { - guard json.count > 4 else { return nil } - joinRef = json[0] as? String - ref = json[1] as? String ?? "" - - if let topic = json[2] as? String, - let event = json[3] as? String, - let payload = json[4] as? Payload - { - self.topic = topic - self.event = event - rawPayload = payload - } else { - return nil - } - } -} diff --git a/Sources/Realtime/Deprecated/TimeoutTimer.swift b/Sources/Realtime/Deprecated/TimeoutTimer.swift deleted file mode 100644 index b6b37c4c7..000000000 --- a/Sources/Realtime/Deprecated/TimeoutTimer.swift +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -/// Creates a timer that can perform calculated reties by setting -/// `timerCalculation` , such as exponential backoff. -/// -/// ### Example -/// -/// let reconnectTimer = TimeoutTimer() -/// -/// // Receive a callbcak when the timer is fired -/// reconnectTimer.callback.delegate(to: self) { (_) in -/// print("timer was fired") -/// } -/// -/// // Provide timer interval calculation -/// reconnectTimer.timerCalculation.delegate(to: self) { (_, tries) -> TimeInterval in -/// return tries > 2 ? 1000 : [1000, 5000, 10000][tries - 1] -/// } -/// -/// reconnectTimer.scheduleTimeout() // fires after 1000ms -/// reconnectTimer.scheduleTimeout() // fires after 5000ms -/// reconnectTimer.reset() -/// reconnectTimer.scheduleTimeout() // fires after 1000ms - -import Foundation - -// sourcery: AutoMockable -class TimeoutTimer { - /// Callback to be informed when the underlying Timer fires - var callback = Delegated() - - /// Provides TimeInterval to use when scheduling the timer - var timerCalculation = Delegated() - - /// The work to be done when the queue fires - var workItem: DispatchWorkItem? - - /// The number of times the underlyingTimer hass been set off. - var tries: Int = 0 - - /// The Queue to execute on. In testing, this is overridden - var queue: TimerQueue = .main - - /// Resets the Timer, clearing the number of tries and stops - /// any scheduled timeout. - func reset() { - tries = 0 - clearTimer() - } - - /// Schedules a timeout callback to fire after a calculated timeout duration. - func scheduleTimeout() { - // Clear any ongoing timer, not resetting the number of tries - clearTimer() - - // Get the next calculated interval, in milliseconds. Do not - // start the timer if the interval is returned as nil. - guard let timeInterval = timerCalculation.call(tries + 1) else { return } - - let workItem = DispatchWorkItem { - self.tries += 1 - self.callback.call() - } - - self.workItem = workItem - queue.queue(timeInterval: timeInterval, execute: workItem) - } - - /// Invalidates any ongoing Timer. Will not clear how many tries have been made - private func clearTimer() { - workItem?.cancel() - workItem = nil - } -} - -/// Wrapper class around a DispatchQueue. Allows for providing a fake clock -/// during tests. -class TimerQueue { - // Can be overriden in tests - static var main = TimerQueue() - - func queue(timeInterval: TimeInterval, execute: DispatchWorkItem) { - // TimeInterval is always in seconds. Multiply it by 1000 to convert - // to milliseconds and round to the nearest millisecond. - let dispatchInterval = Int(round(timeInterval * 1000)) - - let dispatchTime = DispatchTime.now() + .milliseconds(dispatchInterval) - DispatchQueue.main.asyncAfter(deadline: dispatchTime, execute: execute) - } -} diff --git a/Sources/Realtime/Types.swift b/Sources/Realtime/Types.swift index cd0a44c3a..63bffd3f4 100644 --- a/Sources/Realtime/Types.swift +++ b/Sources/Realtime/Types.swift @@ -121,3 +121,29 @@ struct BroadcastMessagePayload: Encodable { let `private`: Bool } } + +/// Represents the different events that can be sent through +/// a channel regarding a Channel's lifecycle. +enum ChannelEvent { + static let join = "phx_join" + static let leave = "phx_leave" + static let close = "phx_close" + static let error = "phx_error" + static let reply = "phx_reply" + static let system = "system" + static let broadcast = "broadcast" + static let accessToken = "access_token" + static let presence = "presence" + static let presenceDiff = "presence_diff" + static let presenceState = "presence_state" + static let postgresChanges = "postgres_changes" + + static let heartbeat = "heartbeat" + + static func isLifecyleEvent(_ event: String) -> Bool { + switch event { + case join, leave, reply, error, close: true + default: false + } + } +} diff --git a/Sources/Supabase/Deprecated.swift b/Sources/Supabase/Deprecated.swift index 5043e4119..bd58c9849 100644 --- a/Sources/Supabase/Deprecated.swift +++ b/Sources/Supabase/Deprecated.swift @@ -17,10 +17,4 @@ extension SupabaseClient { public var database: PostgrestClient { rest } - - /// Realtime client for Supabase - @available(*, deprecated, message: "Use realtimeV2") - public var realtime: RealtimeClient { - _realtime.value - } } diff --git a/Sources/Supabase/SupabaseClient.swift b/Sources/Supabase/SupabaseClient.swift index b419a94e8..62bdb6122 100644 --- a/Sources/Supabase/SupabaseClient.swift +++ b/Sources/Supabase/SupabaseClient.swift @@ -68,8 +68,6 @@ public final class SupabaseClient: Sendable { } } - let _realtime: UncheckedSendable - /// Realtime client for Supabase public var realtimeV2: RealtimeClientV2 { mutableState.withValue { @@ -184,14 +182,6 @@ public final class SupabaseClient: Sendable { autoRefreshToken: options.auth.autoRefreshToken ) - _realtime = UncheckedSendable( - RealtimeClient( - supabaseURL.appendingPathComponent("/realtime/v1").absoluteString, - headers: _headers.dictionary, - params: _headers.dictionary - ) - ) - if options.auth.accessToken == nil { listenForAuthEvents() } @@ -244,7 +234,7 @@ public final class SupabaseClient: Sendable { /// Returns all Realtime channels. public var channels: [RealtimeChannelV2] { - Array(realtimeV2.subscriptions.values) + Array(realtimeV2.channels.values) } /// Creates a Realtime channel with Broadcast, Presence, and Postgres Changes. @@ -387,7 +377,6 @@ public final class SupabaseClient: Sendable { return nil } - realtime.setAuth(accessToken) await realtimeV2.setAuth(accessToken) }