Skip to content

Commit

Permalink
feat: expose Realtime options on SupabaseClient (#377)
Browse files Browse the repository at this point in the history
* Update RealtimeClientV2.swift

Provide a means to configure RealtimeV2 options

* Update SupabaseClient.swift

* Update RealtimeClientV2.swift

* feat(realtime): add RealtimeClientOptions and expose it to SupbaseClient

* Add deprecated init to avoid breaking changes

* use renamed for deprecation message

* merge headers in-place

* fix realtime integration tests

* test logger instance

---------

Co-authored-by: Guilherme Souza <grsouza@pm.me>
  • Loading branch information
bryandubno and grdsdev committed May 13, 2024
1 parent 866a039 commit 9cfafdb
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 80 deletions.
9 changes: 8 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,18 @@ let package = Package(
"Functions",
]
),
.testTarget(name: "SupabaseTests", dependencies: ["Supabase"]),
.testTarget(
name: "SupabaseTests",
dependencies: [
"Supabase",
.product(name: "CustomDump", package: "swift-custom-dump"),
]
),
.target(
name: "TestHelpers",
dependencies: [
.product(name: "ConcurrencyExtras", package: "swift-concurrency-extras"),
.product(name: "XCTestDynamicOverlay", package: "xctest-dynamic-overlay"),
"Auth",
]
),
Expand Down
2 changes: 1 addition & 1 deletion Sources/Realtime/V2/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public actor RealtimeChannelV2 {
/// Subscribes to the channel
public func subscribe() async {
if await socket?.status != .connected {
if socket?.config.connectOnSubscribe != true {
if socket?.options.connectOnSubscribe != true {
fatalError(
"You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?"
)
Expand Down
111 changes: 73 additions & 38 deletions Sources/Realtime/V2/RealtimeClientV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import Foundation
public typealias JSONObject = _Helpers.JSONObject

public actor RealtimeClientV2 {
@available(*, deprecated, renamed: "RealtimeClientOptions")
public struct Configuration: Sendable {
var url: URL
var apiKey: String
Expand Down Expand Up @@ -64,10 +65,12 @@ public actor RealtimeClientV2 {
}
}

let config: Configuration
let url: URL
let options: RealtimeClientOptions
let ws: any WebSocketClient

var accessToken: String?
let apikey: String?
var ref = 0
var pendingHeartbeatRef: Int?

Expand All @@ -79,34 +82,66 @@ public actor RealtimeClientV2 {

private let statusEventEmitter = EventEmitter<Status>(initialEvent: .disconnected)

/// AsyncStream that emits when connection status change.
///
/// You can also use ``onStatusChange(_:)`` for a closure based method.
public var statusChange: AsyncStream<Status> {
statusEventEmitter.stream()
}

/// The current connection status.
public private(set) var status: Status {
get { statusEventEmitter.lastEvent.value }
set { statusEventEmitter.emit(newValue) }
}

/// Listen for connection status changes.
/// - Parameter listener: Closure that will be called when connection status changes.
/// - Returns: An observation handle that can be used to stop listening.
///
/// - Note: Use ``statusChange`` if you prefer to use Async/Await.
public func onStatusChange(
_ listener: @escaping @Sendable (Status) -> Void
) -> ObservationToken {
statusEventEmitter.attach(listener)
}

@available(*, deprecated, renamed: "RealtimeClientV2.init(url:options:)")
public init(config: Configuration) {
self.init(config: config, ws: WebSocket(config: config))
self.init(
url: config.url,
options: RealtimeClientOptions(
headers: config.headers,
heartbeatInterval: config.heartbeatInterval,
reconnectDelay: config.reconnectDelay,
timeoutInterval: config.timeoutInterval,
disconnectOnSessionLoss: config.disconnectOnSessionLoss,
connectOnSubscribe: config.connectOnSubscribe,
logger: config.logger
)
)
}

init(config: Configuration, ws: any WebSocketClient) {
self.config = config
self.ws = ws
public init(url: URL, options: RealtimeClientOptions) {
self.init(
url: url,
options: options,
ws: WebSocket(
realtimeURL: Self.realtimeWebSocketURL(
baseURL: Self.realtimeBaseURL(url: url),
apikey: options.apikey
),
options: options
)
)
}

if let customJWT = config.headers["Authorization"]?.split(separator: " ").last {
accessToken = String(customJWT)
} else {
accessToken = config.apiKey
}
init(url: URL, options: RealtimeClientOptions, ws: any WebSocketClient) {
self.url = url
self.options = options
self.ws = ws
accessToken = options.accessToken ?? options.apikey
apikey = options.apikey
}

deinit {
Expand All @@ -126,16 +161,16 @@ public actor RealtimeClientV2 {
if status == .disconnected {
connectionTask = Task {
if reconnect {
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(config.reconnectDelay))
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(options.reconnectDelay))

if Task.isCancelled {
config.logger?.debug("Reconnect cancelled, returning")
options.logger?.debug("Reconnect cancelled, returning")
return
}
}

if status == .connected {
config.logger?.debug("WebsSocket already connected")
options.logger?.debug("WebsSocket already connected")
return
}

Expand Down Expand Up @@ -165,7 +200,7 @@ public actor RealtimeClientV2 {

private func onConnected(reconnect: Bool) async {
status = .connected
config.logger?.debug("Connected to realtime WebSocket")
options.logger?.debug("Connected to realtime WebSocket")
listenForMessages()
startHeartbeating()
if reconnect {
Expand All @@ -174,17 +209,17 @@ public actor RealtimeClientV2 {
}

private func onDisconnected() async {
config.logger?
options.logger?
.debug(
"WebSocket disconnected. Trying again in \(config.reconnectDelay)"
"WebSocket disconnected. Trying again in \(options.reconnectDelay)"
)
await reconnect()
}

private func onError(_ error: (any Error)?) async {
config.logger?
options.logger?
.debug(
"WebSocket error \(error?.localizedDescription ?? "<none>"). Trying again in \(config.reconnectDelay)"
"WebSocket error \(error?.localizedDescription ?? "<none>"). Trying again in \(options.reconnectDelay)"
)
await reconnect()
}
Expand All @@ -208,7 +243,7 @@ public actor RealtimeClientV2 {
topic: "realtime:\(topic)",
config: config,
socket: self,
logger: self.config.logger
logger: self.options.logger
)
}

Expand All @@ -224,7 +259,7 @@ public actor RealtimeClientV2 {
subscriptions[channel.topic] = nil

if subscriptions.isEmpty {
config.logger?.debug("No more subscribed channel in socket")
options.logger?.debug("No more subscribed channel in socket")
disconnect()
}
}
Expand Down Expand Up @@ -254,18 +289,18 @@ public actor RealtimeClientV2 {
await onMessage(message)
}
} catch {
config.logger?.debug(
"Error while listening for messages. Trying again in \(config.reconnectDelay) \(error)"
options.logger?.debug(
"Error while listening for messages. Trying again in \(options.reconnectDelay) \(error)"
)
await reconnect()
}
}
}

private func startHeartbeating() {
heartbeatTask = Task { [weak self, config] in
heartbeatTask = Task { [weak self, options] in
while !Task.isCancelled {
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(config.heartbeatInterval))
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(options.heartbeatInterval))
if Task.isCancelled {
break
}
Expand All @@ -277,7 +312,7 @@ public actor RealtimeClientV2 {
private func sendHeartbeat() async {
if pendingHeartbeatRef != nil {
pendingHeartbeatRef = nil
config.logger?.debug("Heartbeat timeout")
options.logger?.debug("Heartbeat timeout")

await reconnect()
return
Expand All @@ -297,7 +332,7 @@ public actor RealtimeClientV2 {
}

public func disconnect() {
config.logger?.debug("Closing WebSocket connection")
options.logger?.debug("Closing WebSocket connection")
ref = 0
messageTask?.cancel()
heartbeatTask?.cancel()
Expand All @@ -323,9 +358,9 @@ public actor RealtimeClientV2 {

if let ref = message.ref, Int(ref) == pendingHeartbeatRef {
pendingHeartbeatRef = nil
config.logger?.debug("heartbeat received")
options.logger?.debug("heartbeat received")
} else {
config.logger?
options.logger?
.debug("Received event \(message.event) for channel \(channel?.topic ?? "null")")
await channel?.onMessage(message)
}
Expand All @@ -335,14 +370,14 @@ public actor RealtimeClientV2 {
/// - Parameter message: The message to push through the socket.
public func push(_ message: RealtimeMessageV2) async {
guard status == .connected else {
config.logger?.warning("Trying to push a message while socket is not connected. This is not supported yet.")
options.logger?.warning("Trying to push a message while socket is not connected. This is not supported yet.")
return
}

do {
try await ws.send(message)
} catch {
config.logger?.debug("""
options.logger?.debug("""
Failed to send message:
\(message)
Expand All @@ -356,10 +391,8 @@ public actor RealtimeClientV2 {
ref += 1
return ref
}
}

extension RealtimeClientV2.Configuration {
var realtimeBaseURL: URL {
static func realtimeBaseURL(url: URL) -> URL {
guard var components = URLComponents(url: url, resolvingAgainstBaseURL: false) else {
return url
}
Expand All @@ -377,21 +410,23 @@ extension RealtimeClientV2.Configuration {
return url
}

var realtimeWebSocketURL: URL {
guard var components = URLComponents(url: realtimeBaseURL, resolvingAgainstBaseURL: false)
static func realtimeWebSocketURL(baseURL: URL, apikey: String?) -> URL {
guard var components = URLComponents(url: baseURL, resolvingAgainstBaseURL: false)
else {
return realtimeBaseURL
return baseURL
}

components.queryItems = components.queryItems ?? []
components.queryItems!.append(URLQueryItem(name: "apikey", value: apiKey))
if let apikey {
components.queryItems!.append(URLQueryItem(name: "apikey", value: apikey))
}
components.queryItems!.append(URLQueryItem(name: "vsn", value: "1.0.0"))

components.path.append("/websocket")
components.path = components.path.replacingOccurrences(of: "//", with: "/")

guard let url = components.url else {
return realtimeBaseURL
return baseURL
}

return url
Expand Down
55 changes: 55 additions & 0 deletions Sources/Realtime/V2/Types.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//
// Types.swift
//
//
// Created by Guilherme Souza on 13/05/24.
//

import _Helpers
import Foundation

/// Options for initializing ``RealtimeClientV2``.
public struct RealtimeClientOptions: Sendable {
package var headers: HTTPHeaders
var heartbeatInterval: TimeInterval
var reconnectDelay: TimeInterval
var timeoutInterval: TimeInterval
var disconnectOnSessionLoss: Bool
var connectOnSubscribe: Bool
package var logger: (any SupabaseLogger)?

public static let defaultHeartbeatInterval: TimeInterval = 15
public static let defaultReconnectDelay: TimeInterval = 7
public static let defaultTimeoutInterval: TimeInterval = 10
public static let defaultDisconnectOnSessionLoss = true
public static let defaultConnectOnSubscribe: Bool = true

public init(
headers: [String: String] = [:],
heartbeatInterval: TimeInterval = Self.defaultHeartbeatInterval,
reconnectDelay: TimeInterval = Self.defaultReconnectDelay,
timeoutInterval: TimeInterval = Self.defaultTimeoutInterval,
disconnectOnSessionLoss: Bool = Self.defaultDisconnectOnSessionLoss,
connectOnSubscribe: Bool = Self.defaultConnectOnSubscribe,
logger: (any SupabaseLogger)? = nil
) {
self.headers = HTTPHeaders(headers)
self.heartbeatInterval = heartbeatInterval
self.reconnectDelay = reconnectDelay
self.timeoutInterval = timeoutInterval
self.disconnectOnSessionLoss = disconnectOnSessionLoss
self.connectOnSubscribe = connectOnSubscribe
self.logger = logger
}

var apikey: String? {
headers["apikey"]
}

var accessToken: String? {
guard let accessToken = headers["Authorization"]?.split(separator: " ").last else {
return nil
}
return String(accessToken)
}
}
8 changes: 4 additions & 4 deletions Sources/Realtime/V2/WebSocketClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @

private let mutableState = LockIsolated(MutableState())

init(config: RealtimeClientV2.Configuration) {
realtimeURL = config.realtimeWebSocketURL
init(realtimeURL: URL, options: RealtimeClientOptions) {
self.realtimeURL = realtimeURL

let sessionConfiguration = URLSessionConfiguration.default
sessionConfiguration.httpAdditionalHeaders = config.headers
sessionConfiguration.httpAdditionalHeaders = options.headers.dictionary
configuration = sessionConfiguration
logger = config.logger
logger = options.logger
}

func connect() -> AsyncStream<ConnectionStatus> {
Expand Down

0 comments on commit 9cfafdb

Please sign in to comment.