From dfe09bc804a06a06743884cbf56c5890409e9a87 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Fri, 19 Apr 2024 22:44:15 -0300 Subject: [PATCH] feat(realtime): add closure based methods (#345) * feat(realtime): add closure based methods * feat(realtime): add closure based method for broadcast * test: add test for closure based methods * fix ObservationTokenTests --- .../Realtime/RealtimeChannel+AsyncAwait.swift | 125 +++++++++++++++ Sources/Realtime/V2/PushV2.swift | 25 +-- Sources/Realtime/V2/RealtimeChannelV2.swift | 146 +++++++----------- Sources/Realtime/V2/RealtimeClientV2.swift | 16 +- Sources/_Helpers/EventEmitter.swift | 19 ++- Tests/RealtimeTests/RealtimeTests.swift | 33 +++- .../_HelpersTests/ObservationTokenTests.swift | 8 +- 7 files changed, 254 insertions(+), 118 deletions(-) create mode 100644 Sources/Realtime/RealtimeChannel+AsyncAwait.swift diff --git a/Sources/Realtime/RealtimeChannel+AsyncAwait.swift b/Sources/Realtime/RealtimeChannel+AsyncAwait.swift new file mode 100644 index 00000000..e5e85244 --- /dev/null +++ b/Sources/Realtime/RealtimeChannel+AsyncAwait.swift @@ -0,0 +1,125 @@ +// +// RealtimeChannel+AsyncAwait.swift +// +// +// Created by Guilherme Souza on 17/04/24. +// + +import Foundation + +extension RealtimeChannelV2 { + /// Listen for clients joining / leaving the channel using presences. + public func presenceChange() -> AsyncStream { + let (stream, continuation) = AsyncStream.makeStream() + + let subscription = onPresenceChange { + continuation.yield($0) + } + + continuation.onTermination = { _ in + subscription.cancel() + } + + return stream + } + + /// Listen for postgres changes in a channel. + public func postgresChange( + _: InsertAction.Type, + schema: String = "public", + table: String? = nil, + filter: String? = nil + ) -> AsyncStream { + postgresChange(event: .insert, schema: schema, table: table, filter: filter) + .compactMap { $0.wrappedAction as? InsertAction } + .eraseToStream() + } + + /// Listen for postgres changes in a channel. + public func postgresChange( + _: UpdateAction.Type, + schema: String = "public", + table: String? = nil, + filter: String? = nil + ) -> AsyncStream { + postgresChange(event: .update, schema: schema, table: table, filter: filter) + .compactMap { $0.wrappedAction as? UpdateAction } + .eraseToStream() + } + + /// Listen for postgres changes in a channel. + public func postgresChange( + _: DeleteAction.Type, + schema: String = "public", + table: String? = nil, + filter: String? = nil + ) -> AsyncStream { + postgresChange(event: .delete, schema: schema, table: table, filter: filter) + .compactMap { $0.wrappedAction as? DeleteAction } + .eraseToStream() + } + + /// Listen for postgres changes in a channel. + public func postgresChange( + _: SelectAction.Type, + schema: String = "public", + table: String? = nil, + filter: String? = nil + ) -> AsyncStream { + postgresChange(event: .select, schema: schema, table: table, filter: filter) + .compactMap { $0.wrappedAction as? SelectAction } + .eraseToStream() + } + + /// Listen for postgres changes in a channel. + public func postgresChange( + _: AnyAction.Type, + schema: String = "public", + table: String? = nil, + filter: String? = nil + ) -> AsyncStream { + postgresChange(event: .all, schema: schema, table: table, filter: filter) + } + + private func postgresChange( + event: PostgresChangeEvent, + schema: String, + table: String?, + filter: String? + ) -> AsyncStream { + let (stream, continuation) = AsyncStream.makeStream() + let subscription = _onPostgresChange( + event: event, + schema: schema, + table: table, + filter: filter + ) { + continuation.yield($0) + } + continuation.onTermination = { _ in + subscription.cancel() + } + return stream + } + + /// Listen for broadcast messages sent by other clients within the same channel under a specific `event`. + public func broadcastStream(event: String) -> AsyncStream { + let (stream, continuation) = AsyncStream.makeStream() + + let subscription = onBroadcast(event: event) { + continuation.yield($0) + } + + continuation.onTermination = { _ in + subscription.cancel() + } + + return stream + } + + /// Listen for broadcast messages sent by other clients within the same channel under a specific `event`. + @available(*, deprecated, renamed: "broadcastStream(event:)") + public func broadcast(event: String) -> AsyncStream { + broadcastStream(event: event) + } +} diff --git a/Sources/Realtime/V2/PushV2.swift b/Sources/Realtime/V2/PushV2.swift index 8d64ca97..54ec2b87 100644 --- a/Sources/Realtime/V2/PushV2.swift +++ b/Sources/Realtime/V2/PushV2.swift @@ -20,28 +20,15 @@ actor PushV2 { } func send() async -> PushStatus { - do { - try await channel?.socket?.ws.send(message) + await channel?.socket?.push(message) - if channel?.config.broadcast.acknowledgeBroadcasts == true { - return await withCheckedContinuation { - receivedContinuation = $0 - } + if channel?.config.broadcast.acknowledgeBroadcasts == true { + return await withCheckedContinuation { + receivedContinuation = $0 } - - return .ok - } catch { - await channel?.socket?.config.logger?.debug( - """ - Failed to send message: - \(message) - - Error: - \(error) - """ - ) - return .error } + + return .ok } func didReceive(status: PushStatus) { diff --git a/Sources/Realtime/V2/RealtimeChannelV2.swift b/Sources/Realtime/V2/RealtimeChannelV2.swift index 0fcb9466..4ab3d340 100644 --- a/Sources/Realtime/V2/RealtimeChannelV2.swift +++ b/Sources/Realtime/V2/RealtimeChannelV2.swift @@ -15,6 +15,8 @@ public struct RealtimeChannelConfig: Sendable { } public actor RealtimeChannelV2 { + public typealias Subscription = ObservationToken + public enum Status: Sendable { case unsubscribed case subscribing @@ -340,94 +342,85 @@ public actor RealtimeChannelV2 { } /// Listen for clients joining / leaving the channel using presences. - public func presenceChange() -> AsyncStream { - let (stream, continuation) = AsyncStream.makeStream() - - let id = callbackManager.addPresenceCallback { - continuation.yield($0) - } - - let logger = logger - - continuation.onTermination = { [weak callbackManager] _ in + public func onPresenceChange( + _ callback: @escaping @Sendable (any PresenceAction) -> Void + ) -> Subscription { + let id = callbackManager.addPresenceCallback(callback: callback) + return Subscription { [weak callbackManager, logger] in logger?.debug("Removing presence callback with id: \(id)") callbackManager?.removeCallback(id: id) } - - return stream } /// Listen for postgres changes in a channel. - public func postgresChange( + public func onPostgresChange( _: InsertAction.Type, schema: String = "public", table: String? = nil, - filter: String? = nil - ) -> AsyncStream { - postgresChange(event: .insert, schema: schema, table: table, filter: filter) - .compactMap { $0.wrappedAction as? InsertAction } - .eraseToStream() + filter: String? = nil, + callback: @escaping @Sendable (InsertAction) -> Void + ) -> Subscription { + _onPostgresChange( + event: .insert, + schema: schema, + table: table, + filter: filter + ) { + guard case let .insert(action) = $0 else { return } + callback(action) + } } /// Listen for postgres changes in a channel. - public func postgresChange( + public func onPostgresChange( _: UpdateAction.Type, schema: String = "public", table: String? = nil, - filter: String? = nil - ) -> AsyncStream { - postgresChange(event: .update, schema: schema, table: table, filter: filter) - .compactMap { $0.wrappedAction as? UpdateAction } - .eraseToStream() + filter: String? = nil, + callback: @escaping @Sendable (UpdateAction) -> Void + ) -> Subscription { + _onPostgresChange( + event: .update, + schema: schema, + table: table, + filter: filter + ) { + guard case let .update(action) = $0 else { return } + callback(action) + } } /// Listen for postgres changes in a channel. - public func postgresChange( + public func onPostgresChange( _: DeleteAction.Type, schema: String = "public", table: String? = nil, - filter: String? = nil - ) -> AsyncStream { - postgresChange(event: .delete, schema: schema, table: table, filter: filter) - .compactMap { $0.wrappedAction as? DeleteAction } - .eraseToStream() - } - - /// Listen for postgres changes in a channel. - public func postgresChange( - _: SelectAction.Type, - schema: String = "public", - table: String? = nil, - filter: String? = nil - ) -> AsyncStream { - postgresChange(event: .select, schema: schema, table: table, filter: filter) - .compactMap { $0.wrappedAction as? SelectAction } - .eraseToStream() - } - - /// Listen for postgres changes in a channel. - public func postgresChange( - _: AnyAction.Type, - schema: String = "public", - table: String? = nil, - filter: String? = nil - ) -> AsyncStream { - postgresChange(event: .all, schema: schema, table: table, filter: filter) + filter: String? = nil, + callback: @escaping @Sendable (DeleteAction) -> Void + ) -> Subscription { + _onPostgresChange( + event: .delete, + schema: schema, + table: table, + filter: filter + ) { + guard case let .delete(action) = $0 else { return } + callback(action) + } } - private func postgresChange( + func _onPostgresChange( event: PostgresChangeEvent, schema: String, table: String?, - filter: String? - ) -> AsyncStream { + filter: String?, + callback: @escaping @Sendable (AnyAction) -> Void + ) -> Subscription { precondition( status != .subscribed, "You cannot call postgresChange after joining the channel" ) - let (stream, continuation) = AsyncStream.makeStream() - let config = PostgresJoinConfig( event: event, schema: schema, @@ -437,44 +430,23 @@ public actor RealtimeChannelV2 { clientChanges.append(config) - let id = callbackManager.addPostgresCallback(filter: config) { action in - continuation.yield(action) - } - - let logger = logger - - continuation.onTermination = { [weak callbackManager] _ in + let id = callbackManager.addPostgresCallback(filter: config, callback: callback) + return Subscription { [weak callbackManager, logger] in logger?.debug("Removing postgres callback with id: \(id)") callbackManager?.removeCallback(id: id) } - - return stream } - /// Listen for broadcast messages sent by other clients within the same channel under a specific - /// `event`. - public func broadcastStream(event: String) -> AsyncStream { - let (stream, continuation) = AsyncStream.makeStream() - - let id = callbackManager.addBroadcastCallback(event: event) { - continuation.yield($0) - } - - let logger = logger - - continuation.onTermination = { [weak callbackManager] _ in + /// Listen for broadcast messages sent by other clients within the same channel under a specific `event`. + public func onBroadcast( + event: String, + callback: @escaping @Sendable (JSONObject) -> Void + ) -> Subscription { + let id = callbackManager.addBroadcastCallback(event: event, callback: callback) + return Subscription { [weak callbackManager, logger] in logger?.debug("Removing broadcast callback with id: \(id)") callbackManager?.removeCallback(id: id) } - - return stream - } - - /// Listen for broadcast messages sent by other clients within the same channel under a specific - /// `event`. - @available(*, deprecated, renamed: "broadcastStream(event:)") - public func broadcast(event: String) -> AsyncStream { - broadcastStream(event: event) } @discardableResult diff --git a/Sources/Realtime/V2/RealtimeClientV2.swift b/Sources/Realtime/V2/RealtimeClientV2.swift index c3f838b8..f0a3f023 100644 --- a/Sources/Realtime/V2/RealtimeClientV2.swift +++ b/Sources/Realtime/V2/RealtimeClientV2.swift @@ -117,6 +117,9 @@ public actor RealtimeClientV2 { subscriptions = [:] } + /// Connects the socket. + /// + /// Suspends until connected. public func connect() async { await connect(reconnect: false) } @@ -284,7 +287,7 @@ public actor RealtimeClientV2 { pendingHeartbeatRef = makeRef() - await send( + await push( RealtimeMessageV2( joinRef: nil, ref: pendingHeartbeatRef?.description, @@ -305,6 +308,8 @@ public actor RealtimeClientV2 { status = .disconnected } + /// Sets the JWT access token used for channel subscription authorization and Realtime RLS. + /// - Parameter token: A JWT string. public func setAuth(_ token: String?) async { accessToken = token @@ -328,7 +333,14 @@ public actor RealtimeClientV2 { } } - func send(_ message: RealtimeMessageV2) async { + /// Push out a message if the socket is connected. + /// - Parameter message: The message to push through the socket. + public func push(_ message: RealtimeMessageV2) async { + guard status == .connected else { + config.logger?.warning("Trying to push a message while socket is not connected. This is not supported yet.") + return + } + do { try await ws.send(message) } catch { diff --git a/Sources/_Helpers/EventEmitter.swift b/Sources/_Helpers/EventEmitter.swift index 3e24c12b..a47c002e 100644 --- a/Sources/_Helpers/EventEmitter.swift +++ b/Sources/_Helpers/EventEmitter.swift @@ -9,10 +9,19 @@ import ConcurrencyExtras import Foundation public final class ObservationToken: Sendable { - let _onRemove = LockIsolated((@Sendable () -> Void)?.none) + let _onCancel = LockIsolated((@Sendable () -> Void)?.none) + package init(_ onCancel: (@Sendable () -> Void)? = nil) { + _onCancel.setValue(onCancel) + } + + @available(*, deprecated, renamed: "cancel") public func remove() { - _onRemove.withValue { + cancel() + } + + public func cancel() { + _onCancel.withValue { if $0 == nil { return } @@ -23,7 +32,7 @@ public final class ObservationToken: Sendable { } deinit { - remove() + cancel() } } @@ -53,7 +62,7 @@ package final class EventEmitter: Sendable { let token = ObservationToken() let key = ObjectIdentifier(token) - token._onRemove.setValue { [weak self] in + token._onCancel.setValue { [weak self] in self?.listeners.withValue { $0[key] = nil } @@ -86,7 +95,7 @@ package final class EventEmitter: Sendable { } continuation.onTermination = { _ in - token.remove() + token.cancel() } } } diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index 414deb9d..5c628527 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -33,7 +33,38 @@ final class RealtimeTests: XCTestCase { ) } - func testBehavior() async { + func testBehavior_Closure() async { + let channel = await sut.channel("public:messages") + _ = await channel.onPostgresChange(InsertAction.self, table: "messages") { _ in } + _ = await channel.onPostgresChange(UpdateAction.self, table: "messages") { _ in } + _ = await channel.onPostgresChange(DeleteAction.self, table: "messages") { _ in } + + let statusChange = await sut.statusChange + + await connectSocketAndWait() + + let status = await statusChange.prefix(3).collect() + XCTAssertEqual(status, [.disconnected, .connecting, .connected]) + + let messageTask = await sut.messageTask + XCTAssertNotNil(messageTask) + + let heartbeatTask = await sut.heartbeatTask + XCTAssertNotNil(heartbeatTask) + + let subscription = Task { + await channel.subscribe() + } + await Task.megaYield() + ws.mockReceive(.messagesSubscribed) + + // Wait until channel subscribed + await subscription.value + + XCTAssertNoDifference(ws.sentMessages.value, [.subscribeToMessages]) + } + + func testBehavior_AsyncAwait() async { let channel = await sut.channel("public:messages") _ = await channel.postgresChange(InsertAction.self, table: "messages") _ = await channel.postgresChange(UpdateAction.self, table: "messages") diff --git a/Tests/_HelpersTests/ObservationTokenTests.swift b/Tests/_HelpersTests/ObservationTokenTests.swift index 62d53856..d00eaa1d 100644 --- a/Tests/_HelpersTests/ObservationTokenTests.swift +++ b/Tests/_HelpersTests/ObservationTokenTests.swift @@ -15,14 +15,14 @@ final class ObservationTokenTests: XCTestCase { let handle = ObservationToken() let onRemoveCallCount = LockIsolated(0) - handle._onRemove.setValue { + handle._onCancel.setValue { onRemoveCallCount.withValue { $0 += 1 } } - handle.remove() - handle.remove() + handle.cancel() + handle.cancel() XCTAssertEqual(onRemoveCallCount.value, 1) } @@ -31,7 +31,7 @@ final class ObservationTokenTests: XCTestCase { var handle: ObservationToken? = ObservationToken() let onRemoveCallCount = LockIsolated(0) - handle?._onRemove.setValue { + handle?._onCancel.setValue { onRemoveCallCount.withValue { $0 += 1 }