Skip to content

Commit

Permalink
Ping/Pong heartbeat logic (#137)
Browse files Browse the repository at this point in the history
* progress

* impl

* minor adjustments

* change logic
  • Loading branch information
hiroshihorie committed Nov 25, 2022
1 parent 1f5959f commit 80d9d97
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 5 deletions.
84 changes: 81 additions & 3 deletions Sources/LiveKit/Core/SignalClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {
private var webSocket: WebSocket?
private var latestJoinResponse: Livekit_JoinResponse?

private var pingIntervalTimer: DispatchQueueTimer?
private var pingTimeoutTimer: DispatchQueueTimer?

init() {
super.init()

Expand All @@ -66,7 +69,8 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {

guard let self = self else { return }

if oldState.connectionState != state.connectionState {
// connectionState did update
if state.connectionState != oldState.connectionState {
self.log("\(oldState.connectionState) -> \(state.connectionState)")
}

Expand Down Expand Up @@ -148,8 +152,11 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {

_state.mutate { $0.connectionState = .disconnected(reason: reason) }

pingIntervalTimer = nil
pingTimeoutTimer = nil

if let socket = webSocket {
socket.cleanUp(reason: reason)
socket.cleanUp(reason: reason, notify: false)
socket.onMessage = nil
socket.onDisconnect = nil
self.webSocket = nil
Expand Down Expand Up @@ -287,6 +294,7 @@ private extension SignalClient {
case .join(let joinResponse):
responseQueueState = .suspended
latestJoinResponse = joinResponse
restartPingTimer()
notify { $0.signalClient(self, didReceive: joinResponse) }
_state.mutate { $0.joinResponseCompleter.set(value: joinResponse) }

Expand Down Expand Up @@ -346,7 +354,7 @@ private extension SignalClient {
case .refreshToken(let token):
notify { $0.signalClient(self, didUpdate: token) }
case .pong(let r):
log("pong: \(r)")
onReceivedPong(r)
}
}
}
Expand Down Expand Up @@ -591,6 +599,7 @@ internal extension SignalClient {
return sendRequest(r)
}

@discardableResult
func sendLeave() -> Promise<Void> {
log()

Expand All @@ -601,6 +610,7 @@ internal extension SignalClient {
return sendRequest(r)
}

@discardableResult
func sendSimulate(scenario: SimulateScenario) -> Promise<Void> {
log()

Expand All @@ -615,6 +625,74 @@ internal extension SignalClient {

return sendRequest(r)
}

@discardableResult
private func sendPing() -> Promise<Void> {
log("ping/pong: sending...")

let r = Livekit_SignalRequest.with {
$0.ping = Int64(Date().timeIntervalSince1970)
}

return sendRequest(r)
}
}

// MARK: - Server ping/pong logic

private extension SignalClient {

func onPingIntervalTimer() {

guard let jr = latestJoinResponse else { return }

sendPing().then { [weak self] in

guard let self = self else { return }

if self.pingTimeoutTimer == nil {
// start timeout timer

self.pingTimeoutTimer = {
let timer = DispatchQueueTimer(timeInterval: TimeInterval(jr.pingTimeout), queue: self.queue)
timer.handler = { [weak self] in
guard let self = self else { return }
self.log("ping/pong timed out", .error)
self.cleanUp(reason: .networkError(SignalClientError.serverPingTimedOut()))
}
timer.resume()
return timer
}()
}
}
}

func onReceivedPong(_ r: Int64) {

log("ping/pong received pong from server")
// clear timeout timer
pingTimeoutTimer = nil
}

func restartPingTimer() {
// always suspend first
pingIntervalTimer = nil
pingTimeoutTimer = nil
// check received joinResponse already
guard let jr = latestJoinResponse,
// check server supports ping/pong
jr.pingTimeout > 0,
jr.pingInterval > 0 else { return }

log("ping/pong starting with interval: \(jr.pingInterval), timeout: \(jr.pingTimeout)")

pingIntervalTimer = {
let timer = DispatchQueueTimer(timeInterval: TimeInterval(jr.pingInterval), queue: queue)
timer.handler = { [weak self] in self?.onPingIntervalTimer() }
timer.resume()
return timer
}()
}
}

internal extension Livekit_SignalRequest {
Expand Down
2 changes: 2 additions & 0 deletions Sources/LiveKit/Errors.swift
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public enum SignalClientError: LiveKitError {
case close(message: String? = nil)
case connect(message: String? = nil)
case timedOut(message: String? = nil)
case serverPingTimedOut(message: String? = nil)

public var description: String {
switch self {
Expand All @@ -110,6 +111,7 @@ public enum SignalClientError: LiveKitError {
case .close(let message): return buildDescription("close", message)
case .connect(let message): return buildDescription("connect", message)
case .timedOut(let message): return buildDescription("timedOut", message)
case .serverPingTimedOut(let message): return buildDescription("serverPingTimedOut", message)
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions Sources/LiveKit/Support/WebSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ internal class WebSocket: NSObject, URLSessionWebSocketDelegate, Loggable {
return connectPromise!
}

internal func cleanUp(reason: DisconnectReason?) {
internal func cleanUp(reason: DisconnectReason?, notify: Bool = true) {

log("reason: \(String(describing: reason))")

Expand All @@ -102,7 +102,9 @@ internal class WebSocket: NSObject, URLSessionWebSocketDelegate, Loggable {
connectPromise = nil
}

onDisconnect?(reason)
if notify {
onDisconnect?(reason)
}
}

public func send(data: Data) -> Promise<Void> {
Expand Down

0 comments on commit 80d9d97

Please sign in to comment.