Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 94 additions & 15 deletions Sources/Realtime/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {

/// Subscribes to the channel.
public func subscribeWithError() async throws {
logger?.debug("Starting subscription to channel '\(topic)' (attempt 1/\(socket.options.maxRetryAttempts))")
logger?.debug(
"Starting subscription to channel '\(topic)' (attempt 1/\(socket.options.maxRetryAttempts))"
)

status = .subscribing

Expand Down Expand Up @@ -248,6 +250,87 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
)
}

/// Sends a broadcast message explicitly via REST API.
///
/// This method always uses the REST API endpoint regardless of WebSocket connection state.
/// Useful when you want to guarantee REST delivery or when gradually migrating from implicit REST fallback.
///
/// - Parameters:
/// - event: The name of the broadcast event.
/// - message: Message payload (required).
/// - timeout: Optional timeout interval. If not specified, uses the socket's default timeout.
/// - Returns: `true` if the message was accepted (HTTP 202), otherwise throws an error.
/// - Throws: An error if the access token is missing, payload is missing, or the request fails.
public func httpSend(
event: String,
message: some Codable,
timeout: TimeInterval? = nil
) async throws {
try await httpSend(event: event, message: JSONObject(message), timeout: timeout)
}

/// Sends a broadcast message explicitly via REST API.
///
/// This method always uses the REST API endpoint regardless of WebSocket connection state.
/// Useful when you want to guarantee REST delivery or when gradually migrating from implicit REST fallback.
///
/// - Parameters:
/// - event: The name of the broadcast event.
/// - message: Message payload as a `JSONObject` (required).
/// - timeout: Optional timeout interval. If not specified, uses the socket's default timeout.
/// - Returns: `true` if the message was accepted (HTTP 202), otherwise throws an error.
/// - Throws: An error if the access token is missing, payload is missing, or the request fails.
public func httpSend(
event: String,
message: JSONObject,
timeout: TimeInterval? = nil
) async throws {
guard let accessToken = await socket._getAccessToken() else {
throw RealtimeError("Access token is required for httpSend()")
}

var headers: HTTPFields = [.contentType: "application/json"]
if let apiKey = socket.options.apikey {
headers[.apiKey] = apiKey
}
headers[.authorization] = "Bearer \(accessToken)"

let body = try await JSONEncoder().encode(
BroadcastMessagePayload(
messages: [
BroadcastMessagePayload.Message(
topic: topic,
event: event,
payload: message,
private: config.isPrivate
)
]
)
)

let request = HTTPRequest(
url: socket.broadcastURL,
method: .post,
headers: headers,
body: body
)

let response = try await withTimeout(interval: timeout ?? socket.options.timeoutInterval) { [self] in
await Result {
try await socket.http.send(request)
}
}.get()

guard response.statusCode == 202 else {
// Try to parse error message from response body
var errorMessage = HTTPURLResponse.localizedString(forStatusCode: response.statusCode)
if let errorBody = try? response.decoded(as: [String: String].self) {
errorMessage = errorBody["error"] ?? errorBody["message"] ?? errorMessage
}
throw RealtimeError(errorMessage)
}
}

/// Send a broadcast message with `event` and a `Codable` payload.
/// - Parameters:
/// - event: Broadcast message event.
Expand All @@ -263,6 +346,12 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
@MainActor
public func broadcast(event: String, message: JSONObject) async {
if status != .subscribed {
logger?.warning(
"Realtime broadcast() is automatically falling back to REST API. "
+ "This behavior will be deprecated in the future. "
+ "Please use httpSend() explicitly for REST delivery."
)

var headers: HTTPFields = [.contentType: "application/json"]
if let apiKey = socket.options.apikey {
headers[.apiKey] = apiKey
Expand All @@ -271,17 +360,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
headers[.authorization] = "Bearer \(accessToken)"
}

struct BroadcastMessagePayload: Encodable {
let messages: [Message]

struct Message: Encodable {
let topic: String
let event: String
let payload: JSONObject
let `private`: Bool
}
}

let task = Task { [headers] in
_ = try? await socket.http.send(
HTTPRequest(
Expand Down Expand Up @@ -543,7 +621,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
table: table,
filter: filter
) {
guard case let .insert(action) = $0 else { return }
guard case .insert(let action) = $0 else { return }
callback(action)
}
}
Expand All @@ -562,7 +640,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
table: table,
filter: filter
) {
guard case let .update(action) = $0 else { return }
guard case .update(let action) = $0 else { return }
callback(action)
}
}
Expand All @@ -581,7 +659,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
table: table,
filter: filter
) {
guard case let .delete(action) = $0 else { return }
guard case .delete(let action) = $0 else { return }
callback(action)
}
}
Expand Down Expand Up @@ -673,3 +751,4 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
push?.didReceive(status: PushStatus(rawValue: status) ?? .ok)
}
}

11 changes: 11 additions & 0 deletions Sources/Realtime/Types.swift
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,14 @@ extension HTTPField.Name {
public enum LogLevel: String, Sendable {
case info, warn, error
}

struct BroadcastMessagePayload: Encodable {
let messages: [Message]

struct Message: Encodable {
let topic: String
let event: String
let payload: JSONObject
let `private`: Bool
}
}
Loading
Loading