diff --git a/Sources/LiveKit/Core/SignalClient.swift b/Sources/LiveKit/Core/SignalClient.swift index 936c4d16f..3da86b4ea 100644 --- a/Sources/LiveKit/Core/SignalClient.swift +++ b/Sources/LiveKit/Core/SignalClient.swift @@ -56,6 +56,9 @@ internal class SignalClient: MulticastDelegate { private var webSocket: WebSocket? private var latestJoinResponse: Livekit_JoinResponse? + private var pingIntervalTimer: DispatchQueueTimer? + private var pingTimeoutTimer: DispatchQueueTimer? + init() { super.init() @@ -66,7 +69,8 @@ internal class SignalClient: MulticastDelegate { guard let self = self else { return } - if oldState.connectionState != state.connectionState { + // connectionState did update + if state.connectionState != oldState.connectionState { self.log("\(oldState.connectionState) -> \(state.connectionState)") } @@ -148,8 +152,11 @@ internal class SignalClient: MulticastDelegate { _state.mutate { $0.connectionState = .disconnected(reason: reason) } + pingIntervalTimer = nil + pingTimeoutTimer = nil + if let socket = webSocket { - socket.cleanUp(reason: reason) + socket.cleanUp(reason: reason, notify: false) socket.onMessage = nil socket.onDisconnect = nil self.webSocket = nil @@ -287,6 +294,7 @@ private extension SignalClient { case .join(let joinResponse): responseQueueState = .suspended latestJoinResponse = joinResponse + restartPingTimer() notify { $0.signalClient(self, didReceive: joinResponse) } _state.mutate { $0.joinResponseCompleter.set(value: joinResponse) } @@ -346,7 +354,7 @@ private extension SignalClient { case .refreshToken(let token): notify { $0.signalClient(self, didUpdate: token) } case .pong(let r): - log("pong: \(r)") + onReceivedPong(r) } } } @@ -591,6 +599,7 @@ internal extension SignalClient { return sendRequest(r) } + @discardableResult func sendLeave() -> Promise { log() @@ -601,6 +610,7 @@ internal extension SignalClient { return sendRequest(r) } + @discardableResult func sendSimulate(scenario: SimulateScenario) -> Promise { log() @@ -615,6 +625,74 @@ internal extension SignalClient { return sendRequest(r) } + + @discardableResult + private func sendPing() -> Promise { + log("ping/pong: sending...") + + let r = Livekit_SignalRequest.with { + $0.ping = Int64(Date().timeIntervalSince1970) + } + + return sendRequest(r) + } +} + +// MARK: - Server ping/pong logic + +private extension SignalClient { + + func onPingIntervalTimer() { + + guard let jr = latestJoinResponse else { return } + + sendPing().then { [weak self] in + + guard let self = self else { return } + + if self.pingTimeoutTimer == nil { + // start timeout timer + + self.pingTimeoutTimer = { + let timer = DispatchQueueTimer(timeInterval: TimeInterval(jr.pingTimeout), queue: self.queue) + timer.handler = { [weak self] in + guard let self = self else { return } + self.log("ping/pong timed out", .error) + self.cleanUp(reason: .networkError(SignalClientError.serverPingTimedOut())) + } + timer.resume() + return timer + }() + } + } + } + + func onReceivedPong(_ r: Int64) { + + log("ping/pong received pong from server") + // clear timeout timer + pingTimeoutTimer = nil + } + + func restartPingTimer() { + // always suspend first + pingIntervalTimer = nil + pingTimeoutTimer = nil + // check received joinResponse already + guard let jr = latestJoinResponse, + // check server supports ping/pong + jr.pingTimeout > 0, + jr.pingInterval > 0 else { return } + + log("ping/pong starting with interval: \(jr.pingInterval), timeout: \(jr.pingTimeout)") + + pingIntervalTimer = { + let timer = DispatchQueueTimer(timeInterval: TimeInterval(jr.pingInterval), queue: queue) + timer.handler = { [weak self] in self?.onPingIntervalTimer() } + timer.resume() + return timer + }() + } } internal extension Livekit_SignalRequest { diff --git a/Sources/LiveKit/Errors.swift b/Sources/LiveKit/Errors.swift index e05567b4f..6964ed465 100644 --- a/Sources/LiveKit/Errors.swift +++ b/Sources/LiveKit/Errors.swift @@ -102,6 +102,7 @@ public enum SignalClientError: LiveKitError { case close(message: String? = nil) case connect(message: String? = nil) case timedOut(message: String? = nil) + case serverPingTimedOut(message: String? = nil) public var description: String { switch self { @@ -110,6 +111,7 @@ public enum SignalClientError: LiveKitError { case .close(let message): return buildDescription("close", message) case .connect(let message): return buildDescription("connect", message) case .timedOut(let message): return buildDescription("timedOut", message) + case .serverPingTimedOut(let message): return buildDescription("serverPingTimedOut", message) } } } diff --git a/Sources/LiveKit/Support/WebSocket.swift b/Sources/LiveKit/Support/WebSocket.swift index 5713fd1e2..994804dc8 100644 --- a/Sources/LiveKit/Support/WebSocket.swift +++ b/Sources/LiveKit/Support/WebSocket.swift @@ -81,7 +81,7 @@ internal class WebSocket: NSObject, URLSessionWebSocketDelegate, Loggable { return connectPromise! } - internal func cleanUp(reason: DisconnectReason?) { + internal func cleanUp(reason: DisconnectReason?, notify: Bool = true) { log("reason: \(String(describing: reason))") @@ -102,7 +102,9 @@ internal class WebSocket: NSObject, URLSessionWebSocketDelegate, Loggable { connectPromise = nil } - onDisconnect?(reason) + if notify { + onDisconnect?(reason) + } } public func send(data: Data) -> Promise {