Skip to content

Commit

Permalink
Allow configuring a queue for HubConnection callbacks
Browse files Browse the repository at this point in the history
By default the main queue is configured
  • Loading branch information
moozzyk committed Oct 16, 2022
1 parent a6e8d25 commit f4558f6
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 73 deletions.
2 changes: 1 addition & 1 deletion Examples/HubSamplePhone/ViewController.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import SignalRClient

class ViewController: UIViewController, UITableViewDelegate, UITableViewDataSource {
// Update the Url accordingly
private let serverUrl = "http://192.168.86.115:5000/chat" // /chat or /chatLongPolling or /chatWebSockets
private let serverUrl = "http://192.168.86.250:5000/chat" // /chat or /chatLongPolling or /chatWebSockets
private let dispatchQueue = DispatchQueue(label: "hubsamplephone.queue.dispatcheueuq")

private var chatHubConnection: HubConnection?
Expand Down
75 changes: 49 additions & 26 deletions Sources/SignalRClient/HubConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class HubConnection {
private let keepAliveIntervalInSeconds: Double?
private var keepAlivePingTask: DispatchWorkItem?

private let callbackQueue: DispatchQueue

/**
Allows setting a delegate that will be notified about connection lifecycle events
Expand Down Expand Up @@ -61,6 +63,7 @@ public class HubConnection {
self.connection = connection
self.hubProtocol = hubProtocol
self.keepAliveIntervalInSeconds = hubConnectionOptions.keepAliveInterval
self.callbackQueue = hubConnectionOptions.callbackQueue
self.logger = logger
self.hubConnectionQueue = DispatchQueue(label: "SignalR.hubconnection.queue")
}
Expand Down Expand Up @@ -157,11 +160,15 @@ public class HubConnection {
if error == nil {
self.resetKeepAlive()
}
sendDidComplete(error)
self.callbackQueue.async {
sendDidComplete(error)
}
})
} catch {
logger.log(logLevel: .error, message: "Sending to server side hub method '\(method)' failed. Error: \(error)")
sendDidComplete(error)
self.callbackQueue.async {
sendDidComplete(error)
}
}
}

Expand Down Expand Up @@ -208,7 +215,7 @@ public class HubConnection {
return
}

let invocationHandler = InvocationHandler<T>(logger: logger, invocationDidComplete: invocationDidComplete)
let invocationHandler = InvocationHandler<T>(logger: logger, callbackQueue: callbackQueue, invocationDidComplete: invocationDidComplete)

_ = invoke(invocationHandler: invocationHandler, method: method, arguments: arguments)
}
Expand Down Expand Up @@ -242,7 +249,7 @@ public class HubConnection {
return StreamHandle(invocationId: "")
}

let streamInvocationHandler = StreamInvocationHandler<T>(logger: logger, streamItemReceived: streamItemReceived, invocationDidComplete: invocationDidComplete)
let streamInvocationHandler = StreamInvocationHandler<T>(logger: logger, callbackQueue: callbackQueue, streamItemReceived: streamItemReceived, invocationDidComplete: invocationDidComplete)

let id = invoke(invocationHandler: streamInvocationHandler, method: method, arguments: arguments)

Expand All @@ -265,7 +272,9 @@ public class HubConnection {

if streamHandle.invocationId == "" {
logger.log(logLevel: .error, message: "Invalid stream handle")
cancelDidFail(SignalRError.invalidOperation(message: "Invalid stream handle."))
callbackQueue.async {
cancelDidFail(SignalRError.invalidOperation(message: "Invalid stream handle."))
}
return
}

Expand All @@ -275,14 +284,18 @@ public class HubConnection {
connection.send(data: cancelInvocationData, sendDidComplete: {error in
if let e = error {
self.logger.log(logLevel: .error, message: "Sending cancellation of server side streaming hub returned error: \(e)")
cancelDidFail(e)
self.callbackQueue.async {
cancelDidFail(e)
}
} else {
self.resetKeepAlive()
}
})
} catch {
logger.log(logLevel: .error, message: "Sending cancellation of server side streaming hub method failed: \(error)")
cancelDidFail(error)
self.callbackQueue.async {
cancelDidFail(error)
}
}
}

Expand Down Expand Up @@ -320,15 +333,17 @@ public class HubConnection {
_ = pendingCalls.removeValue(forKey: invocationId)
}

Util.dispatchToMainThread {
callbackQueue.async {
invocationHandler.raiseError(error: error)
}
}

private func ensureConnectionStarted(errorHandler: (Error)->Void) -> Bool {
private func ensureConnectionStarted(errorHandler: @escaping (Error)->Void) -> Bool {
guard handshakeStatus.isHandled else {
logger.log(logLevel: .error, message: "Attempting to send data before connection has been started.")
errorHandler(SignalRError.invalidOperation(message: "Attempting to send data before connection has been started."))
callbackQueue.async {
errorHandler(SignalRError.invalidOperation(message: "Attempting to send data before connection has been started."))
}
return false
}
return true
Expand All @@ -349,13 +364,19 @@ public class HubConnection {
// TODO: (BUG) if this fails when reconnecting the callback should not be called and there
// will be no further reconnect attempts
logger.log(logLevel: .error, message: "Parsing handshake response failed: \(e)")
delegate?.connectionDidFailToOpen(error: e)
callbackQueue.async {
self.delegate?.connectionDidFailToOpen(error: e)
}
return
}
if originalHandshakeStatus.isReconnect {
delegate?.connectionDidReconnect()
callbackQueue.async {
self.delegate?.connectionDidReconnect()
}
} else {
delegate?.connectionDidOpen(hubConnection: self)
callbackQueue.async {
self.delegate?.connectionDidOpen(hubConnection: self)
}
resetKeepAlive()
}
}
Expand Down Expand Up @@ -391,7 +412,7 @@ public class HubConnection {
}

if serverInvocationHandler != nil {
Util.dispatchToMainThread {
callbackQueue.async {
serverInvocationHandler!.processCompletion(completionMessage: message)
}
} else {
Expand All @@ -406,11 +427,9 @@ public class HubConnection {
}

if serverInvocationHandler != nil {
Util.dispatchToMainThread {
if let error = serverInvocationHandler!.processStreamItem(streamItemMessage: message) {
self.logger.log(logLevel: .error, message: "Processing stream item failed: \(error)")
self.failInvocationWithError(invocationHandler: serverInvocationHandler!, invocationId: message.invocationId, error: error)
}
if let error = serverInvocationHandler!.processStreamItem(streamItemMessage: message) {
logger.log(logLevel: .error, message: "Processing stream item failed: \(error)")
failInvocationWithError(invocationHandler: serverInvocationHandler!, invocationId: message.invocationId, error: error)
}
} else {
logger.log(logLevel: .error, message: "Could not find callback with id \(message.invocationId)")
Expand All @@ -425,7 +444,7 @@ public class HubConnection {
}

if callback != nil {
Util.dispatchToMainThread {
callbackQueue.async {
do {
try callback!(ArgumentExtractor(clientInvocationMessage: message))
} catch {
Expand Down Expand Up @@ -453,21 +472,25 @@ public class HubConnection {
logger.log(logLevel: .info, message: "Terminating \(invocationHandlers.count) pending hub methods")
let invocationError = error ?? SignalRError.hubInvocationCancelled
for serverInvocationHandler in invocationHandlers {
Util.dispatchToMainThread {
serverInvocationHandler.raiseError(error: invocationError)
}
serverInvocationHandler.raiseError(error: invocationError)
}
handshakeStatus = .needsHandling(false)
delegate?.connectionDidClose(error: error)
callbackQueue.async {
self.delegate?.connectionDidClose(error: error)
}
}

fileprivate func connectionDidFailToOpen(error: Error) {
delegate?.connectionDidFailToOpen(error: error)
callbackQueue.async {
self.delegate?.connectionDidFailToOpen(error: error)
}
}

fileprivate func connectionWillReconnect(error: Error) {
handshakeStatus = .needsHandling(true)
delegate?.connectionWillReconnect(error: error)
callbackQueue.async {
self.delegate?.connectionWillReconnect(error: error)
}
}

fileprivate func connectionDidReconnect() {
Expand Down
9 changes: 4 additions & 5 deletions Sources/SignalRClient/ReconnectableConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ internal class ReconnectableConnection: Connection {
logger.log(logLevel: .info, message: "Received send request")
guard state != .reconnecting else {
// TODO: consider buffering
Util.dispatchToMainThread {
sendDidComplete(SignalRError.connectionIsReconnecting)
}
sendDidComplete(SignalRError.connectionIsReconnecting)
return
}
underlyingConnection.send(data: data, sendDidComplete: sendDidComplete)
Expand Down Expand Up @@ -117,11 +115,12 @@ internal class ReconnectableConnection: Connection {
logger.log(logLevel: .debug, message: "nextAttemptInterval: \(nextAttemptInterval), RetryContext: \(retryContext)")
if nextAttemptInterval != .never {
logger.log(logLevel: .debug, message: "Scheduling reconnect attempt at: \(nextAttemptInterval)")
// TODO: can this cause problems because event handlers are dispatched to main queue as well (via `Util.dispatchToMainThread`)
// TODO: not great but running on the connectionQueue deadlocks
DispatchQueue.main.asyncAfter(deadline: .now() + nextAttemptInterval) {
self.startInternal()
}
// TODO: again, running on a random (possibly main) queue
// running on a random (possibly main) queue but HubConnection will
// dispatch to the configured queue
if (retryContext.failedAttemptsCount == 0) {
delegate?.connectionWillReconnect(error: retryContext.error)
}
Expand Down
12 changes: 7 additions & 5 deletions Sources/SignalRClient/ServerInvocationHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ internal class InvocationHandler<T: Decodable>: ServerInvocationHandler {
private let logger: Logger
private let invocationDidComplete: (T?, Error?) -> Void

init(logger: Logger, invocationDidComplete: @escaping (T?, Error?) -> Void) {
init(logger: Logger, callbackQueue: DispatchQueue, invocationDidComplete: @escaping (T?, Error?) -> Void) {
self.logger = logger
self.invocationDidComplete = invocationDidComplete
self.invocationDidComplete = {result, error in
callbackQueue.async { invocationDidComplete(result, error)}
}
}

func createInvocationMessage(invocationId: String, method: String, arguments: [Encodable], streamIds: [String]?) -> HubMessage {
Expand Down Expand Up @@ -70,10 +72,10 @@ internal class StreamInvocationHandler<T: Decodable>: ServerInvocationHandler {
private let streamItemReceived: (T) -> Void
private let invocationDidComplete: (Error?) -> Void

init(logger: Logger, streamItemReceived: @escaping (T) -> Void, invocationDidComplete: @escaping (Error?) -> Void) {
init(logger: Logger, callbackQueue: DispatchQueue, streamItemReceived: @escaping (T) -> Void, invocationDidComplete: @escaping (Error?) -> Void) {
self.logger = logger
self.streamItemReceived = streamItemReceived
self.invocationDidComplete = invocationDidComplete
self.streamItemReceived = { item in callbackQueue.async { streamItemReceived(item) } }
self.invocationDidComplete = { error in callbackQueue.async { invocationDidComplete(error) } }
}

func createInvocationMessage(invocationId: String, method: String, arguments: [Encodable], streamIds: [String]?) -> HubMessage {
Expand Down
15 changes: 0 additions & 15 deletions Sources/SignalRClient/Util.swift

This file was deleted.

0 comments on commit f4558f6

Please sign in to comment.