Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ping/Pong heartbeat logic #137

Merged
merged 6 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 77 additions & 4 deletions Sources/LiveKit/Core/SignalClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {
private var webSocket: WebSocket?
private var latestJoinResponse: Livekit_JoinResponse?

private var pingIntervalTimer: DispatchQueueTimer?
private var pingTimeoutTimer: DispatchQueueTimer?

init() {
super.init()

Expand All @@ -66,7 +69,8 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {

guard let self = self else { return }

if oldState.connectionState != state.connectionState {
// connectionState did update
if state.connectionState != oldState.connectionState {
self.log("\(oldState.connectionState) -> \(state.connectionState)")
}

Expand Down Expand Up @@ -106,7 +110,6 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {
return WebSocket.connect(url: url,
onMessage: self.onWebSocketMessage,
onDisconnect: { reason in
self.webSocket = nil
self.cleanUp(reason: reason)
})
}.then(on: queue) { (webSocket: WebSocket) -> Void in
Expand Down Expand Up @@ -145,8 +148,11 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {

_state.mutate { $0.connectionState = .disconnected(reason: reason) }

pingIntervalTimer = nil
pingTimeoutTimer = nil

if let socket = webSocket {
socket.cleanUp(reason: reason)
socket.cleanUp(reason: reason, notify: false)
socket.onMessage = nil
socket.onDisconnect = nil
self.webSocket = nil
Expand Down Expand Up @@ -284,6 +290,7 @@ private extension SignalClient {
case .join(let joinResponse):
responseQueueState = .suspended
latestJoinResponse = joinResponse
restartPingTimer()
notify { $0.signalClient(self, didReceive: joinResponse) }
_state.mutate { $0.joinResponseCompleter.set(value: joinResponse) }

Expand Down Expand Up @@ -343,7 +350,7 @@ private extension SignalClient {
case .refreshToken(let token):
notify { $0.signalClient(self, didUpdate: token) }
case .pong(let r):
log("pong: \(r)")
onReceivedPong(r)
}
}
}
Expand Down Expand Up @@ -584,6 +591,7 @@ internal extension SignalClient {
return sendRequest(r)
}

@discardableResult
func sendLeave() -> Promise<Void> {
log()

Expand All @@ -594,6 +602,7 @@ internal extension SignalClient {
return sendRequest(r)
}

@discardableResult
func sendSimulate(scenario: SimulateScenario) -> Promise<Void> {
log()

Expand All @@ -608,6 +617,70 @@ internal extension SignalClient {

return sendRequest(r)
}

@discardableResult
private func sendPing() -> Promise<Void> {
log("ping/pong: sending...")

let r = Livekit_SignalRequest.with {
$0.ping = Int64(Date().timeIntervalSince1970)
}

return sendRequest(r)
}
}

// MARK: - Server ping/pong logic

private extension SignalClient {

func onPingIntervalTimer() {

guard let jr = latestJoinResponse else { return }

pingIntervalTimer?.suspend()
sendPing()

pingTimeoutTimer = {
let timer = DispatchQueueTimer(timeInterval: TimeInterval(jr.pingTimeout), queue: queue)
timer.handler = { [weak self] in
guard let self = self else { return }
self.log("ping/pong timed out", .error)
self.cleanUp(reason: .networkError(SignalClientError.serverPingTimedOut()))
}
timer.resume()
return timer
}()
}

func onReceivedPong(_ r: Int64) {

log("ping/pong received pong from server")
// clear timeout timer
pingTimeoutTimer = nil
// carry on...
pingIntervalTimer?.resume()
}

func restartPingTimer() {
// always suspend first
pingIntervalTimer = nil
pingTimeoutTimer = nil
// check received joinResponse already
guard let jr = latestJoinResponse,
// check server supports ping/pong
jr.pingTimeout > 0,
jr.pingInterval > 0 else { return }

log("ping/pong starting with interval: \(jr.pingInterval)")

pingIntervalTimer = {
hiroshihorie marked this conversation as resolved.
Show resolved Hide resolved
let timer = DispatchQueueTimer(timeInterval: TimeInterval(jr.pingInterval), queue: queue)
timer.handler = { [weak self] in self?.onPingIntervalTimer() }
timer.resume()
return timer
}()
}
}

internal extension Livekit_SignalRequest {
Expand Down
2 changes: 2 additions & 0 deletions Sources/LiveKit/Errors.swift
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public enum SignalClientError: LiveKitError {
case close(message: String? = nil)
case connect(message: String? = nil)
case timedOut(message: String? = nil)
case serverPingTimedOut(message: String? = nil)

public var description: String {
switch self {
Expand All @@ -110,6 +111,7 @@ public enum SignalClientError: LiveKitError {
case .close(let message): return buildDescription("close", message)
case .connect(let message): return buildDescription("connect", message)
case .timedOut(let message): return buildDescription("timedOut", message)
case .serverPingTimedOut(let message): return buildDescription("serverPingTimedOut", message)
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions Sources/LiveKit/Support/WebSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ internal class WebSocket: NSObject, URLSessionWebSocketDelegate, Loggable {
return connectPromise!
}

internal func cleanUp(reason: DisconnectReason?) {
internal func cleanUp(reason: DisconnectReason?, notify: Bool = true) {

log("reason: \(String(describing: reason))")

Expand All @@ -102,7 +102,9 @@ internal class WebSocket: NSObject, URLSessionWebSocketDelegate, Loggable {
connectPromise = nil
}

onDisconnect?(reason)
if notify {
onDisconnect?(reason)
}
}

public func send(data: Data) -> Promise<Void> {
Expand Down