Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose Realtime options on SupabaseClient #377

Merged
merged 9 commits into from
May 13, 2024
9 changes: 8 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,18 @@ let package = Package(
"Functions",
]
),
.testTarget(name: "SupabaseTests", dependencies: ["Supabase"]),
.testTarget(
name: "SupabaseTests",
dependencies: [
"Supabase",
.product(name: "CustomDump", package: "swift-custom-dump"),
]
),
.target(
name: "TestHelpers",
dependencies: [
.product(name: "ConcurrencyExtras", package: "swift-concurrency-extras"),
.product(name: "XCTestDynamicOverlay", package: "xctest-dynamic-overlay"),
"Auth",
]
),
Expand Down
10 changes: 5 additions & 5 deletions Sources/Auth/AuthClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -699,21 +699,21 @@ public final class AuthClient: Sendable {
/// - Parameter scope: Specifies which sessions should be logged out.
public func signOut(scope: SignOutScope = .global) async throws {
guard let accessToken = currentSession?.accessToken else {
configuration.logger?.warning("signOut called without a session")
return
configuration.logger?.warning("signOut called without a session")
return
}

if scope != .others {
await sessionManager.remove()
eventEmitter.emit(.signedOut, session: nil)
await sessionManager.remove()
eventEmitter.emit(.signedOut, session: nil)
}

do {
_ = try await api.execute(
.init(
url: configuration.url.appendingPathComponent("logout"),
method: .post,
query: [URLQueryItem(name: "scope", value: scope.rawValue)],
query: [URLQueryItem(name: "scope", value: scope.rawValue)],
headers: [.init(name: "Authorization", value: "Bearer \(accessToken)")]
)
)
Expand Down
2 changes: 1 addition & 1 deletion Sources/Realtime/V2/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public actor RealtimeChannelV2 {
/// Subscribes to the channel
public func subscribe() async {
if await socket?.status != .connected {
if socket?.config.connectOnSubscribe != true {
if socket?.options.connectOnSubscribe != true {
fatalError(
"You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?"
)
Expand Down
111 changes: 73 additions & 38 deletions Sources/Realtime/V2/RealtimeClientV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import Foundation
public typealias JSONObject = _Helpers.JSONObject

public actor RealtimeClientV2 {
@available(*, deprecated, renamed: "RealtimeClientOptions")
public struct Configuration: Sendable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be better marked as internal given it's not intended to be accessed by consumers

var url: URL
var apiKey: String
Expand Down Expand Up @@ -64,10 +65,12 @@ public actor RealtimeClientV2 {
}
}

let config: Configuration
let url: URL
let options: RealtimeClientOptions
let ws: any WebSocketClient

var accessToken: String?
let apikey: String?
var ref = 0
var pendingHeartbeatRef: Int?

Expand All @@ -79,34 +82,66 @@ public actor RealtimeClientV2 {

private let statusEventEmitter = EventEmitter<Status>(initialEvent: .disconnected)

/// AsyncStream that emits when connection status change.
///
/// You can also use ``onStatusChange(_:)`` for a closure based method.
public var statusChange: AsyncStream<Status> {
statusEventEmitter.stream()
}

/// The current connection status.
public private(set) var status: Status {
get { statusEventEmitter.lastEvent.value }
set { statusEventEmitter.emit(newValue) }
}

/// Listen for connection status changes.
/// - Parameter listener: Closure that will be called when connection status changes.
/// - Returns: An observation handle that can be used to stop listening.
///
/// - Note: Use ``statusChange`` if you prefer to use Async/Await.
public func onStatusChange(
_ listener: @escaping @Sendable (Status) -> Void
) -> ObservationToken {
statusEventEmitter.attach(listener)
}

@available(*, deprecated, renamed: "RealtimeClientV2.init(url:options:)")
public init(config: Configuration) {
self.init(config: config, ws: WebSocket(config: config))
self.init(
url: config.url,
options: RealtimeClientOptions(
headers: config.headers,
heartbeatInterval: config.heartbeatInterval,
reconnectDelay: config.reconnectDelay,
timeoutInterval: config.timeoutInterval,
disconnectOnSessionLoss: config.disconnectOnSessionLoss,
connectOnSubscribe: config.connectOnSubscribe,
logger: config.logger
)
)
}

init(config: Configuration, ws: any WebSocketClient) {
self.config = config
self.ws = ws
public init(url: URL, options: RealtimeClientOptions) {
self.init(
url: url,
options: options,
ws: WebSocket(
realtimeURL: Self.realtimeWebSocketURL(
baseURL: Self.realtimeBaseURL(url: url),
apikey: options.apikey
),
options: options
)
)
}

if let customJWT = config.headers["Authorization"]?.split(separator: " ").last {
accessToken = String(customJWT)
} else {
accessToken = config.apiKey
}
init(url: URL, options: RealtimeClientOptions, ws: any WebSocketClient) {
self.url = url
self.options = options
self.ws = ws
accessToken = options.accessToken ?? options.apikey
apikey = options.apikey
}

deinit {
Expand All @@ -126,16 +161,16 @@ public actor RealtimeClientV2 {
if status == .disconnected {
connectionTask = Task {
if reconnect {
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(config.reconnectDelay))
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(options.reconnectDelay))

if Task.isCancelled {
config.logger?.debug("Reconnect cancelled, returning")
options.logger?.debug("Reconnect cancelled, returning")
return
}
}

if status == .connected {
config.logger?.debug("WebsSocket already connected")
options.logger?.debug("WebsSocket already connected")
return
}

Expand Down Expand Up @@ -165,7 +200,7 @@ public actor RealtimeClientV2 {

private func onConnected(reconnect: Bool) async {
status = .connected
config.logger?.debug("Connected to realtime WebSocket")
options.logger?.debug("Connected to realtime WebSocket")
listenForMessages()
startHeartbeating()
if reconnect {
Expand All @@ -174,17 +209,17 @@ public actor RealtimeClientV2 {
}

private func onDisconnected() async {
config.logger?
options.logger?
.debug(
"WebSocket disconnected. Trying again in \(config.reconnectDelay)"
"WebSocket disconnected. Trying again in \(options.reconnectDelay)"
)
await reconnect()
}

private func onError(_ error: (any Error)?) async {
config.logger?
options.logger?
.debug(
"WebSocket error \(error?.localizedDescription ?? "<none>"). Trying again in \(config.reconnectDelay)"
"WebSocket error \(error?.localizedDescription ?? "<none>"). Trying again in \(options.reconnectDelay)"
)
await reconnect()
}
Expand All @@ -208,7 +243,7 @@ public actor RealtimeClientV2 {
topic: "realtime:\(topic)",
config: config,
socket: self,
logger: self.config.logger
logger: self.options.logger
)
}

Expand All @@ -224,7 +259,7 @@ public actor RealtimeClientV2 {
subscriptions[channel.topic] = nil

if subscriptions.isEmpty {
config.logger?.debug("No more subscribed channel in socket")
options.logger?.debug("No more subscribed channel in socket")
disconnect()
}
}
Expand Down Expand Up @@ -254,18 +289,18 @@ public actor RealtimeClientV2 {
await onMessage(message)
}
} catch {
config.logger?.debug(
"Error while listening for messages. Trying again in \(config.reconnectDelay) \(error)"
options.logger?.debug(
"Error while listening for messages. Trying again in \(options.reconnectDelay) \(error)"
)
await reconnect()
}
}
}

private func startHeartbeating() {
heartbeatTask = Task { [weak self, config] in
heartbeatTask = Task { [weak self, options] in
while !Task.isCancelled {
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(config.heartbeatInterval))
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(options.heartbeatInterval))
if Task.isCancelled {
break
}
Expand All @@ -277,7 +312,7 @@ public actor RealtimeClientV2 {
private func sendHeartbeat() async {
if pendingHeartbeatRef != nil {
pendingHeartbeatRef = nil
config.logger?.debug("Heartbeat timeout")
options.logger?.debug("Heartbeat timeout")

await reconnect()
return
Expand All @@ -297,7 +332,7 @@ public actor RealtimeClientV2 {
}

public func disconnect() {
config.logger?.debug("Closing WebSocket connection")
options.logger?.debug("Closing WebSocket connection")
ref = 0
messageTask?.cancel()
heartbeatTask?.cancel()
Expand All @@ -323,9 +358,9 @@ public actor RealtimeClientV2 {

if let ref = message.ref, Int(ref) == pendingHeartbeatRef {
pendingHeartbeatRef = nil
config.logger?.debug("heartbeat received")
options.logger?.debug("heartbeat received")
} else {
config.logger?
options.logger?
.debug("Received event \(message.event) for channel \(channel?.topic ?? "null")")
await channel?.onMessage(message)
}
Expand All @@ -335,14 +370,14 @@ public actor RealtimeClientV2 {
/// - Parameter message: The message to push through the socket.
public func push(_ message: RealtimeMessageV2) async {
guard status == .connected else {
config.logger?.warning("Trying to push a message while socket is not connected. This is not supported yet.")
options.logger?.warning("Trying to push a message while socket is not connected. This is not supported yet.")
return
}

do {
try await ws.send(message)
} catch {
config.logger?.debug("""
options.logger?.debug("""
Failed to send message:
\(message)

Expand All @@ -356,10 +391,8 @@ public actor RealtimeClientV2 {
ref += 1
return ref
}
}

extension RealtimeClientV2.Configuration {
var realtimeBaseURL: URL {
static func realtimeBaseURL(url: URL) -> URL {
guard var components = URLComponents(url: url, resolvingAgainstBaseURL: false) else {
return url
}
Expand All @@ -377,21 +410,23 @@ extension RealtimeClientV2.Configuration {
return url
}

var realtimeWebSocketURL: URL {
guard var components = URLComponents(url: realtimeBaseURL, resolvingAgainstBaseURL: false)
static func realtimeWebSocketURL(baseURL: URL, apikey: String?) -> URL {
guard var components = URLComponents(url: baseURL, resolvingAgainstBaseURL: false)
else {
return realtimeBaseURL
return baseURL
}

components.queryItems = components.queryItems ?? []
components.queryItems!.append(URLQueryItem(name: "apikey", value: apiKey))
if let apikey {
components.queryItems!.append(URLQueryItem(name: "apikey", value: apikey))
}
components.queryItems!.append(URLQueryItem(name: "vsn", value: "1.0.0"))

components.path.append("/websocket")
components.path = components.path.replacingOccurrences(of: "//", with: "/")

guard let url = components.url else {
return realtimeBaseURL
return baseURL
}

return url
Expand Down
55 changes: 55 additions & 0 deletions Sources/Realtime/V2/Types.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//
// Types.swift
//
//
// Created by Guilherme Souza on 13/05/24.
//

import _Helpers
import Foundation

/// Options for initializing ``RealtimeClientV2``.
public struct RealtimeClientOptions: Sendable {
package var headers: HTTPHeaders
var heartbeatInterval: TimeInterval
var reconnectDelay: TimeInterval
var timeoutInterval: TimeInterval
var disconnectOnSessionLoss: Bool
var connectOnSubscribe: Bool
package var logger: (any SupabaseLogger)?

public static let defaultHeartbeatInterval: TimeInterval = 15
public static let defaultReconnectDelay: TimeInterval = 7
public static let defaultTimeoutInterval: TimeInterval = 10
public static let defaultDisconnectOnSessionLoss = true
public static let defaultConnectOnSubscribe: Bool = true

public init(
headers: [String: String] = [:],
heartbeatInterval: TimeInterval = Self.defaultHeartbeatInterval,
reconnectDelay: TimeInterval = Self.defaultReconnectDelay,
timeoutInterval: TimeInterval = Self.defaultTimeoutInterval,
disconnectOnSessionLoss: Bool = Self.defaultDisconnectOnSessionLoss,
connectOnSubscribe: Bool = Self.defaultConnectOnSubscribe,
logger: (any SupabaseLogger)? = nil
) {
self.headers = HTTPHeaders(headers)
self.heartbeatInterval = heartbeatInterval
self.reconnectDelay = reconnectDelay
self.timeoutInterval = timeoutInterval
self.disconnectOnSessionLoss = disconnectOnSessionLoss
self.connectOnSubscribe = connectOnSubscribe
self.logger = logger
}

var apikey: String? {
headers["apikey"]
}

var accessToken: String? {
guard let accessToken = headers["Authorization"]?.split(separator: " ").last else {
return nil
}
return String(accessToken)
}
}
Loading
Loading