Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Sources/Realtime/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
@MainActor
private var mutableState = MutableState()

let topic: String
public let topic: String

@MainActor var config: RealtimeChannelConfig
@MainActor public private(set) var config: RealtimeChannelConfig

let logger: (any SupabaseLogger)?
let socket: any RealtimeClientProtocol
Expand Down
74 changes: 38 additions & 36 deletions Sources/Realtime/RealtimeClientV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import Foundation
#endif

/// Factory function for returning a new WebSocket connection.
typealias WebSocketTransport = @Sendable (_ url: URL, _ headers: [String: String]) async throws ->
typealias WebSocketTransport =
@Sendable (_ url: URL, _ headers: [String: String]) async throws ->
any WebSocket

protocol RealtimeClientProtocol: AnyObject, Sendable {
Expand Down Expand Up @@ -84,10 +85,7 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol {
///
/// You can also use ``onHeartbeat(_:)`` for a closure based method.
public var heartbeat: AsyncStream<HeartbeatStatus> {
AsyncStream(
heartbeatSubject.values.compactMap { $0 }
as AsyncCompactMapSequence<AsyncStream<HeartbeatStatus?>, HeartbeatStatus>
)
AsyncStream(heartbeatSubject.values.compactMap { $0 })
}

/// Listen for connection status changes.
Expand Down Expand Up @@ -366,48 +364,52 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol {
}

private func listenForMessages() {
let messageTask = Task { [weak self] in
guard let self, let conn = self.conn else { return }

do {
for await event in conn.events {
if Task.isCancelled { return }

switch event {
case .binary:
self.options.logger?.error("Unsupported binary event received.")
break
case .text(let text):
let data = Data(text.utf8)
let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data)
await onMessage(message)
mutableState.withValue {
$0.messageTask?.cancel()
$0.messageTask = Task { [weak self] in
guard let self, let conn = self.conn else { return }

case let .close(code, reason):
onClose(code: code, reason: reason)
do {
for await event in conn.events {
if Task.isCancelled { return }

switch event {
case .binary:
self.options.logger?.error("Unsupported binary event received.")
break
case .text(let text):
let data = Data(text.utf8)
let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data)
await onMessage(message)

if Task.isCancelled {
return
}

case .close(let code, let reason):
onClose(code: code, reason: reason)
}
}
} catch {
onError(error)
}
} catch {
onError(error)
}
}
mutableState.withValue {
$0.messageTask = messageTask
}
}

private func startHeartbeating() {
let heartbeatTask = Task { [weak self, options] in
while !Task.isCancelled {
try? await _clock.sleep(for: options.heartbeatInterval)
if Task.isCancelled {
break
mutableState.withValue {
$0.heartbeatTask?.cancel()
$0.heartbeatTask = Task { [weak self, options] in
while !Task.isCancelled {
try? await _clock.sleep(for: options.heartbeatInterval)
if Task.isCancelled {
break
}
await self?.sendHeartbeat()
}
await self?.sendHeartbeat()
}
}
mutableState.withValue {
$0.heartbeatTask = heartbeatTask
}
}

private func sendHeartbeat() async {
Expand Down
187 changes: 187 additions & 0 deletions Tests/RealtimeTests/RealtimeTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,193 @@ final class RealtimeTests: XCTestCase {
let token = "sb-token"
await sut.setAuth(token)
}

// MARK: - Task Lifecycle Tests

func testListenForMessagesCancelsExistingTask() async {
server.onEvent = { @Sendable [server] event in
guard let msg = event.realtimeMessage else { return }

if msg.event == "heartbeat" {
server?.send(
RealtimeMessageV2(
joinRef: msg.joinRef,
ref: msg.ref,
topic: "phoenix",
event: "phx_reply",
payload: ["response": [:]]
)
)
}
}

await sut.connect()

// Get the first message task
let firstMessageTask = sut.mutableState.messageTask
XCTAssertNotNil(firstMessageTask)
XCTAssertFalse(firstMessageTask?.isCancelled ?? true)

// Trigger reconnection which will call listenForMessages again
sut.disconnect()
await sut.connect()

// Verify the old task was cancelled
XCTAssertTrue(firstMessageTask?.isCancelled ?? false)

// Verify a new task was created
let secondMessageTask = sut.mutableState.messageTask
XCTAssertNotNil(secondMessageTask)
XCTAssertFalse(secondMessageTask?.isCancelled ?? true)
}

func testStartHeartbeatingCancelsExistingTask() async {
server.onEvent = { @Sendable [server] event in
guard let msg = event.realtimeMessage else { return }

if msg.event == "heartbeat" {
server?.send(
RealtimeMessageV2(
joinRef: msg.joinRef,
ref: msg.ref,
topic: "phoenix",
event: "phx_reply",
payload: ["response": [:]]
)
)
}
}

await sut.connect()

// Get the first heartbeat task
let firstHeartbeatTask = sut.mutableState.heartbeatTask
XCTAssertNotNil(firstHeartbeatTask)
XCTAssertFalse(firstHeartbeatTask?.isCancelled ?? true)

// Trigger reconnection which will call startHeartbeating again
sut.disconnect()
await sut.connect()

// Verify the old task was cancelled
XCTAssertTrue(firstHeartbeatTask?.isCancelled ?? false)

// Verify a new task was created
let secondHeartbeatTask = sut.mutableState.heartbeatTask
XCTAssertNotNil(secondHeartbeatTask)
XCTAssertFalse(secondHeartbeatTask?.isCancelled ?? true)
}

func testMessageProcessingRespectsCancellation() async {
let messagesProcessed = LockIsolated(0)

server.onEvent = { @Sendable [server] event in
guard let msg = event.realtimeMessage else { return }

if msg.event == "heartbeat" {
server?.send(
RealtimeMessageV2(
joinRef: msg.joinRef,
ref: msg.ref,
topic: "phoenix",
event: "phx_reply",
payload: ["response": [:]]
)
)
}
}

await sut.connect()

// Send multiple messages
for i in 1...3 {
server.send(
RealtimeMessageV2(
joinRef: nil,
ref: "\(i)",
topic: "test-topic",
event: "test-event",
payload: ["index": .double(Double(i))]
)
)
messagesProcessed.withValue { $0 += 1 }
}

await Task.megaYield()

// Disconnect to cancel message processing
sut.disconnect()

// Try to send more messages after disconnect (these should not be processed)
for i in 4...6 {
server.send(
RealtimeMessageV2(
joinRef: nil,
ref: "\(i)",
topic: "test-topic",
event: "test-event",
payload: ["index": .double(Double(i))]
)
)
}

await Task.megaYield()

// Verify that the message task was cancelled
XCTAssertTrue(sut.mutableState.messageTask?.isCancelled ?? false)
}

func testMultipleReconnectionsHandleTaskLifecycleCorrectly() async {
server.onEvent = { @Sendable [server] event in
guard let msg = event.realtimeMessage else { return }

if msg.event == "heartbeat" {
server?.send(
RealtimeMessageV2(
joinRef: msg.joinRef,
ref: msg.ref,
topic: "phoenix",
event: "phx_reply",
payload: ["response": [:]]
)
)
}
}

var previousMessageTasks: [Task<Void, Never>?] = []
var previousHeartbeatTasks: [Task<Void, Never>?] = []

// Test multiple connect/disconnect cycles
for _ in 1...3 {
await sut.connect()

let messageTask = sut.mutableState.messageTask
let heartbeatTask = sut.mutableState.heartbeatTask

XCTAssertNotNil(messageTask)
XCTAssertNotNil(heartbeatTask)
XCTAssertFalse(messageTask?.isCancelled ?? true)
XCTAssertFalse(heartbeatTask?.isCancelled ?? true)

previousMessageTasks.append(messageTask)
previousHeartbeatTasks.append(heartbeatTask)

sut.disconnect()

// Verify tasks were cancelled after disconnect
XCTAssertTrue(messageTask?.isCancelled ?? false)
XCTAssertTrue(heartbeatTask?.isCancelled ?? false)
}

// Verify all previous tasks were properly cancelled
for task in previousMessageTasks {
XCTAssertTrue(task?.isCancelled ?? false)
}

for task in previousHeartbeatTasks {
XCTAssertTrue(task?.isCancelled ?? false)
}
}
}

extension RealtimeMessageV2 {
Expand Down
Loading