Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ let package = Package(
name: "RealtimeTests",
dependencies: [
.product(name: "CustomDump", package: "swift-custom-dump"),
.product(name: "InlineSnapshotTesting", package: "swift-snapshot-testing"),
"PostgREST",
"Realtime",
"TestHelpers",
Expand Down
2 changes: 1 addition & 1 deletion Sources/Realtime/Push.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ actor Push {

if channel?.config.broadcast.acknowledgeBroadcasts == true {
do {
return try await withTimeout(interval: channel?.socket.options().timeoutInterval ?? 10) {
return try await withTimeout(interval: channel?.socket.options.timeoutInterval ?? 10) {
await withCheckedContinuation {
self.receivedContinuation = $0
}
Expand Down
128 changes: 40 additions & 88 deletions Sources/Realtime/RealtimeChannel.swift
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
//
// RealtimeChannel.swift
//
//
// Created by Guilherme Souza on 26/12/23.
//

import ConcurrencyExtras
import Foundation
import Helpers
Expand Down Expand Up @@ -33,40 +26,7 @@ public struct RealtimeChannelConfig: Sendable {
public var isPrivate: Bool
}

struct Socket: Sendable {
var broadcastURL: @Sendable () -> URL
var status: @Sendable () -> RealtimeClient.Status
var options: @Sendable () -> RealtimeClientOptions
var accessToken: @Sendable () -> String?
var apiKey: @Sendable () -> String?
var makeRef: @Sendable () -> Int

var connect: @Sendable () async -> Void
var addChannel: @Sendable (_ channel: RealtimeChannel) -> Void
var removeChannel: @Sendable (_ channel: RealtimeChannel) async -> Void
var push: @Sendable (_ message: RealtimeMessage) async -> Void
var httpSend: @Sendable (_ request: HTTPRequest) async throws -> HTTPResponse
}

extension Socket {
init(client: RealtimeClient) {
self.init(
broadcastURL: { [weak client] in client?.broadcastURL ?? URL(string: "http://localhost")! },
status: { [weak client] in client?.status ?? .disconnected },
options: { [weak client] in client?.options ?? .init() },
accessToken: { [weak client] in client?.mutableState.accessToken },
apiKey: { [weak client] in client?.apikey },
makeRef: { [weak client] in client?.makeRef() ?? 0 },
connect: { [weak client] in await client?.connect() },
addChannel: { [weak client] in client?.addChannel($0) },
removeChannel: { [weak client] in await client?.removeChannel($0) },
push: { [weak client] in await client?.push($0) },
httpSend: { [weak client] in try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse()) }
)
}
}

public final class RealtimeChannel: Sendable {
public actor RealtimeChannel {
public typealias Subscription = ObservationToken

public enum Status: Sendable {
Expand All @@ -76,21 +36,23 @@ public final class RealtimeChannel: Sendable {
case unsubscribing
}

struct MutableState {
var clientChanges: [PostgresJoinConfig] = []
var joinRef: String?
var pushes: [String: Push] = [:]
}

private let mutableState = LockIsolated(MutableState())

let topic: String
let config: RealtimeChannelConfig
let logger: (any SupabaseLogger)?
let socket: Socket
private weak var _socket: RealtimeClient?

var socket: RealtimeClient {
guard let _socket else {
fatalError("Expected a RealtimeClient instance to be associated with this channel.")
}
return _socket
}

private let callbackManager = CallbackManager()
private let statusEventEmitter = EventEmitter<Status>(initialEvent: .unsubscribed)
private(set) var clientChanges: [PostgresJoinConfig] = []
private(set) var joinRef: String?
private(set) var pushes: [String: Push] = [:]

public private(set) var status: Status {
get { statusEventEmitter.lastEvent }
Expand All @@ -115,13 +77,13 @@ public final class RealtimeChannel: Sendable {
init(
topic: String,
config: RealtimeChannelConfig,
socket: Socket,
socket: RealtimeClient,
logger: (any SupabaseLogger)?
) {
self.topic = topic
self.config = config
self.logger = logger
self.socket = socket
_socket = socket
}

deinit {
Expand All @@ -130,34 +92,33 @@ public final class RealtimeChannel: Sendable {

/// Subscribes to the channel
public func subscribe() async {
if socket.status() != .connected {
if socket.options().connectOnSubscribe != true {
if await socket.status != .connected {
if socket.options.connectOnSubscribe != true {
fatalError(
"You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?"
)
}
await socket.connect()
}

socket.addChannel(self)
await socket.addChannel(self)

status = .subscribing
logger?.debug("subscribing to channel \(topic)")

let joinConfig = RealtimeJoinConfig(
broadcast: config.broadcast,
presence: config.presence,
postgresChanges: mutableState.clientChanges,
postgresChanges: clientChanges,
isPrivate: config.isPrivate
)

let payload = RealtimeJoinPayload(
let payload = await RealtimeJoinPayload(
config: joinConfig,
accessToken: socket.accessToken()
accessToken: socket.accessToken
)

let joinRef = socket.makeRef().description
mutableState.withValue { $0.joinRef = joinRef }
joinRef = await socket.makeRef().description

logger?.debug("subscribing to channel with body: \(joinConfig)")

Expand All @@ -172,7 +133,7 @@ public final class RealtimeChannel: Sendable {
)

do {
try await withTimeout(interval: socket.options().timeoutInterval) { [self] in
try await withTimeout(interval: socket.options.timeoutInterval) { [self] in
_ = await statusChange.first { @Sendable in $0 == .subscribed }
}
} catch {
Expand All @@ -191,7 +152,7 @@ public final class RealtimeChannel: Sendable {

await push(
RealtimeMessage(
joinRef: mutableState.joinRef,
joinRef: joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.leave,
Expand All @@ -204,7 +165,7 @@ public final class RealtimeChannel: Sendable {
logger?.debug("Updating auth token for channel \(topic)")
await push(
RealtimeMessage(
joinRef: mutableState.joinRef,
joinRef: joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.accessToken,
Expand Down Expand Up @@ -235,17 +196,18 @@ public final class RealtimeChannel: Sendable {
}

var headers = HTTPHeaders(["content-type": "application/json"])
if let apiKey = socket.apiKey() {
if let apiKey = socket.apikey {
headers["apikey"] = apiKey
}
if let accessToken = socket.accessToken() {

if let accessToken = await socket.accessToken {
headers["authorization"] = "Bearer \(accessToken)"
}

let task = Task { [headers] in
_ = try? await socket.httpSend(
_ = try? await socket.http.send(
HTTPRequest(
url: socket.broadcastURL(),
url: socket.broadcastURL,
method: .post,
headers: headers,
body: JSONEncoder().encode(
Expand All @@ -265,14 +227,14 @@ public final class RealtimeChannel: Sendable {
}

if config.broadcast.acknowledgeBroadcasts {
try? await withTimeout(interval: socket.options().timeoutInterval) {
try? await withTimeout(interval: socket.options.timeoutInterval) {
await task.value
}
}
} else {
await push(
RealtimeMessage(
joinRef: mutableState.joinRef,
joinRef: joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.broadcast,
Expand All @@ -298,7 +260,7 @@ public final class RealtimeChannel: Sendable {

await push(
RealtimeMessage(
joinRef: mutableState.joinRef,
joinRef: joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.presence,
Expand All @@ -314,7 +276,7 @@ public final class RealtimeChannel: Sendable {
public func untrack() async {
await push(
RealtimeMessage(
joinRef: mutableState.joinRef,
joinRef: joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.presence,
Expand All @@ -326,7 +288,7 @@ public final class RealtimeChannel: Sendable {
)
}

func onMessage(_ message: RealtimeMessage) {
func onMessage(_ message: RealtimeMessage) async {
do {
guard let eventType = message.eventType else {
logger?.debug("Received message without event type: \(message)")
Expand Down Expand Up @@ -437,13 +399,9 @@ public final class RealtimeChannel: Sendable {
callbackManager.triggerBroadcast(event: event, json: payload)

case .close:
Task { [weak self] in
guard let self else { return }

await socket.removeChannel(self)
logger?.debug("Unsubscribed from channel \(message.topic)")
status = .unsubscribed
}
await socket.removeChannel(self)
logger?.debug("Unsubscribed from channel \(message.topic)")
status = .unsubscribed

case .error:
logger?.debug(
Expand Down Expand Up @@ -551,9 +509,7 @@ public final class RealtimeChannel: Sendable {
filter: filter
)

mutableState.withValue {
$0.clientChanges.append(config)
}
clientChanges.append(config)

let id = callbackManager.addPostgresCallback(filter: config, callback: callback)
return Subscription { [weak callbackManager, logger] in
Expand All @@ -578,18 +534,14 @@ public final class RealtimeChannel: Sendable {
private func push(_ message: RealtimeMessage) async -> PushStatus {
let push = Push(channel: self, message: message)
if let ref = message.ref {
mutableState.withValue {
$0.pushes[ref] = push
}
pushes[ref] = push
}
return await push.send()
}

private func didReceiveReply(ref: String, status: String) {
Task {
let push = mutableState.withValue {
$0.pushes.removeValue(forKey: ref)
}
let push = pushes.removeValue(forKey: ref)
await push?.didReceive(status: PushStatus(rawValue: status) ?? .ok)
}
}
Expand Down
Loading