Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(functions): add experimental invoke with streamed responses #346

Merged
merged 4 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 94 additions & 28 deletions Sources/Functions/FunctionsClient.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import _Helpers
import Foundation
@preconcurrency import Foundation

#if canImport(FoundationNetworking)
import FoundationNetworking
Expand All @@ -20,21 +20,23 @@ public actor FunctionsClient {
var headers: [String: String]
/// The Region to invoke the functions in.
let region: String?
/// The fetch handler used to make requests.
let fetch: FetchHandler

private let http: HTTPClient

/// Initializes a new instance of `FunctionsClient`.
///
/// - Parameters:
/// - url: The base URL for the functions.
/// - headers: Headers to be included in the requests. (Default: empty dictionary)
/// - region: The Region to invoke the functions in.
/// - logger: SupabaseLogger instance to use.
/// - fetch: The fetch handler used to make requests. (Default: URLSession.shared.data(for:))
@_disfavoredOverload
public init(
url: URL,
headers: [String: String] = [:],
region: String? = nil,
logger: (any SupabaseLogger)? = nil,
fetch: @escaping FetchHandler = { try await URLSession.shared.data(for: $0) }
) {
self.url = url
Expand All @@ -43,7 +45,7 @@ public actor FunctionsClient {
self.headers["X-Client-Info"] = "functions-swift/\(version)"
}
self.region = region
self.fetch = fetch
http = HTTPClient(logger: logger, fetchHandler: fetch)
}

/// Initializes a new instance of `FunctionsClient`.
Expand All @@ -52,20 +54,16 @@ public actor FunctionsClient {
/// - url: The base URL for the functions.
/// - headers: Headers to be included in the requests. (Default: empty dictionary)
/// - region: The Region to invoke the functions in.
/// - logger: SupabaseLogger instance to use.
/// - fetch: The fetch handler used to make requests. (Default: URLSession.shared.data(for:))
public init(
url: URL,
headers: [String: String] = [:],
region: FunctionRegion? = nil,
logger: (any SupabaseLogger)? = nil,
fetch: @escaping FetchHandler = { try await URLSession.shared.data(for: $0) }
) {
self.url = url
self.headers = headers
if headers["X-Client-Info"] == nil {
self.headers["X-Client-Info"] = "functions-swift/\(version)"
}
self.region = region?.rawValue
self.fetch = fetch
self.init(url: url, headers: headers, region: region?.rawValue, logger: logger, fetch: fetch)
}

/// Updates the authorization header.
Expand All @@ -92,10 +90,10 @@ public actor FunctionsClient {
options: FunctionInvokeOptions = .init(),
decode: (Data, HTTPURLResponse) throws -> Response
) async throws -> Response {
let (data, response) = try await rawInvoke(
let response = try await rawInvoke(
functionName: functionName, invokeOptions: options
)
return try decode(data, response)
return try decode(response.data, response.response)
}

/// Invokes a function and decodes the response as a specific type.
Expand Down Expand Up @@ -130,33 +128,101 @@ public actor FunctionsClient {
private func rawInvoke(
functionName: String,
invokeOptions: FunctionInvokeOptions
) async throws -> (Data, HTTPURLResponse) {
) async throws -> Response {
var request = Request(
path: functionName,
method: .post,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is causing a major bug in the library. Upon upgrading our client version, we can no longer make GET/PUT/DELETE requests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jareddr thanks for pointing out the issue, a PR with the fix #367

I'll get it merged and released ASAP

headers: invokeOptions.headers.merging(headers) { invoke, _ in invoke },
body: invokeOptions.body
)

if let region = invokeOptions.region ?? region {
request.headers["x-region"] = region
}

let response = try await http.fetch(request, baseURL: url)

guard 200 ..< 300 ~= response.statusCode else {
throw FunctionsError.httpError(code: response.statusCode, data: response.data)
}

let isRelayError = response.response.value(forHTTPHeaderField: "x-relay-error") == "true"
if isRelayError {
throw FunctionsError.relayError
}

return response
}

/// Invokes a function with streamed response.
///
/// Function MUST return a `text/event-stream` content type for this method to work.
///
/// - Parameters:
/// - functionName: The name of the function to invoke.
/// - invokeOptions: Options for invoking the function.
/// - Returns: A stream of Data.
///
/// - Warning: Experimental method.
/// - Note: This method doesn't use the same underlying `URLSession` as the remaining methods in the library.
public func _invokeWithStreamedResponse(
_ functionName: String,
options invokeOptions: FunctionInvokeOptions = .init()
) -> AsyncThrowingStream<Data, any Error> {
let (stream, continuation) = AsyncThrowingStream<Data, any Error>.makeStream()
let delegate = StreamResponseDelegate(continuation: continuation)

let session = URLSession(configuration: .default, delegate: delegate, delegateQueue: nil)

let url = url.appendingPathComponent(functionName)
var urlRequest = URLRequest(url: url)
urlRequest.allHTTPHeaderFields = invokeOptions.headers.merging(headers) { invoke, _ in invoke }
urlRequest.httpMethod = (invokeOptions.method ?? .post).rawValue
urlRequest.httpBody = invokeOptions.body

let region = invokeOptions.region ?? region
if let region {
urlRequest.setValue(region, forHTTPHeaderField: "x-region")
let task = session.dataTask(with: urlRequest) { data, response, _ in
guard let httpResponse = response as? HTTPURLResponse else {
continuation.finish(throwing: URLError(.badServerResponse))
return
}

guard 200 ..< 300 ~= httpResponse.statusCode else {
let error = FunctionsError.httpError(code: httpResponse.statusCode, data: data ?? Data())
continuation.finish(throwing: error)
return
}

let isRelayError = httpResponse.value(forHTTPHeaderField: "x-relay-error") == "true"
if isRelayError {
continuation.finish(throwing: FunctionsError.relayError)
}
}

let (data, response) = try await fetch(urlRequest)
task.resume()

guard let httpResponse = response as? HTTPURLResponse else {
throw URLError(.badServerResponse)
}
continuation.onTermination = { _ in
task.cancel()

guard 200 ..< 300 ~= httpResponse.statusCode else {
throw FunctionsError.httpError(code: httpResponse.statusCode, data: data)
// Hold a strong reference to delegate until continuation terminates.
_ = delegate
}

let isRelayError = httpResponse.value(forHTTPHeaderField: "x-relay-error") == "true"
if isRelayError {
throw FunctionsError.relayError
}
return stream
}
}

final class StreamResponseDelegate: NSObject, URLSessionDataDelegate, Sendable {
let continuation: AsyncThrowingStream<Data, any Error>.Continuation

init(continuation: AsyncThrowingStream<Data, any Error>.Continuation) {
self.continuation = continuation
}

func urlSession(_: URLSession, dataTask _: URLSessionDataTask, didReceive data: Data) {
continuation.yield(data)
}

return (data, httpResponse)
func urlSession(_: URLSession, task _: URLSessionTask, didCompleteWithError error: (any Error)?) {
continuation.finish(throwing: error)
}
}
1 change: 1 addition & 0 deletions Sources/Supabase/SupabaseClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public final class SupabaseClient: @unchecked Sendable {
url: functionsURL,
headers: defaultHeaders,
region: options.functions.region,
logger: options.global.logger,
fetch: fetchWithAuth
)

Expand Down
Loading