Skip to content

Commit

Permalink
Merge pull request #28 from botlabs-gg/improved-reconnecting
Browse files Browse the repository at this point in the history
Add non-resume reconnect handling
  • Loading branch information
DRSchlaubi committed Oct 12, 2023
2 parents dc88890 + f73feb2 commit 21b6e13
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ public interface Link {
*/
public suspend fun onDisconnected()

/**
* Called internally when this link is connected or reconnected to a new node without resuming, thereby creating a
* new session.
*/
public suspend fun onNewSession()

/**
* Destroys this link (will no longer be usable).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,21 @@ public abstract class AbstractLavakord internal constructor(
guildId: ULong,
event: VoiceState
) {
link.node.updatePlayer(guildId, request = PlayerUpdate(voice = event.toOmissible()))
(link as AbstractLink).onVoiceServerUpdate(event)
}

/**
* Abstract function to create a new [Link] for this [guild][guildId] using this [node].
*/
protected abstract fun buildNewLink(guildId: ULong, node: Node): Link

/** Called on websocket connect without resuming */
internal suspend fun onNewSession(node: Node) {
if (!options.link.autoReconnect) return
linksMap.values.filter { it.node == node }.forEach {
launch {
it.onNewSession()
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,38 @@
package dev.schlaubi.lavakord.audio.internal

import dev.arbjerg.lavalink.protocol.v4.PlayerUpdate
import dev.arbjerg.lavalink.protocol.v4.VoiceState
import dev.arbjerg.lavalink.protocol.v4.toOmissible
import dev.schlaubi.lavakord.audio.Link
import dev.schlaubi.lavakord.audio.Node
import dev.schlaubi.lavakord.audio.player.Player
import dev.schlaubi.lavakord.rest.destroyPlayer
import dev.schlaubi.lavakord.rest.updatePlayer

/**
* Abstract implementation of [Link].
*/
public abstract class AbstractLink(final override val node: Node, final override val guildId: ULong) : Link {
public abstract class AbstractLink(node: Node, final override val guildId: ULong) : Link {

final override var node: Node = node
private set

override val player: Player = WebsocketPlayer(node as NodeImpl, guildId)
abstract override val lavakord: AbstractLavakord
override var lastChannelId: ULong? = null
override var state: Link.State = Link.State.NOT_CONNECTED
private var cachedVoiceState: VoiceState? = null

override suspend fun onDisconnected() {
state = Link.State.NOT_CONNECTED
node.destroyPlayer(guildId)
cachedVoiceState = null
}

override suspend fun onNewSession() {
cachedVoiceState?.let {
node.updatePlayer(guildId, request = PlayerUpdate(voice = it.toOmissible()))
}
}

override suspend fun destroy() {
Expand All @@ -29,4 +45,9 @@ public abstract class AbstractLink(final override val node: Node, final override
lavakord.removeDestroyedLink(this)
state = Link.State.DESTROYED
}

internal suspend fun onVoiceServerUpdate(update: VoiceState) {
cachedVoiceState = update
node.updatePlayer(guildId, request = PlayerUpdate(voice = update.toOmissible()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.ktor.websocket.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow
Expand Down Expand Up @@ -118,30 +119,37 @@ internal class NodeImpl(

retry.reset()

LOG.debug { "Successfully connected to node: $name ($host)" }
LOG.info { "Successfully connected to node: $name ($host)" }

while (!session.incoming.isClosedForReceive) {
try {
onEvent(session.receiveDeserialized())
} catch (e: WebsocketDeserializeException) {
LOG.warn(e) { "An error occurred whilst decoding incoming websocket packet" }
} catch (e: ClosedReceiveChannelException) {
break
} catch (e: Exception) {
LOG.warn(e) { "An exception occurred whilst decoding incoming websocket packet" }
}
}
val reason = session.closeReason.await()
if (reason?.knownReason == CloseReason.Codes.NORMAL) return

available = false
LOG.warn { "Disconnected from websocket for: $reason. Music will continue playing if we can reconnect within the next $resumeTimeout seconds" }
reconnect(resume = true)
val reason = session.closeReason.await()
val resumeAgain = resume && reason?.knownReason != CloseReason.Codes.NORMAL
if (resumeAgain) {
LOG.warn { "Disconnected from websocket for: $reason. Music will continue playing if we can reconnect within the next $resumeTimeout seconds" }
} else {
LOG.warn { "Disconnected from websocket for: $reason. Not resuming." }
}
reconnect(resume = resumeAgain)
}

private suspend fun reconnect(e: Throwable? = null, resume: Boolean = false) {
LOG.error(e) { "Exception whilst trying to connect. Reconnecting" }
if (retry.hasNext) {
LOG.error { "Exception whilst trying to connect: '${e?.message}'. Reconnecting" }
retry.retry()
connect(resume)
} else {
lavakord.removeNode(this)
error("Could not reconnect to websocket after to many attempts")
throw IllegalStateException("Could not reconnect to websocket after too many attempts", e)
}
}

Expand Down Expand Up @@ -173,6 +181,14 @@ internal class NodeImpl(
is Message.PlayerUpdateEvent -> (lavakord.getLink(event.guildId).player as WebsocketPlayer)
.provideState(event.state)

is Message.EmittedEvent.WebSocketClosedEvent -> {
// These codes represent an invalid session
// See https://discord.com/developers/docs/topics/opcodes-and-status-codes#voice-voice-close-event-codes
if (event.code == 4004 || event.code == 4006 || event.code == 4009 || event.code == 4014) {
lavakord.getLink(event.guildId).onDisconnected()
}
}

is Message.StatsEvent -> {
LOG.debug { "Received node statistics for $name: $event" }
lastStatsEvent = event
Expand All @@ -185,6 +201,7 @@ internal class NodeImpl(
is Message.ReadyEvent -> {
available = true
sessionId = event.sessionId
lavakord.onNewSession(this)
updateSession(
SessionUpdate(
resuming = true.toOmissible(),
Expand Down

0 comments on commit 21b6e13

Please sign in to comment.