Skip to content

Commit

Permalink
Fix a regression that results in a deadlock
Browse files Browse the repository at this point in the history
This refactoring f4558f6 effectively removed the fix
introduced in #244 and the client
started deadlocking if a keep alive was sent during reconnects. To avoid
deadlocks the callback will be now scheduled asynchronously on the callback queue if
there is an error due to a reconnect. For all other cases ReconnectableConnection will
rely on the fact that the underling HttpConnection already schedules callbacks to be
executed asynchronously.

Fixes: #264
  • Loading branch information
moozzyk committed Jan 4, 2023
1 parent 6109088 commit 0e7a67e
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 8 deletions.
2 changes: 1 addition & 1 deletion Sources/SignalRClient/HubConnectionBuilder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public class HubConnectionBuilder {
return HttpConnection(url: url, options: httpConnectionOptionsCopy, transportFactory: transportFactory, logger: logger)
}

return ReconnectableConnection(connectionFactory: connectionFactory, reconnectPolicy: reconnectPolicy, logger: logger)
return ReconnectableConnection(connectionFactory: connectionFactory, reconnectPolicy: reconnectPolicy, callbackQueue: httpConnectionOptions.callbackQueue, logger: logger)
}

private func createLegacyHttpConnection(transportFactory: TransportFactory) -> HttpConnection {
Expand Down
9 changes: 7 additions & 2 deletions Sources/SignalRClient/ReconnectableConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import Foundation

internal class ReconnectableConnection: Connection {
private let connectionQueue = DispatchQueue(label: "SignalR.reconnection.queue")
private let callbackQueue: DispatchQueue

private let connectionFactory: () -> Connection
private let reconnectPolicy: ReconnectPolicy
Expand Down Expand Up @@ -37,11 +38,12 @@ internal class ReconnectableConnection: Connection {
return underlyingConnection.inherentKeepAlive
}

init(connectionFactory: @escaping () -> Connection, reconnectPolicy: ReconnectPolicy, logger: Logger) {
init(connectionFactory: @escaping () -> Connection, reconnectPolicy: ReconnectPolicy, callbackQueue: DispatchQueue, logger: Logger) {
self.connectionFactory = connectionFactory
self.reconnectPolicy = reconnectPolicy
self.logger = logger
self.underlyingConnection = connectionFactory()
self.callbackQueue = callbackQueue
}

func start() {
Expand All @@ -58,7 +60,10 @@ internal class ReconnectableConnection: Connection {
logger.log(logLevel: .info, message: "Received send request")
guard state != .reconnecting else {
// TODO: consider buffering
sendDidComplete(SignalRError.connectionIsReconnecting)
// Never synchronously respond to avoid upstream deadlocks based on async assumptions
callbackQueue.async {
sendDidComplete(SignalRError.connectionIsReconnecting)
}
return
}
underlyingConnection.send(data: data, sendDidComplete: sendDidComplete)
Expand Down
47 changes: 42 additions & 5 deletions Tests/SignalRClientTests/ReconnectableConnectionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import XCTest
@testable import SignalRClient

class ReconnectableConnectionTests: XCTestCase {
private let callbackQueue = DispatchQueue(label: "SignalR.test.connection.callbackQueue")

public func testThatConnectionDoesNotReconnectIfReconnectPolicyReturnsNever() {
let didCloseExpectation = expectation(description: "connection closed")

let testConnection = TestConnection()
let delegate = TestConnectionDelegate()
let reconnectableConnection = ReconnectableConnection(connectionFactory: {return testConnection}, reconnectPolicy: NoReconnectPolicy(), logger: PrintLogger())
let reconnectableConnection = ReconnectableConnection(connectionFactory: {return testConnection}, reconnectPolicy: NoReconnectPolicy(), callbackQueue: callbackQueue, logger: PrintLogger())

delegate.connectionDidOpenHandler = { connection in
testConnection.delegate?.connectionDidClose(error: SignalRError.invalidOperation(message: "error"))
Expand All @@ -37,7 +39,7 @@ class ReconnectableConnectionTests: XCTestCase {

let testConnection = TestConnection()
let delegate = TestConnectionDelegate()
let reconnectableConnection = ReconnectableConnection(connectionFactory: {return testConnection}, reconnectPolicy: DefaultReconnectPolicy(retryIntervals: [.milliseconds(10)]), logger: PrintLogger())
let reconnectableConnection = ReconnectableConnection(connectionFactory: {return testConnection}, reconnectPolicy: DefaultReconnectPolicy(retryIntervals: [.milliseconds(10)]), callbackQueue: callbackQueue, logger: PrintLogger())

delegate.connectionDidOpenHandler = { connection in
didOpenExpectation.fulfill()
Expand Down Expand Up @@ -79,7 +81,7 @@ class ReconnectableConnectionTests: XCTestCase {

let testConnection = TestConnection()
let delegate = TestConnectionDelegate()
let reconnectableConnection = ReconnectableConnection(connectionFactory: {return testConnection}, reconnectPolicy: DefaultReconnectPolicy(retryIntervals: [.milliseconds(10), .milliseconds(10), .milliseconds(10)]), logger: PrintLogger())
let reconnectableConnection = ReconnectableConnection(connectionFactory: {return testConnection}, reconnectPolicy: DefaultReconnectPolicy(retryIntervals: [.milliseconds(10), .milliseconds(10), .milliseconds(10)]), callbackQueue: callbackQueue, logger: PrintLogger())

delegate.connectionDidOpenHandler = { connection in
didOpenExpectation.fulfill()
Expand Down Expand Up @@ -124,7 +126,7 @@ class ReconnectableConnectionTests: XCTestCase {

let testConnection = TestConnection()
let delegate = TestConnectionDelegate()
let reconnectableConnection = ReconnectableConnection(connectionFactory: {return testConnection}, reconnectPolicy: DefaultReconnectPolicy(retryIntervals: [.milliseconds(10)]), logger: PrintLogger())
let reconnectableConnection = ReconnectableConnection(connectionFactory: {return testConnection}, reconnectPolicy: DefaultReconnectPolicy(retryIntervals: [.milliseconds(10)]), callbackQueue: callbackQueue, logger: PrintLogger())

delegate.connectionDidOpenHandler = { _ in
testConnection.delegate?.connectionDidClose(error: SignalRError.invalidOperation(message: "forcing reconnect"))
Expand All @@ -147,11 +149,46 @@ class ReconnectableConnectionTests: XCTestCase {
waitForExpectations(timeout: 5 /*seconds*/)
}

public func testThatSendingDuringReconnectDoesNotCauseDeadlock() {
let didCloseExpectation = expectation(description: "connection closed")
let sendDidFail = expectation(description: "send failed")

let testConnection = TestConnection()
let delegate = TestConnectionDelegate()
let reconnectableConnection = ReconnectableConnection(connectionFactory: {return testConnection}, reconnectPolicy: DefaultReconnectPolicy(retryIntervals: [.milliseconds(10)]), callbackQueue: callbackQueue, logger: PrintLogger())

let tmpQueue = DispatchQueue(label: "SignalR.test.temp.queue")

delegate.connectionDidOpenHandler = { _ in
testConnection.delegate?.connectionDidClose(error: SignalRError.invalidOperation(message: "forcing reconnect"))
tmpQueue.async {
reconnectableConnection.send(data: "Should fail".data(using: .utf8)!) { error in
tmpQueue.sync {
XCTAssertNotNil(error)
XCTAssertEqual(String(describing: error!), String(describing: SignalRError.connectionIsReconnecting))
reconnectableConnection.stop(stopError: nil)
sendDidFail.fulfill()
}
}
}
}

delegate.connectionDidCloseHandler = { error in
XCTAssertNil(error)
didCloseExpectation.fulfill()
}

reconnectableConnection.delegate = delegate
reconnectableConnection.start()

waitForExpectations(timeout: 5 /*seconds*/)
}

public func testReconnectableConnectionForwardsInherentKeepAliveFromConnection() {
for inherentKeepAlive in [true, false] {
let testConnection = TestConnection()
testConnection.inherentKeepAlive = inherentKeepAlive
let reconnectableConnection = ReconnectableConnection(connectionFactory: {return testConnection}, reconnectPolicy: DefaultReconnectPolicy(retryIntervals: [.milliseconds(10)]), logger: PrintLogger())
let reconnectableConnection = ReconnectableConnection(connectionFactory: {return testConnection}, reconnectPolicy: DefaultReconnectPolicy(retryIntervals: [.milliseconds(10)]), callbackQueue: callbackQueue, logger: PrintLogger())
XCTAssertEqual(inherentKeepAlive, reconnectableConnection.inherentKeepAlive)
}
}
Expand Down

0 comments on commit 0e7a67e

Please sign in to comment.