From 7a52e22dc7d185708de76eba3b1b885973ca16d9 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 13 Sep 2022 16:40:51 +0900 Subject: [PATCH 1/4] progress --- Sources/LiveKit/Core/SignalClient.swift | 35 +++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/Sources/LiveKit/Core/SignalClient.swift b/Sources/LiveKit/Core/SignalClient.swift index 8a7b832b4..0a3d7068f 100644 --- a/Sources/LiveKit/Core/SignalClient.swift +++ b/Sources/LiveKit/Core/SignalClient.swift @@ -56,6 +56,8 @@ internal class SignalClient: MulticastDelegate { private var webSocket: WebSocket? private var latestJoinResponse: Livekit_JoinResponse? + private lazy var pingTimer = DispatchQueueTimer(timeInterval: 1, queue: queue) + init() { super.init() @@ -66,12 +68,23 @@ internal class SignalClient: MulticastDelegate { guard let self = self else { return } - if oldState.connectionState != state.connectionState { + // connectionState did update + if state.connectionState != oldState.connectionState { self.log("\(oldState.connectionState) -> \(state.connectionState)") + + if case .connected = state.connectionState { + self.pingTimer.restart() + } else { + self.pingTimer.suspend() + } } self.notify { $0.signalClient(self, didMutate: state, oldState: oldState) } } + + pingTimer.handler = { [weak self] in + self?.onPingTimer() + } } deinit { @@ -203,6 +216,11 @@ internal class SignalClient: MulticastDelegate { throw: { EngineError.timedOut(message: "server didn't respond to addTrack request") }) } } + + func onPingTimer() { + // + sendPing() + } } // MARK: - Private @@ -343,7 +361,7 @@ private extension SignalClient { case .refreshToken(let token): notify { $0.signalClient(self, didUpdate: token) } case .pong(let r): - log("pong: \(r)") + log("ping/pong: received from server \(r)") } } } @@ -584,6 +602,7 @@ internal extension SignalClient { return sendRequest(r) } + @discardableResult func sendLeave() -> Promise { log() @@ -594,6 +613,7 @@ internal extension SignalClient { return sendRequest(r) } + @discardableResult func sendSimulate(scenario: SimulateScenario) -> Promise { log() @@ -608,6 +628,17 @@ internal extension SignalClient { return sendRequest(r) } + + @discardableResult + func sendPing() -> Promise { + log("ping/pong: sending...") + + let r = Livekit_SignalRequest.with { + $0.ping = Int64(Date().timeIntervalSince1970) + } + + return sendRequest(r) + } } internal extension Livekit_SignalRequest { From 30acdb28897db412616eab6cb780da996e3a62df Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Thu, 15 Sep 2022 21:51:18 +0900 Subject: [PATCH 2/4] impl --- Sources/LiveKit/Core/SignalClient.swift | 81 +++++++++++++++++++------ Sources/LiveKit/Errors.swift | 2 + Sources/LiveKit/Support/WebSocket.swift | 6 +- 3 files changed, 68 insertions(+), 21 deletions(-) diff --git a/Sources/LiveKit/Core/SignalClient.swift b/Sources/LiveKit/Core/SignalClient.swift index 0a3d7068f..ff2dbee17 100644 --- a/Sources/LiveKit/Core/SignalClient.swift +++ b/Sources/LiveKit/Core/SignalClient.swift @@ -56,7 +56,8 @@ internal class SignalClient: MulticastDelegate { private var webSocket: WebSocket? private var latestJoinResponse: Livekit_JoinResponse? - private lazy var pingTimer = DispatchQueueTimer(timeInterval: 1, queue: queue) + private var pingIntervalTimer: DispatchQueueTimer? + private var pingTimeoutTimer: DispatchQueueTimer? init() { super.init() @@ -71,20 +72,10 @@ internal class SignalClient: MulticastDelegate { // connectionState did update if state.connectionState != oldState.connectionState { self.log("\(oldState.connectionState) -> \(state.connectionState)") - - if case .connected = state.connectionState { - self.pingTimer.restart() - } else { - self.pingTimer.suspend() - } } self.notify { $0.signalClient(self, didMutate: state, oldState: oldState) } } - - pingTimer.handler = { [weak self] in - self?.onPingTimer() - } } deinit { @@ -119,7 +110,6 @@ internal class SignalClient: MulticastDelegate { return WebSocket.connect(url: url, onMessage: self.onWebSocketMessage, onDisconnect: { reason in - self.webSocket = nil self.cleanUp(reason: reason) }) }.then(on: queue) { (webSocket: WebSocket) -> Void in @@ -158,8 +148,11 @@ internal class SignalClient: MulticastDelegate { _state.mutate { $0.connectionState = .disconnected(reason: reason) } + pingIntervalTimer = nil + pingTimeoutTimer = nil + if let socket = webSocket { - socket.cleanUp(reason: reason) + socket.cleanUp(reason: reason, notify: false) socket.onMessage = nil socket.onDisconnect = nil self.webSocket = nil @@ -216,11 +209,6 @@ internal class SignalClient: MulticastDelegate { throw: { EngineError.timedOut(message: "server didn't respond to addTrack request") }) } } - - func onPingTimer() { - // - sendPing() - } } // MARK: - Private @@ -302,6 +290,7 @@ private extension SignalClient { case .join(let joinResponse): responseQueueState = .suspended latestJoinResponse = joinResponse + restartPingTimer() notify { $0.signalClient(self, didReceive: joinResponse) } _state.mutate { $0.joinResponseCompleter.set(value: joinResponse) } @@ -361,7 +350,7 @@ private extension SignalClient { case .refreshToken(let token): notify { $0.signalClient(self, didUpdate: token) } case .pong(let r): - log("ping/pong: received from server \(r)") + onReceivedPong(r) } } } @@ -641,6 +630,60 @@ internal extension SignalClient { } } +internal extension SignalClient { + + func onPingIntervalTimer() { + + guard let jr = latestJoinResponse, + // check server supports ping/pong + jr.pingTimeout > 0, + jr.pingInterval > 0 else { return } + + pingIntervalTimer?.suspend() + sendPing() + + pingTimeoutTimer = { + let timer = DispatchQueueTimer(timeInterval: TimeInterval(jr.pingTimeout), queue: queue) + timer.handler = { [weak self] in + guard let self = self else { return } + self.log("ping/pong timed out", .error) + self.cleanUp(reason: .networkError(SignalClientError.serverPingTimedOut())) + } + timer.resume() + return timer + }() + } + + func onReceivedPong(_ r: Int64) { + + log("ping/pong received pong from server") + // clear timeout timer + pingTimeoutTimer = nil + // carry on... + pingIntervalTimer?.resume() + } + + func restartPingTimer() { + // always suspend first + pingIntervalTimer = nil + pingTimeoutTimer = nil + // check received joinResponse already + guard let jr = latestJoinResponse, + // check server supports ping/pong + jr.pingTimeout > 0, + jr.pingInterval > 0 else { return } + + log("ping/pong starting with interval: \(jr.pingInterval)") + + pingIntervalTimer = { + let timer = DispatchQueueTimer(timeInterval: TimeInterval(jr.pingInterval), queue: queue) + timer.handler = { [weak self] in self?.onPingIntervalTimer() } + timer.resume() + return timer + }() + } +} + internal extension Livekit_SignalRequest { func canEnqueue() -> Bool { diff --git a/Sources/LiveKit/Errors.swift b/Sources/LiveKit/Errors.swift index e05567b4f..6964ed465 100644 --- a/Sources/LiveKit/Errors.swift +++ b/Sources/LiveKit/Errors.swift @@ -102,6 +102,7 @@ public enum SignalClientError: LiveKitError { case close(message: String? = nil) case connect(message: String? = nil) case timedOut(message: String? = nil) + case serverPingTimedOut(message: String? = nil) public var description: String { switch self { @@ -110,6 +111,7 @@ public enum SignalClientError: LiveKitError { case .close(let message): return buildDescription("close", message) case .connect(let message): return buildDescription("connect", message) case .timedOut(let message): return buildDescription("timedOut", message) + case .serverPingTimedOut(let message): return buildDescription("serverPingTimedOut", message) } } } diff --git a/Sources/LiveKit/Support/WebSocket.swift b/Sources/LiveKit/Support/WebSocket.swift index 5713fd1e2..994804dc8 100644 --- a/Sources/LiveKit/Support/WebSocket.swift +++ b/Sources/LiveKit/Support/WebSocket.swift @@ -81,7 +81,7 @@ internal class WebSocket: NSObject, URLSessionWebSocketDelegate, Loggable { return connectPromise! } - internal func cleanUp(reason: DisconnectReason?) { + internal func cleanUp(reason: DisconnectReason?, notify: Bool = true) { log("reason: \(String(describing: reason))") @@ -102,7 +102,9 @@ internal class WebSocket: NSObject, URLSessionWebSocketDelegate, Loggable { connectPromise = nil } - onDisconnect?(reason) + if notify { + onDisconnect?(reason) + } } public func send(data: Data) -> Promise { From a06aad64c3c37e7e8e1193933a580899ed9a4817 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Thu, 15 Sep 2022 21:55:23 +0900 Subject: [PATCH 3/4] minor adjustments --- Sources/LiveKit/Core/SignalClient.swift | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/Sources/LiveKit/Core/SignalClient.swift b/Sources/LiveKit/Core/SignalClient.swift index ff2dbee17..8daa25eac 100644 --- a/Sources/LiveKit/Core/SignalClient.swift +++ b/Sources/LiveKit/Core/SignalClient.swift @@ -619,7 +619,7 @@ internal extension SignalClient { } @discardableResult - func sendPing() -> Promise { + private func sendPing() -> Promise { log("ping/pong: sending...") let r = Livekit_SignalRequest.with { @@ -630,14 +630,13 @@ internal extension SignalClient { } } -internal extension SignalClient { +// MARK: - Server ping/pong logic + +private extension SignalClient { func onPingIntervalTimer() { - guard let jr = latestJoinResponse, - // check server supports ping/pong - jr.pingTimeout > 0, - jr.pingInterval > 0 else { return } + guard let jr = latestJoinResponse else { return } pingIntervalTimer?.suspend() sendPing() @@ -671,7 +670,7 @@ internal extension SignalClient { guard let jr = latestJoinResponse, // check server supports ping/pong jr.pingTimeout > 0, - jr.pingInterval > 0 else { return } + jr.pingInterval > 0 else { return } log("ping/pong starting with interval: \(jr.pingInterval)") From 6cf871b12bb90a7d914aec1d496660b33ca93f7d Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Fri, 25 Nov 2022 23:09:11 +0900 Subject: [PATCH 4/4] change logic --- Sources/LiveKit/Core/SignalClient.swift | 34 ++++++++++++++----------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/Sources/LiveKit/Core/SignalClient.swift b/Sources/LiveKit/Core/SignalClient.swift index 7d77e7a86..3da86b4ea 100644 --- a/Sources/LiveKit/Core/SignalClient.swift +++ b/Sources/LiveKit/Core/SignalClient.swift @@ -646,19 +646,25 @@ private extension SignalClient { guard let jr = latestJoinResponse else { return } - pingIntervalTimer?.suspend() - sendPing() - - pingTimeoutTimer = { - let timer = DispatchQueueTimer(timeInterval: TimeInterval(jr.pingTimeout), queue: queue) - timer.handler = { [weak self] in - guard let self = self else { return } - self.log("ping/pong timed out", .error) - self.cleanUp(reason: .networkError(SignalClientError.serverPingTimedOut())) + sendPing().then { [weak self] in + + guard let self = self else { return } + + if self.pingTimeoutTimer == nil { + // start timeout timer + + self.pingTimeoutTimer = { + let timer = DispatchQueueTimer(timeInterval: TimeInterval(jr.pingTimeout), queue: self.queue) + timer.handler = { [weak self] in + guard let self = self else { return } + self.log("ping/pong timed out", .error) + self.cleanUp(reason: .networkError(SignalClientError.serverPingTimedOut())) + } + timer.resume() + return timer + }() } - timer.resume() - return timer - }() + } } func onReceivedPong(_ r: Int64) { @@ -666,8 +672,6 @@ private extension SignalClient { log("ping/pong received pong from server") // clear timeout timer pingTimeoutTimer = nil - // carry on... - pingIntervalTimer?.resume() } func restartPingTimer() { @@ -680,7 +684,7 @@ private extension SignalClient { jr.pingTimeout > 0, jr.pingInterval > 0 else { return } - log("ping/pong starting with interval: \(jr.pingInterval)") + log("ping/pong starting with interval: \(jr.pingInterval), timeout: \(jr.pingTimeout)") pingIntervalTimer = { let timer = DispatchQueueTimer(timeInterval: TimeInterval(jr.pingInterval), queue: queue)